Browse Source

20210609康复添加ws消息

pt
zy_Java 4 years ago
parent
commit
409eba5b5a
  1. 1
      game/src/main/java/com/ccsens/game/netty/wsserver/WebSocketDecoder.java
  2. 4
      game/src/main/resources/application.yml
  3. 2
      pom.xml
  4. 12
      recovery/src/main/java/com/ccsens/recovery/RecoveryApplication.java
  5. 29
      recovery/src/main/java/com/ccsens/recovery/bean/message/AckMessageDto.java
  6. 30
      recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageDto.java
  7. 29
      recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageWithAnswerDto.java
  8. 24
      recovery/src/main/java/com/ccsens/recovery/bean/message/BaseMessageDto.java
  9. 20
      recovery/src/main/java/com/ccsens/recovery/bean/message/HeartMessageDto.java
  10. 388
      recovery/src/main/java/com/ccsens/recovery/netty/ChannelManager.java
  11. 239
      recovery/src/main/java/com/ccsens/recovery/netty/WrapperedChannel.java
  12. 76
      recovery/src/main/java/com/ccsens/recovery/netty/wsserver/NettyWsServer.java
  13. 33
      recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketDecoder.java
  14. 25
      recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketEncoder.java
  15. 151
      recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketHandler.java
  16. 19
      recovery/src/main/java/com/ccsens/recovery/service/IMessageService.java
  17. 60
      recovery/src/main/java/com/ccsens/recovery/service/MessageService.java
  18. 4
      recovery/src/main/resources/application.yml
  19. 50
      util/src/main/java/com/ccsens/util/bean/message/common/InMessage.java
  20. 105
      util/src/main/java/com/ccsens/util/bean/message/common/MessageConstant.java
  21. 24
      util/src/main/java/com/ccsens/util/message/BaseMessageDto.java

1
game/src/main/java/com/ccsens/game/netty/wsserver/WebSocketDecoder.java

@ -10,7 +10,6 @@ import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**

4
game/src/main/resources/application.yml

@ -1,4 +1,4 @@
spring:
profiles:
active: prod
include: common, util-prod
active: dev
include: common, util-dev

2
pom.xml

@ -11,7 +11,7 @@
<module>tall</module>
<module>recovery</module>
<!-- <module>ht</module>-->
<!-- <module>game</module>-->
<module>game</module>
<module>mt</module>
<module>wisdomcar</module>
<!-- <module>beneficiation</module>-->

12
recovery/src/main/java/com/ccsens/recovery/RecoveryApplication.java

