From f96b5f528317c8eba37ff9abfba5f3596337cc2b Mon Sep 17 00:00:00 2001 From: zhizhi wu <2377881365@qq.com> Date: Wed, 16 Dec 2020 20:34:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../netty/tcphexserver/ModbusDecoder.java | 6 +- .../ccmq/lowlevel/message/MessageHandler.java | 131 +++++++++--------- .../message/common/MessageConstant.java | 3 +- .../bean/dto/ccmodbus/CCModBusEntity.java | 4 +- 4 files changed, 76 insertions(+), 68 deletions(-) diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusDecoder.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusDecoder.java index ecabe81..0c757a4 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusDecoder.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusDecoder.java @@ -63,7 +63,7 @@ public class ModbusDecoder extends ByteToMessageDecoder { CCModBusEntity ccModBusEntity = new CCModBusEntity(in); CCModBusEntity.Error error = ccModBusEntity.valid(); - log.info("modbus数据检查:{}", error); + log.info("modBus数据检查:{}", error); switch (error) { case ERROR_FILTER_NOT_MATCH: case ERROR_LEN_EXCLUEE_MAX: @@ -82,6 +82,10 @@ public class ModbusDecoder extends ByteToMessageDecoder { //交给下个handler处理 discardNBytes(in, ccModBusEntity.getModbusLength()); + for (int i = 0; i < ccModBusEntity.getModbusData().length ; i++) { + log.info("modBus原始数据"+i + ":" + Integer.toHexString(ccModBusEntity.getModbusData()[i]) +"------" + ccModBusEntity.getModbusData()[i]); + } + InMessage message = ModbusConverter.convertCCModbusToMessage(ccModBusEntity); log.info("modBus封装成inMessage:{}", message); if (ObjectUtil.isNotNull(message)) { diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java index ec4172e..8607f6a 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java @@ -130,75 +130,79 @@ public class MessageHandler { List sendTimesUpLimitMessageList = null,expiredMessageList = null; while(true){ - logger.info("while true"); - //从redis中或取第一个待处理用户 - Object o = RedisUtil.sPop(RedisKeyManager.getPendingClientSetKey()); - logger.info("从redis中或取第一个待处理用户:{},{}", RedisKeyManager.getPendingClientSetKey(), o); - if(o != null && StrUtil.isNotEmpty(typeAndUserId = ackId = (String)o)){ - Console.log("RedisUtil.sPop: {}",o); - String []stringArray = CcMessageUtil.splitTypeAndUserId(typeAndUserId); - MessageConstant.DomainType toDomain = MessageConstant.DomainType.valueOf(stringArray[0]); - String to = stringArray[1]; - logger.info("to:{},toDomain:{}", to, toDomain); - MessageConstant.InvokerMessage invokerMessage = null; - if (stringArray.length >= 3 && StrUtil.isNotEmpty(stringArray[2]) ) { - invokerMessage = JacksonUtil.jsonToBean(stringArray[2], MessageConstant.InvokerMessage.class); - } - //授权时保存的业务消息 - String authBusinessMessage = stringArray.length > 3 ? stringArray[3] : null; - //针对同一用户,在上一次ack还未收到/超时之前,不进行处理 - Object lockObj = ResourceLock.getLockObj(ackId); - logger.info("上锁:{}", lockObj); - synchronized (lockObj) { - //查找该用户是否有正在处理的消息 - if (RedisUtil.hasKey(RedisKeyManager.getAckSetKey(ackId))) { - logger.info("用户有消息正在处理"); - //将当前用户重新放回到待处理列表的最后 - RedisUtil.sSet(RedisKeyManager.getPendingClientSetKey(), o); - continue; - } - - //查找所有没有ack的消息 - messageList = getMessageDao().getClientPendingMessage(toDomain, to, MAX_MESSAGE_NUM); - logger.info("messageList:{}", messageList); - if (CollectionUtil.isEmpty(messageList)) { - logger.info("messageList是空"); - continue; + try { + logger.info("while true"); + //从redis中或取第一个待处理用户 + Object o = RedisUtil.sPop(RedisKeyManager.getPendingClientSetKey()); + logger.info("从redis中或取第一个待处理用户:{},{}", RedisKeyManager.getPendingClientSetKey(), o); + if(o != null && StrUtil.isNotEmpty(typeAndUserId = ackId = (String)o)){ + Console.log("RedisUtil.sPop: {}",o); + String []stringArray = CcMessageUtil.splitTypeAndUserId(typeAndUserId); + MessageConstant.DomainType toDomain = MessageConstant.DomainType.valueOf(stringArray[0]); + String to = stringArray[1]; + logger.info("to:{},toDomain:{}", to, toDomain); + MessageConstant.InvokerMessage invokerMessage = null; + if (stringArray.length >= 3 && StrUtil.isNotEmpty(stringArray[2]) ) { + invokerMessage = JacksonUtil.jsonToBean(stringArray[2], MessageConstant.InvokerMessage.class); } + //授权时保存的业务消息 + String authBusinessMessage = stringArray.length > 3 ? stringArray[3] : null; + //针对同一用户,在上一次ack还未收到/超时之前,不进行处理 + Object lockObj = ResourceLock.getLockObj(ackId); + logger.info("上锁:{}", lockObj); + synchronized (lockObj) { + //查找该用户是否有正在处理的消息 + if (RedisUtil.hasKey(RedisKeyManager.getAckSetKey(ackId))) { + logger.info("用户有消息正在处理"); + //将当前用户重新放回到待处理列表的最后 + RedisUtil.sSet(RedisKeyManager.getPendingClientSetKey(), o); + continue; + } - //判断用户是否在线 - boolean clientOnLine = ClientManager.isUserOnline(toDomain,to); - logger.info("用户是否在线:{}", clientOnLine); - //发送和处理消息 - if(clientOnLine){ - //发送 - //1.手机所有待发送的messageId,以ackId为key放入redis - Set messageIdSet = new HashSet<>(messageList.size()); - OutMessageSet outMessageSet = OutMessageSet.newInstance(); - for(Message message : messageList) { - messageIdSet.add(message.getId()); - outMessageSet.add(new OutMessage(message, authBusinessMessage)); - getMessageDao().incrementSendTimes(message.getId(), DateUtil.currentSeconds()); + //查找所有没有ack的消息 + messageList = getMessageDao().getClientPendingMessage(toDomain, to, MAX_MESSAGE_NUM); + logger.info("messageList:{}", messageList); + if (CollectionUtil.isEmpty(messageList)) { + logger.info("messageList是空"); + continue; } - RedisUtil.sSetAndTime(RedisKeyManager.getAckSetKey(ackId),REDIS_ACK_EXPIRED_SECONDS,messageIdSet.toArray()); - logger.info("存储redis:{},{},{}",RedisKeyManager.getAckSetKey(ackId),REDIS_ACK_EXPIRED_SECONDS,messageIdSet.toArray()); - //2.构造outMessage并且发送 - outMessageSet.ackId(ackId); - - //发送给对应的接收者 - logger.info("给{}发送消息:{}", to, outMessageSet); - ClientManager.sendMessageToAuthedClient(toDomain,invokerMessage, to,outMessageSet); - }else{ - //不发送,根据规则检查所有“offLineDiscard”的消息设置为failed状态 - for(Message message : messageList){ - if(message.getRule().getOfflineDiscard() == 1){ - getMessageDao().updateMessageStatus(message.getId(),MessageConstant.Status.Failed); + + //判断用户是否在线 + boolean clientOnLine = ClientManager.isUserOnline(toDomain,to); + logger.info("用户是否在线:{}", clientOnLine); + //发送和处理消息 + if(clientOnLine){ + //发送 + //1.手机所有待发送的messageId,以ackId为key放入redis + Set messageIdSet = new HashSet<>(messageList.size()); + OutMessageSet outMessageSet = OutMessageSet.newInstance(); + for(Message message : messageList) { + messageIdSet.add(message.getId()); + outMessageSet.add(new OutMessage(message, authBusinessMessage)); + getMessageDao().incrementSendTimes(message.getId(), DateUtil.currentSeconds()); } + RedisUtil.sSetAndTime(RedisKeyManager.getAckSetKey(ackId),REDIS_ACK_EXPIRED_SECONDS,messageIdSet.toArray()); + logger.info("存储redis:{},{},{}",RedisKeyManager.getAckSetKey(ackId),REDIS_ACK_EXPIRED_SECONDS,messageIdSet.toArray()); + //2.构造outMessage并且发送 + outMessageSet.ackId(ackId); + + //发送给对应的接收者 + logger.info("给{}发送消息:{}", to, outMessageSet); + ClientManager.sendMessageToAuthedClient(toDomain,invokerMessage, to,outMessageSet); + }else{ + //不发送,根据规则检查所有“offLineDiscard”的消息设置为failed状态 + for(Message message : messageList){ + if(message.getRule().getOfflineDiscard() == 1){ + getMessageDao().updateMessageStatus(message.getId(),MessageConstant.Status.Failed); + } + } + logger.info("没有发送消息"); } - logger.info("没有发送消息"); } + ResourceLock.freeLockObj(ackId); } - ResourceLock.freeLockObj(ackId); + } catch (Exception e) { + logger.error("循环发送消息异常", e); } try { Thread.sleep(100); @@ -476,8 +480,9 @@ public class MessageHandler { // 寄存器将要修改的数据 long value = 0; for (int j = 0; j < CCModBusEntity.SIZE_DATA_SINGLE; j++) { + value <<= 8; - value += modBusData[CCModBusEntity.POSITION_DATA + CCModBusEntity.SIZE_DATA_SINGLE * i + j]; + value |= modBusData[CCModBusEntity.POSITION_DATA + CCModBusEntity.SIZE_DATA_SINGLE * i + j] & 0xFF; } if (origin == null || origin.longValue() != value) { register.put(String.valueOf(startAddr + i), value); diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java index 9da9dc9..34c6102 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java @@ -28,8 +28,7 @@ public class MessageConstant { } public long getAddr(){ - long addr = first << 8; - addr += second; + long addr = ((long)first << 8 | second) & 0xFFFF; return addr; } diff --git a/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java b/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java index 5be0a32..58e4285 100644 --- a/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java +++ b/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java @@ -73,7 +73,7 @@ public class CCModBusEntity { int start = POSITION_REGISTER; for (int i = 0; i < SIZE_REGISTER_START; i++) { addr <<= 8; - addr += modBusData[start + i]; + addr |= modBusData[start + i] & 0xFF; } return addr; @@ -97,7 +97,7 @@ public class CCModBusEntity { int start = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_REGISTER_START; for (int i = 0; i < SIZE_REGISTER_NUM; i++) { number <<= 8; - number += modBusData[start + i]; + number |= modBusData[start + i] & 0xFF; } return number;