Browse Source

Merge branch 'master' of gitee.com:ccsens_s/common

master
zy_Java 5 years ago
parent
commit
73f172c3be
  1. 4
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/ChannelManager.java
  2. 6
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusDecoder.java
  3. 129
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java
  4. 3
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java
  5. 7
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/persist/MessageDao.java
  6. 4
      ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java
  7. 2
      ccmq/src/main/java/wiki/tall/ccmq/common/util/WebConstant.java

4
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/ChannelManager.java

@ -215,12 +215,12 @@ public class ChannelManager {
* @param message 消息
*/
public static synchronized boolean sendTo(String userId,Object message){
logger.debug("Invoke sendTo({},{})",userId,message);
logger.info("Invoke sendTo({},{})",userId,message);
Set<WrapperedChannel> wChannelSet = authedChannels.get(userId);
if(CollectionUtil.isNotEmpty(wChannelSet)){
for(WrapperedChannel wChannel:wChannelSet){
wChannel.writeAndFlush(message);
logger.debug("Send message {} to channel {}",message,userId);
logger.info("Send message {} to channel {}",message,userId);
}
return true;
}else{

6
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusDecoder.java

@ -63,7 +63,7 @@ public class ModbusDecoder extends ByteToMessageDecoder {
CCModBusEntity ccModBusEntity = new CCModBusEntity(in);
CCModBusEntity.Error error = ccModBusEntity.valid();
log.info("modbus数据检查:{}", error);
log.info("modBus数据检查:{}", error);
switch (error) {
case ERROR_FILTER_NOT_MATCH:
case ERROR_LEN_EXCLUEE_MAX:
@ -82,6 +82,10 @@ public class ModbusDecoder extends ByteToMessageDecoder {
//交给下个handler处理
discardNBytes(in, ccModBusEntity.getModbusLength());
for (int i = 0; i < ccModBusEntity.getModbusData().length ; i++) {
log.info("modBus原始数据"+i + ":" + Integer.toHexString(ccModBusEntity.getModbusData()[i]) +"------" + ccModBusEntity.getModbusData()[i]);
}
InMessage message = ModbusConverter.convertCCModbusToMessage(ccModBusEntity);
log.info("modBus封装成inMessage:{}", message);
if (ObjectUtil.isNotNull(message)) {

129
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java

@ -124,77 +124,85 @@ public class MessageHandler {
@Async("cc-msg-executor")
public void loopSendMessage() throws Exception {
logger.info("开始执行循环方法");
String typeAndUserId = null,ackId = null;
List<Message> messageList = null;
List<String> sendTimesUpLimitMessageList = null,expiredMessageList = null;
while(true){
//从redis中或取第一个待处理用户
Object o = RedisUtil.sPop(RedisKeyManager.getPendingClientSetKey());
if(o != null && StrUtil.isNotEmpty(typeAndUserId = ackId = (String)o)){
Console.log("RedisUtil.sPop: {}",o);
String []stringArray = CcMessageUtil.splitTypeAndUserId(typeAndUserId);
MessageConstant.DomainType toDomain = MessageConstant.DomainType.valueOf(stringArray[0]);
String to = stringArray[1];
MessageConstant.InvokerMessage invokerMessage = null;
if (stringArray.length >= 3 && StrUtil.isNotEmpty(stringArray[2]) ) {
invokerMessage = JacksonUtil.jsonToBean(stringArray[2], MessageConstant.InvokerMessage.class);
}
//授权时保存的业务消息
String authBusinessMessage = stringArray.length > 3 ? stringArray[3] : null;
//针对同一用户,在上一次ack还未收到/超时之前,不进行处理
Object lockObj = ResourceLock.getLockObj(ackId);
logger.info("上锁:{}", lockObj);
synchronized (lockObj) {
//查找该用户是否有正在处理的消息
if (RedisUtil.hasKey(RedisKeyManager.getAckSetKey(ackId))) {
logger.info("用户有消息正在处理");
//将当前用户重新放回到待处理列表的最后
RedisUtil.sSet(RedisKeyManager.getPendingClientSetKey(), o);
continue;
}
//查找所有没有ack的消息
messageList = getMessageDao().getClientPendingMessage(toDomain, to, MAX_MESSAGE_NUM);
if (CollectionUtil.isEmpty(messageList)) {
logger.info("messageList是空");
continue;
try {
logger.info("while true");
//从redis中或取第一个待处理用户
Object o = RedisUtil.sPop(RedisKeyManager.getPendingClientSetKey());
logger.info("从redis中或取第一个待处理用户:{},{}", RedisKeyManager.getPendingClientSetKey(), o);
if(o != null && StrUtil.isNotEmpty(typeAndUserId = ackId = (String)o)){
Console.log("RedisUtil.sPop: {}",o);
String []stringArray = CcMessageUtil.splitTypeAndUserId(typeAndUserId);
MessageConstant.DomainType toDomain = MessageConstant.DomainType.valueOf(stringArray[0]);
String to = stringArray[1];
logger.info("to:{},toDomain:{}", to, toDomain);
MessageConstant.InvokerMessage invokerMessage = null;
if (stringArray.length >= 3 && StrUtil.isNotEmpty(stringArray[2]) ) {
invokerMessage = JacksonUtil.jsonToBean(stringArray[2], MessageConstant.InvokerMessage.class);
}
//授权时保存的业务消息
String authBusinessMessage = stringArray.length > 3 ? stringArray[3] : null;
//针对同一用户,在上一次ack还未收到/超时之前,不进行处理
Object lockObj = ResourceLock.getLockObj(ackId);
logger.info("上锁:{}", lockObj);
synchronized (lockObj) {
//查找该用户是否有正在处理的消息
if (RedisUtil.hasKey(RedisKeyManager.getAckSetKey(ackId))) {
logger.info("用户有消息正在处理");
//将当前用户重新放回到待处理列表的最后
RedisUtil.sSet(RedisKeyManager.getPendingClientSetKey(), o);
continue;
}
//判断用户是否在线
boolean clientOnLine = ClientManager.isUserOnline(toDomain,to);
logger.info("用户是否在线:{}", clientOnLine);
//发送和处理消息
if(clientOnLine){
//发送
//1.手机所有待发送的messageId,以ackId为key放入redis
Set<String> messageIdSet = new HashSet<>(messageList.size());
OutMessageSet outMessageSet = OutMessageSet.newInstance();
for(Message message : messageList) {
messageIdSet.add(message.getId());
outMessageSet.add(new OutMessage(message, authBusinessMessage));
getMessageDao().incrementSendTimes(message.getId(), DateUtil.currentSeconds());
//查找所有没有ack的消息
messageList = getMessageDao().getClientPendingMessage(toDomain, to, MAX_MESSAGE_NUM);
logger.info("messageList:{}", messageList);
if (CollectionUtil.isEmpty(messageList)) {
logger.info("messageList是空");
continue;
}
RedisUtil.sSetAndTime(RedisKeyManager.getAckSetKey(ackId),REDIS_ACK_EXPIRED_SECONDS,messageIdSet.toArray());
//2.构造outMessage并且发送
outMessageSet.ackId(ackId);
//发送给对应的接收者
logger.info("给{}发送消息:{}", to, outMessageSet);
ClientManager.sendMessageToAuthedClient(toDomain,invokerMessage, to,outMessageSet);
}else{
//不发送,根据规则检查所有“offLineDiscard”的消息设置为failed状态
for(Message message : messageList){
if(message.getRule().getOfflineDiscard() == 1){
getMessageDao().updateMessageStatus(message.getId(),MessageConstant.Status.Failed);
//判断用户是否在线
boolean clientOnLine = ClientManager.isUserOnline(toDomain,to);
logger.info("用户是否在线:{}", clientOnLine);
//发送和处理消息
if(clientOnLine){
//发送
//1.手机所有待发送的messageId,以ackId为key放入redis
Set<String> messageIdSet = new HashSet<>(messageList.size());
OutMessageSet outMessageSet = OutMessageSet.newInstance();
for(Message message : messageList) {
messageIdSet.add(message.getId());
outMessageSet.add(new OutMessage(message, authBusinessMessage));
getMessageDao().incrementSendTimes(message.getId(), DateUtil.currentSeconds());
}
RedisUtil.sSetAndTime(RedisKeyManager.getAckSetKey(ackId),REDIS_ACK_EXPIRED_SECONDS,messageIdSet.toArray());
logger.info("存储redis:{},{},{}",RedisKeyManager.getAckSetKey(ackId),REDIS_ACK_EXPIRED_SECONDS,messageIdSet.toArray());
//2.构造outMessage并且发送
outMessageSet.ackId(ackId);
//发送给对应的接收者
logger.info("给{}发送消息:{}", to, outMessageSet);
ClientManager.sendMessageToAuthedClient(toDomain,invokerMessage, to,outMessageSet);
}else{
//不发送,根据规则检查所有“offLineDiscard”的消息设置为failed状态
for(Message message : messageList){
if(message.getRule().getOfflineDiscard() == 1){
getMessageDao().updateMessageStatus(message.getId(),MessageConstant.Status.Failed);
}
}
logger.info("没有发送消息");
}
logger.info("没有发送消息");
}
ResourceLock.freeLockObj(ackId);
}
ResourceLock.freeLockObj(ackId);
} catch (Exception e) {
logger.error("循环发送消息异常", e);
}
try {
Thread.sleep(100);
@ -472,8 +480,9 @@ public class MessageHandler {
// 寄存器将要修改的数据
long value = 0;
for (int j = 0; j < CCModBusEntity.SIZE_DATA_SINGLE; j++) {
value <<= 8;
value += modBusData[CCModBusEntity.POSITION_DATA + CCModBusEntity.SIZE_DATA_SINGLE * i + j];
value |= modBusData[CCModBusEntity.POSITION_DATA + CCModBusEntity.SIZE_DATA_SINGLE * i + j] & 0xFF;
}
if (origin == null || origin.longValue() != value) {
register.put(String.valueOf(startAddr + i), value);

3
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java

@ -28,8 +28,7 @@ public class MessageConstant {
}
public long getAddr(){
long addr = first << 8;
addr += second;
long addr = ((long)first << 8 | second) & 0xFFFF;
return addr;
}

7
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/persist/MessageDao.java

@ -3,8 +3,6 @@ package com.ccsens.ccmq.lowlevel.persist;
import com.ccsens.ccmq.lowlevel.message.common.InMessage;
import com.ccsens.ccmq.lowlevel.message.common.Message;
import com.ccsens.ccmq.lowlevel.message.common.MessageConstant;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import org.bson.Document;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Sort;
@ -14,6 +12,7 @@ import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Repository;
import wiki.tall.ccmq.common.util.DateUtil;
import wiki.tall.ccmq.common.util.WebConstant;
import java.util.List;
@ -141,7 +140,9 @@ public class MessageDao implements IMessageDao {
public List<Message> getClientPendingMessage(MessageConstant.DomainType toDomain, String to, Integer maxMessageNum) {
Query query = clientPendingMessageQuery(toDomain,to)
.with(Sort.by(Sort.Order.asc("time")));
if(maxMessageNum == null || maxMessageNum.intValue() == 0) {
if(maxMessageNum == null || maxMessageNum.intValue() <= 0) {
query.limit(WebConstant.PAGE_SIZE_MAX);
} else {
query.limit(maxMessageNum);
}
return mongoTemplate.find(query,Message.class,COLLECTION_MESSAGE);

4
ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java

@ -73,7 +73,7 @@ public class CCModBusEntity {
int start = POSITION_REGISTER;
for (int i = 0; i < SIZE_REGISTER_START; i++) {
addr <<= 8;
addr += modBusData[start + i];
addr |= modBusData[start + i] & 0xFF;
}
return addr;
@ -97,7 +97,7 @@ public class CCModBusEntity {
int start = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_REGISTER_START;
for (int i = 0; i < SIZE_REGISTER_NUM; i++) {
number <<= 8;
number += modBusData[start + i];
number |= modBusData[start + i] & 0xFF;
}
return number;

2
ccmq/src/main/java/wiki/tall/ccmq/common/util/WebConstant.java

@ -42,6 +42,8 @@ public class WebConstant {
/**消息队列的queue name 对应的key*/
public static final String REDIS_QUEUE = "message_queue";
public static final int PAGE_SIZE_MAX = 50;
public enum QueueAdd{
FAIL(0,"失败"),SUCCESS(1,"成功"), EXISTED(2, "已存在");
public int value;

Loading…
Cancel
Save