Browse Source

日志

master
zhizhi wu 5 years ago
parent
commit
f96b5f5283
  1. 6
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusDecoder.java
  2. 131
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java
  3. 3
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java
  4. 4
      ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java

6
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusDecoder.java

@ -63,7 +63,7 @@ public class ModbusDecoder extends ByteToMessageDecoder {
CCModBusEntity ccModBusEntity = new CCModBusEntity(in); CCModBusEntity ccModBusEntity = new CCModBusEntity(in);
CCModBusEntity.Error error = ccModBusEntity.valid(); CCModBusEntity.Error error = ccModBusEntity.valid();
log.info("modbus数据检查:{}", error); log.info("modBus数据检查:{}", error);
switch (error) { switch (error) {
case ERROR_FILTER_NOT_MATCH: case ERROR_FILTER_NOT_MATCH:
case ERROR_LEN_EXCLUEE_MAX: case ERROR_LEN_EXCLUEE_MAX:
@ -82,6 +82,10 @@ public class ModbusDecoder extends ByteToMessageDecoder {
//交给下个handler处理 //交给下个handler处理
discardNBytes(in, ccModBusEntity.getModbusLength()); discardNBytes(in, ccModBusEntity.getModbusLength());
for (int i = 0; i < ccModBusEntity.getModbusData().length ; i++) {
log.info("modBus原始数据"+i + ":" + Integer.toHexString(ccModBusEntity.getModbusData()[i]) +"------" + ccModBusEntity.getModbusData()[i]);
}
InMessage message = ModbusConverter.convertCCModbusToMessage(ccModBusEntity); InMessage message = ModbusConverter.convertCCModbusToMessage(ccModBusEntity);
log.info("modBus封装成inMessage:{}", message); log.info("modBus封装成inMessage:{}", message);
if (ObjectUtil.isNotNull(message)) { if (ObjectUtil.isNotNull(message)) {

131
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java

@ -130,75 +130,79 @@ public class MessageHandler {
List<String> sendTimesUpLimitMessageList = null,expiredMessageList = null; List<String> sendTimesUpLimitMessageList = null,expiredMessageList = null;
while(true){ while(true){
logger.info("while true"); try {
//从redis中或取第一个待处理用户 logger.info("while true");
Object o = RedisUtil.sPop(RedisKeyManager.getPendingClientSetKey()); //从redis中或取第一个待处理用户
logger.info("从redis中或取第一个待处理用户:{},{}", RedisKeyManager.getPendingClientSetKey(), o); Object o = RedisUtil.sPop(RedisKeyManager.getPendingClientSetKey());
if(o != null && StrUtil.isNotEmpty(typeAndUserId = ackId = (String)o)){ logger.info("从redis中或取第一个待处理用户:{},{}", RedisKeyManager.getPendingClientSetKey(), o);
Console.log("RedisUtil.sPop: {}",o); if(o != null && StrUtil.isNotEmpty(typeAndUserId = ackId = (String)o)){
String []stringArray = CcMessageUtil.splitTypeAndUserId(typeAndUserId); Console.log("RedisUtil.sPop: {}",o);
MessageConstant.DomainType toDomain = MessageConstant.DomainType.valueOf(stringArray[0]); String []stringArray = CcMessageUtil.splitTypeAndUserId(typeAndUserId);
String to = stringArray[1]; MessageConstant.DomainType toDomain = MessageConstant.DomainType.valueOf(stringArray[0]);
logger.info("to:{},toDomain:{}", to, toDomain); String to = stringArray[1];
MessageConstant.InvokerMessage invokerMessage = null; logger.info("to:{},toDomain:{}", to, toDomain);
if (stringArray.length >= 3 && StrUtil.isNotEmpty(stringArray[2]) ) { MessageConstant.InvokerMessage invokerMessage = null;
invokerMessage = JacksonUtil.jsonToBean(stringArray[2], MessageConstant.InvokerMessage.class); if (stringArray.length >= 3 && StrUtil.isNotEmpty(stringArray[2]) ) {
} invokerMessage = JacksonUtil.jsonToBean(stringArray[2], MessageConstant.InvokerMessage.class);
//授权时保存的业务消息
String authBusinessMessage = stringArray.length > 3 ? stringArray[3] : null;
//针对同一用户,在上一次ack还未收到/超时之前,不进行处理
Object lockObj = ResourceLock.getLockObj(ackId);
logger.info("上锁:{}", lockObj);
synchronized (lockObj) {
//查找该用户是否有正在处理的消息
if (RedisUtil.hasKey(RedisKeyManager.getAckSetKey(ackId))) {
logger.info("用户有消息正在处理");
//将当前用户重新放回到待处理列表的最后
RedisUtil.sSet(RedisKeyManager.getPendingClientSetKey(), o);
continue;
}
//查找所有没有ack的消息
messageList = getMessageDao().getClientPendingMessage(toDomain, to, MAX_MESSAGE_NUM);
logger.info("messageList:{}", messageList);
if (CollectionUtil.isEmpty(messageList)) {
logger.info("messageList是空");
continue;
} }
//授权时保存的业务消息
String authBusinessMessage = stringArray.length > 3 ? stringArray[3] : null;
//针对同一用户,在上一次ack还未收到/超时之前,不进行处理
Object lockObj = ResourceLock.getLockObj(ackId);
logger.info("上锁:{}", lockObj);
synchronized (lockObj) {
//查找该用户是否有正在处理的消息
if (RedisUtil.hasKey(RedisKeyManager.getAckSetKey(ackId))) {
logger.info("用户有消息正在处理");
//将当前用户重新放回到待处理列表的最后
RedisUtil.sSet(RedisKeyManager.getPendingClientSetKey(), o);
continue;
}
//判断用户是否在线 //查找所有没有ack的消息
boolean clientOnLine = ClientManager.isUserOnline(toDomain,to); messageList = getMessageDao().getClientPendingMessage(toDomain, to, MAX_MESSAGE_NUM);
logger.info("用户是否在线:{}", clientOnLine); logger.info("messageList:{}", messageList);
//发送和处理消息 if (CollectionUtil.isEmpty(messageList)) {
if(clientOnLine){ logger.info("messageList是空");
//发送 continue;
//1.手机所有待发送的messageId,以ackId为key放入redis
Set<String> messageIdSet = new HashSet<>(messageList.size());
OutMessageSet outMessageSet = OutMessageSet.newInstance();
for(Message message : messageList) {
messageIdSet.add(message.getId());
outMessageSet.add(new OutMessage(message, authBusinessMessage));
getMessageDao().incrementSendTimes(message.getId(), DateUtil.currentSeconds());
} }
RedisUtil.sSetAndTime(RedisKeyManager.getAckSetKey(ackId),REDIS_ACK_EXPIRED_SECONDS,messageIdSet.toArray());
logger.info("存储redis:{},{},{}",RedisKeyManager.getAckSetKey(ackId),REDIS_ACK_EXPIRED_SECONDS,messageIdSet.toArray()); //判断用户是否在线
//2.构造outMessage并且发送 boolean clientOnLine = ClientManager.isUserOnline(toDomain,to);
outMessageSet.ackId(ackId); logger.info("用户是否在线:{}", clientOnLine);
//发送和处理消息
//发送给对应的接收者 if(clientOnLine){
logger.info("给{}发送消息:{}", to, outMessageSet); //发送
ClientManager.sendMessageToAuthedClient(toDomain,invokerMessage, to,outMessageSet); //1.手机所有待发送的messageId,以ackId为key放入redis
}else{ Set<String> messageIdSet = new HashSet<>(messageList.size());
//不发送,根据规则检查所有“offLineDiscard”的消息设置为failed状态 OutMessageSet outMessageSet = OutMessageSet.newInstance();
for(Message message : messageList){ for(Message message : messageList) {
if(message.getRule().getOfflineDiscard() == 1){ messageIdSet.add(message.getId());
getMessageDao().updateMessageStatus(message.getId(),MessageConstant.Status.Failed); outMessageSet.add(new OutMessage(message, authBusinessMessage));
getMessageDao().incrementSendTimes(message.getId(), DateUtil.currentSeconds());
} }
RedisUtil.sSetAndTime(RedisKeyManager.getAckSetKey(ackId),REDIS_ACK_EXPIRED_SECONDS,messageIdSet.toArray());
logger.info("存储redis:{},{},{}",RedisKeyManager.getAckSetKey(ackId),REDIS_ACK_EXPIRED_SECONDS,messageIdSet.toArray());
//2.构造outMessage并且发送
outMessageSet.ackId(ackId);
//发送给对应的接收者
logger.info("给{}发送消息:{}", to, outMessageSet);
ClientManager.sendMessageToAuthedClient(toDomain,invokerMessage, to,outMessageSet);
}else{
//不发送,根据规则检查所有“offLineDiscard”的消息设置为failed状态
for(Message message : messageList){
if(message.getRule().getOfflineDiscard() == 1){
getMessageDao().updateMessageStatus(message.getId(),MessageConstant.Status.Failed);
}
}
logger.info("没有发送消息");
} }
logger.info("没有发送消息");
} }
ResourceLock.freeLockObj(ackId);
} }
ResourceLock.freeLockObj(ackId); } catch (Exception e) {
logger.error("循环发送消息异常", e);
} }
try { try {
Thread.sleep(100); Thread.sleep(100);
@ -476,8 +480,9 @@ public class MessageHandler {
// 寄存器将要修改的数据 // 寄存器将要修改的数据
long value = 0; long value = 0;
for (int j = 0; j < CCModBusEntity.SIZE_DATA_SINGLE; j++) { for (int j = 0; j < CCModBusEntity.SIZE_DATA_SINGLE; j++) {
value <<= 8; value <<= 8;
value += modBusData[CCModBusEntity.POSITION_DATA + CCModBusEntity.SIZE_DATA_SINGLE * i + j]; value |= modBusData[CCModBusEntity.POSITION_DATA + CCModBusEntity.SIZE_DATA_SINGLE * i + j] & 0xFF;
} }
if (origin == null || origin.longValue() != value) { if (origin == null || origin.longValue() != value) {
register.put(String.valueOf(startAddr + i), value); register.put(String.valueOf(startAddr + i), value);

3
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java

@ -28,8 +28,7 @@ public class MessageConstant {
} }
public long getAddr(){ public long getAddr(){
long addr = first << 8; long addr = ((long)first << 8 | second) & 0xFFFF;
addr += second;
return addr; return addr;
} }

4
ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java

@ -73,7 +73,7 @@ public class CCModBusEntity {
int start = POSITION_REGISTER; int start = POSITION_REGISTER;
for (int i = 0; i < SIZE_REGISTER_START; i++) { for (int i = 0; i < SIZE_REGISTER_START; i++) {
addr <<= 8; addr <<= 8;
addr += modBusData[start + i]; addr |= modBusData[start + i] & 0xFF;
} }
return addr; return addr;
@ -97,7 +97,7 @@ public class CCModBusEntity {
int start = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_REGISTER_START; int start = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_REGISTER_START;
for (int i = 0; i < SIZE_REGISTER_NUM; i++) { for (int i = 0; i < SIZE_REGISTER_NUM; i++) {
number <<= 8; number <<= 8;
number += modBusData[start + i]; number |= modBusData[start + i] & 0xFF;
} }
return number; return number;

Loading…
Cancel
Save