@ -1,6 +1,8 @@
package com.ccsens.recovery;
import com.ccsens.recovery.netty.wsserver.NettyWsServer;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
@ -8,6 +10,8 @@ import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableAsync;
import javax.annotation.Resource;
/**
* @author
@ -19,10 +23,16 @@ import org.springframework.scheduling.annotation.EnableAsync;
@EnableCircuitBreaker
@EnableFeignClients(basePackages = "com.ccsens.cloudutil.feign")
@SpringBootApplication(scanBasePackages = "com.ccsens")
public class RecoveryApplication {
public class RecoveryApplication implements CommandLineRunner {
@Resource
private NettyWsServer nettyWsServer;
public static void main(String[] args) {
SpringApplication.run(RecoveryApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
nettyWsServer.start();
}
}

29
recovery/src/main/java/com/ccsens/recovery/bean/message/AckMessageDto.java

@ -0,0 +1,29 @@
package com.ccsens.recovery.bean.message;
import com.ccsens.util.WebConstant;
import lombok.Data;
@Data
public class AckMessageDto extends BaseMessageDto {
@lombok.Data
public static class Data {
Long msgId;
}
private Data data;
public AckMessageDto(){
setType(WebConstant.Message_Type.Ack.phase);
setEvent(WebConstant.Message_Ack_Event.Ack.phase);
setTime(System.currentTimeMillis());
}
public AckMessageDto(Long projectId, Long msgId){
this();
setProjectId(projectId);
Data d = new Data();
d.setMsgId(msgId);
setData(d);
}
}

30
recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageDto.java

@ -0,0 +1,30 @@
package com.ccsens.recovery.bean.message;
import com.ccsens.util.WebConstant;
import lombok.Data;
@Data
public class AuthMessageDto extends BaseMessageDto {
@lombok.Data
public static class Data{
private String userId;
private String channelId;
private String token;
}
private Data data;
public AuthMessageDto(){
setType(WebConstant.Message_Type.Auth.phase);
setEvent(WebConstant.Message_Auth_Event.Auth.phase);
setTime(System.currentTimeMillis());
}
public AuthMessageDto(String userId, String channelId){
this();
Data d = new Data();
d.setUserId(userId);
d.setChannelId(channelId);
setData(d);
}
}

29
recovery/src/main/java/com/ccsens/recovery/bean/message/AuthMessageWithAnswerDto.java

@ -0,0 +1,29 @@
package com.ccsens.recovery.bean.message;
import com.ccsens.util.WebConstant;
import lombok.Data;
@Data
public class AuthMessageWithAnswerDto extends BaseMessageDto{
@lombok.Data
public static class Data{
private Boolean success;
private String phase;
}
private Data data;
public AuthMessageWithAnswerDto(){
setType(WebConstant.Message_Type.Auth.phase);
setEvent(WebConstant.Message_Auth_Event.Answer.phase);
setTime(System.currentTimeMillis());
}
public AuthMessageWithAnswerDto(Boolean success, String phase){
this();
Data d = new Data();
d.setSuccess(success);
d.setPhase(phase);
setData(d);
}
}

24
recovery/src/main/java/com/ccsens/recovery/bean/message/BaseMessageDto.java

@ -51,16 +51,16 @@ public class BaseMessageDto {
private List<MessageUser> receivers;
// private Object data;
public Set<String> receiversTransTos() {
Set<String> tos = new HashSet<>();
if (CollectionUtil.isEmpty(receivers)) {
return tos;
}
receivers.forEach(receiver -> {
InMessage.To to = new InMessage.To(receiver.getUserId());
tos.add(JSONObject.toJSONString(to));
});
return tos;
}
// public Set<String> receiversTransTos() {
// Set<String> tos = new HashSet<>();
// if (CollectionUtil.isEmpty(receivers)) {
// return tos;
// }
// receivers.forEach(receiver -> {
// InMessage.To to = new InMessage.To(receiver.getUserId());
// tos.add(JSONObject.toJSONString(to));
// });
//
// return tos;
// }
}

20
recovery/src/main/java/com/ccsens/recovery/bean/message/HeartMessageDto.java

@ -0,0 +1,20 @@
package com.ccsens.recovery.bean.message;
import com.ccsens.util.WebConstant;
import lombok.Data;
@Data
public class HeartMessageDto extends BaseMessageDto{
@lombok.Data
public static class Data{
private int major;
private int minor;
}
private Data data;
public HeartMessageDto(){
setType(WebConstant.Message_Type.Heart.phase);
setEvent(WebConstant.Message_Heart_Event.Heart.phase);
setTime(System.currentTimeMillis());
}
}

388
recovery/src/main/java/com/ccsens/recovery/netty/ChannelManager.java

@ -0,0 +1,388 @@
package com.ccsens.recovery.netty;
import cn.hutool.core.collection.CollectionUtil;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author wei
*/
@Slf4j
@Component
public class ChannelManager {
private static Logger logger = LoggerFactory.getLogger(ChannelManager.class);
private static ThreadLocal<Channel> threadLocal = new ThreadLocal<>();
/**
* UserId,WrappedChannel authed channels;
*/
private static Map<String,Set<WrapperedChannel>> authedChannels;
/**
* Channel,WrapperedChannel
*/
private static Map<Channel,WrapperedChannel> rawChannels;
static {
authedChannels = new ConcurrentHashMap<>();
rawChannels = new ConcurrentHashMap<>();
}
/**
* 私有构造,不允许生成该类对象
*/
private ChannelManager(){}
/**
* 设置当前正在错误的channel
* @param channel netty ws/tcp连接
*/
public static void setCurrentChannel(Channel channel){
threadLocal.set(channel);
}
/**
* 获取当前线程的channel对象
* @return channel
*/
public static Channel getCurrentChannel(){
return threadLocal.get();
}
/**
* 删除当前线程的channel对象
*/
public static void removeCurrentChannel(){
threadLocal.remove();
}
/**
* 添加一个新的连接(Channel)
* @param channel 新的连接
* @param serverType 服务器类型
*/
public static synchronized void addChannel(Channel channel,String serverType){
logger.debug("Invoke addChannel({},{})",channel,serverType);
if(null != channel) {
rawChannels.put(channel, new WrapperedChannel(channel, serverType));
logger.debug("Add a new channel: {},{}",channel.id().asLongText(),serverType);
}else{
logger.error("channel is null");
}
}
/**
* 用户认证
* @param channel 连接
* @param userId 用户ID
* @param major 客户端主版本号
* @param minor 客户端次版本号
* @param message 业务相关参数
* @param mqType 硬件设备消息转发mq类型
*/
public static synchronized void authChannel(Channel channel,String userId,Integer major,Integer minor, String message, Byte mqType){
logger.debug("Invoke authedChannels({},{},{},{})",channel.id().asLongText(),userId,major,minor);
major = major != null ? major : 0;
minor = minor != null ? minor : 0;
WrapperedChannel wrapperedChannel = rawChannels.get(channel);
if(wrapperedChannel != null){
wrapperedChannel.whenAuthed(userId,major,minor, message, mqType);
Set<WrapperedChannel> authedWchannelSet = authedChannels.computeIfAbsent(userId, k -> new HashSet<>());
authedWchannelSet.add(wrapperedChannel);
logger.debug("Authed channel {} with user {}", channel.id().asLongText(), userId);
}else{
logger.error("Authed channel,but wrappedChannel is null.");
}
}
/**
* 用户认证
* @param channel 连接
* @param userId 用户ID
* @param major 客户端主版本号
* @param minor 客户端次版本号
*/
public static synchronized void authChannel(Channel channel,String userId,Integer major,Integer minor){
major = major != null ? major : 0;
minor = minor != null ? minor : 0;
WrapperedChannel wrapperedChannel = rawChannels.get(channel);
if(wrapperedChannel != null){
wrapperedChannel.whenAuthed(userId,major,minor);
Set<WrapperedChannel> authedWchannelSet = authedChannels.computeIfAbsent(userId, k -> new HashSet<>());
authedWchannelSet.add(wrapperedChannel);
}else{
log.error("Authed channel,but wrappedChannel is null.");
}
}
/**
* 移除一个连接(Channel)
* @param channel 要移除的连接
*/
public static synchronized void removeChannel(Channel channel){
logger.debug("Invoke removeChannel({})",channel.id().asLongText());
WrapperedChannel wrapperedChannel = rawChannels.get(channel);
if(wrapperedChannel != null){
//从rawChannels集合中删除
rawChannels.remove(channel);
logger.debug("Remove a channel from rawChannels: {}",channel.id().asLongText());
if(wrapperedChannel.isAuthed()){
Set<WrapperedChannel> authedChannelSet = authedChannels.get(wrapperedChannel.getUserId());
//从authedChannel集合的value(set)中删除
if(CollectionUtil.isNotEmpty(authedChannelSet)){
authedChannelSet.remove(wrapperedChannel);
logger.debug("Remove a channel from authedChannelSet: {}, {}",wrapperedChannel.getUserId(),channel.id().asLongText());
}
//从authedChannel中删除,此处不用else,因为if中语句执行完毕之后,authedChannelSet也可能变成空集合
if(CollectionUtil.isEmpty(authedChannelSet)){
authedChannels.remove(wrapperedChannel.getUserId());
logger.debug("Remove a user from authedChannels: {}",wrapperedChannel.getUserId());
}
}
}else{
logger.error("Remove channel,but wrappedChannel is null.");
}
if(channel.isOpen() || channel.isActive()){
channel.close();
}
}
/**
* 移除一个用户
* @param userId 要移除的用户
*/
public static synchronized void removeUser(String userId){
logger.debug("Invoke remove user : {}",userId);
Set<WrapperedChannel> wChannelSet = authedChannels.get(userId);
if(CollectionUtil.isNotEmpty(wChannelSet)){
for(WrapperedChannel wChannel : wChannelSet){
//从rawChannel中依次删除
rawChannels.remove(wChannel.getChannel());
logger.debug("Remove a channel from rawChannels: {}",wChannel.getChannel().id().asLongText());
}
}
//从authedChannel中删除
authedChannels.remove(userId);
logger.debug("Remove a user from authedChannels: {}",userId);
}
/**
* 添加版本号
* 只能给已认证的请求添加版本号
* @param channel 连接
* @param major 主版本号
* @param minor 次版本号
*/
public static synchronized void versionChannel(Channel channel,int major,int minor){
logger.debug("Invoke Version channel({},{},{}))",channel.id().asLongText(),major,minor);
WrapperedChannel wChannel = rawChannels.get(channel);
if(wChannel != null){
wChannel.setVersion(major,minor);
logger.debug("Version Channel: {},{},{}",channel.id().asLongText(),major,minor);
}else{
logger.error("Remove channel,but wrappedChannel is null.");
}
}
/**
* 发送广播消息给所有已认证Channel
* @param message 消息
*/
public static synchronized void broadCastAuthed(Object message) {
logger.debug("Invoke broadCastAuthed({})",message);
for (Map.Entry<Channel,WrapperedChannel> entry : rawChannels.entrySet()) {
WrapperedChannel wChannel = entry.getValue();
if(wChannel.isAuthed()) {
wChannel.writeAndFlush(message);
logger.debug("Send Message {} to {},{}",message,wChannel.getUserId(),wChannel.getId());
}
}
}
/**
* 发送广播消息
* @param message 消息
*/
public static synchronized void broadCast(Object message) {
logger.debug("Invoke broadCast({})",message);
for (Map.Entry<Channel,WrapperedChannel> entry : rawChannels.entrySet()) {
entry.getValue().writeAndFlush(message);
logger.debug("Send Message {} to {}",message,entry.getValue().getId());
}
}
/**
* 发送消息给某个连接
* @param channel 连接
* @param message 消息
*/
public static synchronized void sendTo(Channel channel,Object message){
log.info("Invoke sendTo({},{})",channel.id().asLongText(),message);
WrapperedChannel wrapperedChannel = rawChannels.get(channel);
if(wrapperedChannel != null) {
wrapperedChannel.writeAndFlush(message);
log.info("Write message {} to Channel {}",message,channel);
}else{
log.info("can't find channel from rawChannels");
}
}
/**
* 发送消息给某个用户
* @param userId 用户ID
* @param message 消息
*/
public static synchronized boolean sendTo(String userId,Object message){
logger.info("Invoke sendTo({},{})",userId,message);
Set<WrapperedChannel> wChannelSet = authedChannels.get(userId);
if(CollectionUtil.isNotEmpty(wChannelSet)){
for(WrapperedChannel wChannel:wChannelSet){
wChannel.writeAndFlush(message);
logger.info("Send message {} to channel {}",message,userId);
}
return true;
}else{
return false;
}
}
/**
* 刷新最后一次接收数据时间 每次接收数据需要调用该函数
* @param channel 接收数据的连接
*/
public static synchronized void flushReceiveTimestamp(Channel channel){
logger.debug("Invoke flushReceiveTimestamp({})",channel.id().asLongText());
WrapperedChannel wrapperedChannel = rawChannels.get(channel);
if(wrapperedChannel != null){
wrapperedChannel.whenReceivedData();
}else{
logger.error("can find channel from rawChannels");
}
}
/**
* 关闭所有未认证的连接
* @param unAuthedChannelsMaxAliveTimeInSeconds 未认证的最大时长(s)
* @throws Exception
* @deprecated 已过期该方法在多线程并发下可能出现问题建议使用messageService中的同名方法
*/
@Deprecated
public static synchronized void closeUnAuthedChannels(Long unAuthedChannelsMaxAliveTimeInSeconds) throws Exception {
logger.debug("Inovke closeUnAuthedChannels({})",unAuthedChannelsMaxAliveTimeInSeconds);
Iterator<Map.Entry<Channel,WrapperedChannel>> it = rawChannels.entrySet().iterator();
while(it.hasNext()){
Map.Entry<Channel,WrapperedChannel> entry = it.next();
WrapperedChannel wrapperedChannel = entry.getValue();
if((!wrapperedChannel.isAuthed()) && (wrapperedChannel.getConnectedSeconds() > unAuthedChannelsMaxAliveTimeInSeconds)) {
it.remove();
//关闭连接
wrapperedChannel.getChannel().close();
logger.debug("Remove a unAuthed Channel {}, which has connected {}s,maxOutTime is {}s", wrapperedChannel.getId(),wrapperedChannel.getConnectedSeconds() ,
unAuthedChannelsMaxAliveTimeInSeconds );
}
}
}
/**
* 判断Channel是否存在
* @param channel 连接
* @return 是否存在
*/
public static synchronized boolean channelExist(Channel channel){
return rawChannels.containsKey(channel);
}
/**
* 判断Channel是否已经认证
* @param channel 连接
* @return 是否认证
*/
public static synchronized boolean channelAuthed(Channel channel){
return rawChannels.containsKey(channel) && rawChannels.get(channel).isAuthed();
}
/**
* 判断用户是否在线
* @param userId 用户ID
* @return 是否认证
*/
public static synchronized boolean isUserOnline(String userId){
return authedChannels.containsKey(userId);
}
/**
* 根据Channel获取对应的用户ID
* @param channel 连接
* @return 用户ID
*/
public static synchronized String getUserIdByChannel(Channel channel) {
return rawChannels.containsKey(channel) ? rawChannels.get(channel).getUserId() : null;
}
/**
* 获取所有在线用户
* @return 所有用户的集合列表
*/
public static synchronized Set<String> getOnlineUsers(){
return authedChannels.keySet();
}
/**
* 获取某种类型的所有在线用户的连接
* @param type 客户端类型(ws/tcp modebus/tcp text)
* @return 所有在西安channel的集合列表
*/
public static synchronized Set<Channel> getOnlineChannels(String type){
Set<Channel> onLineChannels = CollectionUtil.newHashSet();
for(Map.Entry<Channel,WrapperedChannel> entry: rawChannels.entrySet()){
WrapperedChannel wrapperedChannel = entry.getValue();
if(wrapperedChannel.isAuthed() && wrapperedChannel.getType().equals(type)){
onLineChannels.add(entry.getKey());
}
}
return onLineChannels;
}
/**
* 获取一个rawChannel的副本
* @return 副本
*/
public static Map<Channel, WrapperedChannel> getCopyOfAllChannels() {
Map<Channel,WrapperedChannel> copyMap = new HashMap<>(rawChannels.size());
copyMap.putAll(rawChannels);
return copyMap;
}
/**
* 根据Channel获取WrapperedChannel
* @param channel channel
* @return 对应的wrapperedChannel
*/
public static WrapperedChannel getWrapperedChannelByChannel(Channel channel) {
return rawChannels.get(channel);
}
public static synchronized void showAuthedChannels(){
for(Map.Entry<String,Set<WrapperedChannel>> entry : authedChannels.entrySet()){
for (WrapperedChannel channel:entry.getValue()){
logger.debug("{}-->{}",entry.getKey(),channel.toString());
}
}
}
public static synchronized void showAllChannels(){
for(Map.Entry<Channel,WrapperedChannel> entry : rawChannels.entrySet()){
logger.debug(entry.getValue().toString());
}
}
public static Set<String> getAllOnlineUsers() {
return authedChannels.keySet();
}
}

