48 changed files with 1738 additions and 40 deletions
@ -0,0 +1,32 @@ |
|||
package com.ccsens.game.bean.dto.message; |
|||
|
|||
import com.ccsens.util.WebConstant; |
|||
import lombok.Data; |
|||
import lombok.Getter; |
|||
import lombok.Setter; |
|||
|
|||
@Data |
|||
public class AckMessageDto extends BaseMessageDto { |
|||
@Setter |
|||
@Getter |
|||
public static class Data { |
|||
Long msgId; |
|||
} |
|||
|
|||
private Data data; |
|||
|
|||
public AckMessageDto(){ |
|||
setType(WebConstant.Message_Type.Ack.phase); |
|||
setEvent(WebConstant.Message_Ack_Event.Ack.phase); |
|||
setTime(System.currentTimeMillis()); |
|||
} |
|||
|
|||
public AckMessageDto(Long projectId, Long msgId){ |
|||
this(); |
|||
setProjectId(projectId); |
|||
|
|||
Data d = new Data(); |
|||
d.setMsgId(msgId); |
|||
setData(d); |
|||
} |
|||
} |
@ -0,0 +1,34 @@ |
|||
package com.ccsens.game.bean.dto.message; |
|||
|
|||
import com.ccsens.util.WebConstant; |
|||
import lombok.Data; |
|||
import lombok.Getter; |
|||
import lombok.Setter; |
|||
|
|||
@Data |
|||
public class AuthMessageDto extends BaseMessageDto { |
|||
@Setter |
|||
@Getter |
|||
public static class Data{ |
|||
private Long id; |
|||
private String userId; |
|||
private String channelId; |
|||
private String token; |
|||
} |
|||
|
|||
private Data data; |
|||
|
|||
public AuthMessageDto(){ |
|||
setType(WebConstant.Message_Type.Auth.phase); |
|||
setEvent(WebConstant.Message_Auth_Event.Auth.phase); |
|||
setTime(System.currentTimeMillis()); |
|||
} |
|||
|
|||
public AuthMessageDto(String userId, String channelId){ |
|||
this(); |
|||
Data d = new Data(); |
|||
d.setUserId(userId); |
|||
d.setChannelId(channelId); |
|||
setData(d); |
|||
} |
|||
} |
@ -0,0 +1,35 @@ |
|||
package com.ccsens.game.bean.dto.message; |
|||
|
|||
import lombok.Data; |
|||
|
|||
import java.util.List; |
|||
|
|||
@Data |
|||
public class BaseMessageDto { |
|||
@Data |
|||
public static class MessageUser { |
|||
private Long id; |
|||
private Long userId; //本质上是authId //20190507 本质上是userId
|
|||
private String nickname; |
|||
private String avatarUrl; |
|||
private boolean hasRead; |
|||
public MessageUser(){ |
|||
hasRead = false; |
|||
} |
|||
public MessageUser(Long id,Long userId,String nickname,String avatarUrl){ |
|||
this(); |
|||
this.id = id; |
|||
this.userId = userId; |
|||
this.nickname = nickname; |
|||
this.avatarUrl = avatarUrl; |
|||
} |
|||
} |
|||
|
|||
private Long time; |
|||
private String type; |
|||
private String event; |
|||
private Long projectId; |
|||
private MessageUser sender; |
|||
private List<MessageUser> receivers; |
|||
// private Object data;
|
|||
} |
@ -0,0 +1,32 @@ |
|||
package com.ccsens.game.bean.dto.message; |
|||
|
|||
import com.ccsens.util.WebConstant; |
|||
import lombok.Data; |
|||
import lombok.Getter; |
|||
import lombok.Setter; |
|||
|
|||
/** |
|||
* @author __zHangSan |
|||
*/ |
|||
@Data |
|||
public class ChromeMessageDto extends BaseMessageDto{ |
|||
@Setter |
|||
@Getter |
|||
public static class Data{ |
|||
private String url; |
|||
} |
|||
private Data data; |
|||
|
|||
public ChromeMessageDto(){ |
|||
setType(WebConstant.Message_Type.Chrome.phase); |
|||
setTime(System.currentTimeMillis()); |
|||
} |
|||
|
|||
public ChromeMessageDto(String url){ |
|||
this(); |
|||
if(data == null){ |
|||
data = new Data(); |
|||
} |
|||
data.setUrl(url); |
|||
} |
|||
} |
@ -0,0 +1,5 @@ |
|||
package com.ccsens.game.bean.dto.message; |
|||
|
|||
public class GameMessageCountIn extends BaseMessageDto{ |
|||
|
|||
} |
@ -0,0 +1,31 @@ |
|||
package com.ccsens.game.bean.dto.message; |
|||
|
|||
import com.ccsens.util.WebConstant; |
|||
import lombok.Data; |
|||
import lombok.Getter; |
|||
import lombok.Setter; |
|||
@Data |
|||
public class GameMessageCountOut extends BaseMessageDto { |
|||
@Setter |
|||
@Getter |
|||
public static class Data{ |
|||
private int totalTimes; |
|||
private int totalScore; |
|||
} |
|||
|
|||
private Data data; |
|||
|
|||
public GameMessageCountOut(){ |
|||
setType(WebConstant.Message_Type.Game.phase); |
|||
setEvent(WebConstant.Message_Url_Event.Url.phase); |
|||
setTime(System.currentTimeMillis()); |
|||
} |
|||
|
|||
public GameMessageCountOut(int totalTimes, int totalScore){ |
|||
this(); |
|||
Data d = new Data(); |
|||
d.setTotalTimes(totalTimes); |
|||
d.setTotalScore(totalScore); |
|||
setData(d); |
|||
} |
|||
} |
@ -0,0 +1,32 @@ |
|||
package com.ccsens.game.bean.dto.message; |
|||
|
|||
import com.ccsens.util.WebConstant; |
|||
import lombok.Data; |
|||
import lombok.Getter; |
|||
import lombok.Setter; |
|||
|
|||
@Data |
|||
public class GameMessageWithGetUrlDto extends BaseMessageDto { |
|||
@Setter |
|||
@Getter |
|||
public static class Data{ |
|||
private Long recordId; |
|||
private String url; |
|||
} |
|||
|
|||
private Data data; |
|||
|
|||
public GameMessageWithGetUrlDto(){ |
|||
setType(WebConstant.Message_Type.Game.phase); |
|||
setEvent(WebConstant.Message_Url_Event.Url.phase); |
|||
setTime(System.currentTimeMillis()); |
|||
} |
|||
|
|||
public GameMessageWithGetUrlDto(Long recordId, String url){ |
|||
this(); |
|||
Data d = new Data(); |
|||
d.setRecordId(recordId); |
|||
d.setUrl(url); |
|||
setData(d); |
|||
} |
|||
} |
@ -0,0 +1,23 @@ |
|||
package com.ccsens.game.bean.dto.message; |
|||
|
|||
import com.ccsens.util.WebConstant; |
|||
import lombok.Data; |
|||
import lombok.Getter; |
|||
import lombok.Setter; |
|||
|
|||
@Data |
|||
public class HeartMessageDto extends BaseMessageDto{ |
|||
@Setter |
|||
@Getter |
|||
public static class Data{ |
|||
private int major; |
|||
private int minor; |
|||
} |
|||
private Data data; |
|||
|
|||
public HeartMessageDto(){ |
|||
setType(WebConstant.Message_Type.Heart.phase); |
|||
setEvent(WebConstant.Message_Heart_Event.Heart.phase); |
|||
setTime(System.currentTimeMillis()); |
|||
} |
|||
} |
@ -0,0 +1,35 @@ |
|||
package com.ccsens.game.bean.dto.message; |
|||
|
|||
import com.ccsens.util.WebConstant; |
|||
import lombok.Data; |
|||
import lombok.Getter; |
|||
import lombok.Setter; |
|||
|
|||
/** |
|||
* @author __zHangSan |
|||
*/ |
|||
@Data |
|||
public class PPTCtlMessageDto extends BaseMessageDto{ |
|||
@Setter |
|||
@Getter |
|||
public static class Data{ |
|||
/** |
|||
* Supported Operation: up,down,begin,end |
|||
*/ |
|||
private String oper; |
|||
} |
|||
private Data data; |
|||
|
|||
public PPTCtlMessageDto(){ |
|||
setType(WebConstant.Message_Type.PPTCtl.phase); |
|||
setTime(System.currentTimeMillis()); |
|||
} |
|||
|
|||
public PPTCtlMessageDto(String oper){ |
|||
this(); |
|||
if(data == null){ |
|||
data = new Data(); |
|||
} |
|||
data.setOper(oper); |
|||
} |
|||
} |
@ -0,0 +1,362 @@ |
|||
package com.ccsens.game.netty; |
|||
|
|||
import cn.hutool.core.collection.CollectionUtil; |
|||
import io.netty.channel.Channel; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
|
|||
import java.util.*; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
|
|||
/** |
|||
* @author wei |
|||
*/ |
|||
public class ChannelManager { |
|||
private static Logger logger = LoggerFactory.getLogger(ChannelManager.class); |
|||
private static ThreadLocal<Channel> threadLocal = new ThreadLocal<>(); |
|||
|
|||
/** |
|||
* UserId,WrappedChannel authed channels; |
|||
*/ |
|||
private static Map<String,Set<WrapperedChannel>> authedChannels; |
|||
/** |
|||
* Channel,WrapperedChannel |
|||
*/ |
|||
private static Map<Channel,WrapperedChannel> rawChannels; |
|||
|
|||
static { |
|||
authedChannels = new ConcurrentHashMap<>(); |
|||
rawChannels = new ConcurrentHashMap<>(); |
|||
} |
|||
|
|||
/** |
|||
* 私有构造,不允许生成该类对象 |
|||
*/ |
|||
private ChannelManager(){} |
|||
|
|||
/** |
|||
* 设置当前正在错误的channel |
|||
* @param channel netty ws/tcp连接 |
|||
*/ |
|||
public static void setCurrentChannel(Channel channel){ |
|||
threadLocal.set(channel); |
|||
} |
|||
|
|||
/** |
|||
* 获取当前线程的channel对象 |
|||
* @return channel |
|||
*/ |
|||
public static Channel getCurrentChannel(){ |
|||
return threadLocal.get(); |
|||
} |
|||
|
|||
/** |
|||
* 删除当前线程的channel对象 |
|||
*/ |
|||
public static void removeCurrentChannel(){ |
|||
threadLocal.remove(); |
|||
} |
|||
|
|||
/** |
|||
* 添加一个新的连接(Channel) |
|||
* @param channel 新的连接 |
|||
* @param serverType 服务器类型 |
|||
*/ |
|||
public static synchronized void addChannel(Channel channel,String serverType){ |
|||
logger.debug("Invoke addChannel({},{})",channel,serverType); |
|||
if(null != channel) { |
|||
rawChannels.put(channel, new WrapperedChannel(channel, serverType)); |
|||
logger.debug("Add a new channel: {},{}",channel.id().asLongText(),serverType); |
|||
}else{ |
|||
logger.error("channel is null"); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 用户认证 |
|||
* @param channel 连接 |
|||
* @param userId 用户ID |
|||
* @param major 客户端主版本号 |
|||
* @param minor 客户端次版本号 |
|||
*/ |
|||
public static synchronized void authChannel(Channel channel,String userId,Integer major,Integer minor){ |
|||
logger.debug("Invoke authedChannels({},{},{},{})",channel.id().asLongText(),userId,major,minor); |
|||
major = major != null ? major : 0; |
|||
minor = minor != null ? minor : 0; |
|||
|
|||
WrapperedChannel wrapperedChannel = rawChannels.get(channel); |
|||
if(wrapperedChannel != null){ |
|||
wrapperedChannel.whenAuthed(userId,major,minor); |
|||
Set<WrapperedChannel> authedWchannelSet = authedChannels.computeIfAbsent(userId, k -> new HashSet<>()); |
|||
authedWchannelSet.add(wrapperedChannel); |
|||
logger.debug("Authed channel {} with user {}", channel.id().asLongText(), userId); |
|||
}else{ |
|||
logger.error("Authed channel,but wrappedChannel is null."); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 移除一个连接(Channel) |
|||
* @param channel 要移除的连接 |
|||
*/ |
|||
public static synchronized void removeChannel(Channel channel){ |
|||
logger.debug("Invoke removeChannel({})",channel.id().asLongText()); |
|||
WrapperedChannel wrapperedChannel = rawChannels.get(channel); |
|||
if(wrapperedChannel != null){ |
|||
//从rawChannels集合中删除
|
|||
rawChannels.remove(channel); |
|||
logger.debug("Remove a channel from rawChannels: {}",channel.id().asLongText()); |
|||
if(wrapperedChannel.isAuthed()){ |
|||
Set<WrapperedChannel> authedChannelSet = authedChannels.get(wrapperedChannel.getUserId()); |
|||
//从authedChannel集合的value(set)中删除
|
|||
if(CollectionUtil.isNotEmpty(authedChannelSet)){ |
|||
authedChannelSet.remove(wrapperedChannel); |
|||
logger.debug("Remove a channel from authedChannelSet: {}, {}",wrapperedChannel.getUserId(),channel.id().asLongText()); |
|||
} |
|||
//从authedChannel中删除,此处不用else,因为if中语句执行完毕之后,authedChannelSet也可能变成空集合
|
|||
if(CollectionUtil.isEmpty(authedChannelSet)){ |
|||
authedChannels.remove(wrapperedChannel.getUserId()); |
|||
logger.debug("Remove a user from authedChannels: {}",wrapperedChannel.getUserId()); |
|||
} |
|||
} |
|||
}else{ |
|||
logger.error("Remove channel,but wrappedChannel is null."); |
|||
} |
|||
|
|||
if(channel.isOpen() || channel.isActive()){ |
|||
channel.close(); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 移除一个用户 |
|||
* @param userId 要移除的用户 |
|||
*/ |
|||
public static synchronized void removeUser(String userId){ |
|||
logger.debug("Invoke remove user : {}",userId); |
|||
Set<WrapperedChannel> wChannelSet = authedChannels.get(userId); |
|||
if(CollectionUtil.isNotEmpty(wChannelSet)){ |
|||
for(WrapperedChannel wChannel : wChannelSet){ |
|||
//从rawChannel中依次删除
|
|||
rawChannels.remove(wChannel.getChannel()); |
|||
logger.debug("Remove a channel from rawChannels: {}",wChannel.getChannel().id().asLongText()); |
|||
} |
|||
} |
|||
//从authedChannel中删除
|
|||
authedChannels.remove(userId); |
|||
logger.debug("Remove a user from authedChannels: {}",userId); |
|||
} |
|||
|
|||
/** |
|||
* 添加版本号 |
|||
* 只能给已认证的请求添加版本号 |
|||
* @param channel 连接 |
|||
* @param major 主版本号 |
|||
* @param minor 次版本号 |
|||
*/ |
|||
public static synchronized void versionChannel(Channel channel,int major,int minor){ |
|||
logger.debug("Invoke Version channel({},{},{}))",channel.id().asLongText(),major,minor); |
|||
WrapperedChannel wChannel = rawChannels.get(channel); |
|||
if(wChannel != null){ |
|||
wChannel.setVersion(major,minor); |
|||
logger.debug("Version Channel: {},{},{}",channel.id().asLongText(),major,minor); |
|||
}else{ |
|||
logger.error("Remove channel,but wrappedChannel is null."); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 发送广播消息给所有已认证Channel |
|||
* @param message 消息 |
|||
*/ |
|||
public static synchronized void broadCastAuthed(Object message) { |
|||
logger.debug("Invoke broadCastAuthed({})",message); |
|||
for (Map.Entry<Channel,WrapperedChannel> entry : rawChannels.entrySet()) { |
|||
WrapperedChannel wChannel = entry.getValue(); |
|||
if(wChannel.isAuthed()) { |
|||
wChannel.writeAndFlush(message); |
|||
logger.debug("Send Message {} to {},{}",message,wChannel.getUserId(),wChannel.getId()); |
|||
} |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 发送广播消息 |
|||
* @param message 消息 |
|||
*/ |
|||
public static synchronized void broadCast(Object message) { |
|||
logger.debug("Invoke broadCast({})",message); |
|||
for (Map.Entry<Channel,WrapperedChannel> entry : rawChannels.entrySet()) { |
|||
entry.getValue().writeAndFlush(message); |
|||
logger.debug("Send Message {} to {}",message,entry.getValue().getId()); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 发送消息给某个连接 |
|||
* @param channel 连接 |
|||
* @param message 消息 |
|||
*/ |
|||
public static synchronized void sendTo(Channel channel,Object message){ |
|||
logger.debug("Invoke sendTo({},{})",channel.id().asLongText(),message); |
|||
WrapperedChannel wrapperedChannel = rawChannels.get(channel); |
|||
if(wrapperedChannel != null) { |
|||
wrapperedChannel.writeAndFlush(message); |
|||
logger.debug("Write message {} to Channel {}",message,channel); |
|||
}else{ |
|||
logger.error("can't find channel from rawChannels"); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 发送消息给某个用户 |
|||
* @param userId 用户ID |
|||
* @param message 消息 |
|||
*/ |
|||
public static synchronized boolean sendTo(String userId,Object message){ |
|||
logger.debug("Invoke sendTo({},{})",userId,message); |
|||
Set<WrapperedChannel> wChannelSet = authedChannels.get(userId); |
|||
if(CollectionUtil.isNotEmpty(wChannelSet)){ |
|||
for(WrapperedChannel wChannel:wChannelSet){ |
|||
wChannel.writeAndFlush(message); |
|||
logger.debug("Send message {} to channel {}",message,userId); |
|||
} |
|||
return true; |
|||
}else{ |
|||
return false; |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 刷新最后一次接收数据时间 每次接收数据需要调用该函数 |
|||
* @param channel 接收数据的连接 |
|||
*/ |
|||
public static synchronized void flushReceiveTimestamp(Channel channel){ |
|||
logger.debug("Invoke flushReceiveTimestamp({})",channel.id().asLongText()); |
|||
WrapperedChannel wrapperedChannel = rawChannels.get(channel); |
|||
if(wrapperedChannel != null){ |
|||
wrapperedChannel.whenReceivedData(); |
|||
}else{ |
|||
logger.error("can find channel from rawChannels"); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 关闭所有未认证的连接 |
|||
* @param unAuthedChannelsMaxAliveTimeInSeconds 未认证的最大时长(s) |
|||
* @throws Exception |
|||
* @deprecated 已过期,该方法在多线程并发下可能出现问题,建议使用messageService中的同名方法 |
|||
*/ |
|||
@Deprecated |
|||
public static synchronized void closeUnAuthedChannels(Long unAuthedChannelsMaxAliveTimeInSeconds) throws Exception { |
|||
logger.debug("Inovke closeUnAuthedChannels({})",unAuthedChannelsMaxAliveTimeInSeconds); |
|||
Iterator<Map.Entry<Channel,WrapperedChannel>> it = rawChannels.entrySet().iterator(); |
|||
while(it.hasNext()){ |
|||
Map.Entry<Channel,WrapperedChannel> entry = it.next(); |
|||
WrapperedChannel wrapperedChannel = entry.getValue(); |
|||
if((!wrapperedChannel.isAuthed()) && (wrapperedChannel.getConnectedSeconds() > unAuthedChannelsMaxAliveTimeInSeconds)) { |
|||
it.remove(); |
|||
//关闭连接
|
|||
wrapperedChannel.getChannel().close(); |
|||
logger.debug("Remove a unAuthed Channel {}, which has connected {}s,maxOutTime is {}s", wrapperedChannel.getId(),wrapperedChannel.getConnectedSeconds() , |
|||
unAuthedChannelsMaxAliveTimeInSeconds ); |
|||
} |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 判断Channel是否存在 |
|||
* @param channel 连接 |
|||
* @return 是否存在 |
|||
*/ |
|||
public static synchronized boolean channelExist(Channel channel){ |
|||
return rawChannels.containsKey(channel); |
|||
} |
|||
|
|||
/** |
|||
* 判断Channel是否已经认证 |
|||
* @param channel 连接 |
|||
* @return 是否认证 |
|||
*/ |
|||
public static synchronized boolean channelAuthed(Channel channel){ |
|||
return rawChannels.containsKey(channel) && rawChannels.get(channel).isAuthed(); |
|||
} |
|||
|
|||
/** |
|||
* 判断用户是否在线 |
|||
* @param userId 用户ID |
|||
* @return 是否认证 |
|||
*/ |
|||
public static synchronized boolean isUserOnline(String userId){ |
|||
return authedChannels.containsKey(userId); |
|||
} |
|||
|
|||
/** |
|||
* 根据Channel获取对应的用户ID |
|||
* @param channel 连接 |
|||
* @return 用户ID |
|||
*/ |
|||
public static synchronized String getUserIdByChannel(Channel channel) { |
|||
return rawChannels.containsKey(channel) ? rawChannels.get(channel).getUserId() : null; |
|||
} |
|||
|
|||
/** |
|||
* 获取所有在线用户 |
|||
* @return 所有用户的集合列表 |
|||
*/ |
|||
public static synchronized Set<String> getOnlineUsers(){ |
|||
return authedChannels.keySet(); |
|||
} |
|||
|
|||
/** |
|||
* 获取某种类型的所有在线用户的连接 |
|||
* @param type 客户端类型(ws/tcp modebus/tcp text) |
|||
* @return 所有在西安channel的集合列表 |
|||
*/ |
|||
public static synchronized Set<Channel> getOnlineChannels(String type){ |
|||
Set<Channel> onLineChannels = CollectionUtil.newHashSet(); |
|||
for(Map.Entry<Channel,WrapperedChannel> entry: rawChannels.entrySet()){ |
|||
WrapperedChannel wrapperedChannel = entry.getValue(); |
|||
if(wrapperedChannel.isAuthed() && wrapperedChannel.getType().equals(type)){ |
|||
onLineChannels.add(entry.getKey()); |
|||
} |
|||
} |
|||
return onLineChannels; |
|||
} |
|||
|
|||
/** |
|||
* 获取一个rawChannel的副本 |
|||
* @return 副本 |
|||
*/ |
|||
public static Map<Channel, WrapperedChannel> getCopyOfAllChannels() { |
|||
Map<Channel,WrapperedChannel> copyMap = new HashMap<>(rawChannels.size()); |
|||
copyMap.putAll(rawChannels); |
|||
return copyMap; |
|||
} |
|||
|
|||
/** |
|||
* 根据Channel获取WrapperedChannel |
|||
* @param channel channel |
|||
* @return 对应的wrapperedChannel |
|||
*/ |
|||
public static WrapperedChannel getWrapperedChannelByChannel(Channel channel) { |
|||
return rawChannels.get(channel); |
|||
} |
|||
|
|||
public static synchronized void showAuthedChannels(){ |
|||
for(Map.Entry<String,Set<WrapperedChannel>> entry : authedChannels.entrySet()){ |
|||
for (WrapperedChannel channel:entry.getValue()){ |
|||
logger.debug("{}-->{}",entry.getKey(),channel.toString()); |
|||
} |
|||
} |
|||
} |
|||
public static synchronized void showAllChannels(){ |
|||
for(Map.Entry<Channel,WrapperedChannel> entry : rawChannels.entrySet()){ |
|||
logger.debug(entry.getValue().toString()); |
|||
} |
|||
} |
|||
|
|||
public static Set<String> getAllOnlineUsers() { |
|||
return authedChannels.keySet(); |
|||
} |
|||
} |
@ -0,0 +1,217 @@ |
|||
package com.ccsens.game.netty; |
|||
|
|||
import com.fasterxml.jackson.annotation.JsonFormat; |
|||
|
|||
/** |
|||
* @author wei |
|||
* 消息相关常量 |
|||
*/ |
|||
public class GameMessageConstant { |
|||
public enum ClientMessageType{ |
|||
//客户端心跳
|
|||
Ping(0x00), |
|||
//客户端认证
|
|||
Auth(0x01), |
|||
//客户端收到消息ACK
|
|||
Ack(0x02), |
|||
//客户端收到消息ACK
|
|||
HasRead(0x03), |
|||
//客户端请求连接状态
|
|||
GetChannelStatus(0x04), |
|||
|
|||
//不可预期的错误
|
|||
UnExceptedError(0x21), |
|||
ClientIdleClosed(0x22), |
|||
ClientAuthTimeOut(0x23), |
|||
|
|||
//设置消息状态消息(disable撤销消息,适用于,消息撤回),针对消息本身(eg:消息撤回)
|
|||
SetMsgSuccess(0x52), |
|||
SetMsgReverted(0x53), |
|||
SetMsgDeleted(0x54); |
|||
|
|||
public int value; |
|||
|
|||
ClientMessageType(int value){ |
|||
this.value = value; |
|||
} |
|||
|
|||
/** |
|||
* 从int到enum的转换函数 |
|||
* @param value 枚举int值 |
|||
* @return 对应的枚举,找不到则返回null |
|||
*/ |
|||
public static ClientMessageType valueOf(int value) { |
|||
for(ClientMessageType type : values()){ |
|||
if(type.value == value){ |
|||
return type; |
|||
} |
|||
} |
|||
return null; |
|||
} |
|||
} |
|||
|
|||
public enum ServerMessageType{ |
|||
//客户端心跳
|
|||
Pong(0x00), |
|||
//客户端收到消息ACK
|
|||
Ack(0x02), |
|||
//客户端请求连接状态
|
|||
ChannelStatus(0x03); |
|||
//撤销某个消息
|
|||
//DelMessage(0x04),
|
|||
//客户端请求连接状态
|
|||
//QueueStatus(0x05);
|
|||
|
|||
public int value; |
|||
|
|||
ServerMessageType(int value){ |
|||
this.value = value; |
|||
} |
|||
|
|||
/** |
|||
* 从int到enum的转换函数 |
|||
* @param value 枚举int值 |
|||
* @return 对应的枚举,找不到则返回null |
|||
*/ |
|||
public static ServerMessageType valueOf(int value) { |
|||
for(ServerMessageType type : values()){ |
|||
if(type.value == value){ |
|||
return type; |
|||
} |
|||
} |
|||
return null; |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* JsonFormat是Json Serialize相关配置 |
|||
*/ |
|||
@JsonFormat(shape = JsonFormat.Shape.OBJECT) |
|||
public enum Error{ |
|||
//Ok
|
|||
Ok(200,"Ok"), |
|||
|
|||
/** |
|||
* 通用消息错误 |
|||
*/ |
|||
//消息无接收者错误
|
|||
MessageNoReceiversError(1001,"[用户]消息至少应该有一个接收者"), |
|||
//消息无数据错误
|
|||
MessageNoDataError(1002,"不允许消息内容为空"), |
|||
|
|||
/** |
|||
* server消息错误 |
|||
*/ |
|||
//server消息无type错误
|
|||
MessageNoTypeError(1003,"发送至Server域的消息必须有type字段"), |
|||
//Ack参数错误,没有找到对应的msgId
|
|||
AckParameterError(1103,"Ack参数错误,没有找到对应的msgId"), |
|||
//SetSuccess参数错误,没有找到对应的msgId
|
|||
SetSuccessParameterError(1104,"SetSuccess参数错误,没有找到对应的msgId"), |
|||
//SetStatusParameterError
|
|||
SetStatusParameterError(1105,"SetStatus参数错误,没有找到对应的msgId"), |
|||
//HasRead参数错误,没有找到对应的msgId
|
|||
HasReadParameterError(1106,"HasRead参数错误,没有找到对应的msgId"), |
|||
|
|||
/** |
|||
* 身份认证错误 |
|||
*/ |
|||
//无权限
|
|||
UnAuthed(1021, "未认证的用户"), |
|||
|
|||
/** |
|||
* 业务错误 |
|||
*/ |
|||
//认证失败
|
|||
AuthFailed(1301,"认证失败"), |
|||
//空闲断开连接(连续Ns没有收到数据)
|
|||
ChannelIdle(1302,"连接断开:连续N秒没有从收到客户端收到任何数据"), |
|||
//认证超时断开连接
|
|||
ChannelAuthTimeOut(1303,"连接断开:认证超时"), |
|||
//不可预期错误
|
|||
UnExpectedError(1304,"不可预期异常"); |
|||
|
|||
public int code; |
|||
public String text; |
|||
public String extra; |
|||
|
|||
Error(int code,String text) { |
|||
this.code = code; |
|||
this.text = text; |
|||
} |
|||
Error(int code,String text,String extra) { |
|||
this.code = code; |
|||
this.text = text; |
|||
this.extra = extra; |
|||
} |
|||
public Error joinExtra(String extra){ |
|||
this.extra = extra; |
|||
return this; |
|||
} |
|||
} |
|||
|
|||
|
|||
/** |
|||
* 域(User、Queue、Rest、Server) |
|||
*/ |
|||
public enum DomainType{ |
|||
//Netty Client
|
|||
User(1), |
|||
//Queue Client
|
|||
Queue(2), |
|||
//Rest Client
|
|||
Rest(3), |
|||
//系统
|
|||
Server(4); |
|||
|
|||
public int value; |
|||
|
|||
DomainType(int value){ |
|||
this.value = value; |
|||
} |
|||
|
|||
public static DomainType valueOf(int value) { |
|||
for(DomainType domainType : values()){ |
|||
if(domainType.value == value){ |
|||
return domainType; |
|||
} |
|||
} |
|||
return null; |
|||
} |
|||
} |
|||
|
|||
public enum Status{ |
|||
//未决状态(未完成)
|
|||
Pending(0), |
|||
//发送成功(投递成功)
|
|||
Succeed(1), |
|||
//发送失败(投递失败)
|
|||
Failed(2), |
|||
//消息过期
|
|||
Expired(3), |
|||
//消息被撤回
|
|||
Reverted(4), |
|||
//消息被删除
|
|||
Deleted(5), |
|||
//消息达到重试次数上限
|
|||
SendTimesUpLimit(6), |
|||
|
|||
//临时完成状态,当ackIsSuccess==0时,一旦ack设置消息为此状态
|
|||
TempSucceed(10); |
|||
|
|||
public int value; |
|||
|
|||
Status(int value){ |
|||
this.value = value; |
|||
} |
|||
|
|||
public static Status valueOf(int value){ |
|||
for(Status status : values()){ |
|||
if(status.value == value){ |
|||
return status; |
|||
} |
|||
} |
|||
return null; |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,200 @@ |
|||
package com.ccsens.game.netty; |
|||
|
|||
import cn.hutool.core.date.DateUtil; |
|||
import cn.hutool.core.util.ObjectUtil; |
|||
import io.netty.channel.Channel; |
|||
import lombok.Getter; |
|||
import lombok.Setter; |
|||
|
|||
import java.net.InetSocketAddress; |
|||
|
|||
/** |
|||
* @author wei |
|||
*/ |
|||
@Getter |
|||
@Setter |
|||
public class WrapperedChannel { |
|||
@Getter |
|||
@Setter |
|||
private static class ClientVersion{ |
|||
int major; |
|||
int minor; |
|||
public ClientVersion(int major,int minor){ |
|||
this.major = major; |
|||
this.minor = minor; |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* Netty's Channel |
|||
*/ |
|||
private Channel channel; |
|||
/** |
|||
* userId |
|||
*/ |
|||
private String userId; |
|||
/** |
|||
* 客户端版本号 |
|||
*/ |
|||
private ClientVersion version; |
|||
|
|||
/** |
|||
* 客户端类型(哪种服务器) |
|||
*/ |
|||
private String type; |
|||
/** |
|||
* 是否认证 |
|||
*/ |
|||
private boolean authed; |
|||
/** |
|||
* 连接建立时间 |
|||
*/ |
|||
private Long createdAtInSeconds; |
|||
/** |
|||
* 认证时间(s) |
|||
*/ |
|||
private Long authedAtInSeconds; |
|||
/** |
|||
* 最后一次接收有效数据时间(包括心跳包) |
|||
*/ |
|||
private Long lastDataReceivedInSeconds; |
|||
/** |
|||
* 最后一次发送有效数据时间(包括心跳包) |
|||
*/ |
|||
private Long lastDataSendInSeconds; |
|||
/** |
|||
* 接收到数据条数(每接收到一条协议加1,包括心跳) |
|||
*/ |
|||
private long dataReceivedCount; |
|||
/** |
|||
* 发送数据条数(每发送到一条协议加1,包括心跳) |
|||
*/ |
|||
private long dataSendCount; |
|||
|
|||
public WrapperedChannel(Channel channel){ |
|||
this.channel = channel; |
|||
this.createdAtInSeconds = DateUtil.currentSeconds(); |
|||
} |
|||
|
|||
public WrapperedChannel(Channel channel,String type){ |
|||
this(channel); |
|||
this.type = type; |
|||
} |
|||
|
|||
public WrapperedChannel(Channel channel,String type,int major,int minor){ |
|||
this(channel,type); |
|||
this.version = new ClientVersion(major,minor); |
|||
} |
|||
|
|||
public String getVersion(){ |
|||
if(ObjectUtil.isNotNull(version)){ |
|||
return "v" + version.getMajor() + "." + version.getMinor(); |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
public void setVersion(int major,int minor){ |
|||
if (ObjectUtil.isNull(version)) { |
|||
version = new ClientVersion(major, minor); |
|||
} else { |
|||
version.setMajor(major); |
|||
version.setMinor(minor); |
|||
} |
|||
} |
|||
|
|||
public String getRemoteAddr(){ |
|||
if(ObjectUtil.isNotNull(channel)){ |
|||
InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress(); |
|||
if(ObjectUtil.isNotNull(insocket)) { |
|||
return insocket.getAddress().getHostAddress(); |
|||
} |
|||
} |
|||
return null; |
|||
} |
|||
public Integer getRemotePort(){ |
|||
if(ObjectUtil.isNotNull(channel)){ |
|||
InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress(); |
|||
if(ObjectUtil.isNotNull(insocket)) { |
|||
return insocket.getPort(); |
|||
} |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
public String getId(){ |
|||
if(ObjectUtil.isNotNull(channel)){ |
|||
return channel.id().asLongText(); |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
public void whenReceivedData(){ |
|||
lastDataReceivedInSeconds = DateUtil.currentSeconds(); |
|||
dataReceivedCount++; |
|||
} |
|||
|
|||
public void whenSendData(){ |
|||
lastDataSendInSeconds = DateUtil.currentSeconds(); |
|||
dataSendCount++; |
|||
} |
|||
|
|||
public void whenAuthed(){ |
|||
authed = true; |
|||
authedAtInSeconds = DateUtil.currentSeconds(); |
|||
} |
|||
|
|||
public void whenAuthed(String userId,int major,int minor){ |
|||
whenAuthed(); |
|||
setVersion(major,minor); |
|||
this.userId = userId; |
|||
} |
|||
|
|||
public void writeAndFlush(Object message){ |
|||
if(channel != null && channel.isActive()){ |
|||
channel.writeAndFlush(message); |
|||
whenSendData(); |
|||
} |
|||
} |
|||
|
|||
public Long getConnectedSeconds(){ |
|||
return createdAtInSeconds == null ? null : DateUtil.currentSeconds() - createdAtInSeconds; |
|||
} |
|||
|
|||
public Long getOnlineSeconds(){ |
|||
return authedAtInSeconds == null ? null : DateUtil.currentSeconds() - authedAtInSeconds ; |
|||
} |
|||
|
|||
@Override |
|||
public boolean equals(Object obj) { |
|||
if(ObjectUtil.isNull(obj)) { |
|||
return false; |
|||
} |
|||
if(this == obj) { |
|||
return true; |
|||
} |
|||
if(ObjectUtil.isNull(channel)) { |
|||
return false; |
|||
} |
|||
if(obj.getClass() == this.getClass()){ |
|||
WrapperedChannel other = (WrapperedChannel)obj; |
|||
return channel.equals(other.channel); |
|||
}else if(obj.getClass() == channel.getClass()){ |
|||
Channel other = (Channel)obj; |
|||
return channel.equals(other); |
|||
} |
|||
return false; |
|||
} |
|||
|
|||
@Override |
|||
public int hashCode() { |
|||
if(ObjectUtil.isNotNull(channel)){ |
|||
return channel.hashCode(); |
|||
} |
|||
return 0; |
|||
} |
|||
|
|||
@Override |
|||
public String toString() { |
|||
return String.format("id: %s, type: %s, authed: %b, userId: %s, version: %s",getId(),type,authed,userId,getVersion()); |
|||
} |
|||
} |
@ -0,0 +1,70 @@ |
|||
package com.ccsens.game.netty.wsserver; |
|||
|
|||
import io.netty.bootstrap.ServerBootstrap; |
|||
import io.netty.channel.*; |
|||
import io.netty.channel.nio.NioEventLoopGroup; |
|||
import io.netty.channel.socket.SocketChannel; |
|||
import io.netty.channel.socket.nio.NioServerSocketChannel; |
|||
import io.netty.handler.codec.http.HttpObjectAggregator; |
|||
import io.netty.handler.codec.http.HttpServerCodec; |
|||
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; |
|||
import io.netty.handler.logging.LogLevel; |
|||
import io.netty.handler.logging.LoggingHandler; |
|||
import io.netty.handler.stream.ChunkedWriteHandler; |
|||
import io.netty.handler.timeout.IdleStateHandler; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.scheduling.annotation.Async; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
/** |
|||
* @author wei |
|||
*/ |
|||
@Component |
|||
public class NettyWsServer { |
|||
private static final String URI = "/game/v3.0/ws"; |
|||
private static final short PORT = 7070; |
|||
|
|||
@Autowired |
|||
private WebSocketHandler webSocketHandler; |
|||
|
|||
@Async |
|||
public void start() { |
|||
// Configure the server.
|
|||
EventLoopGroup bossGroup = new NioEventLoopGroup(1); |
|||
EventLoopGroup workerGroup = new NioEventLoopGroup(); |
|||
try { |
|||
ServerBootstrap b = new ServerBootstrap(); |
|||
b.group(bossGroup, workerGroup) |
|||
.channel(NioServerSocketChannel.class) |
|||
.option(ChannelOption.SO_BACKLOG, 100) |
|||
.handler(new LoggingHandler(LogLevel.INFO)) |
|||
.childHandler(new ChannelInitializer<SocketChannel>() { |
|||
@Override |
|||
public void initChannel(SocketChannel ch) throws Exception { |
|||
ChannelPipeline p = ch.pipeline(); |
|||
p.addLast(new IdleStateHandler(40, |
|||
0, 0, TimeUnit.SECONDS)); |
|||
p.addLast(new HttpServerCodec()); |
|||
p.addLast(new HttpObjectAggregator(64 * 1024)); |
|||
p.addLast(new ChunkedWriteHandler()); |
|||
p.addLast(new WebSocketServerProtocolHandler(URI)); |
|||
p.addLast(new WebSocketDecoder()); |
|||
p.addLast(new WebSocketEncoder()); |
|||
p.addLast(webSocketHandler); |
|||
} |
|||
}); |
|||
// Start the server.
|
|||
ChannelFuture f = b.bind(PORT).sync(); |
|||
// Wait until the server socket is closed.
|
|||
f.channel().closeFuture().sync(); |
|||
} catch(Exception e){ |
|||
e.printStackTrace(); |
|||
}finally { |
|||
// Shut down all event loops to terminate all threads.
|
|||
bossGroup.shutdownGracefully(); |
|||
workerGroup.shutdownGracefully(); |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,33 @@ |
|||
package com.ccsens.game.netty.wsserver; |
|||
|
|||
import com.ccsens.game.bean.dto.message.BaseMessageDto; |
|||
import com.ccsens.util.JacksonUtil; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import io.netty.handler.codec.MessageToMessageDecoder; |
|||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.List; |
|||
|
|||
/** |
|||
* @author wei |
|||
*/ |
|||
public class WebSocketDecoder extends MessageToMessageDecoder<TextWebSocketFrame> { |
|||
private static Logger logger = LoggerFactory.getLogger(WebSocketDecoder.class); |
|||
|
|||
@Override |
|||
protected void decode(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg, List<Object> out) throws Exception { |
|||
String text = msg.text(); |
|||
logger.info("Websocket received: {}",text); |
|||
|
|||
try { |
|||
out.add(JacksonUtil.jsonToBean(text, BaseMessageDto.class)); |
|||
}catch(IOException e){ |
|||
e.printStackTrace(); |
|||
logger.error("Websocket Read Error: {}",text); |
|||
throw e; |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,25 @@ |
|||
package com.ccsens.game.netty.wsserver; |
|||
|
|||
import com.ccsens.game.bean.dto.message.BaseMessageDto; |
|||
import com.ccsens.util.JacksonUtil; |
|||
import io.netty.buffer.ByteBuf; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import io.netty.handler.codec.MessageToByteEncoder; |
|||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
|
|||
/** |
|||
* @author wei |
|||
*/ |
|||
public class WebSocketEncoder extends MessageToByteEncoder<BaseMessageDto> { |
|||
private static Logger logger = LoggerFactory.getLogger(WebSocketEncoder.class); |
|||
|
|||
@Override |
|||
protected void encode(ChannelHandlerContext ctx, BaseMessageDto outMessageSet, ByteBuf out) throws Exception { |
|||
String msg = JacksonUtil.beanToJson(outMessageSet); |
|||
ctx.writeAndFlush(new TextWebSocketFrame(msg)); |
|||
|
|||
logger.info("Websocket send: {}",msg); |
|||
} |
|||
} |
@ -0,0 +1,135 @@ |
|||
package com.ccsens.game.netty.wsserver; |
|||
|
|||
|
|||
import com.ccsens.game.bean.dto.message.AckMessageDto; |
|||
import com.ccsens.game.bean.dto.message.AuthMessageDto; |
|||
import com.ccsens.game.bean.dto.message.BaseMessageDto; |
|||
import com.ccsens.game.bean.dto.message.HeartMessageDto; |
|||
import com.ccsens.game.netty.ChannelManager; |
|||
import com.ccsens.game.service.IClientService; |
|||
import com.ccsens.game.service.IMessageService; |
|||
import com.ccsens.util.WebConstant; |
|||
import com.ccsens.util.bean.message.common.MessageConstant; |
|||
import io.netty.channel.ChannelHandler; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import io.netty.channel.SimpleChannelInboundHandler; |
|||
import io.netty.handler.timeout.IdleState; |
|||
import io.netty.handler.timeout.IdleStateEvent; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* @author wei |
|||
*/ |
|||
@ChannelHandler.Sharable |
|||
@Component |
|||
public class WebSocketHandler extends SimpleChannelInboundHandler<BaseMessageDto> { |
|||
private static final String TYPE = "netty_ws"; |
|||
|
|||
private static Logger logger = LoggerFactory.getLogger(WebSocketHandler.class); |
|||
|
|||
@Autowired |
|||
private IClientService clientService; |
|||
@Autowired |
|||
private IMessageService messageService; |
|||
|
|||
@Override |
|||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { |
|||
} |
|||
|
|||
@Override |
|||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { |
|||
} |
|||
|
|||
@Override |
|||
public void channelActive(ChannelHandlerContext ctx) throws Exception { |
|||
ChannelManager.addChannel(ctx.channel(), TYPE); |
|||
} |
|||
|
|||
@Override |
|||
public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
|||
ChannelManager.removeChannel(ctx.channel()); |
|||
} |
|||
|
|||
@Override |
|||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { |
|||
|
|||
cause.printStackTrace(); |
|||
ctx.close(); |
|||
logger.error("Channel closed. Ws get a exception: {}", cause.getMessage()); |
|||
} |
|||
|
|||
|
|||
@Override |
|||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { |
|||
if (evt instanceof IdleStateEvent) { |
|||
IdleStateEvent idleStateEvent = (IdleStateEvent) evt; |
|||
if (idleStateEvent.state() == IdleState.READER_IDLE) { |
|||
ctx.channel().close(); |
|||
logger.error("Ws channel idle,closed."); |
|||
} |
|||
} else { |
|||
super.userEventTriggered(ctx, evt); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
protected void channelRead0(ChannelHandlerContext ctx, BaseMessageDto baseMessage) throws Exception { |
|||
MessageConstant.GameClientMessageType gameClientMessageType = MessageConstant.GameClientMessageType.valueOf(baseMessage.getType()); |
|||
|
|||
System.out.println(baseMessage); |
|||
try { |
|||
switch (gameClientMessageType) { |
|||
case Heart: { |
|||
HeartMessageDto message = (HeartMessageDto) baseMessage; |
|||
message.setTime(System.currentTimeMillis()); |
|||
if (message.getData() != null) { |
|||
ChannelManager.versionChannel( |
|||
ctx.channel(), message.getData().getMajor(), message.getData().getMinor()); |
|||
} |
|||
ChannelManager.sendTo(ctx.channel(), message); |
|||
break; |
|||
} |
|||
case Ack: { |
|||
AckMessageDto message = (AckMessageDto) baseMessage; |
|||
String userId = ChannelManager.getUserIdByChannel(ctx.channel()); |
|||
messageService.doAckMessageWithAck(userId, message); |
|||
break; |
|||
} |
|||
case Auth: |
|||
AuthMessageDto theMessage = (AuthMessageDto) baseMessage; |
|||
theMessage.getData().setChannelId(ctx.channel().id().asLongText()); |
|||
doAuthMessage(ctx, theMessage); |
|||
break; |
|||
case Count: |
|||
String userId = ChannelManager.getUserIdByChannel(ctx.channel()); |
|||
clientService.clientAddTimes(userId); |
|||
break; |
|||
case ChangeStatus: |
|||
break; |
|||
default: |
|||
break; |
|||
} |
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
logger.error("Websocket Process Message Failed: {},{}", e.getMessage(), baseMessage); |
|||
throw e; |
|||
} finally { |
|||
ChannelManager.removeCurrentChannel(); |
|||
} |
|||
} |
|||
|
|||
private void doAuthMessage(ChannelHandlerContext ctx, AuthMessageDto message) throws Exception { |
|||
WebConstant.Message_Auth_Event event = WebConstant.Message_Auth_Event.phaseOf(message.getEvent()); |
|||
switch (event) { |
|||
case Auth: |
|||
messageService.doAuthMessageWithAuth(ctx, message); |
|||
break; |
|||
default: |
|||
break; |
|||
} |
|||
} |
|||
} |
|||
|
@ -0,0 +1,18 @@ |
|||
package com.ccsens.game.service; |
|||
|
|||
import com.ccsens.game.bean.dto.message.AckMessageDto; |
|||
import com.ccsens.game.bean.dto.message.AuthMessageDto; |
|||
import com.ccsens.game.bean.dto.message.ChromeMessageDto; |
|||
import com.ccsens.game.bean.dto.message.GameMessageWithGetUrlDto; |
|||
import com.fasterxml.jackson.core.JsonProcessingException; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
|
|||
public interface IMessageService { |
|||
//获取路径后给每个人发送游戏路径消息
|
|||
void sendGameMessageWithGetUrl(ChromeMessageDto message) throws Exception; |
|||
|
|||
void doAckMessageWithAck(String userId, AckMessageDto message); |
|||
|
|||
void doAuthMessageWithAuth(ChannelHandlerContext ctx, AuthMessageDto message); |
|||
} |
|||
|
@ -0,0 +1,67 @@ |
|||
package com.ccsens.game.service; |
|||
|
|||
import com.ccsens.cloudutil.feign.TallFeignClient; |
|||
import com.ccsens.game.bean.dto.message.AckMessageDto; |
|||
import com.ccsens.game.bean.dto.message.AuthMessageDto; |
|||
import com.ccsens.game.bean.dto.message.ChromeMessageDto; |
|||
import com.ccsens.game.netty.ChannelManager; |
|||
import com.ccsens.util.JacksonUtil; |
|||
import com.ccsens.util.config.RabbitMQConfig; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import org.springframework.amqp.core.AmqpTemplate; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Service; |
|||
|
|||
@Service |
|||
public class MessageService implements IMessageService{ |
|||
@Autowired |
|||
private AmqpTemplate rabbitTemplate; |
|||
@Autowired |
|||
private TallFeignClient tallFeignClient; |
|||
|
|||
@Override |
|||
public void sendGameMessageWithGetUrl(ChromeMessageDto message) throws Exception { |
|||
System.out.println(JacksonUtil.beanToJson(message)); |
|||
rabbitTemplate.convertAndSend(RabbitMQConfig.RabbitMQ_QUEUE_NAME, |
|||
JacksonUtil.beanToJson(message)); |
|||
} |
|||
|
|||
@Override |
|||
public void doAckMessageWithAck(String userId, AckMessageDto message) { |
|||
// System.out.println(message);
|
|||
// //1.存储到MongoDB(ack消息不需要存储,修改响应消息的发送成功标志即可)
|
|||
//// Long userId = message.getSender().getUserId();
|
|||
// Long msgId = message.getData().getMsgId();
|
|||
// messageLogDao.updateMessageLogSendSuccess(userId,msgId,1);
|
|||
} |
|||
|
|||
@Override |
|||
public void doAuthMessageWithAuth(ChannelHandlerContext ctx, AuthMessageDto message) { |
|||
System.out.println(message); |
|||
//ws连接认证
|
|||
String userId = message.getData().getUserId(); |
|||
if(userId == null){ |
|||
String token = message.getData().getToken(); |
|||
userId = tallFeignClient.getUserId(token); |
|||
} |
|||
ChannelManager.authChannel(ctx.channel(),userId,1,1); |
|||
// if(userId == null){
|
|||
//// logger.warn(String.format("Auth failed: %s",userId,message.getData().getToken()));
|
|||
//// ChannelManager.removeChannel(ctx.channel());
|
|||
// AuthMessageWithAnswerDto messageWithAnswerDto = new AuthMessageWithAnswerDto(false,"未找到对应的用户信息");
|
|||
// sendTo(ctx.channel(),messageWithAnswerDto);
|
|||
// ctx.channel().close();
|
|||
// return;
|
|||
// }
|
|||
// String channelId = message.getData().getChannelId();
|
|||
// if(ChannelManager.authChannel(ctx.channel(),userId)){ //认证成功
|
|||
//查找所有该用户相关未收到的消息和未读消息依次发送给该用户
|
|||
// sendAllUnSendSuccessedMessagesByUserId(userId,0L);
|
|||
// AuthMessageWithAnswerDto messageWithAnswerDto = new AuthMessageWithAnswerDto(true,"ok");
|
|||
// sendTo(ctx.channel(),messageWithAnswerDto);
|
|||
// }else {
|
|||
//// AuthMessageWithAnswerDto messageWithAnswerDto = new AuthMessageWithAnswerDto(false,"认证失败");
|
|||
//// sendTo(ctx.channel(),messageWithAnswerDto);
|
|||
// }
|
|||
} |
|||
} |
@ -1,4 +1,4 @@ |
|||
spring: |
|||
profiles: |
|||
active: dev |
|||
include: common, util-dev |
|||
active: test |
|||
include: common, util-test |
@ -0,0 +1,32 @@ |
|||
package com.ccsens.tall.bean.dto.message; |
|||
|
|||
import com.ccsens.util.WebConstant; |
|||
import lombok.Data; |
|||
import lombok.Getter; |
|||
import lombok.Setter; |
|||
|
|||
/** |
|||
* @author __zHangSan |
|||
*/ |
|||
@Data |
|||
public class ChromeMessageDto extends BaseMessageDto{ |
|||
@Setter |
|||
@Getter |
|||
public static class Data{ |
|||
private String url; |
|||
} |
|||
private Data data; |
|||
|
|||
public ChromeMessageDto(){ |
|||
setType(WebConstant.Message_Type.Chrome.phase); |
|||
setTime(System.currentTimeMillis()); |
|||
} |
|||
|
|||
public ChromeMessageDto(String url){ |
|||
this(); |
|||
if(data == null){ |
|||
data = new Data(); |
|||
} |
|||
data.setUrl(url); |
|||
} |
|||
} |
@ -0,0 +1,35 @@ |
|||
package com.ccsens.tall.bean.dto.message; |
|||
|
|||
import com.ccsens.util.WebConstant; |
|||
import lombok.Data; |
|||
import lombok.Getter; |
|||
import lombok.Setter; |
|||
|
|||
/** |
|||
* @author __zHangSan |
|||
*/ |
|||
@Data |
|||
public class PPTCtlMessageDto extends BaseMessageDto{ |
|||
@Setter |
|||
@Getter |
|||
public static class Data{ |
|||
/** |
|||
* Supported Operation: up,down,begin,end |
|||
*/ |
|||
private String oper; |
|||
} |
|||
private Data data; |
|||
|
|||
public PPTCtlMessageDto(){ |
|||
setType(WebConstant.Message_Type.PPTCtl.phase); |
|||
setTime(System.currentTimeMillis()); |
|||
} |
|||
|
|||
public PPTCtlMessageDto(String oper){ |
|||
this(); |
|||
if(data == null){ |
|||
data = new Data(); |
|||
} |
|||
data.setOper(oper); |
|||
} |
|||
} |
@ -0,0 +1,8 @@ |
|||
package com.ccsens.tall.persist.dao; |
|||
|
|||
import com.ccsens.tall.persist.mapper.ProTaskMemberMapper; |
|||
import org.springframework.stereotype.Repository; |
|||
|
|||
@Repository |
|||
public interface ProTaskMemberDao extends ProTaskMemberMapper{ |
|||
} |
@ -1,7 +1,10 @@ |
|||
package com.ccsens.tall.service; |
|||
|
|||
import com.ccsens.tall.bean.po.ProTaskMember; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
|
|||
public interface ITaskMemberService { |
|||
|
|||
|
|||
void saveTaskMember(ProTaskMember proTaskMember); |
|||
} |
|||
|
Loading…
Reference in new issue