简化redis

This commit is contained in:
liulu 2024-10-31 14:23:15 +08:00
parent f7dc9427ac
commit ac339d668e
6 changed files with 1021 additions and 1039 deletions

View File

@ -1,7 +1,5 @@
package com.sunyard.ssp.redis; package com.sunyard.ssp.redis;
import com.alibaba.fastjson.JSONObject;
import com.sunyard.ssp.server.Config;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -141,10 +139,10 @@ public class JedisClusterConfig extends CachingConfigurerSupport {
@Bean @Bean
public JedisPool redisPoolFactory(){ public JedisPool redisPoolFactory(){
JSONObject node = Config.global.messenger.redisCluster.getJSONObject("node"); // JSONObject node = Config.global.messenger.redisCluster.getJSONObject("node");
String password = Config.global.messenger.redisCluster.getString("password"); // String password = Config.global.messenger.redisCluster.getString("password");
String host = node.getString("ip"); // String host = node.getString("ip");
Integer port = node.getInteger("port"); // Integer port = node.getInteger("port");
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(10); jedisPoolConfig.setMaxTotal(10);
@ -158,7 +156,7 @@ public class JedisClusterConfig extends CachingConfigurerSupport {
jedisPoolConfig.setTestOnBorrow(true); jedisPoolConfig.setTestOnBorrow(true);
jedisPoolConfig.setTestOnReturn(true); jedisPoolConfig.setTestOnReturn(true);
JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, 10000, password); JedisPool jedisPool = new JedisPool(jedisPoolConfig, host2, port2, 10000, password);
return jedisPool; return jedisPool;
} }

View File

@ -1,72 +1,72 @@
package com.sunyard.ssp.redis; //package com.sunyard.ssp.redis;
//
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ListOperations; //import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.SetOperations; //import org.springframework.data.redis.core.SetOperations;
import org.springframework.data.redis.core.StringRedisTemplate; //import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations; //import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import java.util.List; //import java.util.List;
import java.util.Set; //import java.util.Set;
import java.util.concurrent.TimeUnit; //import java.util.concurrent.TimeUnit;
//
/** ///**
* @author:tsz // * @author:tsz
* @date:2020/3/6 // * @date:2020/3/6
* @description: redis String 基本操作 // * @description: redis String 基本操作
*/ // */
@Component //@Component
public class RedisOperation { //public class RedisOperation {
//
@Autowired // @Autowired
private StringRedisTemplate redisTemplate; // private StringRedisTemplate redisTemplate;
//
/*** // /***
* 操作普通字符串 // * 操作普通字符串
*/ // */
public void StringSet(String key, String value) { // public void StringSet(String key, String value) {
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue(); // ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
valueOperations.set(key, value); // valueOperations.set(key, value);
} // }
//
/*** // /***
* 操作列表 // * 操作列表
*/ // */
public void ListSet(String key, List<String> values) { // public void ListSet(String key, List<String> values) {
ListOperations<String, String> listOperations = redisTemplate.opsForList(); // ListOperations<String, String> listOperations = redisTemplate.opsForList();
values.forEach(value -> listOperations.leftPush(key, value)); // values.forEach(value -> listOperations.leftPush(key, value));
} // }
//
/*** // /***
* 操作集合 // * 操作集合
*/ // */
public void SetSet(String key, Set<String> values) { // public void SetSet(String key, Set<String> values) {
SetOperations<String, String> setOperations = redisTemplate.opsForSet(); // SetOperations<String, String> setOperations = redisTemplate.opsForSet();
values.forEach(value -> setOperations.add(key, value)); // values.forEach(value -> setOperations.add(key, value));
} // }
//
/*** // /***
* 获取字符串 // * 获取字符串
*/ // */
public String StringGet(String key) { // public String StringGet(String key) {
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue(); // ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
return valueOperations.get(key); // return valueOperations.get(key);
} // }
//
/*** // /***
* 列表弹出元素 // * 列表弹出元素
*/ // */
public String ListLeftPop(String key) { // public String ListLeftPop(String key) {
ListOperations<String, String> listOperations = redisTemplate.opsForList(); // ListOperations<String, String> listOperations = redisTemplate.opsForList();
return listOperations.leftPop(key, 2, TimeUnit.SECONDS); // return listOperations.leftPop(key, 2, TimeUnit.SECONDS);
} // }
//
/*** // /***
* 集合弹出元素 // * 集合弹出元素
*/ // */
public String SetPop(String key) { // public String SetPop(String key) {
SetOperations<String, String> setOperations = redisTemplate.opsForSet(); // SetOperations<String, String> setOperations = redisTemplate.opsForSet();
return setOperations.pop(key); // return setOperations.pop(key);
} // }
} //}

View File

@ -1,433 +1,433 @@
package com.sunyard.ssp.server; //package com.sunyard.ssp.server;
//
import cn.hutool.core.util.ObjectUtil; //import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil; //import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONArray; //import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; //import com.alibaba.fastjson.JSONObject;
import com.google.protobuf.Any; //import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException; //import com.google.protobuf.InvalidProtocolBufferException;
import com.sunyard.ssp.common.exception.SspwebException; //import com.sunyard.ssp.common.exception.SspwebException;
import com.sunyard.ssp.proto.control.Control; //import com.sunyard.ssp.proto.control.Control;
import lombok.AllArgsConstructor; //import lombok.AllArgsConstructor;
import lombok.Getter; //import lombok.Getter;
import lombok.SneakyThrows; //import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import lombok.val; //import lombok.val;
import lombok.var; //import lombok.var;
import redis.clients.jedis.BinaryJedisPubSub; //import redis.clients.jedis.BinaryJedisPubSub;
import redis.clients.jedis.HostAndPort; //import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster; //import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool; //import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig; //import redis.clients.jedis.JedisPoolConfig;
//
import java.util.HashSet; //import java.util.HashSet;
import java.util.Iterator; //import java.util.Iterator;
import java.util.Set; //import java.util.Set;
import java.util.UUID; //import java.util.UUID;
import java.util.concurrent.Callable; //import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; //import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; //import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException; //import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; //import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; //import java.util.concurrent.Executors;
import java.util.concurrent.Future; //import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; //import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; //import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; //import java.util.concurrent.locks.ReentrantLock;
//
//
/** ///**
* 封装到 Server 的通信 // * 封装到 Server 的通信
* 异步转同步 // * 异步转同步
*/ // */
//
@Slf4j //@Slf4j
public class Messenger { //public class Messenger {
//
private static String channel = "MANAGER_CLUSTER"; // private static String channel = "MANAGER_CLUSTER";
private static JedisCluster jedisCluster; // private static JedisCluster jedisCluster;
private static Redis redisSubscribe; // private static Redis redisSubscribe;
private static Redis redisSubscribe2; // private static Redis redisSubscribe2;
private static Redis redis;//Publish // private static Redis redis;//Publish
private static Thread thread; // private static Thread thread;
private static ExecutorService executorService = Executors.newFixedThreadPool(3 ); // private static ExecutorService executorService = Executors.newFixedThreadPool(3 );
private static ConcurrentHashMap<String, Record> pendingMessageRecord = new ConcurrentHashMap<>(); // private static ConcurrentHashMap<String, Record> pendingMessageRecord = new ConcurrentHashMap<>();
//
private static JedisPool jedisPool = null; // private static JedisPool jedisPool = null;
private static JedisPool jedisPool2 = null; // private static JedisPool jedisPool2 = null;
private static JedisPool jedisPool3 = null; // private static JedisPool jedisPool3 = null;
//
private static String host ; // private static String host ;
private static Integer port ; // private static Integer port ;
private static String password; // private static String password;
private static String mode; // private static String mode;
static { // static {
//
// 只有 CLI 需要重置 channel // // 只有 CLI 需要重置 channel
//channel = Control.Req.Origin.CLI + "_" + Config.global.system.name; // //channel = Control.Req.Origin.CLI + "_" + Config.global.system.name;
//
// 初始化 // // 初始化
// 链接到 redis // // 链接到 redis
Set<HostAndPort> nodes = new HashSet<>(); // Set<HostAndPort> nodes = new HashSet<>();
JSONArray ns = Config.global.messenger.redisCluster.getJSONArray("nodes"); // JSONArray ns = Config.global.messenger.redisCluster.getJSONArray("nodes");
password = Config.global.messenger.redisCluster.getString("password"); // password = Config.global.messenger.redisCluster.getString("password");
mode = Config.global.messenger.redisCluster.getString("mode"); // mode = Config.global.messenger.redisCluster.getString("mode");
JSONObject node = Config.global.messenger.redisCluster.getJSONObject("node"); // JSONObject node = Config.global.messenger.redisCluster.getJSONObject("node");
host = node.getString("ip"); // host = node.getString("ip");
port = node.getInteger("port"); // port = node.getInteger("port");
for (Object n : ns) { // for (Object n : ns) {
JSONObject no = (JSONObject) n; // JSONObject no = (JSONObject) n;
nodes.add(new HostAndPort(no.getString("ip"), no.getInteger("port"))); // nodes.add(new HostAndPort(no.getString("ip"), no.getInteger("port")));
} // }
//
@AllArgsConstructor // @AllArgsConstructor
class Msg extends BinaryJedisPubSub { // class Msg extends BinaryJedisPubSub {
//
@Override // @Override
public void onMessage(byte[] channel, byte[] message) { // public void onMessage(byte[] channel, byte[] message) {
//
//
// message 解析为 Res 对象 // // message 解析为 Res 对象
try { // try {
val res = Control.Res.parseFrom( message ); // val res = Control.Res.parseFrom( message );
val id = res.getId(); // val id = res.getId();
//
val record = pendingMessageRecord.get(id); // val record = pendingMessageRecord.get(id);
if ( null == record ){ // if ( null == record ){
log.debug("没有找到消息 {} 的记录, 遗弃。", id); // log.debug("没有找到消息 {} 的记录, 遗弃。", id);
} else { // } else {
//
record.lock.lock(); // record.lock.lock();
try { // try {
if ( record instanceof SoloRecord ){ // if ( record instanceof SoloRecord ){
log.debug("找到 Solo 消息 {} 的记录, 触发信号。", id); // log.debug("找到 Solo 消息 {} 的记录, 触发信号。", id);
((SoloRecord) record).res = res; // ((SoloRecord) record).res = res;
((SoloRecord) record).getCondition().signal(); // ((SoloRecord) record).getCondition().signal();
} else if ( record instanceof ClusterRecord ){ // } else if ( record instanceof ClusterRecord ){
log.debug("找到 Cluster 消息 {} 的记录, 触发阶段信号。", id); // log.debug("找到 Cluster 消息 {} 的记录, 触发阶段信号。", id);
((ClusterRecord) record).reslist.add( res ); // ((ClusterRecord) record).reslist.add( res );
((ClusterRecord) record).getStage().signal(); // ((ClusterRecord) record).getStage().signal();
} // }
}finally { // }finally {
record.lock.unlock(); // record.lock.unlock();
} // }
//
} // }
//
} catch (InvalidProtocolBufferException e) { // } catch (InvalidProtocolBufferException e) {
e.printStackTrace(); // e.printStackTrace();
} // }
//
} // }
} // }
//
JedisPoolConfig config = new JedisPoolConfig(); // JedisPoolConfig config = new JedisPoolConfig();
// config.setTestOnBorrow(false); // // config.setTestOnBorrow(false);
jedisCluster = new JedisCluster(nodes, 1000, 2000, 5, password, config); // jedisCluster = new JedisCluster(nodes, 1000, 2000, 5, password, config);
if (jedisPool ==null){ // if (jedisPool ==null){
//调试的时候的配置 // //调试的时候的配置
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); // JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
//连接池中的最大空闲连接 // //连接池中的最大空闲连接
jedisPoolConfig.setMaxIdle(1000); // jedisPoolConfig.setMaxIdle(1000);
//连接池最大阻塞等待时间 // //连接池最大阻塞等待时间
//jedisPoolConfig.setMaxWaitMillis(10000); // //jedisPoolConfig.setMaxWaitMillis(10000);
//在获取连接的时候检查有效性 // //在获取连接的时候检查有效性
//jedisPoolConfig.setTestOnBorrow(true); // //jedisPoolConfig.setTestOnBorrow(true);
// 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true // // 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
jedisPoolConfig.setBlockWhenExhausted(false); // jedisPoolConfig.setBlockWhenExhausted(false);
// 是否启用pool的jmx管理功能, 默认true // // 是否启用pool的jmx管理功能, 默认true
jedisPoolConfig.setJmxEnabled(true); // jedisPoolConfig.setJmxEnabled(true);
jedisPool = new JedisPool(jedisPoolConfig,host,port,10000,password); // jedisPool = new JedisPool(jedisPoolConfig,host,port,10000,password);
} // }
if (jedisPool2 ==null){ // if (jedisPool2 ==null){
//调试的时候的配置 // //调试的时候的配置
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); // JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
//连接池中的最大空闲连接 // //连接池中的最大空闲连接
jedisPoolConfig.setMaxIdle(1000); // jedisPoolConfig.setMaxIdle(1000);
//连接池最大阻塞等待时间 // //连接池最大阻塞等待时间
//jedisPoolConfig.setMaxWaitMillis(10000); // //jedisPoolConfig.setMaxWaitMillis(10000);
//在获取连接的时候检查有效性 // //在获取连接的时候检查有效性
//jedisPoolConfig.setTestOnBorrow(true); // //jedisPoolConfig.setTestOnBorrow(true);
// 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true // // 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
jedisPoolConfig.setBlockWhenExhausted(false); // jedisPoolConfig.setBlockWhenExhausted(false);
// 是否启用pool的jmx管理功能, 默认true // // 是否启用pool的jmx管理功能, 默认true
jedisPoolConfig.setJmxEnabled(true); // jedisPoolConfig.setJmxEnabled(true);
jedisPool2 = new JedisPool(jedisPoolConfig,host,port,10000,password); // jedisPool2 = new JedisPool(jedisPoolConfig,host,port,10000,password);
} // }
if (jedisPool3 ==null){ // if (jedisPool3 ==null){
//
} // }
redis = new Redis(jedisCluster, jedisPool2, password, mode); // redis = new Redis(jedisCluster, jedisPool2, password, mode);
redisSubscribe = new Redis(jedisCluster, jedisPool, password, mode); // redisSubscribe = new Redis(jedisCluster, jedisPool, password, mode);
//
thread = new Thread(new Runnable() { // thread = new Thread(new Runnable() {
@Override // @Override
public void run() { // public void run() {
log.info("监听反馈消息 {}", channel); // log.info("监听反馈消息 {}", channel);
redisSubscribe.subscribe(new Msg(), channel.getBytes() ); // redisSubscribe.subscribe(new Msg(), channel.getBytes() );
//
} // }
}); // });
thread.setName("messenger"); // thread.setName("messenger");
thread.start(); // thread.start();
} // }
//
//
public static Thread subscribe(final BinaryJedisPubSub jedisPubSub, String channel){ // public static Thread subscribe(final BinaryJedisPubSub jedisPubSub, String channel){
return new Thread(new Runnable() { // return new Thread(new Runnable() {
@Override // @Override
public void run() { // public void run() {
//调试的时候的配置 // //调试的时候的配置
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); // JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
//连接池中的最大空闲连接 // //连接池中的最大空闲连接
jedisPoolConfig.setMaxIdle(1000); // jedisPoolConfig.setMaxIdle(1000);
//连接池最大阻塞等待时间 // //连接池最大阻塞等待时间
//jedisPoolConfig.setMaxWaitMillis(10000); // //jedisPoolConfig.setMaxWaitMillis(10000);
//在获取连接的时候检查有效性 // //在获取连接的时候检查有效性
//jedisPoolConfig.setTestOnBorrow(true); // //jedisPoolConfig.setTestOnBorrow(true);
// 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true // // 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
jedisPoolConfig.setBlockWhenExhausted(false); // jedisPoolConfig.setBlockWhenExhausted(false);
// 是否启用pool的jmx管理功能, 默认true // // 是否启用pool的jmx管理功能, 默认true
jedisPoolConfig.setJmxEnabled(true); // jedisPoolConfig.setJmxEnabled(true);
jedisPool3 = new JedisPool(jedisPoolConfig,host,port,10000,password); // jedisPool3 = new JedisPool(jedisPoolConfig,host,port,10000,password);
//
redisSubscribe2 = new Redis(jedisCluster, jedisPool3, password, mode); // redisSubscribe2 = new Redis(jedisCluster, jedisPool3, password, mode);
//
log.info("监听Check消息 {}", channel); // log.info("监听Check消息 {}", channel);
redisSubscribe2.subscribe(jedisPubSub, channel.getBytes() ); // redisSubscribe2.subscribe(jedisPubSub, channel.getBytes() );
} // }
}); // });
} // }
//
public static String genId(){ // public static String genId(){
return UUID.randomUUID().toString().replace("-", ""); // return UUID.randomUUID().toString().replace("-", "");
} // }
//
//
public static Control.Res primarySyncCmd( String cmd, com.google.protobuf.GeneratedMessageV3 data) throws ExecutionException, InterruptedException { // public static Control.Res primarySyncCmd( String cmd, com.google.protobuf.GeneratedMessageV3 data) throws ExecutionException, InterruptedException {
var c = System.currentTimeMillis(); // var c = System.currentTimeMillis();
var last = c - 120000L; // var last = c - 120000L;
redis.zremrangeByScore("SERVER_CLUSTER", 0, last); // redis.zremrangeByScore("SERVER_CLUSTER", 0, last);
val servers = redis.zrevrange("SERVER_CLUSTER", 0, 9); // val servers = redis.zrevrange("SERVER_CLUSTER", 0, 9);
if (servers.size()<1){ // if (servers.size()<1){
throw new SspwebException("安全服务异常"); // throw new SspwebException("安全服务异常");
} // }
//
//
Iterator<String> it = servers.iterator(); // Iterator<String> it = servers.iterator();
String channel = servers.iterator().next(); // String channel = servers.iterator().next();
//
// 方便调试的逻辑 // // 方便调试的逻辑
// 如果配置了调试服务器优先使用调试服务器 // // 如果配置了调试服务器优先使用调试服务器
val debugServer = redis.get("DEBUG_SERVER"); // val debugServer = redis.get("DEBUG_SERVER");
if ( null != debugServer && servers.contains( debugServer ) ) { // if ( null != debugServer && servers.contains( debugServer ) ) {
channel = debugServer; // channel = debugServer;
} // }
//
val req = Control.Req.newBuilder() // val req = Control.Req.newBuilder()
.setId(Messenger.genId()) // .setId(Messenger.genId())
.setOrigin(Control.Req.Origin.MANAGER) // .setOrigin(Control.Req.Origin.MANAGER)
.setFrom("CLUSTER") // .setFrom("CLUSTER")
.setCmd(cmd) // .setCmd(cmd)
.setData(Any.pack(data)) // .setData(Any.pack(data))
.build(); // .build();
return soloSyncCmd("SERVER_" + channel, req); // return soloSyncCmd("SERVER_" + channel, req);
} // }
//
//
public static Control.Res soloSyncCmd(String channel, String cmd, com.google.protobuf.GeneratedMessageV3 data) throws ExecutionException, InterruptedException { // public static Control.Res soloSyncCmd(String channel, String cmd, com.google.protobuf.GeneratedMessageV3 data) throws ExecutionException, InterruptedException {
//
val req = Control.Req.newBuilder() // val req = Control.Req.newBuilder()
.setId(Messenger.genId()) // .setId(Messenger.genId())
.setOrigin(Control.Req.Origin.MANAGER) // .setOrigin(Control.Req.Origin.MANAGER)
.setFrom("CLUSTER") // .setFrom("CLUSTER")
.setCmd(cmd) // .setCmd(cmd)
.setData(Any.pack(data)) // .setData(Any.pack(data))
.build(); // .build();
return soloSyncCmd(channel, req); // return soloSyncCmd(channel, req);
} // }
/** // /**
* 同步通信 // * 同步通信
* // *
* @param req // * @param req
* @return // * @return
*/ // */
public static Control.Res soloSyncCmd(String channel, Control.Req req) throws ExecutionException, InterruptedException { // public static Control.Res soloSyncCmd(String channel, Control.Req req) throws ExecutionException, InterruptedException {
//
var c = System.currentTimeMillis(); // var c = System.currentTimeMillis();
var last = c - 120000L; // var last = c - 120000L;
redis.zremrangeByScore("SERVER_CLUSTER", 0, last); // redis.zremrangeByScore("SERVER_CLUSTER", 0, last);
val servers = redis.zrevrange("SERVER_CLUSTER", 0, 9); // val servers = redis.zrevrange("SERVER_CLUSTER", 0, 9);
if (servers.size()<1){ // if (servers.size()<1){
throw new SspwebException("安全服务异常"); // throw new SspwebException("安全服务异常");
} // }
log.info("SoloSyncCmd 发送消息到 sspserver id={} cmd={}", channel, req.getCmd()); // log.info("SoloSyncCmd 发送消息到 sspserver id={} cmd={}", channel, req.getCmd());
//
// 发送请求 // // 发送请求
redis.publish(channel.getBytes(), req.toByteArray() ); // redis.publish(channel.getBytes(), req.toByteArray() );
//
// 单点通信创建一个信号量 // // 单点通信创建一个信号量
final SoloRecord record = new SoloRecord(); // final SoloRecord record = new SoloRecord();
//
// 记录请求 // // 记录请求
pendingMessageRecord.put( req.getId(), record ); // pendingMessageRecord.put( req.getId(), record );
Future future = executorService.submit(new Callable<Control.Res>() { // Future future = executorService.submit(new Callable<Control.Res>() {
//
@Override // @Override
public Control.Res call() throws Exception { // public Control.Res call() throws Exception {
//Thread.sleep(3000); // //Thread.sleep(3000);
try { // try {
record.getLock().lock(); // record.getLock().lock();
// 等待信号量 // // 等待信号量
record.getCondition().await( Config.global.messenger.maxWaitTime , TimeUnit.SECONDS ); // record.getCondition().await( Config.global.messenger.maxWaitTime , TimeUnit.SECONDS );
//
}catch ( InterruptedException e ){ // }catch ( InterruptedException e ){
log.error("Solo 请求 {} {} 被打断from={}, to={}", req.getId() , req.getCmd() , req.getFrom(), channel); // log.error("Solo 请求 {} {} 被打断from={}, to={}", req.getId() , req.getCmd() , req.getFrom(), channel);
} finally { // } finally {
record.getLock().unlock(); // record.getLock().unlock();
} // }
//
// 返回结果 // // 返回结果
return record.res; // return record.res;
} // }
}); // });
// 阻塞,等待子线程结束 // // 阻塞,等待子线程结束
return (Control.Res) future.get(); // return (Control.Res) future.get();
//
} // }
//
//
public static ConcurrentLinkedQueue<Control.Res> clusterSyncCmd( String cmd, com.google.protobuf.GeneratedMessageV3 data) throws ExecutionException, InterruptedException { // public static ConcurrentLinkedQueue<Control.Res> clusterSyncCmd( String cmd, com.google.protobuf.GeneratedMessageV3 data) throws ExecutionException, InterruptedException {
if (ObjectUtil.isNull(data) || StrUtil.isEmpty(data.toString())){ // if (ObjectUtil.isNull(data) || StrUtil.isEmpty(data.toString())){
val req = Control.Req.newBuilder() // val req = Control.Req.newBuilder()
.setId(Messenger.genId()) // .setId(Messenger.genId())
.setOrigin(Control.Req.Origin.MANAGER) // .setOrigin(Control.Req.Origin.MANAGER)
.setFrom("CLUSTER") // .setFrom("CLUSTER")
.setCmd(cmd) // .setCmd(cmd)
.build(); // .build();
return clusterSyncCmd("SERVER_CLUSTER", req, null); // return clusterSyncCmd("SERVER_CLUSTER", req, null);
} else { // } else {
val req = Control.Req.newBuilder() // val req = Control.Req.newBuilder()
.setId(Messenger.genId()) // .setId(Messenger.genId())
.setOrigin(Control.Req.Origin.MANAGER) // .setOrigin(Control.Req.Origin.MANAGER)
.setFrom("CLUSTER") // .setFrom("CLUSTER")
.setCmd(cmd) // .setCmd(cmd)
.setData(Any.pack(data)) // .setData(Any.pack(data))
.build(); // .build();
return clusterSyncCmd("SERVER_CLUSTER", req, null); // return clusterSyncCmd("SERVER_CLUSTER", req, null);
} // }
//
} // }
public static ConcurrentLinkedQueue<Control.Res> clusterSyncCmd(String channel, Control.Req req) throws ExecutionException, InterruptedException { // public static ConcurrentLinkedQueue<Control.Res> clusterSyncCmd(String channel, Control.Req req) throws ExecutionException, InterruptedException {
return clusterSyncCmd(channel, req, null); // return clusterSyncCmd(channel, req, null);
} // }
//
/** // /**
* 集群同步通信 // * 集群同步通信
* 一个发送多个返回收到是个数组 // * 一个发送多个返回收到是个数组
* // *
* @param req // * @param req
* @return // * @return
*/ // */
public static ConcurrentLinkedQueue<Control.Res> clusterSyncCmd(String channel, Control.Req req, ClusterRecordMoniter moniter) throws ExecutionException, InterruptedException { // public static ConcurrentLinkedQueue<Control.Res> clusterSyncCmd(String channel, Control.Req req, ClusterRecordMoniter moniter) throws ExecutionException, InterruptedException {
//
// 发送请求 // // 发送请求
redis.publish(channel.getBytes(), req.toByteArray() ); // redis.publish(channel.getBytes(), req.toByteArray() );
//
// 单点通信创建一个信号量 // // 单点通信创建一个信号量
final ClusterRecord record = new ClusterRecord(); // final ClusterRecord record = new ClusterRecord();
//
// 记录请求 // // 记录请求
pendingMessageRecord.put( req.getId(), record ); // pendingMessageRecord.put( req.getId(), record );
//
// 监听阶段数据 // // 监听阶段数据
executorService.submit(new Runnable() { // executorService.submit(new Runnable() {
//
@SneakyThrows // @SneakyThrows
@Override // @Override
public void run() { // public void run() {
//
try { // try {
record.getLock().lock(); // record.getLock().lock();
//
log.debug("等待阶段信号"); // log.debug("等待阶段信号");
record.getStage().await(Config.global.messenger.maxWaitTime , TimeUnit.SECONDS); // record.getStage().await(Config.global.messenger.maxWaitTime , TimeUnit.SECONDS);
log.debug("收到阶段信号"); // log.debug("收到阶段信号");
//
// 查询应该响应的数量 // // 查询应该响应的数量
val servers = redis.zrange("SERVER_CLUSTER", 0, 9); // val servers = redis.zrange("SERVER_CLUSTER", 0, 9);
log.debug("应该收到的消息数 {} 目前收到的消息数 {}", servers.size(), record.getReslist().size() ); // log.debug("应该收到的消息数 {} 目前收到的消息数 {}", servers.size(), record.getReslist().size() );
//
// 全部收到 // // 全部收到
if ( servers.size() == record.getReslist().size() ){ // if ( servers.size() == record.getReslist().size() ){
record.getCondition().signal(); // record.getCondition().signal();
} // }
//
//
if ( null != moniter ){ // if ( null != moniter ){
moniter.onStage( servers.size(), record.getReslist(), record.getCondition() ); // moniter.onStage( servers.size(), record.getReslist(), record.getCondition() );
} // }
//
} finally { // } finally {
record.getLock().unlock(); // record.getLock().unlock();
//
} // }
} // }
}); // });
//
// 监听整体 // // 监听整体
Future future = executorService.submit(new Callable<ConcurrentLinkedQueue<Control.Res>>() { // Future future = executorService.submit(new Callable<ConcurrentLinkedQueue<Control.Res>>() {
@Override // @Override
public ConcurrentLinkedQueue<Control.Res> call() throws Exception { // public ConcurrentLinkedQueue<Control.Res> call() throws Exception {
//
try { // try {
record.getLock().lock(); // record.getLock().lock();
record.getCondition().await(Config.global.messenger.maxWaitTime , TimeUnit.SECONDS); // record.getCondition().await(Config.global.messenger.maxWaitTime , TimeUnit.SECONDS);
return record.getReslist(); // return record.getReslist();
} finally { // } finally {
record.getLock().unlock(); // record.getLock().unlock();
} // }
} // }
}); // });
//
// 阻塞,等待子线程结束 // // 阻塞,等待子线程结束
return (ConcurrentLinkedQueue<Control.Res>) future.get(); // return (ConcurrentLinkedQueue<Control.Res>) future.get();
} // }
//
@Getter // @Getter
private abstract static class Record { // private abstract static class Record {
private final ReentrantLock lock = new ReentrantLock(); // private final ReentrantLock lock = new ReentrantLock();
} // }
//
@Getter // @Getter
private static class SoloRecord extends Record { // private static class SoloRecord extends Record {
private Condition condition = getLock().newCondition(); // private Condition condition = getLock().newCondition();
// 返回一个结果 // // 返回一个结果
private Control.Res res; // private Control.Res res;
} // }
//
@Getter // @Getter
private static class ClusterRecord extends Record { // private static class ClusterRecord extends Record {
private Condition stage = getLock().newCondition(); // private Condition stage = getLock().newCondition();
private Condition condition = getLock().newCondition(); // private Condition condition = getLock().newCondition();
//
// 返回一组结果 // // 返回一组结果
private final ConcurrentLinkedQueue<Control.Res> reslist = new ConcurrentLinkedQueue<>(); // private final ConcurrentLinkedQueue<Control.Res> reslist = new ConcurrentLinkedQueue<>();
} // }
//
/** // /**
* 允许监听集群命令的中间阶段 // * 允许监听集群命令的中间阶段
*/ // */
public abstract static class ClusterRecordMoniter { // public abstract static class ClusterRecordMoniter {
//
/** // /**
* // *
* @param curAliveServerNum - 当前集群中存活的服务数量及期望响应的数量 // * @param curAliveServerNum - 当前集群中存活的服务数量及期望响应的数量
* @param reslist - 当前已经收集的返回结果 // * @param reslist - 当前已经收集的返回结果
* @param condition - 终结信号量 condition.signal(); 可以提前终结等待 // * @param condition - 终结信号量 condition.signal(); 可以提前终结等待
*/ // */
public abstract void onStage( int curAliveServerNum, ConcurrentLinkedQueue<Control.Res> reslist , Condition condition ); // public abstract void onStage( int curAliveServerNum, ConcurrentLinkedQueue<Control.Res> reslist , Condition condition );
//
} // }
//
} //}