239
recovery/src/main/java/com/ccsens/recovery/netty/WrapperedChannel.java

@ -0,0 +1,239 @@
package com.ccsens.recovery.netty;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.ccsens.util.DateUtil;
import com.ccsens.util.JacksonUtil;
import io.netty.channel.Channel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
/**
* @author wei
*/
@Slf4j
@Getter
@Setter
public class WrapperedChannel {
@Getter
@Setter
private static class ClientVersion{
int major;
int minor;
public ClientVersion(int major,int minor){
this.major = major;
this.minor = minor;
}
}
/**
* Netty's Channel
*/
private Channel channel;
/**
* userId
*/
private String userId;
/**
* 客户端版本号
*/
private ClientVersion version;
/**
* 客户端类型(哪种服务器)
*/
private String type;
/**
* 是否认证
*/
private boolean authed;
/**
* 连接建立时间
*/
private Long createdAtInSeconds;
/**
* 认证时间(s)
*/
private Long authedAtInSeconds;
/**
* 最后一次接收有效数据时间(包括心跳包)
*/
private Long lastDataReceivedInSeconds;
/**
* 最后一次发送有效数据时间(包括心跳包)
*/
private Long lastDataSendInSeconds;
/**
* 接收到数据条数(每接收到一条协议加1包括心跳)
*/
private long dataReceivedCount;
/**
* 发送数据条数(每发送到一条协议加1包括心跳)
*/
private long dataSendCount;
/**
* mqType,仅用于硬件设备认证时指定消息转发给哪个mq
*/
private Byte mqType;
/**业务相关参数*/
private String message;
/**寄存器 硬件系统*/
private Map<String, Long> register = new HashMap<>();
public WrapperedChannel(Channel channel){
this.channel = channel;
this.createdAtInSeconds = DateUtil.currentSeconds();
}
public WrapperedChannel(Channel channel,String type){
this(channel);
this.type = type;
}
public WrapperedChannel(Channel channel,String type,int major,int minor){
this(channel,type);
this.version = new ClientVersion(major,minor);
}
public String getVersion(){
if(ObjectUtil.isNotNull(version)){
return "v" + version.getMajor() + "." + version.getMinor();
}
return null;
}
public void setVersion(int major,int minor){
if (ObjectUtil.isNull(version)) {
version = new ClientVersion(major, minor);
} else {
version.setMajor(major);
version.setMinor(minor);
}
}
public String getRemoteAddr(){
if(ObjectUtil.isNotNull(channel)){
InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();
if(ObjectUtil.isNotNull(insocket)) {
return insocket.getAddress().getHostAddress();
}
}
return null;
}
public Integer getRemotePort(){
if(ObjectUtil.isNotNull(channel)){
InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();
if(ObjectUtil.isNotNull(insocket)) {
return insocket.getPort();
}
}
return null;
}
public String getId(){
if(ObjectUtil.isNotNull(channel)){
return channel.id().asLongText();
}
return null;
}
public void whenReceivedData(){
lastDataReceivedInSeconds = DateUtil.currentSeconds();
dataReceivedCount++;
}
public void whenSendData(){
lastDataSendInSeconds = DateUtil.currentSeconds();
dataSendCount++;
}
public void whenAuthed(){
authed = true;
authedAtInSeconds = DateUtil.currentSeconds();
}
public void whenAuthed(String userId,int major,int minor){
whenAuthed();
setVersion(major,minor);
this.userId = userId;
}
public void whenAuthed(String userId,int major,int minor, String message, Byte mqType){
whenAuthed();
setVersion(major,minor);
this.userId = userId;
this.message = message;
this.mqType = mqType;
if (StrUtil.isNotEmpty(message)) {
try {
Map<String, Object> map = JacksonUtil.jsonToMap(message);
for (String key: map.keySet()) {
Object o = map.get(key);
if (o instanceof Integer) {
register.put(key, (long)(Integer) map.get(key));
} else {
register.put(key, (long) map.get(key));
}
}
} catch (IOException e) {
log.error("message转map异常", e);
}
}
}
public void writeAndFlush(Object message){
if(channel != null && channel.isActive()){
channel.writeAndFlush(message);
whenSendData();
}
}
public Long getConnectedSeconds(){
return createdAtInSeconds == null ? null : DateUtil.currentSeconds() - createdAtInSeconds;
}
public Long getOnlineSeconds(){
return authedAtInSeconds == null ? null : DateUtil.currentSeconds() - authedAtInSeconds ;
}
@Override
public boolean equals(Object obj) {
if(ObjectUtil.isNull(obj)) {
return false;
}
if(this == obj) {
return true;
}
if(ObjectUtil.isNull(channel)) {
return false;
}
if(obj.getClass() == this.getClass()){
WrapperedChannel other = (WrapperedChannel)obj;
return channel.equals(other.channel);
}else if(obj.getClass() == channel.getClass()){
Channel other = (Channel)obj;
return channel.equals(other);
}
return false;
}
@Override
public int hashCode() {
if(ObjectUtil.isNotNull(channel)){
return channel.hashCode();
}
return 0;
}
@Override
public String toString() {
return String.format("id: %s, type: %s, authed: %b, userId: %s, version: %s",getId(),type,authed,userId,getVersion());
}
}

