21 changed files with 1219 additions and 106 deletions
@ -1,4 +1,4 @@ |
|||||
spring: |
spring: |
||||
profiles: |
profiles: |
||||
active: prod |
active: dev |
||||
include: common, util-prod |
include: common, util-dev |
||||
@ -0,0 +1,29 @@ |
|||||
|
package com.ccsens.recovery.bean.message; |
||||
|
|
||||
|
import com.ccsens.util.WebConstant; |
||||
|
import lombok.Data; |
||||
|
|
||||
|
@Data |
||||
|
public class AckMessageDto extends BaseMessageDto { |
||||
|
@lombok.Data |
||||
|
public static class Data { |
||||
|
Long msgId; |
||||
|
} |
||||
|
|
||||
|
private Data data; |
||||
|
|
||||
|
public AckMessageDto(){ |
||||
|
setType(WebConstant.Message_Type.Ack.phase); |
||||
|
setEvent(WebConstant.Message_Ack_Event.Ack.phase); |
||||
|
setTime(System.currentTimeMillis()); |
||||
|
} |
||||
|
|
||||
|
public AckMessageDto(Long projectId, Long msgId){ |
||||
|
this(); |
||||
|
setProjectId(projectId); |
||||
|
|
||||
|
Data d = new Data(); |
||||
|
d.setMsgId(msgId); |
||||
|
setData(d); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,30 @@ |
|||||
|
package com.ccsens.recovery.bean.message; |
||||
|
|
||||
|
import com.ccsens.util.WebConstant; |
||||
|
import lombok.Data; |
||||
|
|
||||
|
@Data |
||||
|
public class AuthMessageDto extends BaseMessageDto { |
||||
|
@lombok.Data |
||||
|
public static class Data{ |
||||
|
private String userId; |
||||
|
private String channelId; |
||||
|
private String token; |
||||
|
} |
||||
|
|
||||
|
private Data data; |
||||
|
|
||||
|
public AuthMessageDto(){ |
||||
|
setType(WebConstant.Message_Type.Auth.phase); |
||||
|
setEvent(WebConstant.Message_Auth_Event.Auth.phase); |
||||
|
setTime(System.currentTimeMillis()); |
||||
|
} |
||||
|
|
||||
|
public AuthMessageDto(String userId, String channelId){ |
||||
|
this(); |
||||
|
Data d = new Data(); |
||||
|
d.setUserId(userId); |
||||
|
d.setChannelId(channelId); |
||||
|
setData(d); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,29 @@ |
|||||
|
package com.ccsens.recovery.bean.message; |
||||
|
|
||||
|
import com.ccsens.util.WebConstant; |
||||
|
import lombok.Data; |
||||
|
|
||||
|
@Data |
||||
|
public class AuthMessageWithAnswerDto extends BaseMessageDto{ |
||||
|
@lombok.Data |
||||
|
public static class Data{ |
||||
|
private Boolean success; |
||||
|
private String phase; |
||||
|
} |
||||
|
|
||||
|
private Data data; |
||||
|
|
||||
|
public AuthMessageWithAnswerDto(){ |
||||
|
setType(WebConstant.Message_Type.Auth.phase); |
||||
|
setEvent(WebConstant.Message_Auth_Event.Answer.phase); |
||||
|
setTime(System.currentTimeMillis()); |
||||
|
} |
||||
|
|
||||
|
public AuthMessageWithAnswerDto(Boolean success, String phase){ |
||||
|
this(); |
||||
|
Data d = new Data(); |
||||
|
d.setSuccess(success); |
||||
|
d.setPhase(phase); |
||||
|
setData(d); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,20 @@ |
|||||
|
package com.ccsens.recovery.bean.message; |
||||
|
|
||||
|
import com.ccsens.util.WebConstant; |
||||
|
import lombok.Data; |
||||
|
|
||||
|
@Data |
||||
|
public class HeartMessageDto extends BaseMessageDto{ |
||||
|
@lombok.Data |
||||
|
public static class Data{ |
||||
|
private int major; |
||||
|
private int minor; |
||||
|
} |
||||
|
private Data data; |
||||
|
|
||||
|
public HeartMessageDto(){ |
||||
|
setType(WebConstant.Message_Type.Heart.phase); |
||||
|
setEvent(WebConstant.Message_Heart_Event.Heart.phase); |
||||
|
setTime(System.currentTimeMillis()); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,388 @@ |
|||||
|
package com.ccsens.recovery.netty; |
||||
|
|
||||
|
import cn.hutool.core.collection.CollectionUtil; |
||||
|
import io.netty.channel.Channel; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.util.*; |
||||
|
import java.util.concurrent.ConcurrentHashMap; |
||||
|
|
||||
|
/** |
||||
|
* @author wei |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
@Component |
||||
|
public class ChannelManager { |
||||
|
private static Logger logger = LoggerFactory.getLogger(ChannelManager.class); |
||||
|
private static ThreadLocal<Channel> threadLocal = new ThreadLocal<>(); |
||||
|
|
||||
|
/** |
||||
|
* UserId,WrappedChannel authed channels; |
||||
|
*/ |
||||
|
private static Map<String,Set<WrapperedChannel>> authedChannels; |
||||
|
/** |
||||
|
* Channel,WrapperedChannel |
||||
|
*/ |
||||
|
private static Map<Channel,WrapperedChannel> rawChannels; |
||||
|
|
||||
|
static { |
||||
|
authedChannels = new ConcurrentHashMap<>(); |
||||
|
rawChannels = new ConcurrentHashMap<>(); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 私有构造,不允许生成该类对象 |
||||
|
*/ |
||||
|
private ChannelManager(){} |
||||
|
|
||||
|
/** |
||||
|
* 设置当前正在错误的channel |
||||
|
* @param channel netty ws/tcp连接 |
||||
|
*/ |
||||
|
public static void setCurrentChannel(Channel channel){ |
||||
|
threadLocal.set(channel); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 获取当前线程的channel对象 |
||||
|
* @return channel |
||||
|
*/ |
||||
|
public static Channel getCurrentChannel(){ |
||||
|
return threadLocal.get(); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 删除当前线程的channel对象 |
||||
|
*/ |
||||
|
public static void removeCurrentChannel(){ |
||||
|
threadLocal.remove(); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 添加一个新的连接(Channel) |
||||
|
* @param channel 新的连接 |
||||
|
* @param serverType 服务器类型 |
||||
|
*/ |
||||
|
public static synchronized void addChannel(Channel channel,String serverType){ |
||||
|
logger.debug("Invoke addChannel({},{})",channel,serverType); |
||||
|
if(null != channel) { |
||||
|
rawChannels.put(channel, new WrapperedChannel(channel, serverType)); |
||||
|
logger.debug("Add a new channel: {},{}",channel.id().asLongText(),serverType); |
||||
|
}else{ |
||||
|
logger.error("channel is null"); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 用户认证 |
||||
|
* @param channel 连接 |
||||
|
* @param userId 用户ID |
||||
|
* @param major 客户端主版本号 |
||||
|
* @param minor 客户端次版本号 |
||||
|
* @param message 业务相关参数 |
||||
|
* @param mqType 硬件设备消息转发mq类型 |
||||
|
*/ |
||||
|
public static synchronized void authChannel(Channel channel,String userId,Integer major,Integer minor, String message, Byte mqType){ |
||||
|
logger.debug("Invoke authedChannels({},{},{},{})",channel.id().asLongText(),userId,major,minor); |
||||
|
major = major != null ? major : 0; |
||||
|
minor = minor != null ? minor : 0; |
||||
|
|
||||
|
WrapperedChannel wrapperedChannel = rawChannels.get(channel); |
||||
|
if(wrapperedChannel != null){ |
||||
|
wrapperedChannel.whenAuthed(userId,major,minor, message, mqType); |
||||
|
Set<WrapperedChannel> authedWchannelSet = authedChannels.computeIfAbsent(userId, k -> new HashSet<>()); |
||||
|
authedWchannelSet.add(wrapperedChannel); |
||||
|
logger.debug("Authed channel {} with user {}", channel.id().asLongText(), userId); |
||||
|
}else{ |
||||
|
logger.error("Authed channel,but wrappedChannel is null."); |
||||
|
} |
||||
|
} |
||||
|
/** |
||||
|
* 用户认证 |
||||
|
* @param channel 连接 |
||||
|
* @param userId 用户ID |
||||
|
* @param major 客户端主版本号 |
||||
|
* @param minor 客户端次版本号 |
||||
|
*/ |
||||
|
public static synchronized void authChannel(Channel channel,String userId,Integer major,Integer minor){ |
||||
|
major = major != null ? major : 0; |
||||
|
minor = minor != null ? minor : 0; |
||||
|
WrapperedChannel wrapperedChannel = rawChannels.get(channel); |
||||
|
if(wrapperedChannel != null){ |
||||
|
wrapperedChannel.whenAuthed(userId,major,minor); |
||||
|
Set<WrapperedChannel> authedWchannelSet = authedChannels.computeIfAbsent(userId, k -> new HashSet<>()); |
||||
|
|
||||
|
authedWchannelSet.add(wrapperedChannel); |
||||
|
}else{ |
||||
|
log.error("Authed channel,but wrappedChannel is null."); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 移除一个连接(Channel) |
||||
|
* @param channel 要移除的连接 |
||||
|
*/ |
||||
|
public static synchronized void removeChannel(Channel channel){ |
||||
|
logger.debug("Invoke removeChannel({})",channel.id().asLongText()); |
||||
|
WrapperedChannel wrapperedChannel = rawChannels.get(channel); |
||||
|
if(wrapperedChannel != null){ |
||||
|
//从rawChannels集合中删除
|
||||
|
rawChannels.remove(channel); |
||||
|
logger.debug("Remove a channel from rawChannels: {}",channel.id().asLongText()); |
||||
|
if(wrapperedChannel.isAuthed()){ |
||||
|
Set<WrapperedChannel> authedChannelSet = authedChannels.get(wrapperedChannel.getUserId()); |
||||
|
//从authedChannel集合的value(set)中删除
|
||||
|
if(CollectionUtil.isNotEmpty(authedChannelSet)){ |
||||
|
authedChannelSet.remove(wrapperedChannel); |
||||
|
logger.debug("Remove a channel from authedChannelSet: {}, {}",wrapperedChannel.getUserId(),channel.id().asLongText()); |
||||
|
} |
||||
|
//从authedChannel中删除,此处不用else,因为if中语句执行完毕之后,authedChannelSet也可能变成空集合
|
||||
|
if(CollectionUtil.isEmpty(authedChannelSet)){ |
||||
|
authedChannels.remove(wrapperedChannel.getUserId()); |
||||
|
logger.debug("Remove a user from authedChannels: {}",wrapperedChannel.getUserId()); |
||||
|
} |
||||
|
} |
||||
|
}else{ |
||||
|
logger.error("Remove channel,but wrappedChannel is null."); |
||||
|
} |
||||
|
|
||||
|
if(channel.isOpen() || channel.isActive()){ |
||||
|
channel.close(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 移除一个用户 |
||||
|
* @param userId 要移除的用户 |
||||
|
*/ |
||||
|
public static synchronized void removeUser(String userId){ |
||||
|
logger.debug("Invoke remove user : {}",userId); |
||||
|
Set<WrapperedChannel> wChannelSet = authedChannels.get(userId); |
||||
|
if(CollectionUtil.isNotEmpty(wChannelSet)){ |
||||
|
for(WrapperedChannel wChannel : wChannelSet){ |
||||
|
//从rawChannel中依次删除
|
||||
|
rawChannels.remove(wChannel.getChannel()); |
||||
|
logger.debug("Remove a channel from rawChannels: {}",wChannel.getChannel().id().asLongText()); |
||||
|
} |
||||
|
} |
||||
|
//从authedChannel中删除
|
||||
|
authedChannels.remove(userId); |
||||
|
logger.debug("Remove a user from authedChannels: {}",userId); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 添加版本号 |
||||
|
* 只能给已认证的请求添加版本号 |
||||
|
* @param channel 连接 |
||||
|
* @param major 主版本号 |
||||
|
* @param minor 次版本号 |
||||
|
*/ |
||||
|
public static synchronized void versionChannel(Channel channel,int major,int minor){ |
||||
|
logger.debug("Invoke Version channel({},{},{}))",channel.id().asLongText(),major,minor); |
||||
|
WrapperedChannel wChannel = rawChannels.get(channel); |
||||
|
if(wChannel != null){ |
||||
|
wChannel.setVersion(major,minor); |
||||
|
logger.debug("Version Channel: {},{},{}",channel.id().asLongText(),major,minor); |
||||
|
}else{ |
||||
|
logger.error("Remove channel,but wrappedChannel is null."); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送广播消息给所有已认证Channel |
||||
|
* @param message 消息 |
||||
|
*/ |
||||
|
public static synchronized void broadCastAuthed(Object message) { |
||||
|
logger.debug("Invoke broadCastAuthed({})",message); |
||||
|
for (Map.Entry<Channel,WrapperedChannel> entry : rawChannels.entrySet()) { |
||||
|
WrapperedChannel wChannel = entry.getValue(); |
||||
|
if(wChannel.isAuthed()) { |
||||
|
wChannel.writeAndFlush(message); |
||||
|
logger.debug("Send Message {} to {},{}",message,wChannel.getUserId(),wChannel.getId()); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送广播消息 |
||||
|
* @param message 消息 |
||||
|
*/ |
||||
|
public static synchronized void broadCast(Object message) { |
||||
|
logger.debug("Invoke broadCast({})",message); |
||||
|
for (Map.Entry<Channel,WrapperedChannel> entry : rawChannels.entrySet()) { |
||||
|
entry.getValue().writeAndFlush(message); |
||||
|
logger.debug("Send Message {} to {}",message,entry.getValue().getId()); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送消息给某个连接 |
||||
|
* @param channel 连接 |
||||
|
* @param message 消息 |
||||
|
*/ |
||||
|
public static synchronized void sendTo(Channel channel,Object message){ |
||||
|
log.info("Invoke sendTo({},{})",channel.id().asLongText(),message); |
||||
|
WrapperedChannel wrapperedChannel = rawChannels.get(channel); |
||||
|
if(wrapperedChannel != null) { |
||||
|
wrapperedChannel.writeAndFlush(message); |
||||
|
log.info("Write message {} to Channel {}",message,channel); |
||||
|
}else{ |
||||
|
log.info("can't find channel from rawChannels"); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 发送消息给某个用户 |
||||
|
* @param userId 用户ID |
||||
|
* @param message 消息 |
||||
|
*/ |
||||
|
public static synchronized boolean sendTo(String userId,Object message){ |
||||
|
logger.info("Invoke sendTo({},{})",userId,message); |
||||
|
Set<WrapperedChannel> wChannelSet = authedChannels.get(userId); |
||||
|
if(CollectionUtil.isNotEmpty(wChannelSet)){ |
||||
|
for(WrapperedChannel wChannel:wChannelSet){ |
||||
|
wChannel.writeAndFlush(message); |
||||
|
logger.info("Send message {} to channel {}",message,userId); |
||||
|
} |
||||
|
return true; |
||||
|
}else{ |
||||
|
return false; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 刷新最后一次接收数据时间 每次接收数据需要调用该函数 |
||||
|
* @param channel 接收数据的连接 |
||||
|
*/ |
||||
|
public static synchronized void flushReceiveTimestamp(Channel channel){ |
||||
|
logger.debug("Invoke flushReceiveTimestamp({})",channel.id().asLongText()); |
||||
|
WrapperedChannel wrapperedChannel = rawChannels.get(channel); |
||||
|
if(wrapperedChannel != null){ |
||||
|
wrapperedChannel.whenReceivedData(); |
||||
|
}else{ |
||||
|
logger.error("can find channel from rawChannels"); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 关闭所有未认证的连接 |
||||
|
* @param unAuthedChannelsMaxAliveTimeInSeconds 未认证的最大时长(s) |
||||
|
* @throws Exception |
||||
|
* @deprecated 已过期,该方法在多线程并发下可能出现问题,建议使用messageService中的同名方法 |
||||
|
*/ |
||||
|
@Deprecated |
||||
|
public static synchronized void closeUnAuthedChannels(Long unAuthedChannelsMaxAliveTimeInSeconds) throws Exception { |
||||
|
logger.debug("Inovke closeUnAuthedChannels({})",unAuthedChannelsMaxAliveTimeInSeconds); |
||||
|
Iterator<Map.Entry<Channel,WrapperedChannel>> it = rawChannels.entrySet().iterator(); |
||||
|
while(it.hasNext()){ |
||||
|
Map.Entry<Channel,WrapperedChannel> entry = it.next(); |
||||
|
WrapperedChannel wrapperedChannel = entry.getValue(); |
||||
|
if((!wrapperedChannel.isAuthed()) && (wrapperedChannel.getConnectedSeconds() > unAuthedChannelsMaxAliveTimeInSeconds)) { |
||||
|
it.remove(); |
||||
|
//关闭连接
|
||||
|
wrapperedChannel.getChannel().close(); |
||||
|
logger.debug("Remove a unAuthed Channel {}, which has connected {}s,maxOutTime is {}s", wrapperedChannel.getId(),wrapperedChannel.getConnectedSeconds() , |
||||
|
unAuthedChannelsMaxAliveTimeInSeconds ); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 判断Channel是否存在 |
||||
|
* @param channel 连接 |
||||
|
* @return 是否存在 |
||||
|
*/ |
||||
|
public static synchronized boolean channelExist(Channel channel){ |
||||
|
return rawChannels.containsKey(channel); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 判断Channel是否已经认证 |
||||
|
* @param channel 连接 |
||||
|
* @return 是否认证 |
||||
|
*/ |
||||
|
public static synchronized boolean channelAuthed(Channel channel){ |
||||
|
return rawChannels.containsKey(channel) && rawChannels.get(channel).isAuthed(); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 判断用户是否在线 |
||||
|
* @param userId 用户ID |
||||
|
* @return 是否认证 |
||||
|
*/ |
||||
|
public static synchronized boolean isUserOnline(String userId){ |
||||
|
return authedChannels.containsKey(userId); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 根据Channel获取对应的用户ID |
||||
|
* @param channel 连接 |
||||
|
* @return 用户ID |
||||
|
*/ |
||||
|
public static synchronized String getUserIdByChannel(Channel channel) { |
||||
|
return rawChannels.containsKey(channel) ? rawChannels.get(channel).getUserId() : null; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 获取所有在线用户 |
||||
|
* @return 所有用户的集合列表 |
||||
|
*/ |
||||
|
public static synchronized Set<String> getOnlineUsers(){ |
||||
|
return authedChannels.keySet(); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 获取某种类型的所有在线用户的连接 |
||||
|
* @param type 客户端类型(ws/tcp modebus/tcp text) |
||||
|
* @return 所有在西安channel的集合列表 |
||||
|
*/ |
||||
|
public static synchronized Set<Channel> getOnlineChannels(String type){ |
||||
|
Set<Channel> onLineChannels = CollectionUtil.newHashSet(); |
||||
|
for(Map.Entry<Channel,WrapperedChannel> entry: rawChannels.entrySet()){ |
||||
|
WrapperedChannel wrapperedChannel = entry.getValue(); |
||||
|
if(wrapperedChannel.isAuthed() && wrapperedChannel.getType().equals(type)){ |
||||
|
onLineChannels.add(entry.getKey()); |
||||
|
} |
||||
|
} |
||||
|
return onLineChannels; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 获取一个rawChannel的副本 |
||||
|
* @return 副本 |
||||
|
*/ |
||||
|
public static Map<Channel, WrapperedChannel> getCopyOfAllChannels() { |
||||
|
Map<Channel,WrapperedChannel> copyMap = new HashMap<>(rawChannels.size()); |
||||
|
copyMap.putAll(rawChannels); |
||||
|
return copyMap; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 根据Channel获取WrapperedChannel |
||||
|
* @param channel channel |
||||
|
* @return 对应的wrapperedChannel |
||||
|
*/ |
||||
|
public static WrapperedChannel getWrapperedChannelByChannel(Channel channel) { |
||||
|
return rawChannels.get(channel); |
||||
|
} |
||||
|
|
||||
|
public static synchronized void showAuthedChannels(){ |
||||
|
for(Map.Entry<String,Set<WrapperedChannel>> entry : authedChannels.entrySet()){ |
||||
|
for (WrapperedChannel channel:entry.getValue()){ |
||||
|
logger.debug("{}-->{}",entry.getKey(),channel.toString()); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
public static synchronized void showAllChannels(){ |
||||
|
for(Map.Entry<Channel,WrapperedChannel> entry : rawChannels.entrySet()){ |
||||
|
logger.debug(entry.getValue().toString()); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public static Set<String> getAllOnlineUsers() { |
||||
|
return authedChannels.keySet(); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,239 @@ |
|||||
|
package com.ccsens.recovery.netty; |
||||
|
|
||||
|
import cn.hutool.core.util.ObjectUtil; |
||||
|
import cn.hutool.core.util.StrUtil; |
||||
|
import com.ccsens.util.DateUtil; |
||||
|
import com.ccsens.util.JacksonUtil; |
||||
|
import io.netty.channel.Channel; |
||||
|
import lombok.Getter; |
||||
|
import lombok.Setter; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
|
||||
|
import java.io.IOException; |
||||
|
import java.net.InetSocketAddress; |
||||
|
import java.util.HashMap; |
||||
|
import java.util.Map; |
||||
|
|
||||
|
/** |
||||
|
* @author wei |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
@Getter |
||||
|
@Setter |
||||
|
public class WrapperedChannel { |
||||
|
@Getter |
||||
|
@Setter |
||||
|
private static class ClientVersion{ |
||||
|
int major; |
||||
|
int minor; |
||||
|
public ClientVersion(int major,int minor){ |
||||
|
this.major = major; |
||||
|
this.minor = minor; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Netty's Channel |
||||
|
*/ |
||||
|
private Channel channel; |
||||
|
/** |
||||
|
* userId |
||||
|
*/ |
||||
|
private String userId; |
||||
|
/** |
||||
|
* 客户端版本号 |
||||
|
*/ |
||||
|
private ClientVersion version; |
||||
|
|
||||
|
/** |
||||
|
* 客户端类型(哪种服务器) |
||||
|
*/ |
||||
|
private String type; |
||||
|
/** |
||||
|
* 是否认证 |
||||
|
*/ |
||||
|
private boolean authed; |
||||
|
/** |
||||
|
* 连接建立时间 |
||||
|
*/ |
||||
|
private Long createdAtInSeconds; |
||||
|
/** |
||||
|
* 认证时间(s) |
||||
|
*/ |
||||
|
private Long authedAtInSeconds; |
||||
|
/** |
||||
|
* 最后一次接收有效数据时间(包括心跳包) |
||||
|
*/ |
||||
|
private Long lastDataReceivedInSeconds; |
||||
|
/** |
||||
|
* 最后一次发送有效数据时间(包括心跳包) |
||||
|
*/ |
||||
|
private Long lastDataSendInSeconds; |
||||
|
/** |
||||
|
* 接收到数据条数(每接收到一条协议加1,包括心跳) |
||||
|
*/ |
||||
|
private long dataReceivedCount; |
||||
|
/** |
||||
|
* 发送数据条数(每发送到一条协议加1,包括心跳) |
||||
|
*/ |
||||
|
private long dataSendCount; |
||||
|
|
||||
|
/** |
||||
|
* mqType,仅用于硬件设备认证时指定消息转发给哪个mq |
||||
|
*/ |
||||
|
private Byte mqType; |
||||
|
/**业务相关参数*/ |
||||
|
private String message; |
||||
|
/**寄存器 硬件系统*/ |
||||
|
private Map<String, Long> register = new HashMap<>(); |
||||
|
|
||||
|
public WrapperedChannel(Channel channel){ |
||||
|
this.channel = channel; |
||||
|
this.createdAtInSeconds = DateUtil.currentSeconds(); |
||||
|
} |
||||
|
|
||||
|
public WrapperedChannel(Channel channel,String type){ |
||||
|
this(channel); |
||||
|
this.type = type; |
||||
|
} |
||||
|
|
||||
|
public WrapperedChannel(Channel channel,String type,int major,int minor){ |
||||
|
this(channel,type); |
||||
|
this.version = new ClientVersion(major,minor); |
||||
|
} |
||||
|
|
||||
|
public String getVersion(){ |
||||
|
if(ObjectUtil.isNotNull(version)){ |
||||
|
return "v" + version.getMajor() + "." + version.getMinor(); |
||||
|
} |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
public void setVersion(int major,int minor){ |
||||
|
if (ObjectUtil.isNull(version)) { |
||||
|
version = new ClientVersion(major, minor); |
||||
|
} else { |
||||
|
version.setMajor(major); |
||||
|
version.setMinor(minor); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public String getRemoteAddr(){ |
||||
|
if(ObjectUtil.isNotNull(channel)){ |
||||
|
InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress(); |
||||
|
if(ObjectUtil.isNotNull(insocket)) { |
||||
|
return insocket.getAddress().getHostAddress(); |
||||
|
} |
||||
|
} |
||||
|
return null; |
||||
|
} |
||||
|
public Integer getRemotePort(){ |
||||
|
if(ObjectUtil.isNotNull(channel)){ |
||||
|
InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress(); |
||||
|
if(ObjectUtil.isNotNull(insocket)) { |
||||
|
return insocket.getPort(); |
||||
|
} |
||||
|
} |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
public String getId(){ |
||||
|
if(ObjectUtil.isNotNull(channel)){ |
||||
|
return channel.id().asLongText(); |
||||
|
} |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
public void whenReceivedData(){ |
||||
|
lastDataReceivedInSeconds = DateUtil.currentSeconds(); |
||||
|
dataReceivedCount++; |
||||
|
} |
||||
|
|
||||
|
public void whenSendData(){ |
||||
|
lastDataSendInSeconds = DateUtil.currentSeconds(); |
||||
|
dataSendCount++; |
||||
|
} |
||||
|
|
||||
|
public void whenAuthed(){ |
||||
|
authed = true; |
||||
|
authedAtInSeconds = DateUtil.currentSeconds(); |
||||
|
} |
||||
|
public void whenAuthed(String userId,int major,int minor){ |
||||
|
whenAuthed(); |
||||
|
setVersion(major,minor); |
||||
|
this.userId = userId; |
||||
|
} |
||||
|
public void whenAuthed(String userId,int major,int minor, String message, Byte mqType){ |
||||
|
whenAuthed(); |
||||
|
setVersion(major,minor); |
||||
|
this.userId = userId; |
||||
|
this.message = message; |
||||
|
this.mqType = mqType; |
||||
|
if (StrUtil.isNotEmpty(message)) { |
||||
|
try { |
||||
|
Map<String, Object> map = JacksonUtil.jsonToMap(message); |
||||
|
for (String key: map.keySet()) { |
||||
|
Object o = map.get(key); |
||||
|
if (o instanceof Integer) { |
||||
|
register.put(key, (long)(Integer) map.get(key)); |
||||
|
} else { |
||||
|
register.put(key, (long) map.get(key)); |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
} catch (IOException e) { |
||||
|
log.error("message转map异常", e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
public void writeAndFlush(Object message){ |
||||
|
if(channel != null && channel.isActive()){ |
||||
|
channel.writeAndFlush(message); |
||||
|
whenSendData(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public Long getConnectedSeconds(){ |
||||
|
return createdAtInSeconds == null ? null : DateUtil.currentSeconds() - createdAtInSeconds; |
||||
|
} |
||||
|
|
||||
|
public Long getOnlineSeconds(){ |
||||
|
return authedAtInSeconds == null ? null : DateUtil.currentSeconds() - authedAtInSeconds ; |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public boolean equals(Object obj) { |
||||
|
if(ObjectUtil.isNull(obj)) { |
||||
|
return false; |
||||
|
} |
||||
|
if(this == obj) { |
||||
|
return true; |
||||
|
} |
||||
|
if(ObjectUtil.isNull(channel)) { |
||||
|
return false; |
||||
|
} |
||||
|
if(obj.getClass() == this.getClass()){ |
||||
|
WrapperedChannel other = (WrapperedChannel)obj; |
||||
|
return channel.equals(other.channel); |
||||
|
}else if(obj.getClass() == channel.getClass()){ |
||||
|
Channel other = (Channel)obj; |
||||
|
return channel.equals(other); |
||||
|
} |
||||
|
return false; |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public int hashCode() { |
||||
|
if(ObjectUtil.isNotNull(channel)){ |
||||
|
return channel.hashCode(); |
||||
|
} |
||||
|
return 0; |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public String toString() { |
||||
|
return String.format("id: %s, type: %s, authed: %b, userId: %s, version: %s",getId(),type,authed,userId,getVersion()); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,76 @@ |
|||||
|
package com.ccsens.recovery.netty.wsserver; |
||||
|
|
||||
|
import io.netty.bootstrap.ServerBootstrap; |
||||
|
import io.netty.channel.*; |
||||
|
import io.netty.channel.nio.NioEventLoopGroup; |
||||
|
import io.netty.channel.socket.SocketChannel; |
||||
|
import io.netty.channel.socket.nio.NioServerSocketChannel; |
||||
|
import io.netty.handler.codec.http.HttpObjectAggregator; |
||||
|
import io.netty.handler.codec.http.HttpServerCodec; |
||||
|
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; |
||||
|
import io.netty.handler.logging.LogLevel; |
||||
|
import io.netty.handler.logging.LoggingHandler; |
||||
|
import io.netty.handler.stream.ChunkedWriteHandler; |
||||
|
import io.netty.handler.timeout.IdleStateHandler; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.scheduling.annotation.Async; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.util.concurrent.TimeUnit; |
||||
|
|
||||
|
/** |
||||
|
* @author wei |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
@Component |
||||
|
public class NettyWsServer { |
||||
|
private static final String URI = "/recovery/ws"; |
||||
|
private static final short PORT = 7191; |
||||
|
|
||||
|
@Autowired |
||||
|
private WebSocketHandler webSocketHandler; |
||||
|
|
||||
|
@Async |
||||
|
public void start() { |
||||
|
// Configure the server.
|
||||
|
EventLoopGroup bossGroup = new NioEventLoopGroup(1); |
||||
|
EventLoopGroup workerGroup = new NioEventLoopGroup(); |
||||
|
try { |
||||
|
ServerBootstrap b = new ServerBootstrap(); |
||||
|
b.group(bossGroup, workerGroup) |
||||
|
.channel(NioServerSocketChannel.class) |
||||
|
.option(ChannelOption.SO_BACKLOG, 100) |
||||
|
.handler(new LoggingHandler(LogLevel.INFO)) |
||||
|
.childHandler(new ChannelInitializer<SocketChannel>() { |
||||
|
@Override |
||||
|
public void initChannel(SocketChannel ch) throws Exception { |
||||
|
ChannelPipeline p = ch.pipeline(); |
||||
|
p.addLast(new IdleStateHandler(40, |
||||
|
0, 0, TimeUnit.SECONDS)); |
||||
|
p.addLast(new HttpServerCodec()); |
||||
|
p.addLast(new HttpObjectAggregator(64 * 1024)); |
||||
|
p.addLast(new ChunkedWriteHandler()); |
||||
|
p.addLast(new WebSocketServerProtocolHandler(URI)); |
||||
|
p.addLast(new WebSocketDecoder()); |
||||
|
p.addLast(new WebSocketEncoder()); |
||||
|
p.addLast(webSocketHandler); |
||||
|
} |
||||
|
}); |
||||
|
// Start the server.
|
||||
|
ChannelFuture f = b.bind(PORT).sync(); |
||||
|
log.info("WS端口号:{}",PORT); |
||||
|
// Wait until the server socket is closed.
|
||||
|
f.channel().closeFuture().sync(); |
||||
|
log.info("WS端口号:{}",PORT); |
||||
|
} catch(Exception e){ |
||||
|
log.error("ss",e); |
||||
|
e.printStackTrace(); |
||||
|
}finally { |
||||
|
// Shut down all event loops to terminate all threads.
|
||||
|
log.info("WS端口号:{}",PORT); |
||||
|
bossGroup.shutdownGracefully(); |
||||
|
workerGroup.shutdownGracefully(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,33 @@ |
|||||
|
package com.ccsens.recovery.netty.wsserver; |
||||
|
|
||||
|
import com.ccsens.util.JacksonUtil; |
||||
|
import com.ccsens.util.bean.message.common.InMessage; |
||||
|
import io.netty.channel.ChannelHandlerContext; |
||||
|
import io.netty.handler.codec.MessageToMessageDecoder; |
||||
|
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
|
||||
|
import java.io.IOException; |
||||
|
import java.util.List; |
||||
|
|
||||
|
/** |
||||
|
* @author wei |
||||
|
*/ |
||||
|
public class WebSocketDecoder extends MessageToMessageDecoder<TextWebSocketFrame> { |
||||
|
private static Logger logger = LoggerFactory.getLogger(WebSocketDecoder.class); |
||||
|
|
||||
|
@Override |
||||
|
protected void decode(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg, List<Object> out) throws Exception { |
||||
|
String text = msg.text(); |
||||
|
logger.info("Websocket received: {}",text); |
||||
|
|
||||
|
try { |
||||
|
out.add(JacksonUtil.jsonToBean(text, InMessage.class)); |
||||
|
}catch(IOException e){ |
||||
|
e.printStackTrace(); |
||||
|
logger.error("Websocket Read Error: {}",text); |
||||
|
throw e; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,25 @@ |
|||||
|
package com.ccsens.recovery.netty.wsserver; |
||||
|
|
||||
|
import com.ccsens.util.JacksonUtil; |
||||
|
import com.ccsens.util.bean.message.common.OutMessageSet; |
||||
|
import io.netty.buffer.ByteBuf; |
||||
|
import io.netty.channel.ChannelHandlerContext; |
||||
|
import io.netty.handler.codec.MessageToByteEncoder; |
||||
|
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
|
||||
|
/** |
||||
|
* @author wei |
||||
|
*/ |
||||
|
public class WebSocketEncoder extends MessageToByteEncoder<OutMessageSet> { |
||||
|
private static Logger logger = LoggerFactory.getLogger(WebSocketEncoder.class); |
||||
|
|
||||
|
@Override |
||||
|
protected void encode(ChannelHandlerContext ctx, OutMessageSet outMessageSet, ByteBuf out) throws Exception { |
||||
|
String msg = JacksonUtil.beanToJson(outMessageSet); |
||||
|
ctx.writeAndFlush(new TextWebSocketFrame(msg)); |
||||
|
|
||||
|
logger.info("Websocket send: {}",msg); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,151 @@ |
|||||
|
package com.ccsens.recovery.netty.wsserver; |
||||
|
|
||||
|
|
||||
|
import cn.hutool.core.util.StrUtil; |
||||
|
import com.ccsens.recovery.bean.message.AckMessageDto; |
||||
|
import com.ccsens.recovery.bean.message.AuthMessageDto; |
||||
|
import com.ccsens.recovery.bean.message.BaseMessageDto; |
||||
|
import com.ccsens.recovery.netty.ChannelManager; |
||||
|
import com.ccsens.recovery.service.IMessageService; |
||||
|
import com.ccsens.util.JacksonUtil; |
||||
|
import com.ccsens.util.WebConstant; |
||||
|
import com.ccsens.util.bean.message.client.PingMessage; |
||||
|
import com.ccsens.util.bean.message.common.InMessage; |
||||
|
import com.ccsens.util.bean.message.common.MessageConstant; |
||||
|
import com.ccsens.util.bean.message.common.OutMessage; |
||||
|
import com.ccsens.util.bean.message.common.OutMessageSet; |
||||
|
import com.ccsens.util.bean.message.server.PongMessage; |
||||
|
import io.netty.channel.ChannelHandler; |
||||
|
import io.netty.channel.ChannelHandlerContext; |
||||
|
import io.netty.channel.SimpleChannelInboundHandler; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import javax.annotation.Resource; |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* @author wei |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
@ChannelHandler.Sharable |
||||
|
@Component |
||||
|
public class WebSocketHandler extends SimpleChannelInboundHandler<InMessage> { |
||||
|
private static final String TYPE = "netty_ws"; |
||||
|
|
||||
|
@Resource |
||||
|
private IMessageService messageService; |
||||
|
|
||||
|
@Override |
||||
|
public void channelActive(ChannelHandlerContext ctx) throws Exception { |
||||
|
ChannelManager.addChannel(ctx.channel(), TYPE); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
||||
|
ChannelManager.removeChannel(ctx.channel()); |
||||
|
} |
||||
|
|
||||
|
// @Override
|
||||
|
// public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
|
// try {
|
||||
|
// ChannelManager.setCurrentChannel(ctx.channel());
|
||||
|
// MessageHandler.handleMessage(
|
||||
|
// InMessage.newToServerMessage(MessageConstant.DomainType.User,new UnExceptedErrorMessage(cause.getMessage())));
|
||||
|
// }catch(Exception e){
|
||||
|
// e.printStackTrace();
|
||||
|
// log.error("Ws exceptionCaught handler error: {}",e.getMessage());
|
||||
|
// }finally {
|
||||
|
// ChannelManager.removeCurrentChannel();
|
||||
|
// }
|
||||
|
// cause.printStackTrace();
|
||||
|
// ctx.close();
|
||||
|
// log.error("Channel closed. Ws get a exception: {}", cause.getMessage());
|
||||
|
// }
|
||||
|
//
|
||||
|
// @Override
|
||||
|
// public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
|
// if (evt instanceof IdleStateEvent) {
|
||||
|
// IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
|
||||
|
// if (idleStateEvent.state() == IdleState.READER_IDLE) {
|
||||
|
// try {
|
||||
|
// ChannelManager.setCurrentChannel(ctx.channel());
|
||||
|
// MessageHandler.handleMessage(
|
||||
|
// InMessage.newToServerMessage(MessageConstant.DomainType.User,new ClientIdleClosedMessage()));
|
||||
|
// }catch(Exception e){
|
||||
|
// e.printStackTrace();
|
||||
|
// log.error("Ws exceptionCaught handler error: {}",e.getMessage());
|
||||
|
// }finally {
|
||||
|
// ChannelManager.removeCurrentChannel();
|
||||
|
// }
|
||||
|
// ctx.channel().close();
|
||||
|
// log.error("Ws channel idle,closed.");
|
||||
|
// }
|
||||
|
// } else {
|
||||
|
// super.userEventTriggered(ctx, evt);
|
||||
|
// }
|
||||
|
// }
|
||||
|
|
||||
|
@Override |
||||
|
protected void channelRead0(ChannelHandlerContext ctx, InMessage msg) throws Exception { |
||||
|
log.info("接受到消息的时间+++++++++++++++++++++++{}:{}", msg, System.currentTimeMillis()); |
||||
|
if (StrUtil.isBlank(msg.getData())) { |
||||
|
return; |
||||
|
} |
||||
|
try { |
||||
|
OutMessage outMessage = null; |
||||
|
BaseMessageDto baseMessage = JacksonUtil.jsonToBean(msg.getData(), BaseMessageDto.class); |
||||
|
MessageConstant.ClientMessageType gameClientMessageType = MessageConstant.ClientMessageType.valueOf(baseMessage.getType()); |
||||
|
switch (gameClientMessageType) { |
||||
|
case Ping: { |
||||
|
|
||||
|
PingMessage inSysData = JacksonUtil.jsonToBean(msg.getData(), PingMessage.class); |
||||
|
if (null != inSysData.getData()) { |
||||
|
ChannelManager.versionChannel(ChannelManager.getCurrentChannel(), inSysData.getData().getMajor(), inSysData.getData().getMinor()); |
||||
|
} |
||||
|
outMessage = new OutMessage(JacksonUtil.beanToJson(new PongMessage())); |
||||
|
|
||||
|
break; |
||||
|
} |
||||
|
case Ack: { |
||||
|
AckMessageDto message = JacksonUtil.jsonToBean(msg.getData(), AckMessageDto.class); |
||||
|
String userId = ChannelManager.getUserIdByChannel(ctx.channel()); |
||||
|
messageService.doAckMessageWithAck(userId, message); |
||||
|
break; |
||||
|
} |
||||
|
case Auth: |
||||
|
log.info("认证:-------------{}", baseMessage); |
||||
|
AuthMessageDto theMessage = JacksonUtil.jsonToBean(msg.getData(), AuthMessageDto.class); |
||||
|
theMessage.getData().setChannelId(ctx.channel().id().asLongText()); |
||||
|
outMessage = doAuthMessage(ctx, theMessage); |
||||
|
break; |
||||
|
default: |
||||
|
break; |
||||
|
} |
||||
|
//2.结果应答
|
||||
|
if (null != outMessage) { |
||||
|
ChannelManager.sendTo(ctx.channel(), OutMessageSet.newInstance().ackId(null).add(outMessage)); |
||||
|
} |
||||
|
} catch (Exception e) { |
||||
|
e.printStackTrace(); |
||||
|
log.error("Websocket Process Message Failed: {}", e.getMessage()); |
||||
|
throw e; |
||||
|
} finally { |
||||
|
ChannelManager.removeCurrentChannel(); |
||||
|
|
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
private OutMessage doAuthMessage(ChannelHandlerContext ctx, AuthMessageDto message) throws Exception { |
||||
|
WebConstant.Message_Auth_Event event = WebConstant.Message_Auth_Event.phaseOf(message.getEvent()); |
||||
|
switch (event) { |
||||
|
case Auth: |
||||
|
return messageService.doAuthMessageWithAuth(ctx, message); |
||||
|
default: |
||||
|
break; |
||||
|
} |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
} |
||||
@ -0,0 +1,19 @@ |
|||||
|
package com.ccsens.recovery.service; |
||||
|
|
||||
|
import com.ccsens.recovery.bean.message.AckMessageDto; |
||||
|
import com.ccsens.recovery.bean.message.AuthMessageDto; |
||||
|
import com.ccsens.util.bean.message.common.OutMessage; |
||||
|
import com.fasterxml.jackson.core.JsonProcessingException; |
||||
|
import io.netty.channel.ChannelHandlerContext; |
||||
|
|
||||
|
import java.util.List; |
||||
|
import java.util.Set; |
||||
|
|
||||
|
public interface IMessageService { |
||||
|
|
||||
|
void doAckMessageWithAck(String userId, AckMessageDto message); |
||||
|
|
||||
|
OutMessage doAuthMessageWithAuth(ChannelHandlerContext ctx, AuthMessageDto message) throws JsonProcessingException, Exception; |
||||
|
|
||||
|
} |
||||
|
|
||||
@ -0,0 +1,60 @@ |
|||||
|
package com.ccsens.recovery.service; |
||||
|
|
||||
|
import com.ccsens.cloudutil.feign.TallFeignClient; |
||||
|
import com.ccsens.recovery.bean.message.AckMessageDto; |
||||
|
import com.ccsens.recovery.bean.message.AuthMessageDto; |
||||
|
import com.ccsens.recovery.bean.message.AuthMessageWithAnswerDto; |
||||
|
import com.ccsens.recovery.netty.ChannelManager; |
||||
|
import com.ccsens.util.JacksonUtil; |
||||
|
import com.ccsens.util.bean.message.common.MessageConstant; |
||||
|
import com.ccsens.util.bean.message.common.OutMessage; |
||||
|
import com.ccsens.util.bean.message.server.ChannelStatusMessage; |
||||
|
import com.fasterxml.jackson.core.JsonProcessingException; |
||||
|
import io.netty.channel.ChannelHandlerContext; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.amqp.core.AmqpTemplate; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.stereotype.Service; |
||||
|
|
||||
|
|
||||
|
import static com.ccsens.recovery.netty.ChannelManager.sendTo; |
||||
|
|
||||
|
@Slf4j |
||||
|
@Service |
||||
|
public class MessageService implements IMessageService { |
||||
|
@Autowired |
||||
|
private AmqpTemplate rabbitTemplate; |
||||
|
@Autowired |
||||
|
private TallFeignClient tallFeignClient; |
||||
|
|
||||
|
|
||||
|
@Override |
||||
|
public void doAckMessageWithAck(String userId, AckMessageDto message) { |
||||
|
// System.out.println(message);
|
||||
|
// //1.存储到MongoDB(ack消息不需要存储,修改响应消息的发送成功标志即可)
|
||||
|
//// Long userId = message.getSender().getUserId();
|
||||
|
// Long msgId = message.getData().getMsgId();
|
||||
|
// messageLogDao.updateMessageLogSendSuccess(userId,msgId,1);
|
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public OutMessage doAuthMessageWithAuth(ChannelHandlerContext ctx, AuthMessageDto message) throws Exception { |
||||
|
log.info("{}",message); |
||||
|
//ws连接认证
|
||||
|
String userId = message.getData().getUserId(); |
||||
|
if (userId == null) { |
||||
|
String token = message.getData().getToken(); |
||||
|
userId = tallFeignClient.getUserId(token); |
||||
|
} |
||||
|
ChannelManager.authChannel(ctx.channel(), userId, 1, 1); |
||||
|
AuthMessageWithAnswerDto messageWithAnswerDto = new AuthMessageWithAnswerDto(true, "ok"); |
||||
|
OutMessage outMessage = new OutMessage(JacksonUtil.beanToJson( |
||||
|
new ChannelStatusMessage(true,0L, MessageConstant.Error.Ok)) |
||||
|
); |
||||
|
// sendTo(ctx.channel(), messageWithAnswerDto);
|
||||
|
return outMessage; |
||||
|
|
||||
|
} |
||||
|
|
||||
|
|
||||
|
} |
||||
@ -1,6 +1,6 @@ |
|||||
spring: |
spring: |
||||
profiles: |
profiles: |
||||
active: prod |
active: dev |
||||
include: util-prod,common |
include: util-dev,common |
||||
|
|
||||
|
|
||||
|
|||||
Loading…
Reference in new issue