View File

@ -1,288 +1,288 @@
package com.sunyard.ssp.server; //package com.sunyard.ssp.server;
//
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.BinaryJedisPubSub; //import redis.clients.jedis.BinaryJedisPubSub;
import redis.clients.jedis.Jedis; //import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster; //import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool; //import redis.clients.jedis.JedisPool;
//
import java.util.Set; //import java.util.Set;
//
// 包装集群和单点 //// 包装集群和单点
@Slf4j //@Slf4j
public class Redis { //public class Redis {
private JedisCluster jedisCluster; // private JedisCluster jedisCluster;
//
private JedisPool jedisPool; // private JedisPool jedisPool;
//
private String mode; // private String mode;
//
private String password; // private String password;
//
public void close() { // public void close() {
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
if (null != jedisPool) { // if (null != jedisPool) {
try { // try {
jedisPool.close(); // jedisPool.close();
} catch (Exception e) { // } catch (Exception e) {
} // }
} // }
} else { // } else {
if (null != jedisCluster) { // if (null != jedisCluster) {
try { // try {
jedisCluster.close(); // jedisCluster.close();
} catch (Exception e) { // } catch (Exception e) {
} // }
} // }
jedisCluster = null; // jedisCluster = null;
} // }
} // }
//
public Redis(JedisCluster jedisCluster, JedisPool jedisPool, String password, String mode) { // public Redis(JedisCluster jedisCluster, JedisPool jedisPool, String password, String mode) {
this.jedisCluster = jedisCluster; // this.jedisCluster = jedisCluster;
this.jedisPool = jedisPool; // this.jedisPool = jedisPool;
this.mode = mode; // this.mode = mode;
this.password = password; // this.password = password;
} // }
//
public String get(String key) { // public String get(String key) {
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
Jedis jedis = jedisPool.getResource(); // Jedis jedis = jedisPool.getResource();
log.info("jedis publish[{}]",jedis); // log.info("jedis publish[{}]",jedis);
try { // try {
//
return jedis.get(key); // return jedis.get(key);
} finally { // } finally {
if(jedis!=null){ // if(jedis!=null){
jedis.close(); // jedis.close();
} // }
//close(); // //close();
} // }
} else { // } else {
return jedisCluster.get( key ); // return jedisCluster.get( key );
} // }
} // }
//
public void subscribe(BinaryJedisPubSub jedisPubSub, final byte[]... channels) { // public void subscribe(BinaryJedisPubSub jedisPubSub, final byte[]... channels) {
//
//
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
Jedis jedis = jedisPool.getResource(); // Jedis jedis = jedisPool.getResource();
log.info("jedis publish[{}]",jedis); // log.info("jedis publish[{}]",jedis);
try { // try {
jedis.auth(password); // jedis.auth(password);
//
jedis.subscribe(jedisPubSub, channels); // jedis.subscribe(jedisPubSub, channels);
} finally { // } finally {
if(jedis!=null){ // if(jedis!=null){
jedis.close(); // jedis.close();
} // }
//close(); // //close();
} // }
} else { // } else {
jedisCluster.subscribe(jedisPubSub, channels); // jedisCluster.subscribe(jedisPubSub, channels);
} // }
//
} // }
//
public Long zrem(final String key, final String... members) { // public Long zrem(final String key, final String... members) {
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
Jedis jedis = jedisPool.getResource(); // Jedis jedis = jedisPool.getResource();
try { // try {
jedis.auth(password); // jedis.auth(password);
return jedis.zrem(key, members); // return jedis.zrem(key, members);
} finally { // } finally {
if (jedis != null) { // if (jedis != null) {
jedis.close(); // jedis.close();
//close(); // //close();
} // }
} // }
} else { // } else {
return jedisCluster.zrem(key, members); // return jedisCluster.zrem(key, members);
} // }
} // }
//
public Long zremrangeByScore(final byte[] key, final double min, final double max) { // public Long zremrangeByScore(final byte[] key, final double min, final double max) {
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
Jedis jedis = jedisPool.getResource(); // Jedis jedis = jedisPool.getResource();
try { // try {
jedis.auth(password); // jedis.auth(password);
return jedis.zremrangeByScore(key, min, max); // return jedis.zremrangeByScore(key, min, max);
} finally { // } finally {
if (jedis != null) { // if (jedis != null) {
jedis.close(); // jedis.close();
//close(); // //close();
} // }
} // }
} else { // } else {
return jedisCluster.zremrangeByScore(key, min, max); // return jedisCluster.zremrangeByScore(key, min, max);
} // }
} // }
//
public Long zremrangeByScore(final String key, final double min, final double max) { // public Long zremrangeByScore(final String key, final double min, final double max) {
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
Jedis jedis = jedisPool.getResource(); // Jedis jedis = jedisPool.getResource();
try { // try {
jedis.auth(password); // jedis.auth(password);
return jedis.zremrangeByScore(key, min, max); // return jedis.zremrangeByScore(key, min, max);
} finally { // } finally {
if (jedis != null) { // if (jedis != null) {
jedis.close(); // jedis.close();
//close(); // //close();
} // }
} // }
} else { // } else {
return jedisCluster.zremrangeByScore(key, min, max); // return jedisCluster.zremrangeByScore(key, min, max);
} // }
} // }
//
public Set<String> zrevrangeByScore(String key, double min, double max) { // public Set<String> zrevrangeByScore(String key, double min, double max) {
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
Jedis jedis = jedisPool.getResource(); // Jedis jedis = jedisPool.getResource();
try { // try {
jedis.auth(password); // jedis.auth(password);
return jedis.zrevrangeByScore(key, min, max); // return jedis.zrevrangeByScore(key, min, max);
} finally { // } finally {
if (jedis != null) { // if (jedis != null) {
jedis.close(); // jedis.close();
//close(); // //close();
} // }
} // }
} else { // } else {
return jedisCluster.zrevrangeByScore(key, min, max); // return jedisCluster.zrevrangeByScore(key, min, max);
} // }
} // }
//
public Set<String> zrangeByScore(String key, double min, double max) { // public Set<String> zrangeByScore(String key, double min, double max) {
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
Jedis jedis = jedisPool.getResource(); // Jedis jedis = jedisPool.getResource();
try { // try {
jedis.auth(password); // jedis.auth(password);
return jedis.zrangeByScore(key, min, max); // return jedis.zrangeByScore(key, min, max);
} finally { // } finally {
if (jedis != null) { // if (jedis != null) {
jedis.close(); // jedis.close();
//close(); // //close();
} // }
} // }
} else { // } else {
return jedisCluster.zrangeByScore(key, min, max); // return jedisCluster.zrangeByScore(key, min, max);
} // }
} // }
//
public Long zadd(final String key, final double score, final String member) { // public Long zadd(final String key, final double score, final String member) {
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
Jedis jedis = jedisPool.getResource(); // Jedis jedis = jedisPool.getResource();
jedis.auth(password); // jedis.auth(password);
return jedis.zadd(key, score, member); // return jedis.zadd(key, score, member);
} else { // } else {
return jedisCluster.zadd(key, score, member); // return jedisCluster.zadd(key, score, member);
} // }
} // }
//
public Long zadd(final byte[] key, final double score, final byte[] member) { // public Long zadd(final byte[] key, final double score, final byte[] member) {
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
Jedis jedis = jedisPool.getResource(); // Jedis jedis = jedisPool.getResource();
jedis.auth(password); // jedis.auth(password);
return jedis.zadd(key, score, member); // return jedis.zadd(key, score, member);
} else { // } else {
return jedisCluster.zadd(key, score, member); // return jedisCluster.zadd(key, score, member);
} // }
} // }
//
public Set<String> zrevrange(final String key, final long start, final long stop) { // public Set<String> zrevrange(final String key, final long start, final long stop) {
//
//
Set<String> zrange; // Set<String> zrange;
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
Jedis jedis = jedisPool.getResource(); // Jedis jedis = jedisPool.getResource();
try { // try {
jedis.auth(password); // jedis.auth(password);
zrange = jedis.zrevrange(key, start, stop); // zrange = jedis.zrevrange(key, start, stop);
} finally { // } finally {
if (jedis != null) { // if (jedis != null) {
jedis.close(); // jedis.close();
//close(); // //close();
} // }
} // }
} else { // } else {
zrange = jedisCluster.zrevrange(key, start, stop); // zrange = jedisCluster.zrevrange(key, start, stop);
//
} // }
return zrange; // return zrange;
//
} // }
public Set<String> zrange(final String key, final long start, final long stop) { // public Set<String> zrange(final String key, final long start, final long stop) {
//
//
Set<String> zrange; // Set<String> zrange;
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
Jedis jedis = jedisPool.getResource(); // Jedis jedis = jedisPool.getResource();
try { // try {
jedis.auth(password); // jedis.auth(password);
zrange = jedis.zrange(key, start, stop); // zrange = jedis.zrange(key, start, stop);
} finally { // } finally {
if (jedis != null) { // if (jedis != null) {
jedis.close(); // jedis.close();
//close(); // //close();
} // }
} // }
} else { // } else {
zrange = jedisCluster.zrange(key, start, stop); // zrange = jedisCluster.zrange(key, start, stop);
//
} // }
return zrange; // return zrange;
//
} // }
//
public Long sadd(final String key, final String... members) { // public Long sadd(final String key, final String... members) {
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
Jedis jedis = jedisPool.getResource(); // Jedis jedis = jedisPool.getResource();
jedis.auth(password); // jedis.auth(password);
return jedis.sadd(key, members); // return jedis.sadd(key, members);
} else { // } else {
return jedisCluster.sadd(key, members); // return jedisCluster.sadd(key, members);
} // }
} // }
//
public Long del(final String key) { // public Long del(final String key) {
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
Jedis jedis = jedisPool.getResource(); // Jedis jedis = jedisPool.getResource();
try { // try {
jedis.auth(password); // jedis.auth(password);
return jedis.del(key); // return jedis.del(key);
} finally { // } finally {
if (jedis != null) { // if (jedis != null) {
jedis.close(); // jedis.close();
//close(); // //close();
} // }
} // }
} else { // } else {
return jedisCluster.del(key); // return jedisCluster.del(key);
} // }
} // }
//
//
public Long publish(final byte[] channel, final byte[] message) { // public Long publish(final byte[] channel, final byte[] message) {
Long publish = null; // Long publish = null;
if ("solo".equals(mode)) { // if ("solo".equals(mode)) {
Jedis jedis = jedisPool.getResource(); // Jedis jedis = jedisPool.getResource();
log.info("jedis publish[{}]",jedis); // log.info("jedis publish[{}]",jedis);
try { // try {
jedis.auth(password); // jedis.auth(password);
publish = jedis.publish(channel, message); // publish = jedis.publish(channel, message);
//
}catch (Exception e){ // }catch (Exception e){
e.printStackTrace(); // e.printStackTrace();
}finally { // }finally {
if (jedis != null) { // if (jedis != null) {
jedis.close(); // jedis.close();
//close(); // //close();
} // }
} // }
} else { // } else {
publish = jedisCluster.publish(channel, message); // publish = jedisCluster.publish(channel, message);
} // }
return publish; // return publish;
} // }
} //}