76
recovery/src/main/java/com/ccsens/recovery/netty/wsserver/NettyWsServer.java

@ -0,0 +1,76 @@
package com.ccsens.recovery.netty.wsserver;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @author wei
*/
@Slf4j
@Component
public class NettyWsServer {
private static final String URI = "/recovery/ws";
private static final short PORT = 7191;
@Autowired
private WebSocketHandler webSocketHandler;
@Async
public void start() {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new IdleStateHandler(40,
0, 0, TimeUnit.SECONDS));
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(64 * 1024));
p.addLast(new ChunkedWriteHandler());
p.addLast(new WebSocketServerProtocolHandler(URI));
p.addLast(new WebSocketDecoder());
p.addLast(new WebSocketEncoder());
p.addLast(webSocketHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
log.info("WS端口号:{}",PORT);
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
log.info("WS端口号:{}",PORT);
} catch(Exception e){
log.error("ss",e);
e.printStackTrace();
}finally {
// Shut down all event loops to terminate all threads.
log.info("WS端口号:{}",PORT);
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

33
recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketDecoder.java

@ -0,0 +1,33 @@
package com.ccsens.recovery.netty.wsserver;
import com.ccsens.util.JacksonUtil;
import com.ccsens.util.bean.message.common.InMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**
* @author wei
*/
public class WebSocketDecoder extends MessageToMessageDecoder<TextWebSocketFrame> {
private static Logger logger = LoggerFactory.getLogger(WebSocketDecoder.class);
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg, List<Object> out) throws Exception {
String text = msg.text();
logger.info("Websocket received: {}",text);
try {
out.add(JacksonUtil.jsonToBean(text, InMessage.class));
}catch(IOException e){
e.printStackTrace();
logger.error("Websocket Read Error: {}",text);
throw e;
}
}
}

