From ac339d668eaa3767958a3a770ac4f5551e031202 Mon Sep 17 00:00:00 2001 From: liulu Date: Thu, 31 Oct 2024 14:23:15 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AE=80=E5=8C=96redis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sunyard/ssp/redis/JedisClusterConfig.java | 12 +- .../com/sunyard/ssp/redis/RedisOperation.java | 144 +-- .../com/sunyard/ssp/server/Messenger.java | 866 +++++++++--------- .../java/com/sunyard/ssp/server/Redis.java | 576 ++++++------ .../com/sunyard/ssp/server/ServerTest.java | 446 ++++----- config/redis.config.json | 16 - 6 files changed, 1021 insertions(+), 1039 deletions(-) delete mode 100644 config/redis.config.json diff --git a/chsm-web-manage/src/main/java/com/sunyard/ssp/redis/JedisClusterConfig.java b/chsm-web-manage/src/main/java/com/sunyard/ssp/redis/JedisClusterConfig.java index 404c0e1..6a70128 100644 --- a/chsm-web-manage/src/main/java/com/sunyard/ssp/redis/JedisClusterConfig.java +++ b/chsm-web-manage/src/main/java/com/sunyard/ssp/redis/JedisClusterConfig.java @@ -1,7 +1,5 @@ package com.sunyard.ssp.redis; -import com.alibaba.fastjson.JSONObject; -import com.sunyard.ssp.server.Config; import lombok.extern.slf4j.Slf4j; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.springframework.beans.factory.annotation.Value; @@ -141,10 +139,10 @@ public class JedisClusterConfig extends CachingConfigurerSupport { @Bean public JedisPool redisPoolFactory(){ - JSONObject node = Config.global.messenger.redisCluster.getJSONObject("node"); - String password = Config.global.messenger.redisCluster.getString("password"); - String host = node.getString("ip"); - Integer port = node.getInteger("port"); +// JSONObject node = Config.global.messenger.redisCluster.getJSONObject("node"); +// String password = Config.global.messenger.redisCluster.getString("password"); +// String host = node.getString("ip"); +// Integer port = node.getInteger("port"); JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(10); @@ -158,7 +156,7 @@ public class JedisClusterConfig extends CachingConfigurerSupport { jedisPoolConfig.setTestOnBorrow(true); jedisPoolConfig.setTestOnReturn(true); - JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, 10000, password); + JedisPool jedisPool = new JedisPool(jedisPoolConfig, host2, port2, 10000, password); return jedisPool; } diff --git a/chsm-web-manage/src/main/java/com/sunyard/ssp/redis/RedisOperation.java b/chsm-web-manage/src/main/java/com/sunyard/ssp/redis/RedisOperation.java index 4f95210..1a0ccac 100644 --- a/chsm-web-manage/src/main/java/com/sunyard/ssp/redis/RedisOperation.java +++ b/chsm-web-manage/src/main/java/com/sunyard/ssp/redis/RedisOperation.java @@ -1,72 +1,72 @@ -package com.sunyard.ssp.redis; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.ListOperations; -import org.springframework.data.redis.core.SetOperations; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.data.redis.core.ValueOperations; -import org.springframework.stereotype.Component; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -/** - * @author:tsz - * @date:2020/3/6 - * @description: redis String 基本操作 - */ -@Component -public class RedisOperation { - - @Autowired - private StringRedisTemplate redisTemplate; - - /*** - * 操作普通字符串 - */ - public void StringSet(String key, String value) { - ValueOperations valueOperations = redisTemplate.opsForValue(); - valueOperations.set(key, value); - } - - /*** - * 操作列表 - */ - public void ListSet(String key, List values) { - ListOperations listOperations = redisTemplate.opsForList(); - values.forEach(value -> listOperations.leftPush(key, value)); - } - - /*** - * 操作集合 - */ - public void SetSet(String key, Set values) { - SetOperations setOperations = redisTemplate.opsForSet(); - values.forEach(value -> setOperations.add(key, value)); - } - - /*** - * 获取字符串 - */ - public String StringGet(String key) { - ValueOperations valueOperations = redisTemplate.opsForValue(); - return valueOperations.get(key); - } - - /*** - * 列表弹出元素 - */ - public String ListLeftPop(String key) { - ListOperations listOperations = redisTemplate.opsForList(); - return listOperations.leftPop(key, 2, TimeUnit.SECONDS); - } - - /*** - * 集合弹出元素 - */ - public String SetPop(String key) { - SetOperations setOperations = redisTemplate.opsForSet(); - return setOperations.pop(key); - } -} +//package com.sunyard.ssp.redis; +// +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.ListOperations; +//import org.springframework.data.redis.core.SetOperations; +//import org.springframework.data.redis.core.StringRedisTemplate; +//import org.springframework.data.redis.core.ValueOperations; +//import org.springframework.stereotype.Component; +// +//import java.util.List; +//import java.util.Set; +//import java.util.concurrent.TimeUnit; +// +///** +// * @author:tsz +// * @date:2020/3/6 +// * @description: redis String 基本操作 +// */ +//@Component +//public class RedisOperation { +// +// @Autowired +// private StringRedisTemplate redisTemplate; +// +// /*** +// * 操作普通字符串 +// */ +// public void StringSet(String key, String value) { +// ValueOperations valueOperations = redisTemplate.opsForValue(); +// valueOperations.set(key, value); +// } +// +// /*** +// * 操作列表 +// */ +// public void ListSet(String key, List values) { +// ListOperations listOperations = redisTemplate.opsForList(); +// values.forEach(value -> listOperations.leftPush(key, value)); +// } +// +// /*** +// * 操作集合 +// */ +// public void SetSet(String key, Set values) { +// SetOperations setOperations = redisTemplate.opsForSet(); +// values.forEach(value -> setOperations.add(key, value)); +// } +// +// /*** +// * 获取字符串 +// */ +// public String StringGet(String key) { +// ValueOperations valueOperations = redisTemplate.opsForValue(); +// return valueOperations.get(key); +// } +// +// /*** +// * 列表弹出元素 +// */ +// public String ListLeftPop(String key) { +// ListOperations listOperations = redisTemplate.opsForList(); +// return listOperations.leftPop(key, 2, TimeUnit.SECONDS); +// } +// +// /*** +// * 集合弹出元素 +// */ +// public String SetPop(String key) { +// SetOperations setOperations = redisTemplate.opsForSet(); +// return setOperations.pop(key); +// } +//} diff --git a/chsm-web-manage/src/main/java/com/sunyard/ssp/server/Messenger.java b/chsm-web-manage/src/main/java/com/sunyard/ssp/server/Messenger.java index bd917f2..47a2b14 100644 --- a/chsm-web-manage/src/main/java/com/sunyard/ssp/server/Messenger.java +++ b/chsm-web-manage/src/main/java/com/sunyard/ssp/server/Messenger.java @@ -1,433 +1,433 @@ -package com.sunyard.ssp.server; - -import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.StrUtil; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.google.protobuf.Any; -import com.google.protobuf.InvalidProtocolBufferException; -import com.sunyard.ssp.common.exception.SspwebException; -import com.sunyard.ssp.proto.control.Control; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import lombok.val; -import lombok.var; -import redis.clients.jedis.BinaryJedisPubSub; -import redis.clients.jedis.HostAndPort; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.JedisPoolConfig; - -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - - -/** - * 封装到 Server 的通信 - * 异步转同步 - */ - -@Slf4j -public class Messenger { - - private static String channel = "MANAGER_CLUSTER"; - private static JedisCluster jedisCluster; - private static Redis redisSubscribe; - private static Redis redisSubscribe2; - private static Redis redis;//Publish - private static Thread thread; - private static ExecutorService executorService = Executors.newFixedThreadPool(3 ); - private static ConcurrentHashMap pendingMessageRecord = new ConcurrentHashMap<>(); - - private static JedisPool jedisPool = null; - private static JedisPool jedisPool2 = null; - private static JedisPool jedisPool3 = null; - - private static String host ; - private static Integer port ; - private static String password; - private static String mode; - static { - - // 只有 CLI 需要重置 channel - //channel = Control.Req.Origin.CLI + "_" + Config.global.system.name; - - // 初始化 - // 链接到 redis - Set nodes = new HashSet<>(); - JSONArray ns = Config.global.messenger.redisCluster.getJSONArray("nodes"); - password = Config.global.messenger.redisCluster.getString("password"); - mode = Config.global.messenger.redisCluster.getString("mode"); - JSONObject node = Config.global.messenger.redisCluster.getJSONObject("node"); - host = node.getString("ip"); - port = node.getInteger("port"); - for (Object n : ns) { - JSONObject no = (JSONObject) n; - nodes.add(new HostAndPort(no.getString("ip"), no.getInteger("port"))); - } - - @AllArgsConstructor - class Msg extends BinaryJedisPubSub { - - @Override - public void onMessage(byte[] channel, byte[] message) { - - - // 将 message 解析为 Res 对象 - try { - val res = Control.Res.parseFrom( message ); - val id = res.getId(); - - val record = pendingMessageRecord.get(id); - if ( null == record ){ - log.debug("没有找到消息 {} 的记录, 遗弃。", id); - } else { - - record.lock.lock(); - try { - if ( record instanceof SoloRecord ){ - log.debug("找到 Solo 消息 {} 的记录, 触发信号。", id); - ((SoloRecord) record).res = res; - ((SoloRecord) record).getCondition().signal(); - } else if ( record instanceof ClusterRecord ){ - log.debug("找到 Cluster 消息 {} 的记录, 触发阶段信号。", id); - ((ClusterRecord) record).reslist.add( res ); - ((ClusterRecord) record).getStage().signal(); - } - }finally { - record.lock.unlock(); - } - - } - - } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); - } - - } - } - - JedisPoolConfig config = new JedisPoolConfig(); - // config.setTestOnBorrow(false); - jedisCluster = new JedisCluster(nodes, 1000, 2000, 5, password, config); - if (jedisPool ==null){ - //调试的时候的配置 - JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); - //连接池中的最大空闲连接 - jedisPoolConfig.setMaxIdle(1000); - //连接池最大阻塞等待时间 - //jedisPoolConfig.setMaxWaitMillis(10000); - //在获取连接的时候检查有效性 - //jedisPoolConfig.setTestOnBorrow(true); - // 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true - jedisPoolConfig.setBlockWhenExhausted(false); - // 是否启用pool的jmx管理功能, 默认true - jedisPoolConfig.setJmxEnabled(true); - jedisPool = new JedisPool(jedisPoolConfig,host,port,10000,password); - } - if (jedisPool2 ==null){ - //调试的时候的配置 - JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); - //连接池中的最大空闲连接 - jedisPoolConfig.setMaxIdle(1000); - //连接池最大阻塞等待时间 - //jedisPoolConfig.setMaxWaitMillis(10000); - //在获取连接的时候检查有效性 - //jedisPoolConfig.setTestOnBorrow(true); - // 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true - jedisPoolConfig.setBlockWhenExhausted(false); - // 是否启用pool的jmx管理功能, 默认true - jedisPoolConfig.setJmxEnabled(true); - jedisPool2 = new JedisPool(jedisPoolConfig,host,port,10000,password); - } - if (jedisPool3 ==null){ - - } - redis = new Redis(jedisCluster, jedisPool2, password, mode); - redisSubscribe = new Redis(jedisCluster, jedisPool, password, mode); - - thread = new Thread(new Runnable() { - @Override - public void run() { - log.info("监听反馈消息 {}", channel); - redisSubscribe.subscribe(new Msg(), channel.getBytes() ); - - } - }); - thread.setName("messenger"); - thread.start(); - } - - - public static Thread subscribe(final BinaryJedisPubSub jedisPubSub, String channel){ - return new Thread(new Runnable() { - @Override - public void run() { - //调试的时候的配置 - JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); - //连接池中的最大空闲连接 - jedisPoolConfig.setMaxIdle(1000); - //连接池最大阻塞等待时间 - //jedisPoolConfig.setMaxWaitMillis(10000); - //在获取连接的时候检查有效性 - //jedisPoolConfig.setTestOnBorrow(true); - // 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true - jedisPoolConfig.setBlockWhenExhausted(false); - // 是否启用pool的jmx管理功能, 默认true - jedisPoolConfig.setJmxEnabled(true); - jedisPool3 = new JedisPool(jedisPoolConfig,host,port,10000,password); - - redisSubscribe2 = new Redis(jedisCluster, jedisPool3, password, mode); - - log.info("监听Check消息 {}", channel); - redisSubscribe2.subscribe(jedisPubSub, channel.getBytes() ); - } - }); - } - - public static String genId(){ - return UUID.randomUUID().toString().replace("-", ""); - } - - - public static Control.Res primarySyncCmd( String cmd, com.google.protobuf.GeneratedMessageV3 data) throws ExecutionException, InterruptedException { - var c = System.currentTimeMillis(); - var last = c - 120000L; - redis.zremrangeByScore("SERVER_CLUSTER", 0, last); - val servers = redis.zrevrange("SERVER_CLUSTER", 0, 9); - if (servers.size()<1){ - throw new SspwebException("安全服务异常"); - } - - - Iterator it = servers.iterator(); - String channel = servers.iterator().next(); - - // 方便调试的逻辑 - // 如果配置了调试服务器,优先使用调试服务器 - val debugServer = redis.get("DEBUG_SERVER"); - if ( null != debugServer && servers.contains( debugServer ) ) { - channel = debugServer; - } - - val req = Control.Req.newBuilder() - .setId(Messenger.genId()) - .setOrigin(Control.Req.Origin.MANAGER) - .setFrom("CLUSTER") - .setCmd(cmd) - .setData(Any.pack(data)) - .build(); - return soloSyncCmd("SERVER_" + channel, req); - } - - - public static Control.Res soloSyncCmd(String channel, String cmd, com.google.protobuf.GeneratedMessageV3 data) throws ExecutionException, InterruptedException { - - val req = Control.Req.newBuilder() - .setId(Messenger.genId()) - .setOrigin(Control.Req.Origin.MANAGER) - .setFrom("CLUSTER") - .setCmd(cmd) - .setData(Any.pack(data)) - .build(); - return soloSyncCmd(channel, req); - } - /** - * 同步通信 - * - * @param req - * @return - */ - public static Control.Res soloSyncCmd(String channel, Control.Req req) throws ExecutionException, InterruptedException { - - var c = System.currentTimeMillis(); - var last = c - 120000L; - redis.zremrangeByScore("SERVER_CLUSTER", 0, last); - val servers = redis.zrevrange("SERVER_CLUSTER", 0, 9); - if (servers.size()<1){ - throw new SspwebException("安全服务异常"); - } - log.info("SoloSyncCmd 发送消息到 sspserver id={} cmd={}", channel, req.getCmd()); - - // 发送请求 - redis.publish(channel.getBytes(), req.toByteArray() ); - - // 单点通信创建一个信号量 - final SoloRecord record = new SoloRecord(); - - // 记录请求 - pendingMessageRecord.put( req.getId(), record ); - Future future = executorService.submit(new Callable() { - - @Override - public Control.Res call() throws Exception { - //Thread.sleep(3000); - try { - record.getLock().lock(); - // 等待信号量 - record.getCondition().await( Config.global.messenger.maxWaitTime , TimeUnit.SECONDS ); - - }catch ( InterruptedException e ){ - log.error("Solo 请求 {} {} 被打断,from={}, to={}", req.getId() , req.getCmd() , req.getFrom(), channel); - } finally { - record.getLock().unlock(); - } - - // 返回结果 - return record.res; - } - }); - // 阻塞,等待子线程结束 - return (Control.Res) future.get(); - - } - - - public static ConcurrentLinkedQueue clusterSyncCmd( String cmd, com.google.protobuf.GeneratedMessageV3 data) throws ExecutionException, InterruptedException { - if (ObjectUtil.isNull(data) || StrUtil.isEmpty(data.toString())){ - val req = Control.Req.newBuilder() - .setId(Messenger.genId()) - .setOrigin(Control.Req.Origin.MANAGER) - .setFrom("CLUSTER") - .setCmd(cmd) - .build(); - return clusterSyncCmd("SERVER_CLUSTER", req, null); - } else { - val req = Control.Req.newBuilder() - .setId(Messenger.genId()) - .setOrigin(Control.Req.Origin.MANAGER) - .setFrom("CLUSTER") - .setCmd(cmd) - .setData(Any.pack(data)) - .build(); - return clusterSyncCmd("SERVER_CLUSTER", req, null); - } - - } - public static ConcurrentLinkedQueue clusterSyncCmd(String channel, Control.Req req) throws ExecutionException, InterruptedException { - return clusterSyncCmd(channel, req, null); - } - - /** - * 集群同步通信 - * 一个发送多个返回,收到是个数组。 - * - * @param req - * @return - */ - public static ConcurrentLinkedQueue clusterSyncCmd(String channel, Control.Req req, ClusterRecordMoniter moniter) throws ExecutionException, InterruptedException { - - // 发送请求 - redis.publish(channel.getBytes(), req.toByteArray() ); - - // 单点通信创建一个信号量 - final ClusterRecord record = new ClusterRecord(); - - // 记录请求 - pendingMessageRecord.put( req.getId(), record ); - - // 监听阶段数据 - executorService.submit(new Runnable() { - - @SneakyThrows - @Override - public void run() { - - try { - record.getLock().lock(); - - log.debug("等待阶段信号"); - record.getStage().await(Config.global.messenger.maxWaitTime , TimeUnit.SECONDS); - log.debug("收到阶段信号"); - - // 查询应该响应的数量 - val servers = redis.zrange("SERVER_CLUSTER", 0, 9); - log.debug("应该收到的消息数 {}, 目前收到的消息数 {}", servers.size(), record.getReslist().size() ); - - // 全部收到 - if ( servers.size() == record.getReslist().size() ){ - record.getCondition().signal(); - } - - - if ( null != moniter ){ - moniter.onStage( servers.size(), record.getReslist(), record.getCondition() ); - } - - } finally { - record.getLock().unlock(); - - } - } - }); - - // 监听整体 - Future future = executorService.submit(new Callable>() { - @Override - public ConcurrentLinkedQueue call() throws Exception { - - try { - record.getLock().lock(); - record.getCondition().await(Config.global.messenger.maxWaitTime , TimeUnit.SECONDS); - return record.getReslist(); - } finally { - record.getLock().unlock(); - } - } - }); - - // 阻塞,等待子线程结束 - return (ConcurrentLinkedQueue) future.get(); - } - - @Getter - private abstract static class Record { - private final ReentrantLock lock = new ReentrantLock(); - } - - @Getter - private static class SoloRecord extends Record { - private Condition condition = getLock().newCondition(); - // 返回一个结果 - private Control.Res res; - } - - @Getter - private static class ClusterRecord extends Record { - private Condition stage = getLock().newCondition(); - private Condition condition = getLock().newCondition(); - - // 返回一组结果 - private final ConcurrentLinkedQueue reslist = new ConcurrentLinkedQueue<>(); - } - - /** - * 允许监听集群命令的中间阶段 - */ - public abstract static class ClusterRecordMoniter { - - /** - * - * @param curAliveServerNum - 当前集群中存活的服务数量,及期望响应的数量 - * @param reslist - 当前已经收集的返回结果 - * @param condition - 终结信号量 condition.signal(); 可以提前终结等待。 - */ - public abstract void onStage( int curAliveServerNum, ConcurrentLinkedQueue reslist , Condition condition ); - - } - -} +//package com.sunyard.ssp.server; +// +//import cn.hutool.core.util.ObjectUtil; +//import cn.hutool.core.util.StrUtil; +//import com.alibaba.fastjson.JSONArray; +//import com.alibaba.fastjson.JSONObject; +//import com.google.protobuf.Any; +//import com.google.protobuf.InvalidProtocolBufferException; +//import com.sunyard.ssp.common.exception.SspwebException; +//import com.sunyard.ssp.proto.control.Control; +//import lombok.AllArgsConstructor; +//import lombok.Getter; +//import lombok.SneakyThrows; +//import lombok.extern.slf4j.Slf4j; +//import lombok.val; +//import lombok.var; +//import redis.clients.jedis.BinaryJedisPubSub; +//import redis.clients.jedis.HostAndPort; +//import redis.clients.jedis.JedisCluster; +//import redis.clients.jedis.JedisPool; +//import redis.clients.jedis.JedisPoolConfig; +// +//import java.util.HashSet; +//import java.util.Iterator; +//import java.util.Set; +//import java.util.UUID; +//import java.util.concurrent.Callable; +//import java.util.concurrent.ConcurrentHashMap; +//import java.util.concurrent.ConcurrentLinkedQueue; +//import java.util.concurrent.ExecutionException; +//import java.util.concurrent.ExecutorService; +//import java.util.concurrent.Executors; +//import java.util.concurrent.Future; +//import java.util.concurrent.TimeUnit; +//import java.util.concurrent.locks.Condition; +//import java.util.concurrent.locks.ReentrantLock; +// +// +///** +// * 封装到 Server 的通信 +// * 异步转同步 +// */ +// +//@Slf4j +//public class Messenger { +// +// private static String channel = "MANAGER_CLUSTER"; +// private static JedisCluster jedisCluster; +// private static Redis redisSubscribe; +// private static Redis redisSubscribe2; +// private static Redis redis;//Publish +// private static Thread thread; +// private static ExecutorService executorService = Executors.newFixedThreadPool(3 ); +// private static ConcurrentHashMap pendingMessageRecord = new ConcurrentHashMap<>(); +// +// private static JedisPool jedisPool = null; +// private static JedisPool jedisPool2 = null; +// private static JedisPool jedisPool3 = null; +// +// private static String host ; +// private static Integer port ; +// private static String password; +// private static String mode; +// static { +// +// // 只有 CLI 需要重置 channel +// //channel = Control.Req.Origin.CLI + "_" + Config.global.system.name; +// +// // 初始化 +// // 链接到 redis +// Set nodes = new HashSet<>(); +// JSONArray ns = Config.global.messenger.redisCluster.getJSONArray("nodes"); +// password = Config.global.messenger.redisCluster.getString("password"); +// mode = Config.global.messenger.redisCluster.getString("mode"); +// JSONObject node = Config.global.messenger.redisCluster.getJSONObject("node"); +// host = node.getString("ip"); +// port = node.getInteger("port"); +// for (Object n : ns) { +// JSONObject no = (JSONObject) n; +// nodes.add(new HostAndPort(no.getString("ip"), no.getInteger("port"))); +// } +// +// @AllArgsConstructor +// class Msg extends BinaryJedisPubSub { +// +// @Override +// public void onMessage(byte[] channel, byte[] message) { +// +// +// // 将 message 解析为 Res 对象 +// try { +// val res = Control.Res.parseFrom( message ); +// val id = res.getId(); +// +// val record = pendingMessageRecord.get(id); +// if ( null == record ){ +// log.debug("没有找到消息 {} 的记录, 遗弃。", id); +// } else { +// +// record.lock.lock(); +// try { +// if ( record instanceof SoloRecord ){ +// log.debug("找到 Solo 消息 {} 的记录, 触发信号。", id); +// ((SoloRecord) record).res = res; +// ((SoloRecord) record).getCondition().signal(); +// } else if ( record instanceof ClusterRecord ){ +// log.debug("找到 Cluster 消息 {} 的记录, 触发阶段信号。", id); +// ((ClusterRecord) record).reslist.add( res ); +// ((ClusterRecord) record).getStage().signal(); +// } +// }finally { +// record.lock.unlock(); +// } +// +// } +// +// } catch (InvalidProtocolBufferException e) { +// e.printStackTrace(); +// } +// +// } +// } +// +// JedisPoolConfig config = new JedisPoolConfig(); +// // config.setTestOnBorrow(false); +// jedisCluster = new JedisCluster(nodes, 1000, 2000, 5, password, config); +// if (jedisPool ==null){ +// //调试的时候的配置 +// JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); +// //连接池中的最大空闲连接 +// jedisPoolConfig.setMaxIdle(1000); +// //连接池最大阻塞等待时间 +// //jedisPoolConfig.setMaxWaitMillis(10000); +// //在获取连接的时候检查有效性 +// //jedisPoolConfig.setTestOnBorrow(true); +// // 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true +// jedisPoolConfig.setBlockWhenExhausted(false); +// // 是否启用pool的jmx管理功能, 默认true +// jedisPoolConfig.setJmxEnabled(true); +// jedisPool = new JedisPool(jedisPoolConfig,host,port,10000,password); +// } +// if (jedisPool2 ==null){ +// //调试的时候的配置 +// JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); +// //连接池中的最大空闲连接 +// jedisPoolConfig.setMaxIdle(1000); +// //连接池最大阻塞等待时间 +// //jedisPoolConfig.setMaxWaitMillis(10000); +// //在获取连接的时候检查有效性 +// //jedisPoolConfig.setTestOnBorrow(true); +// // 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true +// jedisPoolConfig.setBlockWhenExhausted(false); +// // 是否启用pool的jmx管理功能, 默认true +// jedisPoolConfig.setJmxEnabled(true); +// jedisPool2 = new JedisPool(jedisPoolConfig,host,port,10000,password); +// } +// if (jedisPool3 ==null){ +// +// } +// redis = new Redis(jedisCluster, jedisPool2, password, mode); +// redisSubscribe = new Redis(jedisCluster, jedisPool, password, mode); +// +// thread = new Thread(new Runnable() { +// @Override +// public void run() { +// log.info("监听反馈消息 {}", channel); +// redisSubscribe.subscribe(new Msg(), channel.getBytes() ); +// +// } +// }); +// thread.setName("messenger"); +// thread.start(); +// } +// +// +// public static Thread subscribe(final BinaryJedisPubSub jedisPubSub, String channel){ +// return new Thread(new Runnable() { +// @Override +// public void run() { +// //调试的时候的配置 +// JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); +// //连接池中的最大空闲连接 +// jedisPoolConfig.setMaxIdle(1000); +// //连接池最大阻塞等待时间 +// //jedisPoolConfig.setMaxWaitMillis(10000); +// //在获取连接的时候检查有效性 +// //jedisPoolConfig.setTestOnBorrow(true); +// // 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true +// jedisPoolConfig.setBlockWhenExhausted(false); +// // 是否启用pool的jmx管理功能, 默认true +// jedisPoolConfig.setJmxEnabled(true); +// jedisPool3 = new JedisPool(jedisPoolConfig,host,port,10000,password); +// +// redisSubscribe2 = new Redis(jedisCluster, jedisPool3, password, mode); +// +// log.info("监听Check消息 {}", channel); +// redisSubscribe2.subscribe(jedisPubSub, channel.getBytes() ); +// } +// }); +// } +// +// public static String genId(){ +// return UUID.randomUUID().toString().replace("-", ""); +// } +// +// +// public static Control.Res primarySyncCmd( String cmd, com.google.protobuf.GeneratedMessageV3 data) throws ExecutionException, InterruptedException { +// var c = System.currentTimeMillis(); +// var last = c - 120000L; +// redis.zremrangeByScore("SERVER_CLUSTER", 0, last); +// val servers = redis.zrevrange("SERVER_CLUSTER", 0, 9); +// if (servers.size()<1){ +// throw new SspwebException("安全服务异常"); +// } +// +// +// Iterator it = servers.iterator(); +// String channel = servers.iterator().next(); +// +// // 方便调试的逻辑 +// // 如果配置了调试服务器,优先使用调试服务器 +// val debugServer = redis.get("DEBUG_SERVER"); +// if ( null != debugServer && servers.contains( debugServer ) ) { +// channel = debugServer; +// } +// +// val req = Control.Req.newBuilder() +// .setId(Messenger.genId()) +// .setOrigin(Control.Req.Origin.MANAGER) +// .setFrom("CLUSTER") +// .setCmd(cmd) +// .setData(Any.pack(data)) +// .build(); +// return soloSyncCmd("SERVER_" + channel, req); +// } +// +// +// public static Control.Res soloSyncCmd(String channel, String cmd, com.google.protobuf.GeneratedMessageV3 data) throws ExecutionException, InterruptedException { +// +// val req = Control.Req.newBuilder() +// .setId(Messenger.genId()) +// .setOrigin(Control.Req.Origin.MANAGER) +// .setFrom("CLUSTER") +// .setCmd(cmd) +// .setData(Any.pack(data)) +// .build(); +// return soloSyncCmd(channel, req); +// } +// /** +// * 同步通信 +// * +// * @param req +// * @return +// */ +// public static Control.Res soloSyncCmd(String channel, Control.Req req) throws ExecutionException, InterruptedException { +// +// var c = System.currentTimeMillis(); +// var last = c - 120000L; +// redis.zremrangeByScore("SERVER_CLUSTER", 0, last); +// val servers = redis.zrevrange("SERVER_CLUSTER", 0, 9); +// if (servers.size()<1){ +// throw new SspwebException("安全服务异常"); +// } +// log.info("SoloSyncCmd 发送消息到 sspserver id={} cmd={}", channel, req.getCmd()); +// +// // 发送请求 +// redis.publish(channel.getBytes(), req.toByteArray() ); +// +// // 单点通信创建一个信号量 +// final SoloRecord record = new SoloRecord(); +// +// // 记录请求 +// pendingMessageRecord.put( req.getId(), record ); +// Future future = executorService.submit(new Callable() { +// +// @Override +// public Control.Res call() throws Exception { +// //Thread.sleep(3000); +// try { +// record.getLock().lock(); +// // 等待信号量 +// record.getCondition().await( Config.global.messenger.maxWaitTime , TimeUnit.SECONDS ); +// +// }catch ( InterruptedException e ){ +// log.error("Solo 请求 {} {} 被打断,from={}, to={}", req.getId() , req.getCmd() , req.getFrom(), channel); +// } finally { +// record.getLock().unlock(); +// } +// +// // 返回结果 +// return record.res; +// } +// }); +// // 阻塞,等待子线程结束 +// return (Control.Res) future.get(); +// +// } +// +// +// public static ConcurrentLinkedQueue clusterSyncCmd( String cmd, com.google.protobuf.GeneratedMessageV3 data) throws ExecutionException, InterruptedException { +// if (ObjectUtil.isNull(data) || StrUtil.isEmpty(data.toString())){ +// val req = Control.Req.newBuilder() +// .setId(Messenger.genId()) +// .setOrigin(Control.Req.Origin.MANAGER) +// .setFrom("CLUSTER") +// .setCmd(cmd) +// .build(); +// return clusterSyncCmd("SERVER_CLUSTER", req, null); +// } else { +// val req = Control.Req.newBuilder() +// .setId(Messenger.genId()) +// .setOrigin(Control.Req.Origin.MANAGER) +// .setFrom("CLUSTER") +// .setCmd(cmd) +// .setData(Any.pack(data)) +// .build(); +// return clusterSyncCmd("SERVER_CLUSTER", req, null); +// } +// +// } +// public static ConcurrentLinkedQueue clusterSyncCmd(String channel, Control.Req req) throws ExecutionException, InterruptedException { +// return clusterSyncCmd(channel, req, null); +// } +// +// /** +// * 集群同步通信 +// * 一个发送多个返回,收到是个数组。 +// * +// * @param req +// * @return +// */ +// public static ConcurrentLinkedQueue clusterSyncCmd(String channel, Control.Req req, ClusterRecordMoniter moniter) throws ExecutionException, InterruptedException { +// +// // 发送请求 +// redis.publish(channel.getBytes(), req.toByteArray() ); +// +// // 单点通信创建一个信号量 +// final ClusterRecord record = new ClusterRecord(); +// +// // 记录请求 +// pendingMessageRecord.put( req.getId(), record ); +// +// // 监听阶段数据 +// executorService.submit(new Runnable() { +// +// @SneakyThrows +// @Override +// public void run() { +// +// try { +// record.getLock().lock(); +// +// log.debug("等待阶段信号"); +// record.getStage().await(Config.global.messenger.maxWaitTime , TimeUnit.SECONDS); +// log.debug("收到阶段信号"); +// +// // 查询应该响应的数量 +// val servers = redis.zrange("SERVER_CLUSTER", 0, 9); +// log.debug("应该收到的消息数 {}, 目前收到的消息数 {}", servers.size(), record.getReslist().size() ); +// +// // 全部收到 +// if ( servers.size() == record.getReslist().size() ){ +// record.getCondition().signal(); +// } +// +// +// if ( null != moniter ){ +// moniter.onStage( servers.size(), record.getReslist(), record.getCondition() ); +// } +// +// } finally { +// record.getLock().unlock(); +// +// } +// } +// }); +// +// // 监听整体 +// Future future = executorService.submit(new Callable>() { +// @Override +// public ConcurrentLinkedQueue call() throws Exception { +// +// try { +// record.getLock().lock(); +// record.getCondition().await(Config.global.messenger.maxWaitTime , TimeUnit.SECONDS); +// return record.getReslist(); +// } finally { +// record.getLock().unlock(); +// } +// } +// }); +// +// // 阻塞,等待子线程结束 +// return (ConcurrentLinkedQueue) future.get(); +// } +// +// @Getter +// private abstract static class Record { +// private final ReentrantLock lock = new ReentrantLock(); +// } +// +// @Getter +// private static class SoloRecord extends Record { +// private Condition condition = getLock().newCondition(); +// // 返回一个结果 +// private Control.Res res; +// } +// +// @Getter +// private static class ClusterRecord extends Record { +// private Condition stage = getLock().newCondition(); +// private Condition condition = getLock().newCondition(); +// +// // 返回一组结果 +// private final ConcurrentLinkedQueue reslist = new ConcurrentLinkedQueue<>(); +// } +// +// /** +// * 允许监听集群命令的中间阶段 +// */ +// public abstract static class ClusterRecordMoniter { +// +// /** +// * +// * @param curAliveServerNum - 当前集群中存活的服务数量,及期望响应的数量 +// * @param reslist - 当前已经收集的返回结果 +// * @param condition - 终结信号量 condition.signal(); 可以提前终结等待。 +// */ +// public abstract void onStage( int curAliveServerNum, ConcurrentLinkedQueue reslist , Condition condition ); +// +// } +// +//} diff --git a/chsm-web-manage/src/main/java/com/sunyard/ssp/server/Redis.java b/chsm-web-manage/src/main/java/com/sunyard/ssp/server/Redis.java index ee8355f..c08860f 100644 --- a/chsm-web-manage/src/main/java/com/sunyard/ssp/server/Redis.java +++ b/chsm-web-manage/src/main/java/com/sunyard/ssp/server/Redis.java @@ -1,288 +1,288 @@ -package com.sunyard.ssp.server; - -import lombok.extern.slf4j.Slf4j; -import redis.clients.jedis.BinaryJedisPubSub; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.JedisPool; - -import java.util.Set; - -// 包装集群和单点 -@Slf4j -public class Redis { - private JedisCluster jedisCluster; - - private JedisPool jedisPool; - - private String mode; - - private String password; - - public void close() { - if ("solo".equals(mode)) { - if (null != jedisPool) { - try { - jedisPool.close(); - } catch (Exception e) { - } - } - } else { - if (null != jedisCluster) { - try { - jedisCluster.close(); - } catch (Exception e) { - } - } - jedisCluster = null; - } - } - - public Redis(JedisCluster jedisCluster, JedisPool jedisPool, String password, String mode) { - this.jedisCluster = jedisCluster; - this.jedisPool = jedisPool; - this.mode = mode; - this.password = password; - } - - public String get(String key) { - if ("solo".equals(mode)) { - Jedis jedis = jedisPool.getResource(); - log.info("jedis publish[{}]",jedis); - try { - - return jedis.get(key); - } finally { - if(jedis!=null){ - jedis.close(); - } - //close(); - } - } else { - return jedisCluster.get( key ); - } - } - - public void subscribe(BinaryJedisPubSub jedisPubSub, final byte[]... channels) { - - - if ("solo".equals(mode)) { - Jedis jedis = jedisPool.getResource(); - log.info("jedis publish[{}]",jedis); - try { - jedis.auth(password); - - jedis.subscribe(jedisPubSub, channels); - } finally { - if(jedis!=null){ - jedis.close(); - } - //close(); - } - } else { - jedisCluster.subscribe(jedisPubSub, channels); - } - - } - - public Long zrem(final String key, final String... members) { - if ("solo".equals(mode)) { - Jedis jedis = jedisPool.getResource(); - try { - jedis.auth(password); - return jedis.zrem(key, members); - } finally { - if (jedis != null) { - jedis.close(); - //close(); - } - } - } else { - return jedisCluster.zrem(key, members); - } - } - - public Long zremrangeByScore(final byte[] key, final double min, final double max) { - if ("solo".equals(mode)) { - Jedis jedis = jedisPool.getResource(); - try { - jedis.auth(password); - return jedis.zremrangeByScore(key, min, max); - } finally { - if (jedis != null) { - jedis.close(); - //close(); - } - } - } else { - return jedisCluster.zremrangeByScore(key, min, max); - } - } - - public Long zremrangeByScore(final String key, final double min, final double max) { - if ("solo".equals(mode)) { - Jedis jedis = jedisPool.getResource(); - try { - jedis.auth(password); - return jedis.zremrangeByScore(key, min, max); - } finally { - if (jedis != null) { - jedis.close(); - //close(); - } - } - } else { - return jedisCluster.zremrangeByScore(key, min, max); - } - } - - public Set zrevrangeByScore(String key, double min, double max) { - if ("solo".equals(mode)) { - Jedis jedis = jedisPool.getResource(); - try { - jedis.auth(password); - return jedis.zrevrangeByScore(key, min, max); - } finally { - if (jedis != null) { - jedis.close(); - //close(); - } - } - } else { - return jedisCluster.zrevrangeByScore(key, min, max); - } - } - - public Set zrangeByScore(String key, double min, double max) { - if ("solo".equals(mode)) { - Jedis jedis = jedisPool.getResource(); - try { - jedis.auth(password); - return jedis.zrangeByScore(key, min, max); - } finally { - if (jedis != null) { - jedis.close(); - //close(); - } - } - } else { - return jedisCluster.zrangeByScore(key, min, max); - } - } - - public Long zadd(final String key, final double score, final String member) { - if ("solo".equals(mode)) { - Jedis jedis = jedisPool.getResource(); - jedis.auth(password); - return jedis.zadd(key, score, member); - } else { - return jedisCluster.zadd(key, score, member); - } - } - - public Long zadd(final byte[] key, final double score, final byte[] member) { - if ("solo".equals(mode)) { - Jedis jedis = jedisPool.getResource(); - jedis.auth(password); - return jedis.zadd(key, score, member); - } else { - return jedisCluster.zadd(key, score, member); - } - } - - public Set zrevrange(final String key, final long start, final long stop) { - - - Set zrange; - if ("solo".equals(mode)) { - Jedis jedis = jedisPool.getResource(); - try { - jedis.auth(password); - zrange = jedis.zrevrange(key, start, stop); - } finally { - if (jedis != null) { - jedis.close(); - //close(); - } - } - } else { - zrange = jedisCluster.zrevrange(key, start, stop); - - } - return zrange; - - } - public Set zrange(final String key, final long start, final long stop) { - - - Set zrange; - if ("solo".equals(mode)) { - Jedis jedis = jedisPool.getResource(); - try { - jedis.auth(password); - zrange = jedis.zrange(key, start, stop); - } finally { - if (jedis != null) { - jedis.close(); - //close(); - } - } - } else { - zrange = jedisCluster.zrange(key, start, stop); - - } - return zrange; - - } - - public Long sadd(final String key, final String... members) { - if ("solo".equals(mode)) { - Jedis jedis = jedisPool.getResource(); - jedis.auth(password); - return jedis.sadd(key, members); - } else { - return jedisCluster.sadd(key, members); - } - } - - public Long del(final String key) { - if ("solo".equals(mode)) { - Jedis jedis = jedisPool.getResource(); - try { - jedis.auth(password); - return jedis.del(key); - } finally { - if (jedis != null) { - jedis.close(); - //close(); - } - } - } else { - return jedisCluster.del(key); - } - } - - - public Long publish(final byte[] channel, final byte[] message) { - Long publish = null; - if ("solo".equals(mode)) { - Jedis jedis = jedisPool.getResource(); - log.info("jedis publish[{}]",jedis); - try { - jedis.auth(password); - publish = jedis.publish(channel, message); - - }catch (Exception e){ - e.printStackTrace(); - }finally { - if (jedis != null) { - jedis.close(); - //close(); - } - } - } else { - publish = jedisCluster.publish(channel, message); - } - return publish; - } -} +//package com.sunyard.ssp.server; +// +//import lombok.extern.slf4j.Slf4j; +//import redis.clients.jedis.BinaryJedisPubSub; +//import redis.clients.jedis.Jedis; +//import redis.clients.jedis.JedisCluster; +//import redis.clients.jedis.JedisPool; +// +//import java.util.Set; +// +//// 包装集群和单点 +//@Slf4j +//public class Redis { +// private JedisCluster jedisCluster; +// +// private JedisPool jedisPool; +// +// private String mode; +// +// private String password; +// +// public void close() { +// if ("solo".equals(mode)) { +// if (null != jedisPool) { +// try { +// jedisPool.close(); +// } catch (Exception e) { +// } +// } +// } else { +// if (null != jedisCluster) { +// try { +// jedisCluster.close(); +// } catch (Exception e) { +// } +// } +// jedisCluster = null; +// } +// } +// +// public Redis(JedisCluster jedisCluster, JedisPool jedisPool, String password, String mode) { +// this.jedisCluster = jedisCluster; +// this.jedisPool = jedisPool; +// this.mode = mode; +// this.password = password; +// } +// +// public String get(String key) { +// if ("solo".equals(mode)) { +// Jedis jedis = jedisPool.getResource(); +// log.info("jedis publish[{}]",jedis); +// try { +// +// return jedis.get(key); +// } finally { +// if(jedis!=null){ +// jedis.close(); +// } +// //close(); +// } +// } else { +// return jedisCluster.get( key ); +// } +// } +// +// public void subscribe(BinaryJedisPubSub jedisPubSub, final byte[]... channels) { +// +// +// if ("solo".equals(mode)) { +// Jedis jedis = jedisPool.getResource(); +// log.info("jedis publish[{}]",jedis); +// try { +// jedis.auth(password); +// +// jedis.subscribe(jedisPubSub, channels); +// } finally { +// if(jedis!=null){ +// jedis.close(); +// } +// //close(); +// } +// } else { +// jedisCluster.subscribe(jedisPubSub, channels); +// } +// +// } +// +// public Long zrem(final String key, final String... members) { +// if ("solo".equals(mode)) { +// Jedis jedis = jedisPool.getResource(); +// try { +// jedis.auth(password); +// return jedis.zrem(key, members); +// } finally { +// if (jedis != null) { +// jedis.close(); +// //close(); +// } +// } +// } else { +// return jedisCluster.zrem(key, members); +// } +// } +// +// public Long zremrangeByScore(final byte[] key, final double min, final double max) { +// if ("solo".equals(mode)) { +// Jedis jedis = jedisPool.getResource(); +// try { +// jedis.auth(password); +// return jedis.zremrangeByScore(key, min, max); +// } finally { +// if (jedis != null) { +// jedis.close(); +// //close(); +// } +// } +// } else { +// return jedisCluster.zremrangeByScore(key, min, max); +// } +// } +// +// public Long zremrangeByScore(final String key, final double min, final double max) { +// if ("solo".equals(mode)) { +// Jedis jedis = jedisPool.getResource(); +// try { +// jedis.auth(password); +// return jedis.zremrangeByScore(key, min, max); +// } finally { +// if (jedis != null) { +// jedis.close(); +// //close(); +// } +// } +// } else { +// return jedisCluster.zremrangeByScore(key, min, max); +// } +// } +// +// public Set zrevrangeByScore(String key, double min, double max) { +// if ("solo".equals(mode)) { +// Jedis jedis = jedisPool.getResource(); +// try { +// jedis.auth(password); +// return jedis.zrevrangeByScore(key, min, max); +// } finally { +// if (jedis != null) { +// jedis.close(); +// //close(); +// } +// } +// } else { +// return jedisCluster.zrevrangeByScore(key, min, max); +// } +// } +// +// public Set zrangeByScore(String key, double min, double max) { +// if ("solo".equals(mode)) { +// Jedis jedis = jedisPool.getResource(); +// try { +// jedis.auth(password); +// return jedis.zrangeByScore(key, min, max); +// } finally { +// if (jedis != null) { +// jedis.close(); +// //close(); +// } +// } +// } else { +// return jedisCluster.zrangeByScore(key, min, max); +// } +// } +// +// public Long zadd(final String key, final double score, final String member) { +// if ("solo".equals(mode)) { +// Jedis jedis = jedisPool.getResource(); +// jedis.auth(password); +// return jedis.zadd(key, score, member); +// } else { +// return jedisCluster.zadd(key, score, member); +// } +// } +// +// public Long zadd(final byte[] key, final double score, final byte[] member) { +// if ("solo".equals(mode)) { +// Jedis jedis = jedisPool.getResource(); +// jedis.auth(password); +// return jedis.zadd(key, score, member); +// } else { +// return jedisCluster.zadd(key, score, member); +// } +// } +// +// public Set zrevrange(final String key, final long start, final long stop) { +// +// +// Set zrange; +// if ("solo".equals(mode)) { +// Jedis jedis = jedisPool.getResource(); +// try { +// jedis.auth(password); +// zrange = jedis.zrevrange(key, start, stop); +// } finally { +// if (jedis != null) { +// jedis.close(); +// //close(); +// } +// } +// } else { +// zrange = jedisCluster.zrevrange(key, start, stop); +// +// } +// return zrange; +// +// } +// public Set zrange(final String key, final long start, final long stop) { +// +// +// Set zrange; +// if ("solo".equals(mode)) { +// Jedis jedis = jedisPool.getResource(); +// try { +// jedis.auth(password); +// zrange = jedis.zrange(key, start, stop); +// } finally { +// if (jedis != null) { +// jedis.close(); +// //close(); +// } +// } +// } else { +// zrange = jedisCluster.zrange(key, start, stop); +// +// } +// return zrange; +// +// } +// +// public Long sadd(final String key, final String... members) { +// if ("solo".equals(mode)) { +// Jedis jedis = jedisPool.getResource(); +// jedis.auth(password); +// return jedis.sadd(key, members); +// } else { +// return jedisCluster.sadd(key, members); +// } +// } +// +// public Long del(final String key) { +// if ("solo".equals(mode)) { +// Jedis jedis = jedisPool.getResource(); +// try { +// jedis.auth(password); +// return jedis.del(key); +// } finally { +// if (jedis != null) { +// jedis.close(); +// //close(); +// } +// } +// } else { +// return jedisCluster.del(key); +// } +// } +// +// +// public Long publish(final byte[] channel, final byte[] message) { +// Long publish = null; +// if ("solo".equals(mode)) { +// Jedis jedis = jedisPool.getResource(); +// log.info("jedis publish[{}]",jedis); +// try { +// jedis.auth(password); +// publish = jedis.publish(channel, message); +// +// }catch (Exception e){ +// e.printStackTrace(); +// }finally { +// if (jedis != null) { +// jedis.close(); +// //close(); +// } +// } +// } else { +// publish = jedisCluster.publish(channel, message); +// } +// return publish; +// } +//} diff --git a/chsm-web-manage/src/main/java/com/sunyard/ssp/server/ServerTest.java b/chsm-web-manage/src/main/java/com/sunyard/ssp/server/ServerTest.java index 1fddcd0..f5e78e1 100644 --- a/chsm-web-manage/src/main/java/com/sunyard/ssp/server/ServerTest.java +++ b/chsm-web-manage/src/main/java/com/sunyard/ssp/server/ServerTest.java @@ -1,223 +1,223 @@ -package com.sunyard.ssp.server; - -import com.google.protobuf.Any; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.UInt64Value; -import com.sunyard.ssp.proto.control.Control; -import lombok.NoArgsConstructor; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import lombok.val; -import redis.clients.jedis.BinaryJedisPubSub; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -/** - * @author:tsz - * @date:2020/3/19 - * @description: - */ -@Slf4j -public class ServerTest { - - /** - *@author :tsz - *@date :2020/03/19 15:56 - *@description : 测试是否联通渠道 - */ - public static void connect() { - try { - - // 准备消息 - val req = Control.Req.newBuilder() - .setId(Messenger.genId()) - .setOrigin(Control.Req.Origin.MANAGER) - .setFrom("CLUSTER") - .setCmd("test.ping") - .build(); - //Manager 监听通道 MANAGER_CLUSTER - // SERVER_CLUSTER - val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req); - if (null == res) { - System.err.println("接收命令响应超时!"); - } else { -// Control.CMD_ping p = res.getData().unpack(Control.CMD_ping.class); - - } - } catch (Exception e) { - e.printStackTrace(); - } - - } - - /** - *@author :tsz - *@date :2020/3/19 16:06 - *@param : 传递渠道id? - *@return : - *@description : 渠道更新 - */ - public static void channelUpdate(String channelId) { - try { - - // 准备消息 - val req = Control.Req.newBuilder() - .setId(Messenger.genId()) - .setOrigin(Control.Req.Origin.MANAGER) - .setFrom("CLUSTER") - .setCmd("channel.update.enable") - .setData(Any.pack(UInt64Value.of(Long.valueOf(channelId)))) - .build(); - //Manager 监听通道 MANAGER_CLUSTER - // SERVER_CLUSTER - val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req); - if (null == res) { - System.err.println("接收命令响应超时!"); - } else { - - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - *@author :tsz - *@date :2020/3/19 16:44 - *@param : - *@return : - *@description : 列出所有渠道 - */ - public static void channelList() { - try { - - // 准备消息 - val req = Control.Req.newBuilder() - .setId(Messenger.genId()) - .setOrigin(Control.Req.Origin.MANAGER) - .setFrom("CLUSTER") - .setCmd("channel.list") - .build(); - //Manager 监听通道 MANAGER_CLUSTER - // SERVER_CLUSTER - val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req); - if (null == res) { - System.err.println("接收命令响应超时!"); - } else { - val list = res.getData().unpack( Control.VOChannelList.class ); - - } - } catch (Exception e) { - e.printStackTrace(); - } - - } - - /** - *@author :tsz - *@date :2020/3/19 16:49 - *@param : - *@return : - *@description : 列出所有服务器 - */ - public static void serverList(){ - try { - - // 准备消息 - val req = Control.Req.newBuilder() - .setId(Messenger.genId()) - .setOrigin(Control.Req.Origin.MANAGER) - .setFrom("CLUSTER") - .setCmd("server.list") - .build(); - //Manager 监听通道 MANAGER_CLUSTER - // SERVER_CLUSTER - val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req); - if (null == res) { - System.err.println("接收命令响应超时!"); - } else { - val list = res.getData().unpack( Control.VOChannelList.class ); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - *@author :tsz - *@date :2020/03/24 15:38 - *@description : 渠道验证 - */ - public static void VOChannelCheck(Long channelId,String checkId) { - - - val buff = new HashMap>(); - - - - @NoArgsConstructor - @Setter - class Msg extends BinaryJedisPubSub { - - private Thread t; - - - @Override - public void onMessage(byte[] channel, byte[] message) { - - - // 将 message 解析为 Res 对象 - try { - val res = Control.VOChannelCheckRes.parseFrom( message ); - if ( !buff.containsKey(checkId)){ - buff.put(checkId,new ArrayList<>()); - } - - val list = buff.get(checkId); - - - - list.add(res); - if ( res.getType().equals("end") ){ - t.stop(); - } - - - } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); - } - - } - } - - // - val msg = new Msg(); - Thread t = Messenger.subscribe(msg, checkId ); - msg.setT( t ); - t.start(); - - - try { - // 准备消息 - val req = Control.Req.newBuilder() - .setId(Messenger.genId()) - .setOrigin(Control.Req.Origin.MANAGER) - .setFrom("CLUSTER") - .setCmd("channel.check") - .setData(Any.pack(Control.VOChannelCheck.newBuilder().setChannelId(channelId).setCheckId(checkId).build())) - .build(); - } catch (Exception e) { - e.printStackTrace(); - } - - } - - - public static void main(String[] args) { - VOChannelCheck(11L,"111"); - } - - -} +//package com.sunyard.ssp.server; +// +//import com.google.protobuf.Any; +//import com.google.protobuf.InvalidProtocolBufferException; +//import com.google.protobuf.UInt64Value; +//import com.sunyard.ssp.proto.control.Control; +//import lombok.NoArgsConstructor; +//import lombok.Setter; +//import lombok.extern.slf4j.Slf4j; +//import lombok.val; +//import redis.clients.jedis.BinaryJedisPubSub; +// +//import java.util.ArrayList; +//import java.util.HashMap; +//import java.util.List; +// +///** +// * @author:tsz +// * @date:2020/3/19 +// * @description: +// */ +//@Slf4j +//public class ServerTest { +// +// /** +// *@author :tsz +// *@date :2020/03/19 15:56 +// *@description : 测试是否联通渠道 +// */ +// public static void connect() { +// try { +// +// // 准备消息 +// val req = Control.Req.newBuilder() +// .setId(Messenger.genId()) +// .setOrigin(Control.Req.Origin.MANAGER) +// .setFrom("CLUSTER") +// .setCmd("test.ping") +// .build(); +// //Manager 监听通道 MANAGER_CLUSTER +// // SERVER_CLUSTER +// val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req); +// if (null == res) { +// System.err.println("接收命令响应超时!"); +// } else { +//// Control.CMD_ping p = res.getData().unpack(Control.CMD_ping.class); +// +// } +// } catch (Exception e) { +// e.printStackTrace(); +// } +// +// } +// +// /** +// *@author :tsz +// *@date :2020/3/19 16:06 +// *@param : 传递渠道id? +// *@return : +// *@description : 渠道更新 +// */ +// public static void channelUpdate(String channelId) { +// try { +// +// // 准备消息 +// val req = Control.Req.newBuilder() +// .setId(Messenger.genId()) +// .setOrigin(Control.Req.Origin.MANAGER) +// .setFrom("CLUSTER") +// .setCmd("channel.update.enable") +// .setData(Any.pack(UInt64Value.of(Long.valueOf(channelId)))) +// .build(); +// //Manager 监听通道 MANAGER_CLUSTER +// // SERVER_CLUSTER +// val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req); +// if (null == res) { +// System.err.println("接收命令响应超时!"); +// } else { +// +// } +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// +// /** +// *@author :tsz +// *@date :2020/3/19 16:44 +// *@param : +// *@return : +// *@description : 列出所有渠道 +// */ +// public static void channelList() { +// try { +// +// // 准备消息 +// val req = Control.Req.newBuilder() +// .setId(Messenger.genId()) +// .setOrigin(Control.Req.Origin.MANAGER) +// .setFrom("CLUSTER") +// .setCmd("channel.list") +// .build(); +// //Manager 监听通道 MANAGER_CLUSTER +// // SERVER_CLUSTER +// val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req); +// if (null == res) { +// System.err.println("接收命令响应超时!"); +// } else { +// val list = res.getData().unpack( Control.VOChannelList.class ); +// +// } +// } catch (Exception e) { +// e.printStackTrace(); +// } +// +// } +// +// /** +// *@author :tsz +// *@date :2020/3/19 16:49 +// *@param : +// *@return : +// *@description : 列出所有服务器 +// */ +// public static void serverList(){ +// try { +// +// // 准备消息 +// val req = Control.Req.newBuilder() +// .setId(Messenger.genId()) +// .setOrigin(Control.Req.Origin.MANAGER) +// .setFrom("CLUSTER") +// .setCmd("server.list") +// .build(); +// //Manager 监听通道 MANAGER_CLUSTER +// // SERVER_CLUSTER +// val res = Messenger.soloSyncCmd("SERVER_CLUSTER", req); +// if (null == res) { +// System.err.println("接收命令响应超时!"); +// } else { +// val list = res.getData().unpack( Control.VOChannelList.class ); +// } +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// +// /** +// *@author :tsz +// *@date :2020/03/24 15:38 +// *@description : 渠道验证 +// */ +// public static void VOChannelCheck(Long channelId,String checkId) { +// +// +// val buff = new HashMap>(); +// +// +// +// @NoArgsConstructor +// @Setter +// class Msg extends BinaryJedisPubSub { +// +// private Thread t; +// +// +// @Override +// public void onMessage(byte[] channel, byte[] message) { +// +// +// // 将 message 解析为 Res 对象 +// try { +// val res = Control.VOChannelCheckRes.parseFrom( message ); +// if ( !buff.containsKey(checkId)){ +// buff.put(checkId,new ArrayList<>()); +// } +// +// val list = buff.get(checkId); +// +// +// +// list.add(res); +// if ( res.getType().equals("end") ){ +// t.stop(); +// } +// +// +// } catch (InvalidProtocolBufferException e) { +// e.printStackTrace(); +// } +// +// } +// } +// +// // +// val msg = new Msg(); +// Thread t = Messenger.subscribe(msg, checkId ); +// msg.setT( t ); +// t.start(); +// +// +// try { +// // 准备消息 +// val req = Control.Req.newBuilder() +// .setId(Messenger.genId()) +// .setOrigin(Control.Req.Origin.MANAGER) +// .setFrom("CLUSTER") +// .setCmd("channel.check") +// .setData(Any.pack(Control.VOChannelCheck.newBuilder().setChannelId(channelId).setCheckId(checkId).build())) +// .build(); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// +// } +// +// +// public static void main(String[] args) { +// VOChannelCheck(11L,"111"); +// } +// +// +//} diff --git a/config/redis.config.json b/config/redis.config.json deleted file mode 100644 index 9395349..0000000 --- a/config/redis.config.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "password" : "sunyard2", - "mode" : "solo", - "node" : { - "ip" : "172.16.17.163", - "port" : 6379 - }, - "nodes" : [ - { - "ip" : "172.16.17.163", - "port" : 6379 - } - ] - - -}