@ -2,8 +2,8 @@ package com.ccsens.game.netty;
import cn.hutool.core.collection.CollectionUtil ;
import io.netty.channel.Channel ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import lombok.extern.slf4j.Slf4j ;
import java.util.* ;
import java.util.concurrent.ConcurrentHashMap ;
@ -11,8 +11,8 @@ import java.util.concurrent.ConcurrentHashMap;
/ * *
* @author wei
* /
@Slf4j
public class ChannelManager {
private static Logger logger = LoggerFactory . getLogger ( ChannelManager . class ) ;
private static ThreadLocal < Channel > threadLocal = new ThreadLocal < > ( ) ;
/ * *
@ -63,12 +63,12 @@ public class ChannelManager {
* @param serverType 服务器类型
* /
public static synchronized void addChannel ( Channel channel , String serverType ) {
logger . debug ( "Invoke addChannel({},{})" , channel , serverType ) ;
log . info ( "Invoke addChannel({},{})" , channel , serverType ) ;
if ( null ! = channel ) {
rawChannels . put ( channel , new WrapperedChannel ( channel , serverType ) ) ;
logger . debug ( "Add a new channel: {},{}" , channel . id ( ) . asLongText ( ) , serverType ) ;
log . info ( "Add a new channel: {},{}" , channel . id ( ) . asLongText ( ) , serverType ) ;
} else {
logger . error ( "channel is null" ) ;
log . error ( "channel is null" ) ;
}
}
@ -80,7 +80,7 @@ public class ChannelManager {
* @param minor 客户端次版本号
* /
public static synchronized void authChannel ( Channel channel , String userId , Integer major , Integer minor ) {
logger . debug ( "Invoke authedChannels({},{},{},{})" , channel . id ( ) . asLongText ( ) , userId , major , minor ) ;
log . info ( "Invoke authedChannels({},{},{},{})" , channel . id ( ) . asLongText ( ) , userId , major , minor ) ;
major = major ! = null ? major : 0 ;
minor = minor ! = null ? minor : 0 ;
@ -89,9 +89,9 @@ public class ChannelManager {
wrapperedChannel . whenAuthed ( userId , major , minor ) ;
Set < WrapperedChannel > authedWchannelSet = authedChannels . computeIfAbsent ( userId , k - > new HashSet < > ( ) ) ;
authedWchannelSet . add ( wrapperedChannel ) ;
logger . debug ( "Authed channel {} with user {}" , channel . id ( ) . asLongText ( ) , userId ) ;
log . info ( "Authed channel {} with user {}" , channel . id ( ) . asLongText ( ) , userId ) ;
} else {
logger . error ( "Authed channel,but wrappedChannel is null." ) ;
log . error ( "Authed channel,but wrappedChannel is null." ) ;
}
}
@ -104,7 +104,7 @@ public class ChannelManager {
* @param minor 用户参加的游戏id
* /
public static synchronized void authChannel ( Channel channel , String userId , Integer major , Integer minor , String recordId ) {
logger . debug ( "Invoke authedChannels({},{},{},{})" , channel . id ( ) . asLongText ( ) , userId , major , minor ) ;
log . info ( "Invoke authedChannels({},{},{},{})" , channel . id ( ) . asLongText ( ) , userId , major , minor ) ;
major = major ! = null ? major : 0 ;
minor = minor ! = null ? minor : 0 ;
@ -113,9 +113,9 @@ public class ChannelManager {
wrapperedChannel . whenAuthed ( userId , major , minor , recordId ) ;
Set < WrapperedChannel > authedWchannelSet = authedChannels . computeIfAbsent ( userId , k - > new HashSet < > ( ) ) ;
authedWchannelSet . add ( wrapperedChannel ) ;
logger . debug ( "Authed channel {} with user {}" , channel . id ( ) . asLongText ( ) , userId ) ;
log . info ( "Authed channel {} with user {}" , channel . id ( ) . asLongText ( ) , userId ) ;
} else {
logger . error ( "Authed channel,but wrappedChannel is null." ) ;
log . error ( "Authed channel,but wrappedChannel is null." ) ;
}
}
@ -124,27 +124,27 @@ public class ChannelManager {
* @param channel 要移除的连接
* /
public static synchronized void removeChannel ( Channel channel ) {
logger . debug ( "Invoke removeChannel({})" , channel . id ( ) . asLongText ( ) ) ;
log . info ( "Invoke removeChannel({})" , channel . id ( ) . asLongText ( ) ) ;
WrapperedChannel wrapperedChannel = rawChannels . get ( channel ) ;
if ( wrapperedChannel ! = null ) {
//从rawChannels集合中删除
rawChannels . remove ( channel ) ;
logger . debug ( "Remove a channel from rawChannels: {}" , channel . id ( ) . asLongText ( ) ) ;
log . info ( "Remove a channel from rawChannels: {}" , channel . id ( ) . asLongText ( ) ) ;
if ( wrapperedChannel . isAuthed ( ) ) {
Set < WrapperedChannel > authedChannelSet = authedChannels . get ( wrapperedChannel . getUserId ( ) ) ;
//从authedChannel集合的value(set)中删除
if ( CollectionUtil . isNotEmpty ( authedChannelSet ) ) {
authedChannelSet . remove ( wrapperedChannel ) ;
logger . debug ( "Remove a channel from authedChannelSet: {}, {}" , wrapperedChannel . getUserId ( ) , channel . id ( ) . asLongText ( ) ) ;
log . info ( "Remove a channel from authedChannelSet: {}, {}" , wrapperedChannel . getUserId ( ) , channel . id ( ) . asLongText ( ) ) ;
}
//从authedChannel中删除,此处不用else,因为if中语句执行完毕之后,authedChannelSet也可能变成空集合
if ( CollectionUtil . isEmpty ( authedChannelSet ) ) {
authedChannels . remove ( wrapperedChannel . getUserId ( ) ) ;
logger . debug ( "Remove a user from authedChannels: {}" , wrapperedChannel . getUserId ( ) ) ;
log . info ( "Remove a user from authedChannels: {}" , wrapperedChannel . getUserId ( ) ) ;
}
}
} else {
logger . error ( "Remove channel,but wrappedChannel is null." ) ;
log . error ( "Remove channel,but wrappedChannel is null." ) ;
}
if ( channel . isOpen ( ) | | channel . isActive ( ) ) {
@ -157,18 +157,18 @@ public class ChannelManager {
* @param userId 要移除的用户
* /
public static synchronized void removeUser ( String userId ) {
logger . debug ( "Invoke remove user : {}" , userId ) ;
log . info ( "Invoke remove user : {}" , userId ) ;
Set < WrapperedChannel > wChannelSet = authedChannels . get ( userId ) ;
if ( CollectionUtil . isNotEmpty ( wChannelSet ) ) {
for ( WrapperedChannel wChannel : wChannelSet ) {
//从rawChannel中依次删除
rawChannels . remove ( wChannel . getChannel ( ) ) ;
logger . debug ( "Remove a channel from rawChannels: {}" , wChannel . getChannel ( ) . id ( ) . asLongText ( ) ) ;
log . info ( "Remove a channel from rawChannels: {}" , wChannel . getChannel ( ) . id ( ) . asLongText ( ) ) ;
}
}
//从authedChannel中删除
authedChannels . remove ( userId ) ;
logger . debug ( "Remove a user from authedChannels: {}" , userId ) ;
log . info ( "Remove a user from authedChannels: {}" , userId ) ;
}
/ * *
@ -179,13 +179,13 @@ public class ChannelManager {
* @param minor 次版本号
* /
public static synchronized void versionChannel ( Channel channel , int major , int minor ) {
logger . debug ( "Invoke Version channel({},{},{}))" , channel . id ( ) . asLongText ( ) , major , minor ) ;
log . info ( "Invoke Version channel({},{},{}))" , channel . id ( ) . asLongText ( ) , major , minor ) ;
WrapperedChannel wChannel = rawChannels . get ( channel ) ;
if ( wChannel ! = null ) {
wChannel . setVersion ( major , minor ) ;
logger . debug ( "Version Channel: {},{},{}" , channel . id ( ) . asLongText ( ) , major , minor ) ;
log . info ( "Version Channel: {},{},{}" , channel . id ( ) . asLongText ( ) , major , minor ) ;
} else {
logger . error ( "Remove channel,but wrappedChannel is null." ) ;
log . error ( "Remove channel,but wrappedChannel is null." ) ;
}
}
@ -194,12 +194,12 @@ public class ChannelManager {
* @param message 消息
* /
public static synchronized void broadCastAuthed ( Object message ) {
logger . debug ( "Invoke broadCastAuthed({})" , message ) ;
log . info ( "Invoke broadCastAuthed({})" , message ) ;
for ( Map . Entry < Channel , WrapperedChannel > entry : rawChannels . entrySet ( ) ) {
WrapperedChannel wChannel = entry . getValue ( ) ;
if ( wChannel . isAuthed ( ) ) {
wChannel . writeAndFlush ( message ) ;
logger . debug ( "Send Message {} to {},{}" , message , wChannel . getUserId ( ) , wChannel . getId ( ) ) ;
log . info ( "Send Message {} to {},{}" , message , wChannel . getUserId ( ) , wChannel . getId ( ) ) ;
}
}
}
@ -209,10 +209,10 @@ public class ChannelManager {
* @param message 消息
* /
public static synchronized void broadCast ( Object message ) {
logger . debug ( "Invoke broadCast({})" , message ) ;
log . info ( "Invoke broadCast({})" , message ) ;
for ( Map . Entry < Channel , WrapperedChannel > entry : rawChannels . entrySet ( ) ) {
entry . getValue ( ) . writeAndFlush ( message ) ;
logger . debug ( "Send Message {} to {}" , message , entry . getValue ( ) . getId ( ) ) ;
log . info ( "Send Message {} to {}" , message , entry . getValue ( ) . getId ( ) ) ;
}
}
@ -222,13 +222,13 @@ public class ChannelManager {
* @param message 消息
* /
public static synchronized void sendTo ( Channel channel , Object message ) {
logger . debug ( "Invoke sendTo({},{})" , channel . id ( ) . asLongText ( ) , message ) ;
log . info ( "Invoke sendTo({},{})" , channel . id ( ) . asLongText ( ) , message ) ;
WrapperedChannel wrapperedChannel = rawChannels . get ( channel ) ;
if ( wrapperedChannel ! = null ) {
wrapperedChannel . writeAndFlush ( message ) ;
logger . debug ( "Write message {} to Channel {}" , message , channel ) ;
log . info ( "Write message {} to Channel {}" , message , channel ) ;
} else {
logger . error ( "can't find channel from rawChannels" ) ;
log . error ( "can't find channel from rawChannels" ) ;
}
}
@ -238,12 +238,13 @@ public class ChannelManager {
* @param message 消息
* /
public static synchronized boolean sendTo ( String userId , Object message ) {
logger . debug ( "Invoke sendTo({},{})" , userId , message ) ;
log . info ( "Invoke sendTo({},{})" , userId , message ) ;
Set < WrapperedChannel > wChannelSet = authedChannels . get ( userId ) ;
log . info ( "wChannelSet sendTo({},{})" , userId , wChannelSet ) ;
if ( CollectionUtil . isNotEmpty ( wChannelSet ) ) {
for ( WrapperedChannel wChannel : wChannelSet ) {
wChannel . writeAndFlush ( message ) ;
logger . debug ( "Send message {} to channel {}" , message , userId ) ;
log . info ( "Send message {} to channel {}" , message , userId ) ;
}
return true ;
} else {
@ -256,12 +257,12 @@ public class ChannelManager {
* @param channel 接收数据的连接
* /
public static synchronized void flushReceiveTimestamp ( Channel channel ) {
logger . debug ( "Invoke flushReceiveTimestamp({})" , channel . id ( ) . asLongText ( ) ) ;
log . info ( "Invoke flushReceiveTimestamp({})" , channel . id ( ) . asLongText ( ) ) ;
WrapperedChannel wrapperedChannel = rawChannels . get ( channel ) ;
if ( wrapperedChannel ! = null ) {
wrapperedChannel . whenReceivedData ( ) ;
} else {
logger . error ( "can find channel from rawChannels" ) ;
log . error ( "can find channel from rawChannels" ) ;
}
}
@ -273,7 +274,7 @@ public class ChannelManager {
* /
@Deprecated
public static synchronized void closeUnAuthedChannels ( Long unAuthedChannelsMaxAliveTimeInSeconds ) throws Exception {
logger . debug ( "Inovke closeUnAuthedChannels({})" , unAuthedChannelsMaxAliveTimeInSeconds ) ;
log . info ( "Inovke closeUnAuthedChannels({})" , unAuthedChannelsMaxAliveTimeInSeconds ) ;
Iterator < Map . Entry < Channel , WrapperedChannel > > it = rawChannels . entrySet ( ) . iterator ( ) ;
while ( it . hasNext ( ) ) {
Map . Entry < Channel , WrapperedChannel > entry = it . next ( ) ;
@ -282,7 +283,7 @@ public class ChannelManager {
it . remove ( ) ;
//关闭连接
wrapperedChannel . getChannel ( ) . close ( ) ;
logger . debug ( "Remove a unAuthed Channel {}, which has connected {}s,maxOutTime is {}s" , wrapperedChannel . getId ( ) , wrapperedChannel . getConnectedSeconds ( ) ,
log . info ( "Remove a unAuthed Channel {}, which has connected {}s,maxOutTime is {}s" , wrapperedChannel . getId ( ) , wrapperedChannel . getConnectedSeconds ( ) ,
unAuthedChannelsMaxAliveTimeInSeconds ) ;
}
}
@ -370,13 +371,13 @@ public class ChannelManager {
public static synchronized void showAuthedChannels ( ) {
for ( Map . Entry < String , Set < WrapperedChannel > > entry : authedChannels . entrySet ( ) ) {
for ( WrapperedChannel channel : entry . getValue ( ) ) {
logger . debug ( "{}-->{}" , entry . getKey ( ) , channel . toString ( ) ) ;
log . info ( "{}-->{}" , entry . getKey ( ) , channel . toString ( ) ) ;
}
}
}
public static synchronized void showAllChannels ( ) {
for ( Map . Entry < Channel , WrapperedChannel > entry : rawChannels . entrySet ( ) ) {
logger . debug ( entry . getValue ( ) . toString ( ) ) ;
log . info ( entry . getValue ( ) . toString ( ) ) ;
}
}