25
recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketEncoder.java

@ -0,0 +1,25 @@
package com.ccsens.recovery.netty.wsserver;
import com.ccsens.util.JacksonUtil;
import com.ccsens.util.bean.message.common.OutMessageSet;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wei
*/
public class WebSocketEncoder extends MessageToByteEncoder<OutMessageSet> {
private static Logger logger = LoggerFactory.getLogger(WebSocketEncoder.class);
@Override
protected void encode(ChannelHandlerContext ctx, OutMessageSet outMessageSet, ByteBuf out) throws Exception {
String msg = JacksonUtil.beanToJson(outMessageSet);
ctx.writeAndFlush(new TextWebSocketFrame(msg));
logger.info("Websocket send: {}",msg);
}
}

151
recovery/src/main/java/com/ccsens/recovery/netty/wsserver/WebSocketHandler.java

@ -0,0 +1,151 @@
package com.ccsens.recovery.netty.wsserver;
import cn.hutool.core.util.StrUtil;
import com.ccsens.recovery.bean.message.AckMessageDto;
import com.ccsens.recovery.bean.message.AuthMessageDto;
import com.ccsens.recovery.bean.message.BaseMessageDto;
import com.ccsens.recovery.netty.ChannelManager;
import com.ccsens.recovery.service.IMessageService;
import com.ccsens.util.JacksonUtil;
import com.ccsens.util.WebConstant;
import com.ccsens.util.bean.message.client.PingMessage;
import com.ccsens.util.bean.message.common.InMessage;
import com.ccsens.util.bean.message.common.MessageConstant;
import com.ccsens.util.bean.message.common.OutMessage;
import com.ccsens.util.bean.message.common.OutMessageSet;
import com.ccsens.util.bean.message.server.PongMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author wei
*/
@Slf4j
@ChannelHandler.Sharable
@Component
public class WebSocketHandler extends SimpleChannelInboundHandler<InMessage> {
private static final String TYPE = "netty_ws";
@Resource
private IMessageService messageService;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ChannelManager.addChannel(ctx.channel(), TYPE);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ChannelManager.removeChannel(ctx.channel());
}
// @Override
// public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// try {
// ChannelManager.setCurrentChannel(ctx.channel());
// MessageHandler.handleMessage(
// InMessage.newToServerMessage(MessageConstant.DomainType.User,new UnExceptedErrorMessage(cause.getMessage())));
// }catch(Exception e){
// e.printStackTrace();
// log.error("Ws exceptionCaught handler error: {}",e.getMessage());
// }finally {
// ChannelManager.removeCurrentChannel();
// }
// cause.printStackTrace();
// ctx.close();
// log.error("Channel closed. Ws get a exception: {}", cause.getMessage());
// }
//
// @Override
// public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// if (evt instanceof IdleStateEvent) {
// IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
// if (idleStateEvent.state() == IdleState.READER_IDLE) {
// try {
// ChannelManager.setCurrentChannel(ctx.channel());
// MessageHandler.handleMessage(
// InMessage.newToServerMessage(MessageConstant.DomainType.User,new ClientIdleClosedMessage()));
// }catch(Exception e){
// e.printStackTrace();
// log.error("Ws exceptionCaught handler error: {}",e.getMessage());
// }finally {
// ChannelManager.removeCurrentChannel();
// }
// ctx.channel().close();
// log.error("Ws channel idle,closed.");
// }
// } else {
// super.userEventTriggered(ctx, evt);
// }
// }
@Override
protected void channelRead0(ChannelHandlerContext ctx, InMessage msg) throws Exception {
log.info("接受到消息的时间+++++++++++++++++++++++{}:{}", msg, System.currentTimeMillis());
if (StrUtil.isBlank(msg.getData())) {
return;
}
try {
OutMessage outMessage = null;
BaseMessageDto baseMessage = JacksonUtil.jsonToBean(msg.getData(), BaseMessageDto.class);
MessageConstant.ClientMessageType gameClientMessageType = MessageConstant.ClientMessageType.valueOf(baseMessage.getType());
switch (gameClientMessageType) {
case Ping: {
PingMessage inSysData = JacksonUtil.jsonToBean(msg.getData(), PingMessage.class);
if (null != inSysData.getData()) {
ChannelManager.versionChannel(ChannelManager.getCurrentChannel(), inSysData.getData().getMajor(), inSysData.getData().getMinor());
}
outMessage = new OutMessage(JacksonUtil.beanToJson(new PongMessage()));
break;
}
case Ack: {
AckMessageDto message = JacksonUtil.jsonToBean(msg.getData(), AckMessageDto.class);
String userId = ChannelManager.getUserIdByChannel(ctx.channel());
messageService.doAckMessageWithAck(userId, message);
break;
}
case Auth:
log.info("认证:-------------{}", baseMessage);
AuthMessageDto theMessage = JacksonUtil.jsonToBean(msg.getData(), AuthMessageDto.class);
theMessage.getData().setChannelId(ctx.channel().id().asLongText());
outMessage = doAuthMessage(ctx, theMessage);
break;
default:
break;
}
//2.结果应答
if (null != outMessage) {
ChannelManager.sendTo(ctx.channel(), OutMessageSet.newInstance().ackId(null).add(outMessage));
}
} catch (Exception e) {
e.printStackTrace();
log.error("Websocket Process Message Failed: {}", e.getMessage());
throw e;
} finally {
ChannelManager.removeCurrentChannel();
}
}
private OutMessage doAuthMessage(ChannelHandlerContext ctx, AuthMessageDto message) throws Exception {
WebConstant.Message_Auth_Event event = WebConstant.Message_Auth_Event.phaseOf(message.getEvent());
switch (event) {
case Auth:
return messageService.doAuthMessageWithAuth(ctx, message);
default:
break;
}
return null;
}
}

