Browse Source

日志

master
zhizhi wu 5 years ago
parent
commit
0626c85568
  1. 4
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/ChannelManager.java
  2. 8
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java

4
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/ChannelManager.java

@ -215,12 +215,12 @@ public class ChannelManager {
* @param message 消息
*/
public static synchronized boolean sendTo(String userId,Object message){
logger.debug("Invoke sendTo({},{})",userId,message);
logger.info("Invoke sendTo({},{})",userId,message);
Set<WrapperedChannel> wChannelSet = authedChannels.get(userId);
if(CollectionUtil.isNotEmpty(wChannelSet)){
for(WrapperedChannel wChannel:wChannelSet){
wChannel.writeAndFlush(message);
logger.debug("Send message {} to channel {}",message,userId);
logger.info("Send message {} to channel {}",message,userId);
}
return true;
}else{

8
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java

@ -124,19 +124,22 @@ public class MessageHandler {
@Async("cc-msg-executor")
public void loopSendMessage() throws Exception {
logger.info("开始执行循环方法");
String typeAndUserId = null,ackId = null;
List<Message> messageList = null;
List<String> sendTimesUpLimitMessageList = null,expiredMessageList = null;
while(true){
logger.info("while true");
//从redis中或取第一个待处理用户
Object o = RedisUtil.sPop(RedisKeyManager.getPendingClientSetKey());
logger.info("从redis中或取第一个待处理用户:{},{}", RedisKeyManager.getPendingClientSetKey(), o);
if(o != null && StrUtil.isNotEmpty(typeAndUserId = ackId = (String)o)){
Console.log("RedisUtil.sPop: {}",o);
String []stringArray = CcMessageUtil.splitTypeAndUserId(typeAndUserId);
MessageConstant.DomainType toDomain = MessageConstant.DomainType.valueOf(stringArray[0]);
String to = stringArray[1];
logger.info("to:{},toDomain:{}", to, toDomain);
MessageConstant.InvokerMessage invokerMessage = null;
if (stringArray.length >= 3 && StrUtil.isNotEmpty(stringArray[2]) ) {
invokerMessage = JacksonUtil.jsonToBean(stringArray[2], MessageConstant.InvokerMessage.class);
@ -157,6 +160,7 @@ public class MessageHandler {
//查找所有没有ack的消息
messageList = getMessageDao().getClientPendingMessage(toDomain, to, MAX_MESSAGE_NUM);
logger.info("messageList:{}", messageList);
if (CollectionUtil.isEmpty(messageList)) {
logger.info("messageList是空");
continue;
@ -177,7 +181,7 @@ public class MessageHandler {
getMessageDao().incrementSendTimes(message.getId(), DateUtil.currentSeconds());
}
RedisUtil.sSetAndTime(RedisKeyManager.getAckSetKey(ackId),REDIS_ACK_EXPIRED_SECONDS,messageIdSet.toArray());
logger.info("存储redis:{},{},{}",RedisKeyManager.getAckSetKey(ackId),REDIS_ACK_EXPIRED_SECONDS,messageIdSet.toArray());
//2.构造outMessage并且发送
outMessageSet.ackId(ackId);

Loading…
Cancel
Save