From bf5dc819657e88eee6ce42cd240bb135ffea3ef6 Mon Sep 17 00:00:00 2001 From: zhizhi wu <2377881365@qq.com> Date: Tue, 15 Dec 2020 22:06:14 +0800 Subject: [PATCH] modbus --- .../ccmq/lowlevel/client/ClientManager.java | 8 +- .../lowlevel/client/netty/ChannelManager.java | 6 +- .../client/netty/WrapperedChannel.java | 28 ++- .../netty/tcphexserver/ModbusConverter.java | 206 ++++++++++++++++-- .../netty/tcphexserver/ModbusDecoder.java | 65 +++++- .../netty/tcphexserver/ModbusEncoder.java | 22 +- .../netty/tcphexserver/NettyMBServer.java | 2 + .../ccmq/lowlevel/message/MessageHandler.java | 100 +++++++++ .../lowlevel/message/client/AuthMessage.java | 7 + .../message/client/ModBusMessage.java | 37 ++++ .../message/common/MessageConstant.java | 38 +++- .../message/server/ModBusAckMessage.java | 31 +++ .../bean/dto/ccmodbus/CCModBusEntity.java | 108 +++++++-- .../bean/dto/ccmodbus/ModBusEntity.java | 115 ++++++++++ .../common/bean/dto/ccmodbus/Register.java | 29 +++ .../wiki/tall/ccmq/common/util/CRCUtil.java | 2 + .../src/main/resources/application.properties | 2 +- 17 files changed, 756 insertions(+), 50 deletions(-) create mode 100644 ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/ModBusMessage.java create mode 100644 ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/server/ModBusAckMessage.java create mode 100644 ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/ModBusEntity.java create mode 100644 ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/Register.java diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/ClientManager.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/ClientManager.java index 8346992..99f5a28 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/ClientManager.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/ClientManager.java @@ -2,14 +2,14 @@ package com.ccsens.ccmq.lowlevel.client; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.StrUtil; +import com.ccsens.ccmq.lowlevel.client.netty.ChannelManager; import com.ccsens.ccmq.lowlevel.client.netty.WrapperedChannel; +import com.ccsens.ccmq.lowlevel.client.rabbitmq.QueueManager; +import com.ccsens.ccmq.lowlevel.client.restful.RestManager; import com.ccsens.ccmq.lowlevel.message.MessageHandler; import com.ccsens.ccmq.lowlevel.message.client.ClientAuthTimeOutMessage; import com.ccsens.ccmq.lowlevel.message.common.*; import com.ccsens.ccmq.lowlevel.message.server.ServerAckMessage; -import com.ccsens.ccmq.lowlevel.client.netty.ChannelManager; -import com.ccsens.ccmq.lowlevel.client.rabbitmq.QueueManager; -import com.ccsens.ccmq.lowlevel.client.restful.RestManager; import com.fasterxml.jackson.core.JsonProcessingException; import io.netty.channel.Channel; import org.slf4j.Logger; @@ -24,8 +24,6 @@ import wiki.tall.ccmq.common.util.WebConstant; import java.util.Iterator; import java.util.Map; -import static com.ccsens.ccmq.lowlevel.message.MessageHandler.handleMessage; - /** * @author __zHangSan */ diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/ChannelManager.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/ChannelManager.java index 5deeccb..2f667bf 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/ChannelManager.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/ChannelManager.java @@ -1,17 +1,13 @@ package com.ccsens.ccmq.lowlevel.client.netty; import cn.hutool.core.collection.CollectionUtil; -import com.ccsens.ccmq.lowlevel.message.client.ClientAuthTimeOutMessage; -import com.ccsens.ccmq.lowlevel.message.common.InMessage; -import com.ccsens.ccmq.lowlevel.message.common.MessageConstant; import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import static com.ccsens.ccmq.lowlevel.message.MessageHandler.handleMessage; - /** * @author wei */ diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/WrapperedChannel.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/WrapperedChannel.java index cafcd7a..26a49a2 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/WrapperedChannel.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/WrapperedChannel.java @@ -1,16 +1,23 @@ package com.ccsens.ccmq.lowlevel.client.netty; import cn.hutool.core.util.ObjectUtil; -import wiki.tall.ccmq.common.util.DateUtil; +import cn.hutool.core.util.StrUtil; import io.netty.channel.Channel; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import wiki.tall.ccmq.common.util.DateUtil; +import wiki.tall.ccmq.common.util.JacksonUtil; +import java.io.IOException; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; /** * @author wei */ +@Slf4j @Getter @Setter public class WrapperedChannel { @@ -72,6 +79,8 @@ public class WrapperedChannel { private long dataSendCount; /**业务相关参数*/ private String message; + /**寄存器 硬件系统*/ + private Map register = new HashMap<>(); public WrapperedChannel(Channel channel){ this.channel = channel; @@ -150,6 +159,23 @@ public class WrapperedChannel { setVersion(major,minor); this.userId = userId; this.message = message; + if (StrUtil.isNotEmpty(message)) { + try { + Map map = JacksonUtil.jsonToMap(message); + for (String key: map.keySet()) { + Object o = map.get(key); + if (o instanceof Integer) { + register.put(key, (long)(Integer) map.get(key)); + } else { + register.put(key, (long) map.get(key)); + } + + } + } catch (IOException e) { + log.error("message转map异常", e); + } + } + } public void writeAndFlush(Object message){ diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusConverter.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusConverter.java index 50a375b..0f48db1 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusConverter.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusConverter.java @@ -1,7 +1,55 @@ package com.ccsens.ccmq.lowlevel.client.netty.tcphexserver; +import cn.hutool.core.util.StrUtil; +import com.ccsens.ccmq.lowlevel.message.client.AuthMessage; +import com.ccsens.ccmq.lowlevel.message.client.ModBusMessage; +import com.ccsens.ccmq.lowlevel.message.client.PingMessage; +import com.ccsens.ccmq.lowlevel.message.common.InMessage; +import com.ccsens.ccmq.lowlevel.message.common.MessageConstant; +import com.ccsens.ccmq.lowlevel.message.common.OutMessage; +import com.ccsens.ccmq.lowlevel.message.server.ModBusAckMessage; +import com.ccsens.ccmq.lowlevel.message.server.PongMessage; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import lombok.extern.slf4j.Slf4j; +import wiki.tall.ccmq.common.bean.dto.ccmodbus.CCModBusEntity; +import wiki.tall.ccmq.common.util.JacksonUtil; + +import java.io.IOException; + +@Slf4j public class ModbusConverter { + public static InMessage convertCCModbusToMessage(CCModBusEntity ccModBusEntity) throws JsonProcessingException { + InMessage message; +// int addr = ccModBusEntity.getAddr() & 0xFF; + byte[] data = ccModBusEntity.getOriginData(); + short register = (short) ccModBusEntity.getRegisterStartAddr(); + + byte oper = (byte) (ccModBusEntity.getOper() & 0xFF); + MessageConstant.Register type = MessageConstant.Register.valueOf(data[0], data[1] ); + if (type == null) { + message = new InMessage(); + message.setData(JacksonUtil.beanToJson(new ModBusMessage(register,oper,ccModBusEntity.getModbusData()))); + message.setToDomain(MessageConstant.DomainType.Server); + return message; + } + switch (type) { + case Ping: { + message = toHeartMessage(register,oper,ccModBusEntity.getValue()); + break; + } + case Auth:{ + message = toAuthMessage(register,oper,ccModBusEntity.getValue()); + break; + } + default:{ + message = null; + } + } + return message; + } + // public static BaseMessageDto convertCCModbusToMessage(CCModBusEntity ccModBusEntity) { // BaseMessageDto message = null; // int addr = ccModBusEntity.getAddr() & 0xFF; @@ -20,22 +68,154 @@ public class ModbusConverter { // } // return message; // } -// -// /** -// * OriginaData ==> 0x00 -// * @param oper -// * @return -// */ + + /** + * OriginaData ==> 0x00 + * @param oper + * @return + */ + private static InMessage toHeartMessage(short addr, int oper, byte[] originData) throws JsonProcessingException { + InMessage message = new InMessage(); + PingMessage serverMessage = new PingMessage(); + message.setData(JacksonUtil.beanToJson(serverMessage)); + message.setToDomain(MessageConstant.DomainType.Server); + return message; + } // private static BaseMessageDto toHeartMessage(int addr, int oper, byte[] originData){ // HeartMessageDto message = new HeartMessageDto(); // return message; // } -// -// /** -// * OriginaData ==> 0x00 0x00 0x00 0x64 -// * @param oper -// * @return -// */ + + /** + * OriginaData ==> 0x00 0x00 0x00 0x64 + * @param oper + * @return + */ + private static InMessage toAuthMessage(short register, int oper, byte[] values) throws JsonProcessingException { + + long userId = 0; + for(int i=0;i out) throws Exception { -// if (in.readableBytes() < CCModBusEntity.SIZE_MIN) { +// @Override +// protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List out) throws Exception { +// if (in.readableBytes() < ModBusEntity.SIZE_WRITE_MORE_MIN) { // //长度小于协议的最小长度,继续读下一次 // return; // } +// ModBusEntity modBusEntity = new ModBusEntity(in); +// ModBusEntity.Error error = modBusEntity.valid(); // -// CCModBusEntity ccModBusEntity = new CCModBusEntity(in); -// CCModBusEntity.Error error = ccModBusEntity.valid(); // switch (error) { // case ERROR_FILTER_NOT_MATCH: // case ERROR_LEN_EXCLUEE_MAX: -// case ERROR_CRC_INVALID: //丢弃一个字节,继续读取 +// //丢弃一个字节,继续读取 +// case ERROR_CRC_INVALID: // discardNBytes(in, 1); // return; -// case ERROR_NEED_MORE_DATA: //继续读取 +// //继续读取 +// case ERROR_NEED_MORE_DATA: // return; // case ERROR_NONE: +// default: // break; // } // // //交给下个handler处理 -// discardNBytes(in, ccModBusEntity.getModbusLength()); -// BaseMessageDto message = ModbusConverter.convertCCModbusToMessage(ccModBusEntity); +// discardNBytes(in, modBusEntity.getModBusLength()); +// InMessage message = ModbusConverter.convertCCModbusToMessage(modBusEntity); // if(ObjectUtil.isNotNull(message)) { // out.add(message); // } +// +// } + + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List out) throws Exception { + while(true) { + if (in.readableBytes() < CCModBusEntity.SIZE_MIN) { + //长度小于协议的最小长度,继续读下一次 + return; + } + + CCModBusEntity ccModBusEntity = new CCModBusEntity(in); + CCModBusEntity.Error error = ccModBusEntity.valid(); + log.info("modbus数据检查:{}", error); + switch (error) { + case ERROR_FILTER_NOT_MATCH: + case ERROR_LEN_EXCLUEE_MAX: + //丢弃一个字节,继续读取 + case ERROR_CRC_INVALID: + discardNBytes(in, 1); + return; + //继续读取 + case ERROR_NEED_MORE_DATA: + return; + case ERROR_NONE: + default: + break; + } + + //交给下个handler处理 + discardNBytes(in, ccModBusEntity.getModbusLength()); + + InMessage message = ModbusConverter.convertCCModbusToMessage(ccModBusEntity); + log.info("modBus封装成inMessage:{}", message); + if (ObjectUtil.isNotNull(message)) { + out.add(message); + } + } } } diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusEncoder.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusEncoder.java index 0662fc7..e5ad136 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusEncoder.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusEncoder.java @@ -1,14 +1,34 @@ package com.ccsens.ccmq.lowlevel.client.netty.tcphexserver; +import cn.hutool.core.collection.CollectionUtil; +import com.ccsens.ccmq.lowlevel.message.common.OutMessage; import com.ccsens.ccmq.lowlevel.message.common.OutMessageSet; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; +import lombok.extern.slf4j.Slf4j; +import wiki.tall.ccmq.common.bean.dto.ccmodbus.CCModBusEntity; +@Slf4j public class ModbusEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext channelHandlerContext, OutMessageSet message, ByteBuf out) throws Exception { -// + log.info("modBus返回数据:{}", message); + if (message == null || CollectionUtil.isEmpty(message.getMessageSet())) { + return; + } + for(OutMessage outMessage : message.getMessageSet()){ + + CCModBusEntity ccModBusEntity = ModbusConverter.convertCommonProtocolToCCModbus(outMessage); + log.info("modBus数据转化:{}", ccModBusEntity); + if(ccModBusEntity != null){ + byte[] modbusData = ccModBusEntity.getModbusData(); + out.writeBytes(modbusData); + } + } + + + // //1.处理Start->Timer[Countdown] // if(message != null && message instanceof SyncMessageWithStartDto){ // SyncMessageWithStartDto startMessage = (SyncMessageWithStartDto)message; diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/NettyMBServer.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/NettyMBServer.java index 747d085..41e4244 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/NettyMBServer.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/NettyMBServer.java @@ -33,6 +33,8 @@ public class NettyMBServer { b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) + // 超时时间 + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java index 89899bd..3854b8a 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java @@ -15,14 +15,20 @@ import com.ccsens.ccmq.lowlevel.service.IUserService; import com.fasterxml.jackson.databind.JsonNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import wiki.tall.ccmq.common.bean.dto.ccmodbus.CCModBusEntity; +import wiki.tall.ccmq.common.bean.dto.ccmodbus.Register; import wiki.tall.ccmq.common.config.SettingProps; import wiki.tall.ccmq.common.util.*; +import javax.annotation.PostConstruct; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -40,6 +46,9 @@ public class MessageHandler { */ private static final Integer MAX_MESSAGE_NUM = 50; + @Autowired + private AmqpTemplate rabbitTemplate; + private static IMessageDao getMessageDao(){ return SpringContextUtils.getBean(IMessageDao.class); } @@ -52,6 +61,13 @@ public class MessageHandler { return SpringContextUtils.getBean(IUserService.class); } + private static MessageHandler messageHandler; + @PostConstruct + public void init(){ + messageHandler = this; + messageHandler.rabbitTemplate = this.rabbitTemplate; + } + /** * 每个20s同步一次有未决消息的用户到pendingClientSet中 * @throws Exception @@ -240,6 +256,7 @@ public class MessageHandler { } private static void handlerServerMessage(String type, InMessage inMessage) throws Exception { + logger.info("处理Server消息:{},{}", type, inMessage); MessageConstant.ClientMessageType clientMessageType = MessageConstant.ClientMessageType.valueOf(type); String data = inMessage.getData(); OutMessage outMessage = null; @@ -255,16 +272,27 @@ public class MessageHandler { break; } case Auth: { + logger.info("授权开始"); boolean authSuccess = false; AuthMessage inSysData = JacksonUtil.jsonToBean(data, AuthMessage.class); if(null != inSysData.getData()){ + if(StrUtil.isNotEmpty(inSysData.getData().getToken())) { + logger.info("token授权"); String userId = getUserService().getUserIdByToken(inSysData.getData().getToken()); if(StrUtil.isNotEmpty(userId)){ ChannelManager.authChannel(ChannelManager.getCurrentChannel(),userId,inSysData.getData().getMajor(),inSysData.getData().getMinor(), inSysData.getMessage()); onClientOnLine(MessageConstant.DomainType.User,userId); authSuccess = true; } + } else if (inSysData.getData().getAuthId() != null) { + logger.info("authid授权"); + // authId有值默认授权通过 + ChannelManager.authChannel(ChannelManager.getCurrentChannel(),String.valueOf(inSysData.getData().getAuthId()),inSysData.getData().getMajor(),inSysData.getData().getMinor(), inSysData.getMessage()); + logger.info("授权完成"); + onClientOnLine(MessageConstant.DomainType.User,String.valueOf(inSysData.getData().getAuthId())); + logger.info("设置在线"); + authSuccess = true; } } if(!authSuccess){ @@ -272,10 +300,12 @@ public class MessageHandler { new ChannelStatusMessage(false,0L,MessageConstant.Error.AuthFailed)) ); }else{ + outMessage = new OutMessage(JacksonUtil.beanToJson( new ChannelStatusMessage(true,0L,MessageConstant.Error.Ok)) ); } + logger.info("授权结束"); break; } case GetChannelStatus: { @@ -402,6 +432,68 @@ public class MessageHandler { ) ); break; + } + case ModBus:{ + //1.按照功能码读写数据 + ModBusMessage modBusMessage = JacksonUtil.jsonToBean(data, ModBusMessage.class); + byte oper = modBusMessage.getData().getOper(); + switch (oper) { + case CCModBusEntity.Oper.READ : { + // TODO 读操作 + WrapperedChannel wrapperedChannel = ChannelManager.getWrapperedChannelByChannel(ChannelManager.getCurrentChannel()); + Object o = wrapperedChannel.getRegister().get(modBusMessage.getData().getRegister()); + break; + } + case CCModBusEntity.Oper.WRITE : { + // TODO 写单个寄存器 + break; + } + case CCModBusEntity.Oper.WRITE_MORE : { + // 写多个寄存器 + // 2. 如果是写操作且值发生改变,放到mq里 + byte[] modBusData = modBusMessage.getData().getModBusData(); + // 读起始寄存器地址 + int startAddr = CCModBusEntity.getRegisterStartAddr(modBusData); + int num = CCModBusEntity.getRegisterStartNum(modBusData); + WrapperedChannel wrapperedChannel = ChannelManager.getWrapperedChannelByChannel(ChannelManager.getCurrentChannel()); + Map register = wrapperedChannel.getRegister(); + // 授权数据 + Long authId = register.get(String.valueOf(MessageConstant.Register.Auth.getAddr())); + for (int i = 0; i < num; i++) { + // 寄存器的原始数据 + Long origin = register.get(String.valueOf(startAddr)); + // 寄存器将要修改的数据 + long value = 0; + for (int j = 0; j < CCModBusEntity.SIZE_DATA_SINGLE; j++) { + value <<= 8; + value += modBusData[CCModBusEntity.POSITION_DATA + CCModBusEntity.SIZE_DATA_SINGLE * i + j]; + } + if (origin == null || origin.longValue() != value) { + register.put(String.valueOf(startAddr), value); + logger.info("寄存器数据:{}", register); + // 数据不一致,放到消息队列里 + Register register1 = new Register(); + register1.setAuthId(authId); + register1.setAddr(startAddr+i); + register1.setValue(value); + OutMessage outMessage1 = new OutMessage(JacksonUtil.beanToJson(register1)); + OutMessageSet set = new OutMessageSet(); + set.add(outMessage1); + messageHandler.rabbitTemplate.convertAndSend("wisdom_car", + JacksonUtil.beanToJson(set)); + logger.info("modbus通知mq:{}", set); + + } + + } + // 3. 封装返回值 + outMessage = new OutMessage(JacksonUtil.beanToJson(new ModBusAckMessage(modBusData))); + break; + } + default:break; + } + + } default: break; } @@ -416,7 +508,9 @@ public class MessageHandler { private static void onClientOnLine(MessageConstant.DomainType domain,String userId){ getMessageDao().updateMessageStatusToPending(domain,userId); + logger.info("修改mongodb"); addClientToRedisUnPendingSet(CcMessageUtil.getDomainTypeAndUserIdString(domain,userId)); + logger.info("添加redisUnPending"); } private static void updateMessageAckStatus(String ackId) throws Exception { @@ -477,16 +571,22 @@ public class MessageHandler { String []stringArray = CcMessageUtil.splitTypeAndUserId(domainTypeAndUserId); MessageConstant.DomainType toDomain = MessageConstant.DomainType.valueOf(stringArray[0]); String to = stringArray[1]; + logger.info("查询ack"); //1.1 当前用户是否在pendingClients列表中 if (!RedisUtil.sHas(redisUnPendingClientSetKey,domainTypeAndUserId)) { + logger.info("当前用户不在pendingClients列表中"); //1.2 当前用户是否在redis的等待ack列表中 if(!RedisUtil.hasKey(redisWaitAckSetKey)){ + logger.info("当前用户不在redis的等待ack列表中"); //1.3 当前用户是否有pending消息 if(getMessageDao().countClientPendingMessage(toDomain,to) > 0){ + logger.info("当前用户有pending消息"); //1.4 将当前用户添加到redisUnPendingClientSet列表中排队处理 RedisUtil.sSet(redisUnPendingClientSetKey,domainTypeAndUserId); + logger.info("将当前用户添加到redisUnPendingClientSet列表中排队处理"); } } } + logger.info("添加ack"); } } diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/AuthMessage.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/AuthMessage.java index 9a25cdc..8765350 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/AuthMessage.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/AuthMessage.java @@ -16,6 +16,7 @@ public class AuthMessage extends ServerMessage { @Getter public static class Data{ private String token; + private Long authId; // 主版本号 private int major; // 此版本号 @@ -37,4 +38,10 @@ public class AuthMessage extends ServerMessage { data.setMajor(major); data.setMinor(minor); } + + public AuthMessage(Long authId){ + this(); + data = new Data(); + data.setAuthId(authId); + } } diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/ModBusMessage.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/ModBusMessage.java new file mode 100644 index 0000000..f42c439 --- /dev/null +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/ModBusMessage.java @@ -0,0 +1,37 @@ +package com.ccsens.ccmq.lowlevel.message.client; + +import com.ccsens.ccmq.lowlevel.message.common.MessageConstant; +import com.ccsens.ccmq.lowlevel.message.common.ServerMessage; +import lombok.Data; + +/** + * @description: + * @author: whj + * @time: 2020/12/14 22:32 + */ +@Data +public class ModBusMessage extends ServerMessage { + + + @lombok.Data + public static class Data{ + private short register; + private byte oper; + private byte[] modBusData; + } + + private Data data; + /**业务相关参数,由业务具体确定是什么类型,什么参数*/ + private String message; + + public ModBusMessage(){ + setType(MessageConstant.ClientMessageType.ModBus.name()); + } + public ModBusMessage(short register, byte oper, byte[] modBusData) { + this(); + data = new Data(); + data.register = register; + data.oper = oper; + data.modBusData = modBusData; + } +} diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java index ad40010..9da9dc9 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java @@ -8,18 +8,54 @@ import lombok.Data; * 消息相关常量 */ public class MessageConstant { + /** + * 寄存器地址 + */ + public enum Register{ + /**授权*/ + Auth((byte) 0x00, (byte) 0x01), + /**心跳*/ + Ping((byte) 0x00, (byte) 0x02), + ; + + public byte first; + public byte second; + + Register(byte first, byte second){ + this.first = first; + this.second = second; + + } + + public long getAddr(){ + long addr = first << 8; + addr += second; + return addr; + } + + public static Register valueOf(byte first, byte second) { + for(Register register : Register.values()){ + if (register.first == first && register.second == second) { + return register; + } + } + return null; + } + } + public enum ClientMessageType{ //客户端心跳 Ping(0x00), //客户端认证 Auth(0x01), + //客户端收到消息ACK Ack(0x02), //客户端收到消息ACK HasRead(0x03), //客户端请求连接状态 GetChannelStatus(0x04), - + ModBus(0x05), //不可预期的错误 UnExceptedError(0x21), ClientIdleClosed(0x22), diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/server/ModBusAckMessage.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/server/ModBusAckMessage.java new file mode 100644 index 0000000..05038bd --- /dev/null +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/server/ModBusAckMessage.java @@ -0,0 +1,31 @@ +package com.ccsens.ccmq.lowlevel.message.server; + +import com.ccsens.ccmq.lowlevel.message.common.ServerMessage; +import lombok.Data; + +/** + * @author zhangsan + */ +@Data +public class ModBusAckMessage extends ServerMessage { + public ModBusAckMessage(){ + + } + + public ModBusAckMessage(byte[] modBusData){ + data = new Data(modBusData); + } + + @lombok.Data + public static class Data{ + private byte[] modBusData; + public Data(){ + + } + public Data(byte[] modBusData){ + this.modBusData = modBusData; + } + } + + private Data data; +} diff --git a/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java b/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java index 8b82d0d..5be0a32 100644 --- a/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java +++ b/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java @@ -1,5 +1,6 @@ package wiki.tall.ccmq.common.bean.dto.ccmodbus; +import lombok.Data; import wiki.tall.ccmq.common.util.CRCUtil; import io.netty.buffer.ByteBuf; @@ -8,6 +9,8 @@ import io.netty.buffer.ByteBuf; * eg: FF FE 0A 10 93 00 00 00 00 00 00 94 2A * Notice: crc is example ,not the real data */ + +@Data public class CCModBusEntity { public enum Error{ ERROR_NONE, @@ -17,19 +20,41 @@ public class CCModBusEntity { ERROR_NEED_MORE_DATA } + public static class Oper{ + public final static byte READ = 0X03; + public final static byte WRITE = 0X06; + public final static byte WRITE_MORE = 0X10; + } + public static final int SIZE_FILTER = 2; public static final int SIZE_LEN = 1; public static final int SIZE_ADDR = 1; public static final int SIZE_OPER = 1; public static final int SIZE_CRC = 2; + public static final int SIZE_REGISTER_START = 2; + public static final int SIZE_REGISTER_NUM = 2; + public static final int SIZE_COUNT_LENGTH = 1; + public static final int SIZE_DATA_SINGLE = 2; public static final int SIZE_DATA_MIN = 1; - public static final int SIZE_DATA_MAX = 255 - SIZE_ADDR - SIZE_OPER - SIZE_CRC; + public static final int SIZE_MAX = 255; + public static final int SIZE_DATA_MAX = SIZE_MAX - SIZE_ADDR - SIZE_OPER - SIZE_CRC - SIZE_FILTER - SIZE_LEN; public static final int SIZE_MIN = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_DATA_MIN + SIZE_CRC; - public static final int SIZE_MAX = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_DATA_MAX + SIZE_CRC; + public static final byte ADDRESS = 0x14; + public static final int POSITION_DATA = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_REGISTER_START + SIZE_REGISTER_NUM + SIZE_COUNT_LENGTH; + public static final int POSITION_OPER = SIZE_FILTER + SIZE_LEN + SIZE_ADDR; + public static final int POSITION_REGISTER = POSITION_OPER + SIZE_OPER; + public static final int POSITION_REGISTER_NUM = POSITION_REGISTER + SIZE_REGISTER_START; + + + + +// public static final int SIZE_MAX = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_DATA_MAX + SIZE_CRC; + public static final byte[] FILTER = {(byte)0xFF,(byte)0xFE}; // public static final byte[] TAILER = {0x0d,0x0a}; public static final byte[] TAILER = {}; + private byte modbusData[]; private byte filter[]; private int len; @@ -38,6 +63,46 @@ public class CCModBusEntity { private byte originData[]; private byte crc[]; + /** + * 获取寄存器起始地址 + * @param modBusData modBusData + * @return 寄存器起始地址 + */ + public static int getRegisterStartAddr(byte[] modBusData){ + int addr = 0; + int start = POSITION_REGISTER; + for (int i = 0; i < SIZE_REGISTER_START; i++) { + addr <<= 8; + addr += modBusData[start + i]; + } + + return addr; + } + + /** + * 获取寄存器起始地址 + * @return 寄存器起始地址 + */ + public int getRegisterStartAddr(){ + return getRegisterStartAddr(modbusData); + } + + /** + * 获取写几个寄存器(写多个寄存器) + * @param modBusData modBusData + * @return 寄存器个数 + */ + public static int getRegisterStartNum(byte[] modBusData){ + int number = 0; + int start = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_REGISTER_START; + for (int i = 0; i < SIZE_REGISTER_NUM; i++) { + number <<= 8; + number += modBusData[start + i]; + } + + return number; + } + public CCModBusEntity(byte addr,byte oper,byte[] originData){ modbusData = new byte[SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + originData.length + SIZE_CRC + TAILER.length]; int index = 0; @@ -103,9 +168,25 @@ public class CCModBusEntity { return originData; } + public byte[] getValue(){ + getOriginData(); + byte[] values; + if (oper == Oper.WRITE_MORE) { + // TODO 写多个寄存器 + values = new byte[originData[SIZE_REGISTER_START + SIZE_REGISTER_NUM]]; + System.arraycopy(originData, SIZE_REGISTER_START + SIZE_REGISTER_NUM + CCModBusEntity.SIZE_COUNT_LENGTH, values, 0, values.length); + } else { + // 写一个寄存器 + values = new byte[SIZE_DATA_SINGLE]; + System.arraycopy(originData, SIZE_REGISTER_START, values, 0, CCModBusEntity.SIZE_DATA_SINGLE); + } + return values; + } + public byte[] getCrc(){ - if(crc == null || crc.length != SIZE_CRC) + if(crc == null || crc.length != SIZE_CRC) { crc = new byte[SIZE_CRC]; + } int index = SIZE_FILTER + SIZE_LEN + getLen() - SIZE_CRC; System.arraycopy(modbusData,index,crc,0,SIZE_CRC); return crc; @@ -116,15 +197,12 @@ public class CCModBusEntity { } public boolean crcValid() { -// byte[] thisCRC = getCrc(); -// byte crc[] = {0,0}; -// int index = SIZE_FILTER + SIZE_LEN; -// int length = getLen() - SIZE_CRC; -// ProtocolUtil.crc16(crc, this.modbusData,index,length); -// if(crc[1] == thisCRC[0] && crc[0]== thisCRC[1]) -// return true; -// return false; - return true; + byte[] thisCRC = getCrc(); + byte[] crc = new byte[SIZE_CRC]; +// CRCUtil.crc16(crc,modbusData,SIZE_FILTER+SIZE_LEN,modbusData[SIZE_FILTER] - SIZE_CRC - TAILER.length); + CRCUtil.crc16(crc,modbusData,SIZE_FILTER+SIZE_LEN,getLen() - SIZE_CRC); + return crc[0] == thisCRC[0] && crc[1]== thisCRC[1]; + } public void print(){ @@ -138,14 +216,16 @@ public class CCModBusEntity { //1.filter filter = getFilter(); for(int i=0;i SIZE_DATA_MAX) + if(len > SIZE_DATA_MAX) { return Error.ERROR_LEN_EXCLUEE_MAX; + } //3.data 完整性 if(this.modbusData.length < (SIZE_FILTER + SIZE_LEN + len)) { diff --git a/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/ModBusEntity.java b/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/ModBusEntity.java new file mode 100644 index 0000000..1711994 --- /dev/null +++ b/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/ModBusEntity.java @@ -0,0 +1,115 @@ +package wiki.tall.ccmq.common.bean.dto.ccmodbus; + +import io.netty.buffer.ByteBuf; + +/** + * @description: + * @author: whj + * @time: 2020/12/14 15:02 + */ +public class ModBusEntity { + + + + public enum Error{ + ERROR_NONE, + ERROR_FILTER_NOT_MATCH, + ERROR_LEN_EXCLUEE_MAX, + ERROR_CRC_INVALID, + ERROR_NEED_MORE_DATA + } + public enum FunctionCode{ + READ((byte)3), + WRITE((byte)6), + WRITE_MORE((byte)10), + ; + + private byte code; + FunctionCode(byte code) { + this.code = code; + } + public static FunctionCode getEnum(byte code) { + for ( FunctionCode functionCode: FunctionCode.values() ) { + if (functionCode.code == code) { + return functionCode; + } + } + return null; + } + } + + /**传感器地址长度*/ + public static final int SIZE_IP = 1; + /**功能码长度*/ + public static final int SIZE_FUNCTION_CODE = 1; + /**寄存器起始地址长度*/ + public static final int SIZE_REGISTER_START = 2; + /**寄存器数量长度*/ + public static final int SIZE_REGISTER_NUM = 2; + /**计数字节长度*/ + public static final int SIZE_COUNT_BYTES = 1; + public static final int SIZE_DATA_MIN = 2; + public static final int SIZE_DATA_MAX = 240; + /**单个预置值的长度*/ + public static final int SIZE_SINGLE_VALUE = 2; + /**CR校验长度*/ + public static final int SIZE_CR = 2; + + + /**预置多个寄存器最小长度*/ + public static final int SIZE_WRITE_MORE_MIN = SIZE_IP + SIZE_FUNCTION_CODE + SIZE_REGISTER_START + + SIZE_REGISTER_NUM + SIZE_COUNT_BYTES + SIZE_DATA_MIN + SIZE_CR; + /**预置多个寄存器最大长度*/ + public static final int SIZE_WRITE_MORE_MAX = SIZE_WRITE_MORE_MIN - SIZE_DATA_MIN + SIZE_DATA_MAX; + + + + private byte[] modBusData; + + public ModBusEntity(ByteBuf in){ + this.modBusData = new byte[in.readableBytes()]; + in.getBytes(in.readerIndex(),this.modBusData); + } + + + /** + * 协议校验 + * @return + */ + public Error valid() { + // TODO cr校验 + + + return Error.ERROR_NONE; + } + + /** + * 获取预置值的大小 + * @return + */ + public int getPreValueLen(){ + int index = SIZE_IP + SIZE_FUNCTION_CODE + SIZE_REGISTER_START; + + return (modBusData[index] & 0xFF) * SIZE_SINGLE_VALUE; + } + + /** + * 获取对象的长度 + * @return + */ + public int getModBusLength() { + byte functionCode = this.modBusData[SIZE_IP]; + switch (FunctionCode.getEnum(functionCode)) { + case READ : + return SIZE_IP + SIZE_FUNCTION_CODE + SIZE_REGISTER_START + SIZE_REGISTER_NUM + SIZE_CR; + case WRITE : + return SIZE_IP + SIZE_FUNCTION_CODE + SIZE_REGISTER_START + SIZE_SINGLE_VALUE + SIZE_CR; + case WRITE_MORE: + int length = SIZE_IP + SIZE_FUNCTION_CODE + SIZE_REGISTER_START + SIZE_REGISTER_NUM + SIZE_COUNT_BYTES + SIZE_CR; + + + default:break; + } + return 0; + } +} diff --git a/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/Register.java b/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/Register.java new file mode 100644 index 0000000..bc653a8 --- /dev/null +++ b/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/Register.java @@ -0,0 +1,29 @@ +package wiki.tall.ccmq.common.bean.dto.ccmodbus; + +import lombok.Data; + +/** + * @description: + * @author: whj + * @time: 2020/12/15 19:18 + */ +@Data +public class Register { + + /** + * 平车编号 + */ + private Long authId; + /** + * 寄存器地址 + */ + private int addr; + /** + * 寄存器的值 + */ + private Long value; + /** + * 时间 + */ + private Long time = System.currentTimeMillis(); +} diff --git a/ccmq/src/main/java/wiki/tall/ccmq/common/util/CRCUtil.java b/ccmq/src/main/java/wiki/tall/ccmq/common/util/CRCUtil.java index e1a6725..45cf3e8 100644 --- a/ccmq/src/main/java/wiki/tall/ccmq/common/util/CRCUtil.java +++ b/ccmq/src/main/java/wiki/tall/ccmq/common/util/CRCUtil.java @@ -27,4 +27,6 @@ public class CRCUtil { crc[0] = (byte)(wcrc & 0xFF); // crc的低八位 crc[1] = (byte)((wcrc >> 8) & 0xFF); // crc的高八位 } + + } diff --git a/ccmq/src/main/resources/application.properties b/ccmq/src/main/resources/application.properties index 97e42eb..dd8b329 100644 --- a/ccmq/src/main/resources/application.properties +++ b/ccmq/src/main/resources/application.properties @@ -1,5 +1,5 @@ # ѡ񿪷{dev|test|prod} -spring.profiles.active=prod +spring.profiles.active=test # Ӧ spring.application.name=tall-message