19
recovery/src/main/java/com/ccsens/recovery/service/IMessageService.java

@ -0,0 +1,19 @@
package com.ccsens.recovery.service;
import com.ccsens.recovery.bean.message.AckMessageDto;
import com.ccsens.recovery.bean.message.AuthMessageDto;
import com.ccsens.util.bean.message.common.OutMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import java.util.Set;
public interface IMessageService {
void doAckMessageWithAck(String userId, AckMessageDto message);
OutMessage doAuthMessageWithAuth(ChannelHandlerContext ctx, AuthMessageDto message) throws JsonProcessingException, Exception;
}

60
recovery/src/main/java/com/ccsens/recovery/service/MessageService.java

@ -0,0 +1,60 @@
package com.ccsens.recovery.service;
import com.ccsens.cloudutil.feign.TallFeignClient;
import com.ccsens.recovery.bean.message.AckMessageDto;
import com.ccsens.recovery.bean.message.AuthMessageDto;
import com.ccsens.recovery.bean.message.AuthMessageWithAnswerDto;
import com.ccsens.recovery.netty.ChannelManager;
import com.ccsens.util.JacksonUtil;
import com.ccsens.util.bean.message.common.MessageConstant;
import com.ccsens.util.bean.message.common.OutMessage;
import com.ccsens.util.bean.message.server.ChannelStatusMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import static com.ccsens.recovery.netty.ChannelManager.sendTo;
@Slf4j
@Service
public class MessageService implements IMessageService {
@Autowired
private AmqpTemplate rabbitTemplate;
@Autowired
private TallFeignClient tallFeignClient;
@Override
public void doAckMessageWithAck(String userId, AckMessageDto message) {
// System.out.println(message);
// //1.存储到MongoDB(ack消息不需要存储,修改响应消息的发送成功标志即可)
//// Long userId = message.getSender().getUserId();
// Long msgId = message.getData().getMsgId();
// messageLogDao.updateMessageLogSendSuccess(userId,msgId,1);
}
@Override
public OutMessage doAuthMessageWithAuth(ChannelHandlerContext ctx, AuthMessageDto message) throws Exception {
log.info("{}",message);
//ws连接认证
String userId = message.getData().getUserId();
if (userId == null) {
String token = message.getData().getToken();
userId = tallFeignClient.getUserId(token);
}
ChannelManager.authChannel(ctx.channel(), userId, 1, 1);
AuthMessageWithAnswerDto messageWithAnswerDto = new AuthMessageWithAnswerDto(true, "ok");
OutMessage outMessage = new OutMessage(JacksonUtil.beanToJson(
new ChannelStatusMessage(true,0L, MessageConstant.Error.Ok))
);
// sendTo(ctx.channel(), messageWithAnswerDto);
return outMessage;
}
}