View File

@ -1,223 +1,223 @@
package com.sunyard.ssp.server; //package com.sunyard.ssp.server;
//
import com.google.protobuf.Any; //import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException; //import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.UInt64Value; //import com.google.protobuf.UInt64Value;
import com.sunyard.ssp.proto.control.Control; //import com.sunyard.ssp.proto.control.Control;
import lombok.NoArgsConstructor; //import lombok.NoArgsConstructor;
import lombok.Setter; //import lombok.Setter;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import lombok.val; //import lombok.val;
import redis.clients.jedis.BinaryJedisPubSub; //import redis.clients.jedis.BinaryJedisPubSub;
//
import java.util.ArrayList; //import java.util.ArrayList;
import java.util.HashMap; //import java.util.HashMap;
import java.util.List; //import java.util.List;
//
/** ///**
* @author:tsz // * @author:tsz
* @date:2020/3/19 // * @date:2020/3/19
* @description: // * @description:
*/ // */
@Slf4j //@Slf4j
public class ServerTest { //public class ServerTest {
//
/** // /**
*@author tsz // *@author tsz
*@date 2020/03/19 15:56 // *@date 2020/03/19 15:56
*@description : 测试是否联通渠道 // *@description : 测试是否联通渠道
*/ // */
public static void connect() { // public static void connect() {
try { // try {
//
// 准备消息 // // 准备消息
val req = Control.Req.newBuilder() // val req = Control.Req.newBuilder()
.setId(Messenger.genId()) // .setId(Messenger.genId())
.setOrigin(Control.Req.Origin.MANAGER) // .setOrigin(Control.Req.Origin.MANAGER)
.setFrom("CLUSTER") // .setFrom("CLUSTER")
.setCmd("test.ping") // .setCmd("test.ping")
.build(); // .build();
//Manager 监听通道 MANAGER_CLUSTER // //Manager 监听通道 MANAGER_CLUSTER
// SERVER_CLUSTER // // SERVER_CLUSTER
val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req); // val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req);
if (null == res) { // if (null == res) {
System.err.println("接收命令响应超时!"); // System.err.println("接收命令响应超时!");
} else { // } else {
// Control.CMD_ping p = res.getData().unpack(Control.CMD_ping.class); //// Control.CMD_ping p = res.getData().unpack(Control.CMD_ping.class);
//
} // }
} catch (Exception e) { // } catch (Exception e) {
e.printStackTrace(); // e.printStackTrace();
} // }
//
} // }
//
/** // /**
*@author tsz // *@author tsz
*@date 2020/3/19 16:06 // *@date 2020/3/19 16:06
*@param : 传递渠道id // *@param : 传递渠道id
*@return : // *@return :
*@description : 渠道更新 // *@description : 渠道更新
*/ // */
public static void channelUpdate(String channelId) { // public static void channelUpdate(String channelId) {
try { // try {
//
// 准备消息 // // 准备消息
val req = Control.Req.newBuilder() // val req = Control.Req.newBuilder()
.setId(Messenger.genId()) // .setId(Messenger.genId())
.setOrigin(Control.Req.Origin.MANAGER) // .setOrigin(Control.Req.Origin.MANAGER)
.setFrom("CLUSTER") // .setFrom("CLUSTER")
.setCmd("channel.update.enable") // .setCmd("channel.update.enable")
.setData(Any.pack(UInt64Value.of(Long.valueOf(channelId)))) // .setData(Any.pack(UInt64Value.of(Long.valueOf(channelId))))
.build(); // .build();
//Manager 监听通道 MANAGER_CLUSTER // //Manager 监听通道 MANAGER_CLUSTER
// SERVER_CLUSTER // // SERVER_CLUSTER
val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req); // val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req);
if (null == res) { // if (null == res) {
System.err.println("接收命令响应超时!"); // System.err.println("接收命令响应超时!");
} else { // } else {
//
} // }
} catch (Exception e) { // } catch (Exception e) {
e.printStackTrace(); // e.printStackTrace();
} // }
} // }
//
/** // /**
*@author tsz // *@author tsz
*@date 2020/3/19 16:44 // *@date 2020/3/19 16:44
*@param : // *@param :
*@return : // *@return :
*@description : 列出所有渠道 // *@description : 列出所有渠道
*/ // */
public static void channelList() { // public static void channelList() {
try { // try {
//
// 准备消息 // // 准备消息
val req = Control.Req.newBuilder() // val req = Control.Req.newBuilder()
.setId(Messenger.genId()) // .setId(Messenger.genId())
.setOrigin(Control.Req.Origin.MANAGER) // .setOrigin(Control.Req.Origin.MANAGER)
.setFrom("CLUSTER") // .setFrom("CLUSTER")
.setCmd("channel.list") // .setCmd("channel.list")
.build(); // .build();
//Manager 监听通道 MANAGER_CLUSTER // //Manager 监听通道 MANAGER_CLUSTER
// SERVER_CLUSTER // // SERVER_CLUSTER
val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req); // val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req);
if (null == res) { // if (null == res) {
System.err.println("接收命令响应超时!"); // System.err.println("接收命令响应超时!");
} else { // } else {
val list = res.getData().unpack( Control.VOChannelList.class ); // val list = res.getData().unpack( Control.VOChannelList.class );
//
} // }
} catch (Exception e) { // } catch (Exception e) {
e.printStackTrace(); // e.printStackTrace();
} // }
//
} // }
//
/** // /**
*@author tsz // *@author tsz
*@date 2020/3/19 16:49 // *@date 2020/3/19 16:49
*@param : // *@param :
*@return : // *@return :
*@description : 列出所有服务器 // *@description : 列出所有服务器
*/ // */
public static void serverList(){ // public static void serverList(){
try { // try {
//
// 准备消息 // // 准备消息
val req = Control.Req.newBuilder() // val req = Control.Req.newBuilder()
.setId(Messenger.genId()) // .setId(Messenger.genId())
.setOrigin(Control.Req.Origin.MANAGER) // .setOrigin(Control.Req.Origin.MANAGER)
.setFrom("CLUSTER") // .setFrom("CLUSTER")
.setCmd("server.list") // .setCmd("server.list")
.build(); // .build();
//Manager 监听通道 MANAGER_CLUSTER // //Manager 监听通道 MANAGER_CLUSTER
// SERVER_CLUSTER // // SERVER_CLUSTER
val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req); // val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req);
if (null == res) { // if (null == res) {
System.err.println("接收命令响应超时!"); // System.err.println("接收命令响应超时!");
} else { // } else {
val list = res.getData().unpack( Control.VOChannelList.class ); // val list = res.getData().unpack( Control.VOChannelList.class );
} // }
} catch (Exception e) { // } catch (Exception e) {
e.printStackTrace(); // e.printStackTrace();
} // }
} // }
//
/** // /**
*@author tsz // *@author tsz
*@date 2020/03/24 15:38 // *@date 2020/03/24 15:38
*@description : 渠道验证 // *@description : 渠道验证
*/ // */
public static void VOChannelCheck(Long channelId,String checkId) { // public static void VOChannelCheck(Long channelId,String checkId) {
//
//
val buff = new HashMap<String, List<Control.VOChannelCheckRes>>(); // val buff = new HashMap<String, List<Control.VOChannelCheckRes>>();
//
//
//
@NoArgsConstructor // @NoArgsConstructor
@Setter // @Setter
class Msg extends BinaryJedisPubSub { // class Msg extends BinaryJedisPubSub {
//
private Thread t; // private Thread t;
//
//
@Override // @Override
public void onMessage(byte[] channel, byte[] message) { // public void onMessage(byte[] channel, byte[] message) {
//
//
// message 解析为 Res 对象 // // message 解析为 Res 对象
try { // try {
val res = Control.VOChannelCheckRes.parseFrom( message ); // val res = Control.VOChannelCheckRes.parseFrom( message );
if ( !buff.containsKey(checkId)){ // if ( !buff.containsKey(checkId)){
buff.put(checkId,new ArrayList<>()); // buff.put(checkId,new ArrayList<>());
} // }
//
val list = buff.get(checkId); // val list = buff.get(checkId);
//
//
//
list.add(res); // list.add(res);
if ( res.getType().equals("end") ){ // if ( res.getType().equals("end") ){
t.stop(); // t.stop();
} // }
//
//
} catch (InvalidProtocolBufferException e) { // } catch (InvalidProtocolBufferException e) {
e.printStackTrace(); // e.printStackTrace();
} // }
//
} // }
} // }
//
// // //
val msg = new Msg(); // val msg = new Msg();
Thread t = Messenger.subscribe(msg, checkId ); // Thread t = Messenger.subscribe(msg, checkId );
msg.setT( t ); // msg.setT( t );
t.start(); // t.start();
//
//
try { // try {
// 准备消息 // // 准备消息
val req = Control.Req.newBuilder() // val req = Control.Req.newBuilder()
.setId(Messenger.genId()) // .setId(Messenger.genId())
.setOrigin(Control.Req.Origin.MANAGER) // .setOrigin(Control.Req.Origin.MANAGER)
.setFrom("CLUSTER") // .setFrom("CLUSTER")
.setCmd("channel.check") // .setCmd("channel.check")
.setData(Any.pack(Control.VOChannelCheck.newBuilder().setChannelId(channelId).setCheckId(checkId).build())) // .setData(Any.pack(Control.VOChannelCheck.newBuilder().setChannelId(channelId).setCheckId(checkId).build()))
.build(); // .build();
} catch (Exception e) { // } catch (Exception e) {
e.printStackTrace(); // e.printStackTrace();
} // }
//
} // }
//
//
public static void main(String[] args) { // public static void main(String[] args) {
VOChannelCheck(11L,"111"); // VOChannelCheck(11L,"111");
} // }
//
//
} //}

View File

@ -1,16 +0,0 @@
{
"password" : "sunyard2",
"mode" : "solo",
"node" : {
"ip" : "172.16.17.163",
"port" : 6379
},
"nodes" : [
{
"ip" : "172.16.17.163",
"port" : 6379
}
]
}