diff --git a/game/src/main/java/com/ccsens/game/netty/wsserver/WebSocketDecoder.java b/game/src/main/java/com/ccsens/game/netty/wsserver/WebSocketDecoder.java
index 76c43f54..0d359b13 100644
--- a/game/src/main/java/com/ccsens/game/netty/wsserver/WebSocketDecoder.java
+++ b/game/src/main/java/com/ccsens/game/netty/wsserver/WebSocketDecoder.java
@@ -10,7 +10,6 @@ import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.List;
/**
diff --git a/game/src/main/resources/application.yml b/game/src/main/resources/application.yml
index d082c0ea..5c2cd5c4 100644
--- a/game/src/main/resources/application.yml
+++ b/game/src/main/resources/application.yml
@@ -1,4 +1,4 @@
spring:
profiles:
- active: prod
- include: common, util-prod
\ No newline at end of file
+ active: dev
+ include: common, util-dev
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 7562a898..21b355a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,7 +11,7 @@
tall
recovery
-
+ game
mt
wisdomcar
diff --git a/recovery/src/main/java/com/ccsens/recovery/RecoveryApplication.java b/recovery/src/main/java/com/ccsens/recovery/RecoveryApplication.java
index 6bcf6394..d34d2d88 100644
--- a/recovery/src/main/java/com/ccsens/recovery/RecoveryApplication.java
+++ b/recovery/src/main/java/com/ccsens/recovery/RecoveryApplication.java
@@ -1,6 +1,8 @@
package com.ccsens.recovery;
+import com.ccsens.recovery.netty.wsserver.NettyWsServer;
import org.mybatis.spring.annotation.MapperScan;
+import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
@@ -8,6 +10,8 @@ import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableAsync;
+import javax.annotation.Resource;
+
/**
* @author 逗
@@ -19,10 +23,16 @@ import org.springframework.scheduling.annotation.EnableAsync;
@EnableCircuitBreaker
@EnableFeignClients(basePackages = "com.ccsens.cloudutil.feign")
@SpringBootApplication(scanBasePackages = "com.ccsens")
-public class RecoveryApplication {
+public class RecoveryApplication implements CommandLineRunner {
+ @Resource
+ private NettyWsServer nettyWsServer;
public static void main(String[] args) {
SpringApplication.run(RecoveryApplication.class, args);
}
+ @Override
+ public void run(String... args) throws Exception {
+ nettyWsServer.start();
+ }
}
diff --git a/recovery/src/main/java/com/ccsens/recovery/bean/message/AckMessageDto.java b/recovery/src/main/java/com/ccsens/recovery/bean/message/AckMessageDto.java
new file mode 100644
index 00000000..3cd8c8aa
--- /dev/null
+++ b/recovery/src/main/java/com/ccsens/recovery/bean/message/AckMessageDto.java
@@ -0,0 +1,29 @@
+package com.ccsens.recovery.bean.message;
+
+import com.ccsens.util.WebConstant;
+import lombok.Data;
+
+@Data
+public class AckMessageDto extends BaseMessageDto {
+ @lombok.Data
+ public static class Data {
+ Long msgId;
+ }
+
+ private Data data;
+
+ public AckMessageDto(){
+ setType(WebConstant.Message_Type.Ack.phase);
+ setEvent(WebConstant.Message_Ack_Event.Ack.phase);
+ setTime(System.currentTimeMillis());
+ }
+
+ public AckMessageDto(Long projectId, Long msgId){
+ this();
+ setProjectId(projectId);
+
+ Data d = new Data();
+ d.setMsgId(msgId);
+ setData(d);
+ }
+}
diff --git a/recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageDto.java b/recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageDto.java
new file mode 100644
index 00000000..e5f77bda
--- /dev/null
+++ b/recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageDto.java
@@ -0,0 +1,30 @@
+package com.ccsens.recovery.bean.message;
+
+import com.ccsens.util.WebConstant;
+import lombok.Data;
+
+@Data
+public class AuthMessageDto extends BaseMessageDto {
+ @lombok.Data
+ public static class Data{
+ private String userId;
+ private String channelId;
+ private String token;
+ }
+
+ private Data data;
+
+ public AuthMessageDto(){
+ setType(WebConstant.Message_Type.Auth.phase);
+ setEvent(WebConstant.Message_Auth_Event.Auth.phase);
+ setTime(System.currentTimeMillis());
+ }
+
+ public AuthMessageDto(String userId, String channelId){
+ this();
+ Data d = new Data();
+ d.setUserId(userId);
+ d.setChannelId(channelId);
+ setData(d);
+ }
+}
diff --git a/recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageWithAnswerDto.java b/recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageWithAnswerDto.java
new file mode 100644
index 00000000..7639ad96
--- /dev/null
+++ b/recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageWithAnswerDto.java
@@ -0,0 +1,29 @@
+package com.ccsens.recovery.bean.message;
+
+import com.ccsens.util.WebConstant;
+import lombok.Data;
+
+@Data
+public class AuthMessageWithAnswerDto extends BaseMessageDto{
+ @lombok.Data
+ public static class Data{
+ private Boolean success;
+ private String phase;
+ }
+
+ private Data data;
+
+ public AuthMessageWithAnswerDto(){
+ setType(WebConstant.Message_Type.Auth.phase);
+ setEvent(WebConstant.Message_Auth_Event.Answer.phase);
+ setTime(System.currentTimeMillis());
+ }
+
+ public AuthMessageWithAnswerDto(Boolean success, String phase){
+ this();
+ Data d = new Data();
+ d.setSuccess(success);
+ d.setPhase(phase);
+ setData(d);
+ }
+}
diff --git a/recovery/src/main/java/com/ccsens/recovery/bean/message/BaseMessageDto.java b/recovery/src/main/java/com/ccsens/recovery/bean/message/BaseMessageDto.java
index 4a1e5983..f6ffe329 100644
--- a/recovery/src/main/java/com/ccsens/recovery/bean/message/BaseMessageDto.java
+++ b/recovery/src/main/java/com/ccsens/recovery/bean/message/BaseMessageDto.java
@@ -51,16 +51,16 @@ public class BaseMessageDto {
private List receivers;
// private Object data;
- public Set receiversTransTos() {
- Set tos = new HashSet<>();
- if (CollectionUtil.isEmpty(receivers)) {
- return tos;
- }
- receivers.forEach(receiver -> {
- InMessage.To to = new InMessage.To(receiver.getUserId());
- tos.add(JSONObject.toJSONString(to));
- });
-
- return tos;
- }
+// public Set receiversTransTos() {
+// Set tos = new HashSet<>();
+// if (CollectionUtil.isEmpty(receivers)) {
+// return tos;
+// }
+// receivers.forEach(receiver -> {
+// InMessage.To to = new InMessage.To(receiver.getUserId());
+// tos.add(JSONObject.toJSONString(to));
+// });
+//
+// return tos;
+// }
}
diff --git a/recovery/src/main/java/com/ccsens/recovery/bean/message/HeartMessageDto.java b/recovery/src/main/java/com/ccsens/recovery/bean/message/HeartMessageDto.java
new file mode 100644
index 00000000..c69931df
--- /dev/null
+++ b/recovery/src/main/java/com/ccsens/recovery/bean/message/HeartMessageDto.java
@@ -0,0 +1,20 @@
+package com.ccsens.recovery.bean.message;
+
+import com.ccsens.util.WebConstant;
+import lombok.Data;
+
+@Data
+public class HeartMessageDto extends BaseMessageDto{
+ @lombok.Data
+ public static class Data{
+ private int major;
+ private int minor;
+ }
+ private Data data;
+
+ public HeartMessageDto(){
+ setType(WebConstant.Message_Type.Heart.phase);
+ setEvent(WebConstant.Message_Heart_Event.Heart.phase);
+ setTime(System.currentTimeMillis());
+ }
+}
diff --git a/recovery/src/main/java/com/ccsens/recovery/netty/ChannelManager.java b/recovery/src/main/java/com/ccsens/recovery/netty/ChannelManager.java
new file mode 100644
index 00000000..1a3b4a38
--- /dev/null
+++ b/recovery/src/main/java/com/ccsens/recovery/netty/ChannelManager.java
@@ -0,0 +1,388 @@
+package com.ccsens.recovery.netty;
+
+import cn.hutool.core.collection.CollectionUtil;
+import io.netty.channel.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author wei
+ */
+@Slf4j
+@Component
+public class ChannelManager {
+ private static Logger logger = LoggerFactory.getLogger(ChannelManager.class);
+ private static ThreadLocal threadLocal = new ThreadLocal<>();
+
+ /**
+ * UserId,WrappedChannel authed channels;
+ */
+ private static Map> authedChannels;
+ /**
+ * Channel,WrapperedChannel
+ */
+ private static Map rawChannels;
+
+ static {
+ authedChannels = new ConcurrentHashMap<>();
+ rawChannels = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * 私有构造,不允许生成该类对象
+ */
+ private ChannelManager(){}
+
+ /**
+ * 设置当前正在错误的channel
+ * @param channel netty ws/tcp连接
+ */
+ public static void setCurrentChannel(Channel channel){
+ threadLocal.set(channel);
+ }
+
+ /**
+ * 获取当前线程的channel对象
+ * @return channel
+ */
+ public static Channel getCurrentChannel(){
+ return threadLocal.get();
+ }
+
+ /**
+ * 删除当前线程的channel对象
+ */
+ public static void removeCurrentChannel(){
+ threadLocal.remove();
+ }
+
+ /**
+ * 添加一个新的连接(Channel)
+ * @param channel 新的连接
+ * @param serverType 服务器类型
+ */
+ public static synchronized void addChannel(Channel channel,String serverType){
+ logger.debug("Invoke addChannel({},{})",channel,serverType);
+ if(null != channel) {
+ rawChannels.put(channel, new WrapperedChannel(channel, serverType));
+ logger.debug("Add a new channel: {},{}",channel.id().asLongText(),serverType);
+ }else{
+ logger.error("channel is null");
+ }
+ }
+
+ /**
+ * 用户认证
+ * @param channel 连接
+ * @param userId 用户ID
+ * @param major 客户端主版本号
+ * @param minor 客户端次版本号
+ * @param message 业务相关参数
+ * @param mqType 硬件设备消息转发mq类型
+ */
+ public static synchronized void authChannel(Channel channel,String userId,Integer major,Integer minor, String message, Byte mqType){
+ logger.debug("Invoke authedChannels({},{},{},{})",channel.id().asLongText(),userId,major,minor);
+ major = major != null ? major : 0;
+ minor = minor != null ? minor : 0;
+
+ WrapperedChannel wrapperedChannel = rawChannels.get(channel);
+ if(wrapperedChannel != null){
+ wrapperedChannel.whenAuthed(userId,major,minor, message, mqType);
+ Set authedWchannelSet = authedChannels.computeIfAbsent(userId, k -> new HashSet<>());
+ authedWchannelSet.add(wrapperedChannel);
+ logger.debug("Authed channel {} with user {}", channel.id().asLongText(), userId);
+ }else{
+ logger.error("Authed channel,but wrappedChannel is null.");
+ }
+ }
+ /**
+ * 用户认证
+ * @param channel 连接
+ * @param userId 用户ID
+ * @param major 客户端主版本号
+ * @param minor 客户端次版本号
+ */
+ public static synchronized void authChannel(Channel channel,String userId,Integer major,Integer minor){
+ major = major != null ? major : 0;
+ minor = minor != null ? minor : 0;
+ WrapperedChannel wrapperedChannel = rawChannels.get(channel);
+ if(wrapperedChannel != null){
+ wrapperedChannel.whenAuthed(userId,major,minor);
+ Set authedWchannelSet = authedChannels.computeIfAbsent(userId, k -> new HashSet<>());
+
+ authedWchannelSet.add(wrapperedChannel);
+ }else{
+ log.error("Authed channel,but wrappedChannel is null.");
+ }
+ }
+
+ /**
+ * 移除一个连接(Channel)
+ * @param channel 要移除的连接
+ */
+ public static synchronized void removeChannel(Channel channel){
+ logger.debug("Invoke removeChannel({})",channel.id().asLongText());
+ WrapperedChannel wrapperedChannel = rawChannels.get(channel);
+ if(wrapperedChannel != null){
+ //从rawChannels集合中删除
+ rawChannels.remove(channel);
+ logger.debug("Remove a channel from rawChannels: {}",channel.id().asLongText());
+ if(wrapperedChannel.isAuthed()){
+ Set authedChannelSet = authedChannels.get(wrapperedChannel.getUserId());
+ //从authedChannel集合的value(set)中删除
+ if(CollectionUtil.isNotEmpty(authedChannelSet)){
+ authedChannelSet.remove(wrapperedChannel);
+ logger.debug("Remove a channel from authedChannelSet: {}, {}",wrapperedChannel.getUserId(),channel.id().asLongText());
+ }
+ //从authedChannel中删除,此处不用else,因为if中语句执行完毕之后,authedChannelSet也可能变成空集合
+ if(CollectionUtil.isEmpty(authedChannelSet)){
+ authedChannels.remove(wrapperedChannel.getUserId());
+ logger.debug("Remove a user from authedChannels: {}",wrapperedChannel.getUserId());
+ }
+ }
+ }else{
+ logger.error("Remove channel,but wrappedChannel is null.");
+ }
+
+ if(channel.isOpen() || channel.isActive()){
+ channel.close();
+ }
+ }
+
+ /**
+ * 移除一个用户
+ * @param userId 要移除的用户
+ */
+ public static synchronized void removeUser(String userId){
+ logger.debug("Invoke remove user : {}",userId);
+ Set wChannelSet = authedChannels.get(userId);
+ if(CollectionUtil.isNotEmpty(wChannelSet)){
+ for(WrapperedChannel wChannel : wChannelSet){
+ //从rawChannel中依次删除
+ rawChannels.remove(wChannel.getChannel());
+ logger.debug("Remove a channel from rawChannels: {}",wChannel.getChannel().id().asLongText());
+ }
+ }
+ //从authedChannel中删除
+ authedChannels.remove(userId);
+ logger.debug("Remove a user from authedChannels: {}",userId);
+ }
+
+ /**
+ * 添加版本号
+ * 只能给已认证的请求添加版本号
+ * @param channel 连接
+ * @param major 主版本号
+ * @param minor 次版本号
+ */
+ public static synchronized void versionChannel(Channel channel,int major,int minor){
+ logger.debug("Invoke Version channel({},{},{}))",channel.id().asLongText(),major,minor);
+ WrapperedChannel wChannel = rawChannels.get(channel);
+ if(wChannel != null){
+ wChannel.setVersion(major,minor);
+ logger.debug("Version Channel: {},{},{}",channel.id().asLongText(),major,minor);
+ }else{
+ logger.error("Remove channel,but wrappedChannel is null.");
+ }
+ }
+
+ /**
+ * 发送广播消息给所有已认证Channel
+ * @param message 消息
+ */
+ public static synchronized void broadCastAuthed(Object message) {
+ logger.debug("Invoke broadCastAuthed({})",message);
+ for (Map.Entry entry : rawChannels.entrySet()) {
+ WrapperedChannel wChannel = entry.getValue();
+ if(wChannel.isAuthed()) {
+ wChannel.writeAndFlush(message);
+ logger.debug("Send Message {} to {},{}",message,wChannel.getUserId(),wChannel.getId());
+ }
+ }
+ }
+
+ /**
+ * 发送广播消息
+ * @param message 消息
+ */
+ public static synchronized void broadCast(Object message) {
+ logger.debug("Invoke broadCast({})",message);
+ for (Map.Entry entry : rawChannels.entrySet()) {
+ entry.getValue().writeAndFlush(message);
+ logger.debug("Send Message {} to {}",message,entry.getValue().getId());
+ }
+ }
+
+ /**
+ * 发送消息给某个连接
+ * @param channel 连接
+ * @param message 消息
+ */
+ public static synchronized void sendTo(Channel channel,Object message){
+ log.info("Invoke sendTo({},{})",channel.id().asLongText(),message);
+ WrapperedChannel wrapperedChannel = rawChannels.get(channel);
+ if(wrapperedChannel != null) {
+ wrapperedChannel.writeAndFlush(message);
+ log.info("Write message {} to Channel {}",message,channel);
+ }else{
+ log.info("can't find channel from rawChannels");
+ }
+ }
+
+ /**
+ * 发送消息给某个用户
+ * @param userId 用户ID
+ * @param message 消息
+ */
+ public static synchronized boolean sendTo(String userId,Object message){
+ logger.info("Invoke sendTo({},{})",userId,message);
+ Set wChannelSet = authedChannels.get(userId);
+ if(CollectionUtil.isNotEmpty(wChannelSet)){
+ for(WrapperedChannel wChannel:wChannelSet){
+ wChannel.writeAndFlush(message);
+ logger.info("Send message {} to channel {}",message,userId);
+ }
+ return true;
+ }else{
+ return false;
+ }
+ }
+
+ /**
+ * 刷新最后一次接收数据时间 每次接收数据需要调用该函数
+ * @param channel 接收数据的连接
+ */
+ public static synchronized void flushReceiveTimestamp(Channel channel){
+ logger.debug("Invoke flushReceiveTimestamp({})",channel.id().asLongText());
+ WrapperedChannel wrapperedChannel = rawChannels.get(channel);
+ if(wrapperedChannel != null){
+ wrapperedChannel.whenReceivedData();
+ }else{
+ logger.error("can find channel from rawChannels");
+ }
+ }
+
+ /**
+ * 关闭所有未认证的连接
+ * @param unAuthedChannelsMaxAliveTimeInSeconds 未认证的最大时长(s)
+ * @throws Exception
+ * @deprecated 已过期,该方法在多线程并发下可能出现问题,建议使用messageService中的同名方法
+ */
+ @Deprecated
+ public static synchronized void closeUnAuthedChannels(Long unAuthedChannelsMaxAliveTimeInSeconds) throws Exception {
+ logger.debug("Inovke closeUnAuthedChannels({})",unAuthedChannelsMaxAliveTimeInSeconds);
+ Iterator> it = rawChannels.entrySet().iterator();
+ while(it.hasNext()){
+ Map.Entry entry = it.next();
+ WrapperedChannel wrapperedChannel = entry.getValue();
+ if((!wrapperedChannel.isAuthed()) && (wrapperedChannel.getConnectedSeconds() > unAuthedChannelsMaxAliveTimeInSeconds)) {
+ it.remove();
+ //关闭连接
+ wrapperedChannel.getChannel().close();
+ logger.debug("Remove a unAuthed Channel {}, which has connected {}s,maxOutTime is {}s", wrapperedChannel.getId(),wrapperedChannel.getConnectedSeconds() ,
+ unAuthedChannelsMaxAliveTimeInSeconds );
+ }
+ }
+ }
+
+ /**
+ * 判断Channel是否存在
+ * @param channel 连接
+ * @return 是否存在
+ */
+ public static synchronized boolean channelExist(Channel channel){
+ return rawChannels.containsKey(channel);
+ }
+
+ /**
+ * 判断Channel是否已经认证
+ * @param channel 连接
+ * @return 是否认证
+ */
+ public static synchronized boolean channelAuthed(Channel channel){
+ return rawChannels.containsKey(channel) && rawChannels.get(channel).isAuthed();
+ }
+
+ /**
+ * 判断用户是否在线
+ * @param userId 用户ID
+ * @return 是否认证
+ */
+ public static synchronized boolean isUserOnline(String userId){
+ return authedChannels.containsKey(userId);
+ }
+
+ /**
+ * 根据Channel获取对应的用户ID
+ * @param channel 连接
+ * @return 用户ID
+ */
+ public static synchronized String getUserIdByChannel(Channel channel) {
+ return rawChannels.containsKey(channel) ? rawChannels.get(channel).getUserId() : null;
+ }
+
+ /**
+ * 获取所有在线用户
+ * @return 所有用户的集合列表
+ */
+ public static synchronized Set getOnlineUsers(){
+ return authedChannels.keySet();
+ }
+
+ /**
+ * 获取某种类型的所有在线用户的连接
+ * @param type 客户端类型(ws/tcp modebus/tcp text)
+ * @return 所有在西安channel的集合列表
+ */
+ public static synchronized Set getOnlineChannels(String type){
+ Set onLineChannels = CollectionUtil.newHashSet();
+ for(Map.Entry entry: rawChannels.entrySet()){
+ WrapperedChannel wrapperedChannel = entry.getValue();
+ if(wrapperedChannel.isAuthed() && wrapperedChannel.getType().equals(type)){
+ onLineChannels.add(entry.getKey());
+ }
+ }
+ return onLineChannels;
+ }
+
+ /**
+ * 获取一个rawChannel的副本
+ * @return 副本
+ */
+ public static Map getCopyOfAllChannels() {
+ Map copyMap = new HashMap<>(rawChannels.size());
+ copyMap.putAll(rawChannels);
+ return copyMap;
+ }
+
+ /**
+ * 根据Channel获取WrapperedChannel
+ * @param channel channel
+ * @return 对应的wrapperedChannel
+ */
+ public static WrapperedChannel getWrapperedChannelByChannel(Channel channel) {
+ return rawChannels.get(channel);
+ }
+
+ public static synchronized void showAuthedChannels(){
+ for(Map.Entry> entry : authedChannels.entrySet()){
+ for (WrapperedChannel channel:entry.getValue()){
+ logger.debug("{}-->{}",entry.getKey(),channel.toString());
+ }
+ }
+ }
+ public static synchronized void showAllChannels(){
+ for(Map.Entry entry : rawChannels.entrySet()){
+ logger.debug(entry.getValue().toString());
+ }
+ }
+
+ public static Set getAllOnlineUsers() {
+ return authedChannels.keySet();
+ }
+}
diff --git a/recovery/src/main/java/com/ccsens/recovery/netty/WrapperedChannel.java b/recovery/src/main/java/com/ccsens/recovery/netty/WrapperedChannel.java
new file mode 100644
index 00000000..d15dbf92
--- /dev/null
+++ b/recovery/src/main/java/com/ccsens/recovery/netty/WrapperedChannel.java
@@ -0,0 +1,239 @@
+package com.ccsens.recovery.netty;
+
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
+import com.ccsens.util.DateUtil;
+import com.ccsens.util.JacksonUtil;
+import io.netty.channel.Channel;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author wei
+ */
+@Slf4j
+@Getter
+@Setter
+public class WrapperedChannel {
+ @Getter
+ @Setter
+ private static class ClientVersion{
+ int major;
+ int minor;
+ public ClientVersion(int major,int minor){
+ this.major = major;
+ this.minor = minor;
+ }
+ }
+
+ /**
+ * Netty's Channel
+ */
+ private Channel channel;
+ /**
+ * userId
+ */
+ private String userId;
+ /**
+ * 客户端版本号
+ */
+ private ClientVersion version;
+
+ /**
+ * 客户端类型(哪种服务器)
+ */
+ private String type;
+ /**
+ * 是否认证
+ */
+ private boolean authed;
+ /**
+ * 连接建立时间
+ */
+ private Long createdAtInSeconds;
+ /**
+ * 认证时间(s)
+ */
+ private Long authedAtInSeconds;
+ /**
+ * 最后一次接收有效数据时间(包括心跳包)
+ */
+ private Long lastDataReceivedInSeconds;
+ /**
+ * 最后一次发送有效数据时间(包括心跳包)
+ */
+ private Long lastDataSendInSeconds;
+ /**
+ * 接收到数据条数(每接收到一条协议加1,包括心跳)
+ */
+ private long dataReceivedCount;
+ /**
+ * 发送数据条数(每发送到一条协议加1,包括心跳)
+ */
+ private long dataSendCount;
+
+ /**
+ * mqType,仅用于硬件设备认证时指定消息转发给哪个mq
+ */
+ private Byte mqType;
+ /**业务相关参数*/
+ private String message;
+ /**寄存器 硬件系统*/
+ private Map register = new HashMap<>();
+
+ public WrapperedChannel(Channel channel){
+ this.channel = channel;
+ this.createdAtInSeconds = DateUtil.currentSeconds();
+ }
+
+ public WrapperedChannel(Channel channel,String type){
+ this(channel);
+ this.type = type;
+ }
+
+ public WrapperedChannel(Channel channel,String type,int major,int minor){
+ this(channel,type);
+ this.version = new ClientVersion(major,minor);
+ }
+
+ public String getVersion(){
+ if(ObjectUtil.isNotNull(version)){
+ return "v" + version.getMajor() + "." + version.getMinor();
+ }
+ return null;
+ }
+
+ public void setVersion(int major,int minor){
+ if (ObjectUtil.isNull(version)) {
+ version = new ClientVersion(major, minor);
+ } else {
+ version.setMajor(major);
+ version.setMinor(minor);
+ }
+ }
+
+ public String getRemoteAddr(){
+ if(ObjectUtil.isNotNull(channel)){
+ InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();
+ if(ObjectUtil.isNotNull(insocket)) {
+ return insocket.getAddress().getHostAddress();
+ }
+ }
+ return null;
+ }
+ public Integer getRemotePort(){
+ if(ObjectUtil.isNotNull(channel)){
+ InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();
+ if(ObjectUtil.isNotNull(insocket)) {
+ return insocket.getPort();
+ }
+ }
+ return null;
+ }
+
+ public String getId(){
+ if(ObjectUtil.isNotNull(channel)){
+ return channel.id().asLongText();
+ }
+ return null;
+ }
+
+ public void whenReceivedData(){
+ lastDataReceivedInSeconds = DateUtil.currentSeconds();
+ dataReceivedCount++;
+ }
+
+ public void whenSendData(){
+ lastDataSendInSeconds = DateUtil.currentSeconds();
+ dataSendCount++;
+ }
+
+ public void whenAuthed(){
+ authed = true;
+ authedAtInSeconds = DateUtil.currentSeconds();
+ }
+ public void whenAuthed(String userId,int major,int minor){
+ whenAuthed();
+ setVersion(major,minor);
+ this.userId = userId;
+ }
+ public void whenAuthed(String userId,int major,int minor, String message, Byte mqType){
+ whenAuthed();
+ setVersion(major,minor);
+ this.userId = userId;
+ this.message = message;
+ this.mqType = mqType;
+ if (StrUtil.isNotEmpty(message)) {
+ try {
+ Map map = JacksonUtil.jsonToMap(message);
+ for (String key: map.keySet()) {
+ Object o = map.get(key);
+ if (o instanceof Integer) {
+ register.put(key, (long)(Integer) map.get(key));
+ } else {
+ register.put(key, (long) map.get(key));
+ }
+
+ }
+ } catch (IOException e) {
+ log.error("message转map异常", e);
+ }
+ }
+
+ }
+
+ public void writeAndFlush(Object message){
+ if(channel != null && channel.isActive()){
+ channel.writeAndFlush(message);
+ whenSendData();
+ }
+ }
+
+ public Long getConnectedSeconds(){
+ return createdAtInSeconds == null ? null : DateUtil.currentSeconds() - createdAtInSeconds;
+ }
+
+ public Long getOnlineSeconds(){
+ return authedAtInSeconds == null ? null : DateUtil.currentSeconds() - authedAtInSeconds ;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if(ObjectUtil.isNull(obj)) {
+ return false;
+ }
+ if(this == obj) {
+ return true;
+ }
+ if(ObjectUtil.isNull(channel)) {
+ return false;
+ }
+ if(obj.getClass() == this.getClass()){
+ WrapperedChannel other = (WrapperedChannel)obj;
+ return channel.equals(other.channel);
+ }else if(obj.getClass() == channel.getClass()){
+ Channel other = (Channel)obj;
+ return channel.equals(other);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ if(ObjectUtil.isNotNull(channel)){
+ return channel.hashCode();
+ }
+ return 0;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("id: %s, type: %s, authed: %b, userId: %s, version: %s",getId(),type,authed,userId,getVersion());
+ }
+}
diff --git a/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/NettyWsServer.java b/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/NettyWsServer.java
new file mode 100644
index 00000000..1fa7202a
--- /dev/null
+++ b/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/NettyWsServer.java
@@ -0,0 +1,76 @@
+package com.ccsens.recovery.netty.wsserver;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author wei
+ */
+@Slf4j
+@Component
+public class NettyWsServer {
+ private static final String URI = "/recovery/ws";
+ private static final short PORT = 7191;
+
+ @Autowired
+ private WebSocketHandler webSocketHandler;
+
+ @Async
+ public void start() {
+ // Configure the server.
+ EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+ try {
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, 100)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new ChannelInitializer() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(new IdleStateHandler(40,
+ 0, 0, TimeUnit.SECONDS));
+ p.addLast(new HttpServerCodec());
+ p.addLast(new HttpObjectAggregator(64 * 1024));
+ p.addLast(new ChunkedWriteHandler());
+ p.addLast(new WebSocketServerProtocolHandler(URI));
+ p.addLast(new WebSocketDecoder());
+ p.addLast(new WebSocketEncoder());
+ p.addLast(webSocketHandler);
+ }
+ });
+ // Start the server.
+ ChannelFuture f = b.bind(PORT).sync();
+ log.info("WS端口号:{}",PORT);
+ // Wait until the server socket is closed.
+ f.channel().closeFuture().sync();
+ log.info("WS端口号:{}",PORT);
+ } catch(Exception e){
+ log.error("ss",e);
+ e.printStackTrace();
+ }finally {
+ // Shut down all event loops to terminate all threads.
+ log.info("WS端口号:{}",PORT);
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+ }
+ }
+}
diff --git a/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketDecoder.java b/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketDecoder.java
new file mode 100644
index 00000000..bed82669
--- /dev/null
+++ b/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketDecoder.java
@@ -0,0 +1,33 @@
+package com.ccsens.recovery.netty.wsserver;
+
+import com.ccsens.util.JacksonUtil;
+import com.ccsens.util.bean.message.common.InMessage;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * @author wei
+ */
+public class WebSocketDecoder extends MessageToMessageDecoder {
+ private static Logger logger = LoggerFactory.getLogger(WebSocketDecoder.class);
+
+ @Override
+ protected void decode(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg, List