4
recovery/src/main/resources/application.yml

@ -1,6 +1,6 @@
spring:
profiles:
active: prod
include: util-prod,common
active: dev
include: util-dev,common

50
util/src/main/java/com/ccsens/util/bean/message/common/InMessage.java

@ -1,16 +1,11 @@
package com.ccsens.util.bean.message.common;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONObject;
import com.ccsens.util.JacksonUtil;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.swagger.annotations.ApiModel;
import lombok.Data;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
@ -41,7 +36,16 @@ public class InMessage {
/**
* 消息要发送到哪个域
*/
private MessageConstant.DomainType toDomain;
private MessageConstant.DomainType toDomain = MessageConstant.DomainType.User;
/**
* 调用者信息
* User netty,无需配置
* Queue 配置Queue Name
* Rest : 配置URL发送方式
* Server 无需配置
*/
private MessageConstant.InvokerMessage invokerMessage;
/**
* 接受者信息列表
*/
@ -62,6 +66,7 @@ public class InMessage {
*/
private String data;
public InMessage(){
this.time = DateUtil.currentSeconds();
}
@ -97,37 +102,4 @@ public class InMessage {
//TODO
//添加方便链式调用的构造方法,类似builder
@Data
@ApiModel("接收消息者")
public static class To{
private Long id;
public To() {
}
public To(Long id) {
this.id = id;
}
}
/**
* 将userids列表转成tos格式
* @param userIds
* @return
*/
public static Set<String> transTos(List<Long> userIds) {
Set<String> sets = new HashSet<>();
if (CollectionUtil.isEmpty(userIds)) {
return sets;
}
userIds.forEach(userId -> {
To to = new To(userId);
sets.add(JSONObject.toJSONString(to));
});
return sets;
}
}

105
util/src/main/java/com/ccsens/util/bean/message/common/MessageConstant.java

@ -1,24 +1,70 @@
package com.ccsens.util.bean.message.common;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
/**
* @author wei
* 消息相关常量
*/
public class MessageConstant {
/**
* redis参数
*/
public static class Redis{
/**mq集合key*/
public static final String KEY_MQ_COLLECTION = "mqCollection";
}
/**
* 寄存器地址
*/
public enum Register{
/**授权*/
Auth((byte) 0x00, (byte) 0x01),
/**心跳*/
Ping((byte) 0x00, (byte) 0x02),
;
public byte first;
public byte second;
Register(byte first, byte second){
this.first = first;
this.second = second;
}
public long getAddr(){
long addr = ((long)first << 8 | second) & 0xFFFF;
return addr;
}
public static Register valueOf(byte first, byte second) {
for(Register register : Register.values()){
if (register.first == first && register.second == second) {
return register;
}
}
return null;
}
}
public enum ClientMessageType{
//客户端心跳
Ping(0x00),
//客户端认证
Auth(0x01),
//客户端收到消息ACK
Ack(0x02),
//客户端收到消息ACK
HasRead(0x03),
//客户端请求连接状态
GetChannelStatus(0x04),
ModBus(0x05),
//不可预期的错误
UnExceptedError(0x21),
ClientIdleClosed(0x22),
@ -56,7 +102,9 @@ public class MessageConstant {
//客户端收到消息ACK
Ack(0x02),
//客户端请求连接状态
ChannelStatus(0x03);
ChannelStatus(0x03),
// 未认证
UnAuth(0x04);
//撤销某个消息
//DelMessage(0x04),
//客户端请求连接状态
@ -129,7 +177,10 @@ public class MessageConstant {
//认证超时断开连接
ChannelAuthTimeOut(1303,"连接断开:认证超时"),
//不可预期错误
UnExpectedError(1304,"不可预期异常");
UnExpectedError(1304,"不可预期异常"),
QUEUE_NEW_FAIL(1305, "队列创建失败"),
QUEUE_NAME_EMPTY(1306, "不予许消息队列名称为空"),
QUEUE_NOT_EXISTED(1307, "不予许消息队列名称为空");
public int code;
public String text;
@ -180,6 +231,21 @@ public class MessageConstant {
}
}
/**
* 被调用者信息
*/
@Data
public static class InvokerMessage{
/**地址,ws不必设置,mq为Queue name, rest为url*/
private String address;
/**发送方式,get请求...,主要用于rest类型*/
private SendMethod method;
}
public enum SendMethod{
GET, POST, POST_JSON;
}
public enum Status{
//未决状态(未完成)
Pending(0),
@ -215,37 +281,4 @@ public class MessageConstant {
}
}
public enum GameClientMessageType{
//客户端心跳
Heart(0x00),
//客户端认证
Auth(0x01),
//客户端收到消息ACK
Ack(0x02),
//滑动
Count(0x03),
//状态改变
ChangeStatus(0x04)
;
public int value;
GameClientMessageType(int value){
this.value = value;
}
/**
* 从int到enum的转换函数
* @param value 枚举int值
* @return 对应的枚举找不到则返回null
*/
public static GameClientMessageType valueOf(int value) {
for(GameClientMessageType type : values()){
if(type.value == value){
return type;
}
}
return null;
}
}
}

24
util/src/main/java/com/ccsens/util/message/BaseMessageDto.java

@ -51,16 +51,16 @@ public class BaseMessageDto {
private List<MessageUser> receivers;
// private Object data;
public Set<String> receiversTransTos() {
Set<String> tos = new HashSet<>();
if (CollectionUtil.isEmpty(receivers)) {
return tos;
}
receivers.forEach(receiver -> {
InMessage.To to = new InMessage.To(receiver.getUserId());
tos.add(JSONObject.toJSONString(to));
});
return tos;
}
// public Set<String> receiversTransTos() {
// Set<String> tos = new HashSet<>();
// if (CollectionUtil.isEmpty(receivers)) {
// return tos;
// }
// receivers.forEach(receiver -> {
// InMessage.To to = new InMessage.To(receiver.getUserId());
// tos.add(JSONObject.toJSONString(to));
// });
//
// return tos;
// }
}

Loading…
Cancel
Save