From 409eba5b5af395619bedfe4d3fe3acf726b815f9 Mon Sep 17 00:00:00 2001 From: zy_Java <654600784@qq.com> Date: Wed, 9 Jun 2021 09:40:12 +0800 Subject: [PATCH] =?UTF-8?q?20210609=E5=BA=B7=E5=A4=8D=E6=B7=BB=E5=8A=A0ws?= =?UTF-8?q?=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../game/netty/wsserver/WebSocketDecoder.java | 1 - game/src/main/resources/application.yml | 4 +- pom.xml | 2 +- .../ccsens/recovery/RecoveryApplication.java | 12 +- .../recovery/bean/message/AckMessageDto.java | 29 ++ .../recovery/bean/message/AuthMessageDto.java | 30 ++ .../message/AuthMessageWithAnswerDto.java | 29 ++ .../recovery/bean/message/BaseMessageDto.java | 24 +- .../bean/message/HeartMessageDto.java | 20 + .../ccsens/recovery/netty/ChannelManager.java | 388 ++++++++++++++++++ .../recovery/netty/WrapperedChannel.java | 239 +++++++++++ .../netty/wsserver/NettyWsServer.java | 76 ++++ .../netty/wsserver/WebSocketDecoder.java | 33 ++ .../netty/wsserver/WebSocketEncoder.java | 25 ++ .../netty/wsserver/WebSocketHandler.java | 151 +++++++ .../recovery/service/IMessageService.java | 19 + .../recovery/service/MessageService.java | 60 +++ recovery/src/main/resources/application.yml | 4 +- .../util/bean/message/common/InMessage.java | 50 +-- .../bean/message/common/MessageConstant.java | 105 +++-- .../ccsens/util/message/BaseMessageDto.java | 24 +- 21 files changed, 1219 insertions(+), 106 deletions(-) create mode 100644 recovery/src/main/java/com/ccsens/recovery/bean/message/AckMessageDto.java create mode 100644 recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageDto.java create mode 100644 recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageWithAnswerDto.java create mode 100644 recovery/src/main/java/com/ccsens/recovery/bean/message/HeartMessageDto.java create mode 100644 recovery/src/main/java/com/ccsens/recovery/netty/ChannelManager.java create mode 100644 recovery/src/main/java/com/ccsens/recovery/netty/WrapperedChannel.java create mode 100644 recovery/src/main/java/com/ccsens/recovery/netty/wsserver/NettyWsServer.java create mode 100644 recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketDecoder.java create mode 100644 recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketEncoder.java create mode 100644 recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketHandler.java create mode 100644 recovery/src/main/java/com/ccsens/recovery/service/IMessageService.java create mode 100644 recovery/src/main/java/com/ccsens/recovery/service/MessageService.java diff --git a/game/src/main/java/com/ccsens/game/netty/wsserver/WebSocketDecoder.java b/game/src/main/java/com/ccsens/game/netty/wsserver/WebSocketDecoder.java index 76c43f54..0d359b13 100644 --- a/game/src/main/java/com/ccsens/game/netty/wsserver/WebSocketDecoder.java +++ b/game/src/main/java/com/ccsens/game/netty/wsserver/WebSocketDecoder.java @@ -10,7 +10,6 @@ import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.List; /** diff --git a/game/src/main/resources/application.yml b/game/src/main/resources/application.yml index d082c0ea..5c2cd5c4 100644 --- a/game/src/main/resources/application.yml +++ b/game/src/main/resources/application.yml @@ -1,4 +1,4 @@ spring: profiles: - active: prod - include: common, util-prod \ No newline at end of file + active: dev + include: common, util-dev \ No newline at end of file diff --git a/pom.xml b/pom.xml index 7562a898..21b355a2 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ tall recovery - + game mt wisdomcar diff --git a/recovery/src/main/java/com/ccsens/recovery/RecoveryApplication.java b/recovery/src/main/java/com/ccsens/recovery/RecoveryApplication.java index 6bcf6394..d34d2d88 100644 --- a/recovery/src/main/java/com/ccsens/recovery/RecoveryApplication.java +++ b/recovery/src/main/java/com/ccsens/recovery/RecoveryApplication.java @@ -1,6 +1,8 @@ package com.ccsens.recovery; +import com.ccsens.recovery.netty.wsserver.NettyWsServer; import org.mybatis.spring.annotation.MapperScan; +import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.ServletComponentScan; @@ -8,6 +10,8 @@ import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.scheduling.annotation.EnableAsync; +import javax.annotation.Resource; + /** * @author 逗 @@ -19,10 +23,16 @@ import org.springframework.scheduling.annotation.EnableAsync; @EnableCircuitBreaker @EnableFeignClients(basePackages = "com.ccsens.cloudutil.feign") @SpringBootApplication(scanBasePackages = "com.ccsens") -public class RecoveryApplication { +public class RecoveryApplication implements CommandLineRunner { + @Resource + private NettyWsServer nettyWsServer; public static void main(String[] args) { SpringApplication.run(RecoveryApplication.class, args); } + @Override + public void run(String... args) throws Exception { + nettyWsServer.start(); + } } diff --git a/recovery/src/main/java/com/ccsens/recovery/bean/message/AckMessageDto.java b/recovery/src/main/java/com/ccsens/recovery/bean/message/AckMessageDto.java new file mode 100644 index 00000000..3cd8c8aa --- /dev/null +++ b/recovery/src/main/java/com/ccsens/recovery/bean/message/AckMessageDto.java @@ -0,0 +1,29 @@ +package com.ccsens.recovery.bean.message; + +import com.ccsens.util.WebConstant; +import lombok.Data; + +@Data +public class AckMessageDto extends BaseMessageDto { + @lombok.Data + public static class Data { + Long msgId; + } + + private Data data; + + public AckMessageDto(){ + setType(WebConstant.Message_Type.Ack.phase); + setEvent(WebConstant.Message_Ack_Event.Ack.phase); + setTime(System.currentTimeMillis()); + } + + public AckMessageDto(Long projectId, Long msgId){ + this(); + setProjectId(projectId); + + Data d = new Data(); + d.setMsgId(msgId); + setData(d); + } +} diff --git a/recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageDto.java b/recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageDto.java new file mode 100644 index 00000000..e5f77bda --- /dev/null +++ b/recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageDto.java @@ -0,0 +1,30 @@ +package com.ccsens.recovery.bean.message; + +import com.ccsens.util.WebConstant; +import lombok.Data; + +@Data +public class AuthMessageDto extends BaseMessageDto { + @lombok.Data + public static class Data{ + private String userId; + private String channelId; + private String token; + } + + private Data data; + + public AuthMessageDto(){ + setType(WebConstant.Message_Type.Auth.phase); + setEvent(WebConstant.Message_Auth_Event.Auth.phase); + setTime(System.currentTimeMillis()); + } + + public AuthMessageDto(String userId, String channelId){ + this(); + Data d = new Data(); + d.setUserId(userId); + d.setChannelId(channelId); + setData(d); + } +} diff --git a/recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageWithAnswerDto.java b/recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageWithAnswerDto.java new file mode 100644 index 00000000..7639ad96 --- /dev/null +++ b/recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageWithAnswerDto.java @@ -0,0 +1,29 @@ +package com.ccsens.recovery.bean.message; + +import com.ccsens.util.WebConstant; +import lombok.Data; + +@Data +public class AuthMessageWithAnswerDto extends BaseMessageDto{ + @lombok.Data + public static class Data{ + private Boolean success; + private String phase; + } + + private Data data; + + public AuthMessageWithAnswerDto(){ + setType(WebConstant.Message_Type.Auth.phase); + setEvent(WebConstant.Message_Auth_Event.Answer.phase); + setTime(System.currentTimeMillis()); + } + + public AuthMessageWithAnswerDto(Boolean success, String phase){ + this(); + Data d = new Data(); + d.setSuccess(success); + d.setPhase(phase); + setData(d); + } +} diff --git a/recovery/src/main/java/com/ccsens/recovery/bean/message/BaseMessageDto.java b/recovery/src/main/java/com/ccsens/recovery/bean/message/BaseMessageDto.java index 4a1e5983..f6ffe329 100644 --- a/recovery/src/main/java/com/ccsens/recovery/bean/message/BaseMessageDto.java +++ b/recovery/src/main/java/com/ccsens/recovery/bean/message/BaseMessageDto.java @@ -51,16 +51,16 @@ public class BaseMessageDto { private List receivers; // private Object data; - public Set receiversTransTos() { - Set tos = new HashSet<>(); - if (CollectionUtil.isEmpty(receivers)) { - return tos; - } - receivers.forEach(receiver -> { - InMessage.To to = new InMessage.To(receiver.getUserId()); - tos.add(JSONObject.toJSONString(to)); - }); - - return tos; - } +// public Set receiversTransTos() { +// Set tos = new HashSet<>(); +// if (CollectionUtil.isEmpty(receivers)) { +// return tos; +// } +// receivers.forEach(receiver -> { +// InMessage.To to = new InMessage.To(receiver.getUserId()); +// tos.add(JSONObject.toJSONString(to)); +// }); +// +// return tos; +// } } diff --git a/recovery/src/main/java/com/ccsens/recovery/bean/message/HeartMessageDto.java b/recovery/src/main/java/com/ccsens/recovery/bean/message/HeartMessageDto.java new file mode 100644 index 00000000..c69931df --- /dev/null +++ b/recovery/src/main/java/com/ccsens/recovery/bean/message/HeartMessageDto.java @@ -0,0 +1,20 @@ +package com.ccsens.recovery.bean.message; + +import com.ccsens.util.WebConstant; +import lombok.Data; + +@Data +public class HeartMessageDto extends BaseMessageDto{ + @lombok.Data + public static class Data{ + private int major; + private int minor; + } + private Data data; + + public HeartMessageDto(){ + setType(WebConstant.Message_Type.Heart.phase); + setEvent(WebConstant.Message_Heart_Event.Heart.phase); + setTime(System.currentTimeMillis()); + } +} diff --git a/recovery/src/main/java/com/ccsens/recovery/netty/ChannelManager.java b/recovery/src/main/java/com/ccsens/recovery/netty/ChannelManager.java new file mode 100644 index 00000000..1a3b4a38 --- /dev/null +++ b/recovery/src/main/java/com/ccsens/recovery/netty/ChannelManager.java @@ -0,0 +1,388 @@ +package com.ccsens.recovery.netty; + +import cn.hutool.core.collection.CollectionUtil; +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author wei + */ +@Slf4j +@Component +public class ChannelManager { + private static Logger logger = LoggerFactory.getLogger(ChannelManager.class); + private static ThreadLocal threadLocal = new ThreadLocal<>(); + + /** + * UserId,WrappedChannel authed channels; + */ + private static Map> authedChannels; + /** + * Channel,WrapperedChannel + */ + private static Map rawChannels; + + static { + authedChannels = new ConcurrentHashMap<>(); + rawChannels = new ConcurrentHashMap<>(); + } + + /** + * 私有构造,不允许生成该类对象 + */ + private ChannelManager(){} + + /** + * 设置当前正在错误的channel + * @param channel netty ws/tcp连接 + */ + public static void setCurrentChannel(Channel channel){ + threadLocal.set(channel); + } + + /** + * 获取当前线程的channel对象 + * @return channel + */ + public static Channel getCurrentChannel(){ + return threadLocal.get(); + } + + /** + * 删除当前线程的channel对象 + */ + public static void removeCurrentChannel(){ + threadLocal.remove(); + } + + /** + * 添加一个新的连接(Channel) + * @param channel 新的连接 + * @param serverType 服务器类型 + */ + public static synchronized void addChannel(Channel channel,String serverType){ + logger.debug("Invoke addChannel({},{})",channel,serverType); + if(null != channel) { + rawChannels.put(channel, new WrapperedChannel(channel, serverType)); + logger.debug("Add a new channel: {},{}",channel.id().asLongText(),serverType); + }else{ + logger.error("channel is null"); + } + } + + /** + * 用户认证 + * @param channel 连接 + * @param userId 用户ID + * @param major 客户端主版本号 + * @param minor 客户端次版本号 + * @param message 业务相关参数 + * @param mqType 硬件设备消息转发mq类型 + */ + public static synchronized void authChannel(Channel channel,String userId,Integer major,Integer minor, String message, Byte mqType){ + logger.debug("Invoke authedChannels({},{},{},{})",channel.id().asLongText(),userId,major,minor); + major = major != null ? major : 0; + minor = minor != null ? minor : 0; + + WrapperedChannel wrapperedChannel = rawChannels.get(channel); + if(wrapperedChannel != null){ + wrapperedChannel.whenAuthed(userId,major,minor, message, mqType); + Set authedWchannelSet = authedChannels.computeIfAbsent(userId, k -> new HashSet<>()); + authedWchannelSet.add(wrapperedChannel); + logger.debug("Authed channel {} with user {}", channel.id().asLongText(), userId); + }else{ + logger.error("Authed channel,but wrappedChannel is null."); + } + } + /** + * 用户认证 + * @param channel 连接 + * @param userId 用户ID + * @param major 客户端主版本号 + * @param minor 客户端次版本号 + */ + public static synchronized void authChannel(Channel channel,String userId,Integer major,Integer minor){ + major = major != null ? major : 0; + minor = minor != null ? minor : 0; + WrapperedChannel wrapperedChannel = rawChannels.get(channel); + if(wrapperedChannel != null){ + wrapperedChannel.whenAuthed(userId,major,minor); + Set authedWchannelSet = authedChannels.computeIfAbsent(userId, k -> new HashSet<>()); + + authedWchannelSet.add(wrapperedChannel); + }else{ + log.error("Authed channel,but wrappedChannel is null."); + } + } + + /** + * 移除一个连接(Channel) + * @param channel 要移除的连接 + */ + public static synchronized void removeChannel(Channel channel){ + logger.debug("Invoke removeChannel({})",channel.id().asLongText()); + WrapperedChannel wrapperedChannel = rawChannels.get(channel); + if(wrapperedChannel != null){ + //从rawChannels集合中删除 + rawChannels.remove(channel); + logger.debug("Remove a channel from rawChannels: {}",channel.id().asLongText()); + if(wrapperedChannel.isAuthed()){ + Set authedChannelSet = authedChannels.get(wrapperedChannel.getUserId()); + //从authedChannel集合的value(set)中删除 + if(CollectionUtil.isNotEmpty(authedChannelSet)){ + authedChannelSet.remove(wrapperedChannel); + logger.debug("Remove a channel from authedChannelSet: {}, {}",wrapperedChannel.getUserId(),channel.id().asLongText()); + } + //从authedChannel中删除,此处不用else,因为if中语句执行完毕之后,authedChannelSet也可能变成空集合 + if(CollectionUtil.isEmpty(authedChannelSet)){ + authedChannels.remove(wrapperedChannel.getUserId()); + logger.debug("Remove a user from authedChannels: {}",wrapperedChannel.getUserId()); + } + } + }else{ + logger.error("Remove channel,but wrappedChannel is null."); + } + + if(channel.isOpen() || channel.isActive()){ + channel.close(); + } + } + + /** + * 移除一个用户 + * @param userId 要移除的用户 + */ + public static synchronized void removeUser(String userId){ + logger.debug("Invoke remove user : {}",userId); + Set wChannelSet = authedChannels.get(userId); + if(CollectionUtil.isNotEmpty(wChannelSet)){ + for(WrapperedChannel wChannel : wChannelSet){ + //从rawChannel中依次删除 + rawChannels.remove(wChannel.getChannel()); + logger.debug("Remove a channel from rawChannels: {}",wChannel.getChannel().id().asLongText()); + } + } + //从authedChannel中删除 + authedChannels.remove(userId); + logger.debug("Remove a user from authedChannels: {}",userId); + } + + /** + * 添加版本号 + * 只能给已认证的请求添加版本号 + * @param channel 连接 + * @param major 主版本号 + * @param minor 次版本号 + */ + public static synchronized void versionChannel(Channel channel,int major,int minor){ + logger.debug("Invoke Version channel({},{},{}))",channel.id().asLongText(),major,minor); + WrapperedChannel wChannel = rawChannels.get(channel); + if(wChannel != null){ + wChannel.setVersion(major,minor); + logger.debug("Version Channel: {},{},{}",channel.id().asLongText(),major,minor); + }else{ + logger.error("Remove channel,but wrappedChannel is null."); + } + } + + /** + * 发送广播消息给所有已认证Channel + * @param message 消息 + */ + public static synchronized void broadCastAuthed(Object message) { + logger.debug("Invoke broadCastAuthed({})",message); + for (Map.Entry entry : rawChannels.entrySet()) { + WrapperedChannel wChannel = entry.getValue(); + if(wChannel.isAuthed()) { + wChannel.writeAndFlush(message); + logger.debug("Send Message {} to {},{}",message,wChannel.getUserId(),wChannel.getId()); + } + } + } + + /** + * 发送广播消息 + * @param message 消息 + */ + public static synchronized void broadCast(Object message) { + logger.debug("Invoke broadCast({})",message); + for (Map.Entry entry : rawChannels.entrySet()) { + entry.getValue().writeAndFlush(message); + logger.debug("Send Message {} to {}",message,entry.getValue().getId()); + } + } + + /** + * 发送消息给某个连接 + * @param channel 连接 + * @param message 消息 + */ + public static synchronized void sendTo(Channel channel,Object message){ + log.info("Invoke sendTo({},{})",channel.id().asLongText(),message); + WrapperedChannel wrapperedChannel = rawChannels.get(channel); + if(wrapperedChannel != null) { + wrapperedChannel.writeAndFlush(message); + log.info("Write message {} to Channel {}",message,channel); + }else{ + log.info("can't find channel from rawChannels"); + } + } + + /** + * 发送消息给某个用户 + * @param userId 用户ID + * @param message 消息 + */ + public static synchronized boolean sendTo(String userId,Object message){ + logger.info("Invoke sendTo({},{})",userId,message); + Set wChannelSet = authedChannels.get(userId); + if(CollectionUtil.isNotEmpty(wChannelSet)){ + for(WrapperedChannel wChannel:wChannelSet){ + wChannel.writeAndFlush(message); + logger.info("Send message {} to channel {}",message,userId); + } + return true; + }else{ + return false; + } + } + + /** + * 刷新最后一次接收数据时间 每次接收数据需要调用该函数 + * @param channel 接收数据的连接 + */ + public static synchronized void flushReceiveTimestamp(Channel channel){ + logger.debug("Invoke flushReceiveTimestamp({})",channel.id().asLongText()); + WrapperedChannel wrapperedChannel = rawChannels.get(channel); + if(wrapperedChannel != null){ + wrapperedChannel.whenReceivedData(); + }else{ + logger.error("can find channel from rawChannels"); + } + } + + /** + * 关闭所有未认证的连接 + * @param unAuthedChannelsMaxAliveTimeInSeconds 未认证的最大时长(s) + * @throws Exception + * @deprecated 已过期,该方法在多线程并发下可能出现问题,建议使用messageService中的同名方法 + */ + @Deprecated + public static synchronized void closeUnAuthedChannels(Long unAuthedChannelsMaxAliveTimeInSeconds) throws Exception { + logger.debug("Inovke closeUnAuthedChannels({})",unAuthedChannelsMaxAliveTimeInSeconds); + Iterator> it = rawChannels.entrySet().iterator(); + while(it.hasNext()){ + Map.Entry entry = it.next(); + WrapperedChannel wrapperedChannel = entry.getValue(); + if((!wrapperedChannel.isAuthed()) && (wrapperedChannel.getConnectedSeconds() > unAuthedChannelsMaxAliveTimeInSeconds)) { + it.remove(); + //关闭连接 + wrapperedChannel.getChannel().close(); + logger.debug("Remove a unAuthed Channel {}, which has connected {}s,maxOutTime is {}s", wrapperedChannel.getId(),wrapperedChannel.getConnectedSeconds() , + unAuthedChannelsMaxAliveTimeInSeconds ); + } + } + } + + /** + * 判断Channel是否存在 + * @param channel 连接 + * @return 是否存在 + */ + public static synchronized boolean channelExist(Channel channel){ + return rawChannels.containsKey(channel); + } + + /** + * 判断Channel是否已经认证 + * @param channel 连接 + * @return 是否认证 + */ + public static synchronized boolean channelAuthed(Channel channel){ + return rawChannels.containsKey(channel) && rawChannels.get(channel).isAuthed(); + } + + /** + * 判断用户是否在线 + * @param userId 用户ID + * @return 是否认证 + */ + public static synchronized boolean isUserOnline(String userId){ + return authedChannels.containsKey(userId); + } + + /** + * 根据Channel获取对应的用户ID + * @param channel 连接 + * @return 用户ID + */ + public static synchronized String getUserIdByChannel(Channel channel) { + return rawChannels.containsKey(channel) ? rawChannels.get(channel).getUserId() : null; + } + + /** + * 获取所有在线用户 + * @return 所有用户的集合列表 + */ + public static synchronized Set getOnlineUsers(){ + return authedChannels.keySet(); + } + + /** + * 获取某种类型的所有在线用户的连接 + * @param type 客户端类型(ws/tcp modebus/tcp text) + * @return 所有在西安channel的集合列表 + */ + public static synchronized Set getOnlineChannels(String type){ + Set onLineChannels = CollectionUtil.newHashSet(); + for(Map.Entry entry: rawChannels.entrySet()){ + WrapperedChannel wrapperedChannel = entry.getValue(); + if(wrapperedChannel.isAuthed() && wrapperedChannel.getType().equals(type)){ + onLineChannels.add(entry.getKey()); + } + } + return onLineChannels; + } + + /** + * 获取一个rawChannel的副本 + * @return 副本 + */ + public static Map getCopyOfAllChannels() { + Map copyMap = new HashMap<>(rawChannels.size()); + copyMap.putAll(rawChannels); + return copyMap; + } + + /** + * 根据Channel获取WrapperedChannel + * @param channel channel + * @return 对应的wrapperedChannel + */ + public static WrapperedChannel getWrapperedChannelByChannel(Channel channel) { + return rawChannels.get(channel); + } + + public static synchronized void showAuthedChannels(){ + for(Map.Entry> entry : authedChannels.entrySet()){ + for (WrapperedChannel channel:entry.getValue()){ + logger.debug("{}-->{}",entry.getKey(),channel.toString()); + } + } + } + public static synchronized void showAllChannels(){ + for(Map.Entry entry : rawChannels.entrySet()){ + logger.debug(entry.getValue().toString()); + } + } + + public static Set getAllOnlineUsers() { + return authedChannels.keySet(); + } +} diff --git a/recovery/src/main/java/com/ccsens/recovery/netty/WrapperedChannel.java b/recovery/src/main/java/com/ccsens/recovery/netty/WrapperedChannel.java new file mode 100644 index 00000000..d15dbf92 --- /dev/null +++ b/recovery/src/main/java/com/ccsens/recovery/netty/WrapperedChannel.java @@ -0,0 +1,239 @@ +package com.ccsens.recovery.netty; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.ccsens.util.DateUtil; +import com.ccsens.util.JacksonUtil; +import io.netty.channel.Channel; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; + +/** + * @author wei + */ +@Slf4j +@Getter +@Setter +public class WrapperedChannel { + @Getter + @Setter + private static class ClientVersion{ + int major; + int minor; + public ClientVersion(int major,int minor){ + this.major = major; + this.minor = minor; + } + } + + /** + * Netty's Channel + */ + private Channel channel; + /** + * userId + */ + private String userId; + /** + * 客户端版本号 + */ + private ClientVersion version; + + /** + * 客户端类型(哪种服务器) + */ + private String type; + /** + * 是否认证 + */ + private boolean authed; + /** + * 连接建立时间 + */ + private Long createdAtInSeconds; + /** + * 认证时间(s) + */ + private Long authedAtInSeconds; + /** + * 最后一次接收有效数据时间(包括心跳包) + */ + private Long lastDataReceivedInSeconds; + /** + * 最后一次发送有效数据时间(包括心跳包) + */ + private Long lastDataSendInSeconds; + /** + * 接收到数据条数(每接收到一条协议加1,包括心跳) + */ + private long dataReceivedCount; + /** + * 发送数据条数(每发送到一条协议加1,包括心跳) + */ + private long dataSendCount; + + /** + * mqType,仅用于硬件设备认证时指定消息转发给哪个mq + */ + private Byte mqType; + /**业务相关参数*/ + private String message; + /**寄存器 硬件系统*/ + private Map register = new HashMap<>(); + + public WrapperedChannel(Channel channel){ + this.channel = channel; + this.createdAtInSeconds = DateUtil.currentSeconds(); + } + + public WrapperedChannel(Channel channel,String type){ + this(channel); + this.type = type; + } + + public WrapperedChannel(Channel channel,String type,int major,int minor){ + this(channel,type); + this.version = new ClientVersion(major,minor); + } + + public String getVersion(){ + if(ObjectUtil.isNotNull(version)){ + return "v" + version.getMajor() + "." + version.getMinor(); + } + return null; + } + + public void setVersion(int major,int minor){ + if (ObjectUtil.isNull(version)) { + version = new ClientVersion(major, minor); + } else { + version.setMajor(major); + version.setMinor(minor); + } + } + + public String getRemoteAddr(){ + if(ObjectUtil.isNotNull(channel)){ + InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress(); + if(ObjectUtil.isNotNull(insocket)) { + return insocket.getAddress().getHostAddress(); + } + } + return null; + } + public Integer getRemotePort(){ + if(ObjectUtil.isNotNull(channel)){ + InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress(); + if(ObjectUtil.isNotNull(insocket)) { + return insocket.getPort(); + } + } + return null; + } + + public String getId(){ + if(ObjectUtil.isNotNull(channel)){ + return channel.id().asLongText(); + } + return null; + } + + public void whenReceivedData(){ + lastDataReceivedInSeconds = DateUtil.currentSeconds(); + dataReceivedCount++; + } + + public void whenSendData(){ + lastDataSendInSeconds = DateUtil.currentSeconds(); + dataSendCount++; + } + + public void whenAuthed(){ + authed = true; + authedAtInSeconds = DateUtil.currentSeconds(); + } + public void whenAuthed(String userId,int major,int minor){ + whenAuthed(); + setVersion(major,minor); + this.userId = userId; + } + public void whenAuthed(String userId,int major,int minor, String message, Byte mqType){ + whenAuthed(); + setVersion(major,minor); + this.userId = userId; + this.message = message; + this.mqType = mqType; + if (StrUtil.isNotEmpty(message)) { + try { + Map map = JacksonUtil.jsonToMap(message); + for (String key: map.keySet()) { + Object o = map.get(key); + if (o instanceof Integer) { + register.put(key, (long)(Integer) map.get(key)); + } else { + register.put(key, (long) map.get(key)); + } + + } + } catch (IOException e) { + log.error("message转map异常", e); + } + } + + } + + public void writeAndFlush(Object message){ + if(channel != null && channel.isActive()){ + channel.writeAndFlush(message); + whenSendData(); + } + } + + public Long getConnectedSeconds(){ + return createdAtInSeconds == null ? null : DateUtil.currentSeconds() - createdAtInSeconds; + } + + public Long getOnlineSeconds(){ + return authedAtInSeconds == null ? null : DateUtil.currentSeconds() - authedAtInSeconds ; + } + + @Override + public boolean equals(Object obj) { + if(ObjectUtil.isNull(obj)) { + return false; + } + if(this == obj) { + return true; + } + if(ObjectUtil.isNull(channel)) { + return false; + } + if(obj.getClass() == this.getClass()){ + WrapperedChannel other = (WrapperedChannel)obj; + return channel.equals(other.channel); + }else if(obj.getClass() == channel.getClass()){ + Channel other = (Channel)obj; + return channel.equals(other); + } + return false; + } + + @Override + public int hashCode() { + if(ObjectUtil.isNotNull(channel)){ + return channel.hashCode(); + } + return 0; + } + + @Override + public String toString() { + return String.format("id: %s, type: %s, authed: %b, userId: %s, version: %s",getId(),type,authed,userId,getVersion()); + } +} diff --git a/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/NettyWsServer.java b/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/NettyWsServer.java new file mode 100644 index 00000000..1fa7202a --- /dev/null +++ b/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/NettyWsServer.java @@ -0,0 +1,76 @@ +package com.ccsens.recovery.netty.wsserver; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.handler.timeout.IdleStateHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +/** + * @author wei + */ +@Slf4j +@Component +public class NettyWsServer { + private static final String URI = "/recovery/ws"; + private static final short PORT = 7191; + + @Autowired + private WebSocketHandler webSocketHandler; + + @Async + public void start() { + // Configure the server. + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new IdleStateHandler(40, + 0, 0, TimeUnit.SECONDS)); + p.addLast(new HttpServerCodec()); + p.addLast(new HttpObjectAggregator(64 * 1024)); + p.addLast(new ChunkedWriteHandler()); + p.addLast(new WebSocketServerProtocolHandler(URI)); + p.addLast(new WebSocketDecoder()); + p.addLast(new WebSocketEncoder()); + p.addLast(webSocketHandler); + } + }); + // Start the server. + ChannelFuture f = b.bind(PORT).sync(); + log.info("WS端口号:{}",PORT); + // Wait until the server socket is closed. + f.channel().closeFuture().sync(); + log.info("WS端口号:{}",PORT); + } catch(Exception e){ + log.error("ss",e); + e.printStackTrace(); + }finally { + // Shut down all event loops to terminate all threads. + log.info("WS端口号:{}",PORT); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } +} diff --git a/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketDecoder.java b/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketDecoder.java new file mode 100644 index 00000000..bed82669 --- /dev/null +++ b/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketDecoder.java @@ -0,0 +1,33 @@ +package com.ccsens.recovery.netty.wsserver; + +import com.ccsens.util.JacksonUtil; +import com.ccsens.util.bean.message.common.InMessage; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * @author wei + */ +public class WebSocketDecoder extends MessageToMessageDecoder { + private static Logger logger = LoggerFactory.getLogger(WebSocketDecoder.class); + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg, List out) throws Exception { + String text = msg.text(); + logger.info("Websocket received: {}",text); + + try { + out.add(JacksonUtil.jsonToBean(text, InMessage.class)); + }catch(IOException e){ + e.printStackTrace(); + logger.error("Websocket Read Error: {}",text); + throw e; + } + } +} diff --git a/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketEncoder.java b/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketEncoder.java new file mode 100644 index 00000000..320bc44c --- /dev/null +++ b/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketEncoder.java @@ -0,0 +1,25 @@ +package com.ccsens.recovery.netty.wsserver; + +import com.ccsens.util.JacksonUtil; +import com.ccsens.util.bean.message.common.OutMessageSet; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author wei + */ +public class WebSocketEncoder extends MessageToByteEncoder { + private static Logger logger = LoggerFactory.getLogger(WebSocketEncoder.class); + + @Override + protected void encode(ChannelHandlerContext ctx, OutMessageSet outMessageSet, ByteBuf out) throws Exception { + String msg = JacksonUtil.beanToJson(outMessageSet); + ctx.writeAndFlush(new TextWebSocketFrame(msg)); + + logger.info("Websocket send: {}",msg); + } +} diff --git a/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketHandler.java b/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketHandler.java new file mode 100644 index 00000000..0e5e8bf0 --- /dev/null +++ b/recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketHandler.java @@ -0,0 +1,151 @@ +package com.ccsens.recovery.netty.wsserver; + + +import cn.hutool.core.util.StrUtil; +import com.ccsens.recovery.bean.message.AckMessageDto; +import com.ccsens.recovery.bean.message.AuthMessageDto; +import com.ccsens.recovery.bean.message.BaseMessageDto; +import com.ccsens.recovery.netty.ChannelManager; +import com.ccsens.recovery.service.IMessageService; +import com.ccsens.util.JacksonUtil; +import com.ccsens.util.WebConstant; +import com.ccsens.util.bean.message.client.PingMessage; +import com.ccsens.util.bean.message.common.InMessage; +import com.ccsens.util.bean.message.common.MessageConstant; +import com.ccsens.util.bean.message.common.OutMessage; +import com.ccsens.util.bean.message.common.OutMessageSet; +import com.ccsens.util.bean.message.server.PongMessage; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + + +/** + * @author wei + */ +@Slf4j +@ChannelHandler.Sharable +@Component +public class WebSocketHandler extends SimpleChannelInboundHandler { + private static final String TYPE = "netty_ws"; + + @Resource + private IMessageService messageService; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ChannelManager.addChannel(ctx.channel(), TYPE); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + ChannelManager.removeChannel(ctx.channel()); + } + +// @Override +// public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +// try { +// ChannelManager.setCurrentChannel(ctx.channel()); +// MessageHandler.handleMessage( +// InMessage.newToServerMessage(MessageConstant.DomainType.User,new UnExceptedErrorMessage(cause.getMessage()))); +// }catch(Exception e){ +// e.printStackTrace(); +// log.error("Ws exceptionCaught handler error: {}",e.getMessage()); +// }finally { +// ChannelManager.removeCurrentChannel(); +// } +// cause.printStackTrace(); +// ctx.close(); +// log.error("Channel closed. Ws get a exception: {}", cause.getMessage()); +// } +// +// @Override +// public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { +// if (evt instanceof IdleStateEvent) { +// IdleStateEvent idleStateEvent = (IdleStateEvent) evt; +// if (idleStateEvent.state() == IdleState.READER_IDLE) { +// try { +// ChannelManager.setCurrentChannel(ctx.channel()); +// MessageHandler.handleMessage( +// InMessage.newToServerMessage(MessageConstant.DomainType.User,new ClientIdleClosedMessage())); +// }catch(Exception e){ +// e.printStackTrace(); +// log.error("Ws exceptionCaught handler error: {}",e.getMessage()); +// }finally { +// ChannelManager.removeCurrentChannel(); +// } +// ctx.channel().close(); +// log.error("Ws channel idle,closed."); +// } +// } else { +// super.userEventTriggered(ctx, evt); +// } +// } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, InMessage msg) throws Exception { + log.info("接受到消息的时间+++++++++++++++++++++++{}:{}", msg, System.currentTimeMillis()); + if (StrUtil.isBlank(msg.getData())) { + return; + } + try { + OutMessage outMessage = null; + BaseMessageDto baseMessage = JacksonUtil.jsonToBean(msg.getData(), BaseMessageDto.class); + MessageConstant.ClientMessageType gameClientMessageType = MessageConstant.ClientMessageType.valueOf(baseMessage.getType()); + switch (gameClientMessageType) { + case Ping: { + + PingMessage inSysData = JacksonUtil.jsonToBean(msg.getData(), PingMessage.class); + if (null != inSysData.getData()) { + ChannelManager.versionChannel(ChannelManager.getCurrentChannel(), inSysData.getData().getMajor(), inSysData.getData().getMinor()); + } + outMessage = new OutMessage(JacksonUtil.beanToJson(new PongMessage())); + + break; + } + case Ack: { + AckMessageDto message = JacksonUtil.jsonToBean(msg.getData(), AckMessageDto.class); + String userId = ChannelManager.getUserIdByChannel(ctx.channel()); + messageService.doAckMessageWithAck(userId, message); + break; + } + case Auth: + log.info("认证:-------------{}", baseMessage); + AuthMessageDto theMessage = JacksonUtil.jsonToBean(msg.getData(), AuthMessageDto.class); + theMessage.getData().setChannelId(ctx.channel().id().asLongText()); + outMessage = doAuthMessage(ctx, theMessage); + break; + default: + break; + } + //2.结果应答 + if (null != outMessage) { + ChannelManager.sendTo(ctx.channel(), OutMessageSet.newInstance().ackId(null).add(outMessage)); + } + } catch (Exception e) { + e.printStackTrace(); + log.error("Websocket Process Message Failed: {}", e.getMessage()); + throw e; + } finally { + ChannelManager.removeCurrentChannel(); + + } + + } + + private OutMessage doAuthMessage(ChannelHandlerContext ctx, AuthMessageDto message) throws Exception { + WebConstant.Message_Auth_Event event = WebConstant.Message_Auth_Event.phaseOf(message.getEvent()); + switch (event) { + case Auth: + return messageService.doAuthMessageWithAuth(ctx, message); + default: + break; + } + return null; + } + +} diff --git a/recovery/src/main/java/com/ccsens/recovery/service/IMessageService.java b/recovery/src/main/java/com/ccsens/recovery/service/IMessageService.java new file mode 100644 index 00000000..8e09693d --- /dev/null +++ b/recovery/src/main/java/com/ccsens/recovery/service/IMessageService.java @@ -0,0 +1,19 @@ +package com.ccsens.recovery.service; + +import com.ccsens.recovery.bean.message.AckMessageDto; +import com.ccsens.recovery.bean.message.AuthMessageDto; +import com.ccsens.util.bean.message.common.OutMessage; +import com.fasterxml.jackson.core.JsonProcessingException; +import io.netty.channel.ChannelHandlerContext; + +import java.util.List; +import java.util.Set; + +public interface IMessageService { + + void doAckMessageWithAck(String userId, AckMessageDto message); + + OutMessage doAuthMessageWithAuth(ChannelHandlerContext ctx, AuthMessageDto message) throws JsonProcessingException, Exception; + +} + diff --git a/recovery/src/main/java/com/ccsens/recovery/service/MessageService.java b/recovery/src/main/java/com/ccsens/recovery/service/MessageService.java new file mode 100644 index 00000000..cabd6c5e --- /dev/null +++ b/recovery/src/main/java/com/ccsens/recovery/service/MessageService.java @@ -0,0 +1,60 @@ +package com.ccsens.recovery.service; + +import com.ccsens.cloudutil.feign.TallFeignClient; +import com.ccsens.recovery.bean.message.AckMessageDto; +import com.ccsens.recovery.bean.message.AuthMessageDto; +import com.ccsens.recovery.bean.message.AuthMessageWithAnswerDto; +import com.ccsens.recovery.netty.ChannelManager; +import com.ccsens.util.JacksonUtil; +import com.ccsens.util.bean.message.common.MessageConstant; +import com.ccsens.util.bean.message.common.OutMessage; +import com.ccsens.util.bean.message.server.ChannelStatusMessage; +import com.fasterxml.jackson.core.JsonProcessingException; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + + +import static com.ccsens.recovery.netty.ChannelManager.sendTo; + +@Slf4j +@Service +public class MessageService implements IMessageService { + @Autowired + private AmqpTemplate rabbitTemplate; + @Autowired + private TallFeignClient tallFeignClient; + + + @Override + public void doAckMessageWithAck(String userId, AckMessageDto message) { +// System.out.println(message); +// //1.存储到MongoDB(ack消息不需要存储,修改响应消息的发送成功标志即可) +//// Long userId = message.getSender().getUserId(); +// Long msgId = message.getData().getMsgId(); +// messageLogDao.updateMessageLogSendSuccess(userId,msgId,1); + } + + @Override + public OutMessage doAuthMessageWithAuth(ChannelHandlerContext ctx, AuthMessageDto message) throws Exception { + log.info("{}",message); + //ws连接认证 + String userId = message.getData().getUserId(); + if (userId == null) { + String token = message.getData().getToken(); + userId = tallFeignClient.getUserId(token); + } + ChannelManager.authChannel(ctx.channel(), userId, 1, 1); + AuthMessageWithAnswerDto messageWithAnswerDto = new AuthMessageWithAnswerDto(true, "ok"); + OutMessage outMessage = new OutMessage(JacksonUtil.beanToJson( + new ChannelStatusMessage(true,0L, MessageConstant.Error.Ok)) + ); +// sendTo(ctx.channel(), messageWithAnswerDto); + return outMessage; + + } + + +} diff --git a/recovery/src/main/resources/application.yml b/recovery/src/main/resources/application.yml index 3981cf45..12eaa1a7 100644 --- a/recovery/src/main/resources/application.yml +++ b/recovery/src/main/resources/application.yml @@ -1,6 +1,6 @@ spring: profiles: - active: prod - include: util-prod,common + active: dev + include: util-dev,common diff --git a/util/src/main/java/com/ccsens/util/bean/message/common/InMessage.java b/util/src/main/java/com/ccsens/util/bean/message/common/InMessage.java index f1babf46..a45da423 100644 --- a/util/src/main/java/com/ccsens/util/bean/message/common/InMessage.java +++ b/util/src/main/java/com/ccsens/util/bean/message/common/InMessage.java @@ -1,16 +1,11 @@ package com.ccsens.util.bean.message.common; -import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.date.DateUtil; -import com.alibaba.fastjson.JSONObject; import com.ccsens.util.JacksonUtil; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; -import io.swagger.annotations.ApiModel; import lombok.Data; -import java.util.HashSet; -import java.util.List; import java.util.Set; /** @@ -41,7 +36,16 @@ public class InMessage { /** * 消息要发送到哪个域 */ - private MessageConstant.DomainType toDomain; + private MessageConstant.DomainType toDomain = MessageConstant.DomainType.User; + /** + * 调用者信息 + * User : netty,无需配置 + * Queue: 配置Queue Name + * Rest : 配置URL,发送方式 + * Server: 无需配置 + */ + private MessageConstant.InvokerMessage invokerMessage; + /** * 接受者信息(列表) */ @@ -62,6 +66,7 @@ public class InMessage { */ private String data; + public InMessage(){ this.time = DateUtil.currentSeconds(); } @@ -97,37 +102,4 @@ public class InMessage { //TODO //添加方便链式调用的构造方法,类似builder - - - - @Data - @ApiModel("接收消息者") - public static class To{ - private Long id; - - public To() { - } - public To(Long id) { - this.id = id; - } - - } - - /** - * 将userids列表转成tos格式 - * @param userIds - * @return - */ - public static Set transTos(List userIds) { - - Set sets = new HashSet<>(); - if (CollectionUtil.isEmpty(userIds)) { - return sets; - } - userIds.forEach(userId -> { - To to = new To(userId); - sets.add(JSONObject.toJSONString(to)); - }); - return sets; - } } diff --git a/util/src/main/java/com/ccsens/util/bean/message/common/MessageConstant.java b/util/src/main/java/com/ccsens/util/bean/message/common/MessageConstant.java index 74de022a..4e83401f 100644 --- a/util/src/main/java/com/ccsens/util/bean/message/common/MessageConstant.java +++ b/util/src/main/java/com/ccsens/util/bean/message/common/MessageConstant.java @@ -1,24 +1,70 @@ package com.ccsens.util.bean.message.common; import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; /** * @author wei * 消息相关常量 */ public class MessageConstant { + + /** + * redis参数 + */ + public static class Redis{ + /**mq集合key*/ + public static final String KEY_MQ_COLLECTION = "mqCollection"; + } + + + /** + * 寄存器地址 + */ + public enum Register{ + /**授权*/ + Auth((byte) 0x00, (byte) 0x01), + /**心跳*/ + Ping((byte) 0x00, (byte) 0x02), + ; + + public byte first; + public byte second; + + Register(byte first, byte second){ + this.first = first; + this.second = second; + + } + + public long getAddr(){ + long addr = ((long)first << 8 | second) & 0xFFFF; + return addr; + } + + public static Register valueOf(byte first, byte second) { + for(Register register : Register.values()){ + if (register.first == first && register.second == second) { + return register; + } + } + return null; + } + } + public enum ClientMessageType{ //客户端心跳 Ping(0x00), //客户端认证 Auth(0x01), + //客户端收到消息ACK Ack(0x02), //客户端收到消息ACK HasRead(0x03), //客户端请求连接状态 GetChannelStatus(0x04), - + ModBus(0x05), //不可预期的错误 UnExceptedError(0x21), ClientIdleClosed(0x22), @@ -56,7 +102,9 @@ public class MessageConstant { //客户端收到消息ACK Ack(0x02), //客户端请求连接状态 - ChannelStatus(0x03); + ChannelStatus(0x03), + // 未认证 + UnAuth(0x04); //撤销某个消息 //DelMessage(0x04), //客户端请求连接状态 @@ -129,7 +177,10 @@ public class MessageConstant { //认证超时断开连接 ChannelAuthTimeOut(1303,"连接断开:认证超时"), //不可预期错误 - UnExpectedError(1304,"不可预期异常"); + UnExpectedError(1304,"不可预期异常"), + QUEUE_NEW_FAIL(1305, "队列创建失败"), + QUEUE_NAME_EMPTY(1306, "不予许消息队列名称为空"), + QUEUE_NOT_EXISTED(1307, "不予许消息队列名称为空"); public int code; public String text; @@ -180,6 +231,21 @@ public class MessageConstant { } } + /** + * 被调用者信息 + */ + @Data + public static class InvokerMessage{ + /**地址,ws不必设置,mq为Queue name, rest为url*/ + private String address; + /**发送方式,get请求...,主要用于rest类型*/ + private SendMethod method; + } + + public enum SendMethod{ + GET, POST, POST_JSON; + } + public enum Status{ //未决状态(未完成) Pending(0), @@ -215,37 +281,4 @@ public class MessageConstant { } } - public enum GameClientMessageType{ - //客户端心跳 - Heart(0x00), - //客户端认证 - Auth(0x01), - //客户端收到消息ACK - Ack(0x02), - //滑动 - Count(0x03), - //状态改变 - ChangeStatus(0x04) - ; - - public int value; - - GameClientMessageType(int value){ - this.value = value; - } - - /** - * 从int到enum的转换函数 - * @param value 枚举int值 - * @return 对应的枚举,找不到则返回null - */ - public static GameClientMessageType valueOf(int value) { - for(GameClientMessageType type : values()){ - if(type.value == value){ - return type; - } - } - return null; - } - } } diff --git a/util/src/main/java/com/ccsens/util/message/BaseMessageDto.java b/util/src/main/java/com/ccsens/util/message/BaseMessageDto.java index 293ee6fb..4d83960c 100644 --- a/util/src/main/java/com/ccsens/util/message/BaseMessageDto.java +++ b/util/src/main/java/com/ccsens/util/message/BaseMessageDto.java @@ -51,16 +51,16 @@ public class BaseMessageDto { private List receivers; // private Object data; - public Set receiversTransTos() { - Set tos = new HashSet<>(); - if (CollectionUtil.isEmpty(receivers)) { - return tos; - } - receivers.forEach(receiver -> { - InMessage.To to = new InMessage.To(receiver.getUserId()); - tos.add(JSONObject.toJSONString(to)); - }); - - return tos; - } +// public Set receiversTransTos() { +// Set tos = new HashSet<>(); +// if (CollectionUtil.isEmpty(receivers)) { +// return tos; +// } +// receivers.forEach(receiver -> { +// InMessage.To to = new InMessage.To(receiver.getUserId()); +// tos.add(JSONObject.toJSONString(to)); +// }); +// +// return tos; +// } }