Browse Source

modbus

master
zhizhi wu 5 years ago
parent
commit
bf5dc81965
  1. 8
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/ClientManager.java
  2. 6
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/ChannelManager.java
  3. 28
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/WrapperedChannel.java
  4. 206
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusConverter.java
  5. 65
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusDecoder.java
  6. 22
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusEncoder.java
  7. 2
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/NettyMBServer.java
  8. 100
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java
  9. 7
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/AuthMessage.java
  10. 37
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/ModBusMessage.java
  11. 38
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java
  12. 31
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/server/ModBusAckMessage.java
  13. 108
      ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java
  14. 115
      ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/ModBusEntity.java
  15. 29
      ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/Register.java
  16. 2
      ccmq/src/main/java/wiki/tall/ccmq/common/util/CRCUtil.java
  17. 2
      ccmq/src/main/resources/application.properties

8
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/ClientManager.java

@ -2,14 +2,14 @@ package com.ccsens.ccmq.lowlevel.client;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import com.ccsens.ccmq.lowlevel.client.netty.ChannelManager;
import com.ccsens.ccmq.lowlevel.client.netty.WrapperedChannel;
import com.ccsens.ccmq.lowlevel.client.rabbitmq.QueueManager;
import com.ccsens.ccmq.lowlevel.client.restful.RestManager;
import com.ccsens.ccmq.lowlevel.message.MessageHandler;
import com.ccsens.ccmq.lowlevel.message.client.ClientAuthTimeOutMessage;
import com.ccsens.ccmq.lowlevel.message.common.*;
import com.ccsens.ccmq.lowlevel.message.server.ServerAckMessage;
import com.ccsens.ccmq.lowlevel.client.netty.ChannelManager;
import com.ccsens.ccmq.lowlevel.client.rabbitmq.QueueManager;
import com.ccsens.ccmq.lowlevel.client.restful.RestManager;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.channel.Channel;
import org.slf4j.Logger;
@ -24,8 +24,6 @@ import wiki.tall.ccmq.common.util.WebConstant;
import java.util.Iterator;
import java.util.Map;
import static com.ccsens.ccmq.lowlevel.message.MessageHandler.handleMessage;
/**
* @author __zHangSan
*/

6
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/ChannelManager.java

@ -1,17 +1,13 @@
package com.ccsens.ccmq.lowlevel.client.netty;
import cn.hutool.core.collection.CollectionUtil;
import com.ccsens.ccmq.lowlevel.message.client.ClientAuthTimeOutMessage;
import com.ccsens.ccmq.lowlevel.message.common.InMessage;
import com.ccsens.ccmq.lowlevel.message.common.MessageConstant;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static com.ccsens.ccmq.lowlevel.message.MessageHandler.handleMessage;
/**
* @author wei
*/

28
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/WrapperedChannel.java

@ -1,16 +1,23 @@
package com.ccsens.ccmq.lowlevel.client.netty;
import cn.hutool.core.util.ObjectUtil;
import wiki.tall.ccmq.common.util.DateUtil;
import cn.hutool.core.util.StrUtil;
import io.netty.channel.Channel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import wiki.tall.ccmq.common.util.DateUtil;
import wiki.tall.ccmq.common.util.JacksonUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
/**
* @author wei
*/
@Slf4j
@Getter
@Setter
public class WrapperedChannel {
@ -72,6 +79,8 @@ public class WrapperedChannel {
private long dataSendCount;
/**业务相关参数*/
private String message;
/**寄存器 硬件系统*/
private Map<String, Long> register = new HashMap<>();
public WrapperedChannel(Channel channel){
this.channel = channel;
@ -150,6 +159,23 @@ public class WrapperedChannel {
setVersion(major,minor);
this.userId = userId;
this.message = message;
if (StrUtil.isNotEmpty(message)) {
try {
Map<String, Object> map = JacksonUtil.jsonToMap(message);
for (String key: map.keySet()) {
Object o = map.get(key);
if (o instanceof Integer) {
register.put(key, (long)(Integer) map.get(key));
} else {
register.put(key, (long) map.get(key));
}
}
} catch (IOException e) {
log.error("message转map异常", e);
}
}
}
public void writeAndFlush(Object message){

206
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusConverter.java

@ -1,7 +1,55 @@
package com.ccsens.ccmq.lowlevel.client.netty.tcphexserver;
import cn.hutool.core.util.StrUtil;
import com.ccsens.ccmq.lowlevel.message.client.AuthMessage;
import com.ccsens.ccmq.lowlevel.message.client.ModBusMessage;
import com.ccsens.ccmq.lowlevel.message.client.PingMessage;
import com.ccsens.ccmq.lowlevel.message.common.InMessage;
import com.ccsens.ccmq.lowlevel.message.common.MessageConstant;
import com.ccsens.ccmq.lowlevel.message.common.OutMessage;
import com.ccsens.ccmq.lowlevel.message.server.ModBusAckMessage;
import com.ccsens.ccmq.lowlevel.message.server.PongMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import wiki.tall.ccmq.common.bean.dto.ccmodbus.CCModBusEntity;
import wiki.tall.ccmq.common.util.JacksonUtil;
import java.io.IOException;
@Slf4j
public class ModbusConverter {
public static InMessage convertCCModbusToMessage(CCModBusEntity ccModBusEntity) throws JsonProcessingException {
InMessage message;
// int addr = ccModBusEntity.getAddr() & 0xFF;
byte[] data = ccModBusEntity.getOriginData();
short register = (short) ccModBusEntity.getRegisterStartAddr();
byte oper = (byte) (ccModBusEntity.getOper() & 0xFF);
MessageConstant.Register type = MessageConstant.Register.valueOf(data[0], data[1] );
if (type == null) {
message = new InMessage();
message.setData(JacksonUtil.beanToJson(new ModBusMessage(register,oper,ccModBusEntity.getModbusData())));
message.setToDomain(MessageConstant.DomainType.Server);
return message;
}
switch (type) {
case Ping: {
message = toHeartMessage(register,oper,ccModBusEntity.getValue());
break;
}
case Auth:{
message = toAuthMessage(register,oper,ccModBusEntity.getValue());
break;
}
default:{
message = null;
}
}
return message;
}
// public static BaseMessageDto convertCCModbusToMessage(CCModBusEntity ccModBusEntity) {
// BaseMessageDto message = null;
// int addr = ccModBusEntity.getAddr() & 0xFF;
@ -20,22 +68,154 @@ public class ModbusConverter {
// }
// return message;
// }
//
// /**
// * OriginaData ==> 0x00
// * @param oper
// * @return
// */
/**
* OriginaData ==> 0x00
* @param oper
* @return
*/
private static InMessage toHeartMessage(short addr, int oper, byte[] originData) throws JsonProcessingException {
InMessage message = new InMessage();
PingMessage serverMessage = new PingMessage();
message.setData(JacksonUtil.beanToJson(serverMessage));
message.setToDomain(MessageConstant.DomainType.Server);
return message;
}
// private static BaseMessageDto toHeartMessage(int addr, int oper, byte[] originData){
// HeartMessageDto message = new HeartMessageDto();
// return message;
// }
//
// /**
// * OriginaData ==> 0x00 0x00 0x00 0x64
// * @param oper
// * @return
// */
/**
* OriginaData ==> 0x00 0x00 0x00 0x64
* @param oper
* @return
*/
private static InMessage toAuthMessage(short register, int oper, byte[] values) throws JsonProcessingException {
long userId = 0;
for(int i=0;i<CCModBusEntity.SIZE_DATA_SINGLE;i++){
userId <<= 8;
userId |= values[i] & 0xFF;
}
// TODO 此处如何传参? 参数仅为authid
AuthMessage message = new AuthMessage(userId);
message.setMessage("{\""+register+"\":" + userId + "}");
InMessage inMessage = new InMessage();
inMessage.setData(JacksonUtil.beanToJson(message));
inMessage.setToDomain(MessageConstant.DomainType.Server);
return inMessage;
}
public static CCModBusEntity convertCommonProtocolToCCModbus(OutMessage outMessage) throws IOException {
CCModBusEntity ccModBusEntity = null;
JsonNode typeNode = JacksonUtil.getJsonProperty(outMessage.getData(), "type");
if (typeNode == null || StrUtil.isEmpty(typeNode.textValue())) {
ccModBusEntity = fromModBusMessage(JacksonUtil.jsonToBean(outMessage.getData(), ModBusAckMessage.class));
log.info("普通的读写操作,返回结果:{}", ccModBusEntity);
return ccModBusEntity;
}
MessageConstant.ServerMessageType type = MessageConstant.ServerMessageType.valueOf(typeNode.textValue());
switch (type) {
// 心跳
case Pong:{
ccModBusEntity = fromHeartMessage(JacksonUtil.jsonToBean(outMessage.getData(), PongMessage.class));
break;
}
// 认证
case ChannelStatus:{
ccModBusEntity = fromAuthMessage();
break;
}
case Ack:{
ccModBusEntity = fromModBusMessage(JacksonUtil.jsonToBean(outMessage.getData(), ModBusAckMessage.class));
log.info("普通的读写操作,返回结果:{}", ccModBusEntity);
break;
}
default:
}
return ccModBusEntity;
}
/**
* modbus返回
* @param modBusAckMessage modBusData
* @return modBus返回
*/
private static CCModBusEntity fromModBusMessage(ModBusAckMessage modBusAckMessage) {
byte []originData = new byte[CCModBusEntity.SIZE_REGISTER_START + CCModBusEntity.SIZE_REGISTER_NUM];
byte[] modBusData = modBusAckMessage.getData().getModBusData();
byte oper = modBusData[CCModBusEntity.POSITION_OPER];
switch (oper) {
case CCModBusEntity.Oper.READ :
case CCModBusEntity.Oper.WRITE : {
// TODO
break;
}
case CCModBusEntity.Oper.WRITE_MORE : {
// 寄存器起始地址
int i = 0;
for (int j = 0; j < CCModBusEntity.SIZE_REGISTER_START; j++) {
originData[i++] = modBusData[CCModBusEntity.POSITION_REGISTER + j];
}
// 寄存器数量
for (int j = 0; j < CCModBusEntity.SIZE_REGISTER_NUM; j++) {
originData[i++] = modBusData[CCModBusEntity.POSITION_REGISTER_NUM + j];
}
}
default:break;
}
CCModBusEntity ccModBusEntity = new CCModBusEntity(CCModBusEntity.ADDRESS,oper,originData);
return ccModBusEntity;
}
private static CCModBusEntity fromAuthMessage() {
byte []originData = new byte[CCModBusEntity.SIZE_REGISTER_START + CCModBusEntity.SIZE_REGISTER_NUM];
// 寄存器起始地址
originData[0] = MessageConstant.Register.Auth.first;
originData[1] = MessageConstant.Register.Auth.second;
// 寄存器数量
originData[2] = 0x00;
originData[3] = 0x01;
byte addr = CCModBusEntity.ADDRESS;
byte oper = CCModBusEntity.Oper.WRITE_MORE;
CCModBusEntity ccModBusEntity = new CCModBusEntity(addr,oper,originData);
return ccModBusEntity;
}
/**
* OriginalData: 0x00
* @param message
* @return
*/
private static CCModBusEntity fromHeartMessage(PongMessage message){
byte []originData = new byte[CCModBusEntity.SIZE_REGISTER_START + CCModBusEntity.SIZE_REGISTER_NUM];
// 寄存器起始地址
originData[0] = MessageConstant.Register.Ping.first;
originData[1] = MessageConstant.Register.Ping.second;
// 寄存器数量
originData[2] = 0x00;
originData[3] = 0x01;
byte addr = CCModBusEntity.ADDRESS;
byte oper = CCModBusEntity.Oper.WRITE_MORE;
CCModBusEntity ccModBusEntity = new CCModBusEntity(addr,oper,originData);
return ccModBusEntity;
}
// private static BaseMessageDto toAuthMessage(int addr, int oper, byte[] originData){
// Long userId = 0L;
// for(int i=0;i<originData.length;i++){
@ -45,7 +225,7 @@ public class ModbusConverter {
// AuthMessageDto message = new AuthMessageDto(userId,null);
// return message;
// }
//
// public static CCModBusEntity convertCommonProtocolToCCModbus(BaseMessageDto message) {
// CCModBusEntity ccModBusEntity = null;
// WebConstant.Message_Type type = WebConstant.Message_Type.phaseOf(message.getType());

65
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusDecoder.java

@ -1,11 +1,16 @@
package com.ccsens.ccmq.lowlevel.client.netty.tcphexserver;
import cn.hutool.core.util.ObjectUtil;
import com.ccsens.ccmq.lowlevel.message.common.InMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import wiki.tall.ccmq.common.bean.dto.ccmodbus.CCModBusEntity;
import java.util.List;
@Slf4j
public class ModbusDecoder extends ByteToMessageDecoder {
private void discardNBytes(ByteBuf in, int length) {
@ -14,32 +19,74 @@ public class ModbusDecoder extends ByteToMessageDecoder {
}
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
// if (in.readableBytes() < CCModBusEntity.SIZE_MIN) {
// @Override
// protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
// if (in.readableBytes() < ModBusEntity.SIZE_WRITE_MORE_MIN) {
// //长度小于协议的最小长度,继续读下一次
// return;
// }
// ModBusEntity modBusEntity = new ModBusEntity(in);
// ModBusEntity.Error error = modBusEntity.valid();
//
// CCModBusEntity ccModBusEntity = new CCModBusEntity(in);
// CCModBusEntity.Error error = ccModBusEntity.valid();
// switch (error) {
// case ERROR_FILTER_NOT_MATCH:
// case ERROR_LEN_EXCLUEE_MAX:
// case ERROR_CRC_INVALID: //丢弃一个字节,继续读取
// //丢弃一个字节,继续读取
// case ERROR_CRC_INVALID:
// discardNBytes(in, 1);
// return;
// case ERROR_NEED_MORE_DATA: //继续读取
// //继续读取
// case ERROR_NEED_MORE_DATA:
// return;
// case ERROR_NONE:
// default:
// break;
// }
//
// //交给下个handler处理
// discardNBytes(in, ccModBusEntity.getModbusLength());
// BaseMessageDto message = ModbusConverter.convertCCModbusToMessage(ccModBusEntity);
// discardNBytes(in, modBusEntity.getModBusLength());
// InMessage message = ModbusConverter.convertCCModbusToMessage(modBusEntity);
// if(ObjectUtil.isNotNull(message)) {
// out.add(message);
// }
//
// }
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
while(true) {
if (in.readableBytes() < CCModBusEntity.SIZE_MIN) {
//长度小于协议的最小长度,继续读下一次
return;
}
CCModBusEntity ccModBusEntity = new CCModBusEntity(in);
CCModBusEntity.Error error = ccModBusEntity.valid();
log.info("modbus数据检查:{}", error);
switch (error) {
case ERROR_FILTER_NOT_MATCH:
case ERROR_LEN_EXCLUEE_MAX:
//丢弃一个字节,继续读取
case ERROR_CRC_INVALID:
discardNBytes(in, 1);
return;
//继续读取
case ERROR_NEED_MORE_DATA:
return;
case ERROR_NONE:
default:
break;
}
//交给下个handler处理
discardNBytes(in, ccModBusEntity.getModbusLength());
InMessage message = ModbusConverter.convertCCModbusToMessage(ccModBusEntity);
log.info("modBus封装成inMessage:{}", message);
if (ObjectUtil.isNotNull(message)) {
out.add(message);
}
}
}
}

22
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusEncoder.java

@ -1,14 +1,34 @@
package com.ccsens.ccmq.lowlevel.client.netty.tcphexserver;
import cn.hutool.core.collection.CollectionUtil;
import com.ccsens.ccmq.lowlevel.message.common.OutMessage;
import com.ccsens.ccmq.lowlevel.message.common.OutMessageSet;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;
import wiki.tall.ccmq.common.bean.dto.ccmodbus.CCModBusEntity;
@Slf4j
public class ModbusEncoder extends MessageToByteEncoder<OutMessageSet> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, OutMessageSet message, ByteBuf out) throws Exception {
//
log.info("modBus返回数据:{}", message);
if (message == null || CollectionUtil.isEmpty(message.getMessageSet())) {
return;
}
for(OutMessage outMessage : message.getMessageSet()){
CCModBusEntity ccModBusEntity = ModbusConverter.convertCommonProtocolToCCModbus(outMessage);
log.info("modBus数据转化:{}", ccModBusEntity);
if(ccModBusEntity != null){
byte[] modbusData = ccModBusEntity.getModbusData();
out.writeBytes(modbusData);
}
}
// //1.处理Start->Timer[Countdown]
// if(message != null && message instanceof SyncMessageWithStartDto){
// SyncMessageWithStartDto startMessage = (SyncMessageWithStartDto)message;

2
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/NettyMBServer.java

@ -33,6 +33,8 @@ public class NettyMBServer {
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
// 超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override

100
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java

@ -15,14 +15,20 @@ import com.ccsens.ccmq.lowlevel.service.IUserService;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import wiki.tall.ccmq.common.bean.dto.ccmodbus.CCModBusEntity;
import wiki.tall.ccmq.common.bean.dto.ccmodbus.Register;
import wiki.tall.ccmq.common.config.SettingProps;
import wiki.tall.ccmq.common.util.*;
import javax.annotation.PostConstruct;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
@ -40,6 +46,9 @@ public class MessageHandler {
*/
private static final Integer MAX_MESSAGE_NUM = 50;
@Autowired
private AmqpTemplate rabbitTemplate;
private static IMessageDao getMessageDao(){
return SpringContextUtils.getBean(IMessageDao.class);
}
@ -52,6 +61,13 @@ public class MessageHandler {
return SpringContextUtils.getBean(IUserService.class);
}
private static MessageHandler messageHandler;
@PostConstruct
public void init(){
messageHandler = this;
messageHandler.rabbitTemplate = this.rabbitTemplate;
}
/**
* 每个20s同步一次有未决消息的用户到pendingClientSet中
* @throws Exception
@ -240,6 +256,7 @@ public class MessageHandler {
}
private static void handlerServerMessage(String type, InMessage inMessage) throws Exception {
logger.info("处理Server消息:{},{}", type, inMessage);
MessageConstant.ClientMessageType clientMessageType = MessageConstant.ClientMessageType.valueOf(type);
String data = inMessage.getData();
OutMessage outMessage = null;
@ -255,16 +272,27 @@ public class MessageHandler {
break;
}
case Auth: {
logger.info("授权开始");
boolean authSuccess = false;
AuthMessage inSysData = JacksonUtil.jsonToBean(data, AuthMessage.class);
if(null != inSysData.getData()){
if(StrUtil.isNotEmpty(inSysData.getData().getToken())) {
logger.info("token授权");
String userId = getUserService().getUserIdByToken(inSysData.getData().getToken());
if(StrUtil.isNotEmpty(userId)){
ChannelManager.authChannel(ChannelManager.getCurrentChannel(),userId,inSysData.getData().getMajor(),inSysData.getData().getMinor(), inSysData.getMessage());
onClientOnLine(MessageConstant.DomainType.User,userId);
authSuccess = true;
}
} else if (inSysData.getData().getAuthId() != null) {
logger.info("authid授权");
// authId有值默认授权通过
ChannelManager.authChannel(ChannelManager.getCurrentChannel(),String.valueOf(inSysData.getData().getAuthId()),inSysData.getData().getMajor(),inSysData.getData().getMinor(), inSysData.getMessage());
logger.info("授权完成");
onClientOnLine(MessageConstant.DomainType.User,String.valueOf(inSysData.getData().getAuthId()));
logger.info("设置在线");
authSuccess = true;
}
}
if(!authSuccess){
@ -272,10 +300,12 @@ public class MessageHandler {
new ChannelStatusMessage(false,0L,MessageConstant.Error.AuthFailed))
);
}else{
outMessage = new OutMessage(JacksonUtil.beanToJson(
new ChannelStatusMessage(true,0L,MessageConstant.Error.Ok))
);
}
logger.info("授权结束");
break;
}
case GetChannelStatus: {
@ -402,6 +432,68 @@ public class MessageHandler {
)
);
break;
}
case ModBus:{
//1.按照功能码读写数据
ModBusMessage modBusMessage = JacksonUtil.jsonToBean(data, ModBusMessage.class);
byte oper = modBusMessage.getData().getOper();
switch (oper) {
case CCModBusEntity.Oper.READ : {
// TODO 读操作
WrapperedChannel wrapperedChannel = ChannelManager.getWrapperedChannelByChannel(ChannelManager.getCurrentChannel());
Object o = wrapperedChannel.getRegister().get(modBusMessage.getData().getRegister());
break;
}
case CCModBusEntity.Oper.WRITE : {
// TODO 写单个寄存器
break;
}
case CCModBusEntity.Oper.WRITE_MORE : {
// 写多个寄存器
// 2. 如果是写操作且值发生改变,放到mq里
byte[] modBusData = modBusMessage.getData().getModBusData();
// 读起始寄存器地址
int startAddr = CCModBusEntity.getRegisterStartAddr(modBusData);
int num = CCModBusEntity.getRegisterStartNum(modBusData);
WrapperedChannel wrapperedChannel = ChannelManager.getWrapperedChannelByChannel(ChannelManager.getCurrentChannel());
Map<String, Long> register = wrapperedChannel.getRegister();
// 授权数据
Long authId = register.get(String.valueOf(MessageConstant.Register.Auth.getAddr()));
for (int i = 0; i < num; i++) {
// 寄存器的原始数据
Long origin = register.get(String.valueOf(startAddr));
// 寄存器将要修改的数据
long value = 0;
for (int j = 0; j < CCModBusEntity.SIZE_DATA_SINGLE; j++) {
value <<= 8;
value += modBusData[CCModBusEntity.POSITION_DATA + CCModBusEntity.SIZE_DATA_SINGLE * i + j];
}
if (origin == null || origin.longValue() != value) {
register.put(String.valueOf(startAddr), value);
logger.info("寄存器数据:{}", register);
// 数据不一致,放到消息队列里
Register register1 = new Register();
register1.setAuthId(authId);
register1.setAddr(startAddr+i);
register1.setValue(value);
OutMessage outMessage1 = new OutMessage(JacksonUtil.beanToJson(register1));
OutMessageSet set = new OutMessageSet();
set.add(outMessage1);
messageHandler.rabbitTemplate.convertAndSend("wisdom_car",
JacksonUtil.beanToJson(set));
logger.info("modbus通知mq:{}", set);
}
}
// 3. 封装返回值
outMessage = new OutMessage(JacksonUtil.beanToJson(new ModBusAckMessage(modBusData)));
break;
}
default:break;
}
}
default: break;
}
@ -416,7 +508,9 @@ public class MessageHandler {
private static void onClientOnLine(MessageConstant.DomainType domain,String userId){
getMessageDao().updateMessageStatusToPending(domain,userId);
logger.info("修改mongodb");
addClientToRedisUnPendingSet(CcMessageUtil.getDomainTypeAndUserIdString(domain,userId));
logger.info("添加redisUnPending");
}
private static void updateMessageAckStatus(String ackId) throws Exception {
@ -477,16 +571,22 @@ public class MessageHandler {
String []stringArray = CcMessageUtil.splitTypeAndUserId(domainTypeAndUserId);
MessageConstant.DomainType toDomain = MessageConstant.DomainType.valueOf(stringArray[0]);
String to = stringArray[1];
logger.info("查询ack");
//1.1 当前用户是否在pendingClients列表中
if (!RedisUtil.sHas(redisUnPendingClientSetKey,domainTypeAndUserId)) {
logger.info("当前用户不在pendingClients列表中");
//1.2 当前用户是否在redis的等待ack列表中
if(!RedisUtil.hasKey(redisWaitAckSetKey)){
logger.info("当前用户不在redis的等待ack列表中");
//1.3 当前用户是否有pending消息
if(getMessageDao().countClientPendingMessage(toDomain,to) > 0){
logger.info("当前用户有pending消息");
//1.4 将当前用户添加到redisUnPendingClientSet列表中排队处理
RedisUtil.sSet(redisUnPendingClientSetKey,domainTypeAndUserId);
logger.info("将当前用户添加到redisUnPendingClientSet列表中排队处理");
}
}
}
logger.info("添加ack");
}
}

7
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/AuthMessage.java

@ -16,6 +16,7 @@ public class AuthMessage extends ServerMessage {
@Getter
public static class Data{
private String token;
private Long authId;
// 主版本号
private int major;
// 此版本号
@ -37,4 +38,10 @@ public class AuthMessage extends ServerMessage {
data.setMajor(major);
data.setMinor(minor);
}
public AuthMessage(Long authId){
this();
data = new Data();
data.setAuthId(authId);
}
}

37
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/ModBusMessage.java

@ -0,0 +1,37 @@
package com.ccsens.ccmq.lowlevel.message.client;
import com.ccsens.ccmq.lowlevel.message.common.MessageConstant;
import com.ccsens.ccmq.lowlevel.message.common.ServerMessage;
import lombok.Data;
/**
* @description:
* @author: whj
* @time: 2020/12/14 22:32
*/
@Data
public class ModBusMessage extends ServerMessage {
@lombok.Data
public static class Data{
private short register;
private byte oper;
private byte[] modBusData;
}
private Data data;
/**业务相关参数,由业务具体确定是什么类型,什么参数*/
private String message;
public ModBusMessage(){
setType(MessageConstant.ClientMessageType.ModBus.name());
}
public ModBusMessage(short register, byte oper, byte[] modBusData) {
this();
data = new Data();
data.register = register;
data.oper = oper;
data.modBusData = modBusData;
}
}

38
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java

@ -8,18 +8,54 @@ import lombok.Data;
* 消息相关常量
*/
public class MessageConstant {
/**
* 寄存器地址
*/
public enum Register{
/**授权*/
Auth((byte) 0x00, (byte) 0x01),
/**心跳*/
Ping((byte) 0x00, (byte) 0x02),
;
public byte first;
public byte second;
Register(byte first, byte second){
this.first = first;
this.second = second;
}
public long getAddr(){
long addr = first << 8;
addr += second;
return addr;
}
public static Register valueOf(byte first, byte second) {
for(Register register : Register.values()){
if (register.first == first && register.second == second) {
return register;
}
}
return null;
}
}
public enum ClientMessageType{
//客户端心跳
Ping(0x00),
//客户端认证
Auth(0x01),
//客户端收到消息ACK
Ack(0x02),
//客户端收到消息ACK
HasRead(0x03),
//客户端请求连接状态
GetChannelStatus(0x04),
ModBus(0x05),
//不可预期的错误
UnExceptedError(0x21),
ClientIdleClosed(0x22),

31
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/server/ModBusAckMessage.java

@ -0,0 +1,31 @@
package com.ccsens.ccmq.lowlevel.message.server;
import com.ccsens.ccmq.lowlevel.message.common.ServerMessage;
import lombok.Data;
/**
* @author zhangsan
*/
@Data
public class ModBusAckMessage extends ServerMessage {
public ModBusAckMessage(){
}
public ModBusAckMessage(byte[] modBusData){
data = new Data(modBusData);
}
@lombok.Data
public static class Data{
private byte[] modBusData;
public Data(){
}
public Data(byte[] modBusData){
this.modBusData = modBusData;
}
}
private Data data;
}

108
ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java

@ -1,5 +1,6 @@
package wiki.tall.ccmq.common.bean.dto.ccmodbus;
import lombok.Data;
import wiki.tall.ccmq.common.util.CRCUtil;
import io.netty.buffer.ByteBuf;
@ -8,6 +9,8 @@ import io.netty.buffer.ByteBuf;
* eg: FF FE 0A 10 93 00 00 00 00 00 00 94 2A
* Notice: crc is example ,not the real data
*/
@Data
public class CCModBusEntity {
public enum Error{
ERROR_NONE,
@ -17,19 +20,41 @@ public class CCModBusEntity {
ERROR_NEED_MORE_DATA
}
public static class Oper{
public final static byte READ = 0X03;
public final static byte WRITE = 0X06;
public final static byte WRITE_MORE = 0X10;
}
public static final int SIZE_FILTER = 2;
public static final int SIZE_LEN = 1;
public static final int SIZE_ADDR = 1;
public static final int SIZE_OPER = 1;
public static final int SIZE_CRC = 2;
public static final int SIZE_REGISTER_START = 2;
public static final int SIZE_REGISTER_NUM = 2;
public static final int SIZE_COUNT_LENGTH = 1;
public static final int SIZE_DATA_SINGLE = 2;
public static final int SIZE_DATA_MIN = 1;
public static final int SIZE_DATA_MAX = 255 - SIZE_ADDR - SIZE_OPER - SIZE_CRC;
public static final int SIZE_MAX = 255;
public static final int SIZE_DATA_MAX = SIZE_MAX - SIZE_ADDR - SIZE_OPER - SIZE_CRC - SIZE_FILTER - SIZE_LEN;
public static final int SIZE_MIN = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_DATA_MIN + SIZE_CRC;
public static final int SIZE_MAX = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_DATA_MAX + SIZE_CRC;
public static final byte ADDRESS = 0x14;
public static final int POSITION_DATA = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_REGISTER_START + SIZE_REGISTER_NUM + SIZE_COUNT_LENGTH;
public static final int POSITION_OPER = SIZE_FILTER + SIZE_LEN + SIZE_ADDR;
public static final int POSITION_REGISTER = POSITION_OPER + SIZE_OPER;
public static final int POSITION_REGISTER_NUM = POSITION_REGISTER + SIZE_REGISTER_START;
// public static final int SIZE_MAX = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_DATA_MAX + SIZE_CRC;
public static final byte[] FILTER = {(byte)0xFF,(byte)0xFE};
// public static final byte[] TAILER = {0x0d,0x0a};
public static final byte[] TAILER = {};
private byte modbusData[];
private byte filter[];
private int len;
@ -38,6 +63,46 @@ public class CCModBusEntity {
private byte originData[];
private byte crc[];
/**
* 获取寄存器起始地址
* @param modBusData modBusData
* @return 寄存器起始地址
*/
public static int getRegisterStartAddr(byte[] modBusData){
int addr = 0;
int start = POSITION_REGISTER;
for (int i = 0; i < SIZE_REGISTER_START; i++) {
addr <<= 8;
addr += modBusData[start + i];
}
return addr;
}
/**
* 获取寄存器起始地址
* @return 寄存器起始地址
*/
public int getRegisterStartAddr(){
return getRegisterStartAddr(modbusData);
}
/**
* 获取写几个寄存器写多个寄存器
* @param modBusData modBusData
* @return 寄存器个数
*/
public static int getRegisterStartNum(byte[] modBusData){
int number = 0;
int start = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_REGISTER_START;
for (int i = 0; i < SIZE_REGISTER_NUM; i++) {
number <<= 8;
number += modBusData[start + i];
}
return number;
}
public CCModBusEntity(byte addr,byte oper,byte[] originData){
modbusData = new byte[SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + originData.length + SIZE_CRC + TAILER.length];
int index = 0;
@ -103,9 +168,25 @@ public class CCModBusEntity {
return originData;
}
public byte[] getValue(){
getOriginData();
byte[] values;
if (oper == Oper.WRITE_MORE) {
// TODO 写多个寄存器
values = new byte[originData[SIZE_REGISTER_START + SIZE_REGISTER_NUM]];
System.arraycopy(originData, SIZE_REGISTER_START + SIZE_REGISTER_NUM + CCModBusEntity.SIZE_COUNT_LENGTH, values, 0, values.length);
} else {
// 写一个寄存器
values = new byte[SIZE_DATA_SINGLE];
System.arraycopy(originData, SIZE_REGISTER_START, values, 0, CCModBusEntity.SIZE_DATA_SINGLE);
}
return values;
}
public byte[] getCrc(){
if(crc == null || crc.length != SIZE_CRC)
if(crc == null || crc.length != SIZE_CRC) {
crc = new byte[SIZE_CRC];
}
int index = SIZE_FILTER + SIZE_LEN + getLen() - SIZE_CRC;
System.arraycopy(modbusData,index,crc,0,SIZE_CRC);
return crc;
@ -116,15 +197,12 @@ public class CCModBusEntity {
}
public boolean crcValid() {
// byte[] thisCRC = getCrc();
// byte crc[] = {0,0};
// int index = SIZE_FILTER + SIZE_LEN;
// int length = getLen() - SIZE_CRC;
// ProtocolUtil.crc16(crc, this.modbusData,index,length);
// if(crc[1] == thisCRC[0] && crc[0]== thisCRC[1])
// return true;
// return false;
return true;
byte[] thisCRC = getCrc();
byte[] crc = new byte[SIZE_CRC];
// CRCUtil.crc16(crc,modbusData,SIZE_FILTER+SIZE_LEN,modbusData[SIZE_FILTER] - SIZE_CRC - TAILER.length);
CRCUtil.crc16(crc,modbusData,SIZE_FILTER+SIZE_LEN,getLen() - SIZE_CRC);
return crc[0] == thisCRC[0] && crc[1]== thisCRC[1];
}
public void print(){
@ -138,14 +216,16 @@ public class CCModBusEntity {
//1.filter
filter = getFilter();
for(int i=0;i<SIZE_FILTER;i++){
if(filter[i] != FILTER[i])
if(filter[i] != FILTER[i]) {
return Error.ERROR_FILTER_NOT_MATCH;
}
}
//2.len
len = getLen();
if(len > SIZE_DATA_MAX)
if(len > SIZE_DATA_MAX) {
return Error.ERROR_LEN_EXCLUEE_MAX;
}
//3.data 完整性
if(this.modbusData.length < (SIZE_FILTER + SIZE_LEN + len)) {

115
ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/ModBusEntity.java

@ -0,0 +1,115 @@
package wiki.tall.ccmq.common.bean.dto.ccmodbus;
import io.netty.buffer.ByteBuf;
/**
* @description:
* @author: whj
* @time: 2020/12/14 15:02
*/
public class ModBusEntity {
public enum Error{
ERROR_NONE,
ERROR_FILTER_NOT_MATCH,
ERROR_LEN_EXCLUEE_MAX,
ERROR_CRC_INVALID,
ERROR_NEED_MORE_DATA
}
public enum FunctionCode{
READ((byte)3),
WRITE((byte)6),
WRITE_MORE((byte)10),
;
private byte code;
FunctionCode(byte code) {
this.code = code;
}
public static FunctionCode getEnum(byte code) {
for ( FunctionCode functionCode: FunctionCode.values() ) {
if (functionCode.code == code) {
return functionCode;
}
}
return null;
}
}
/**传感器地址长度*/
public static final int SIZE_IP = 1;
/**功能码长度*/
public static final int SIZE_FUNCTION_CODE = 1;
/**寄存器起始地址长度*/
public static final int SIZE_REGISTER_START = 2;
/**寄存器数量长度*/
public static final int SIZE_REGISTER_NUM = 2;
/**计数字节长度*/
public static final int SIZE_COUNT_BYTES = 1;
public static final int SIZE_DATA_MIN = 2;
public static final int SIZE_DATA_MAX = 240;
/**单个预置值的长度*/
public static final int SIZE_SINGLE_VALUE = 2;
/**CR校验长度*/
public static final int SIZE_CR = 2;
/**预置多个寄存器最小长度*/
public static final int SIZE_WRITE_MORE_MIN = SIZE_IP + SIZE_FUNCTION_CODE + SIZE_REGISTER_START
+ SIZE_REGISTER_NUM + SIZE_COUNT_BYTES + SIZE_DATA_MIN + SIZE_CR;
/**预置多个寄存器最大长度*/
public static final int SIZE_WRITE_MORE_MAX = SIZE_WRITE_MORE_MIN - SIZE_DATA_MIN + SIZE_DATA_MAX;
private byte[] modBusData;
public ModBusEntity(ByteBuf in){
this.modBusData = new byte[in.readableBytes()];
in.getBytes(in.readerIndex(),this.modBusData);
}
/**
* 协议校验
* @return
*/
public Error valid() {
// TODO cr校验
return Error.ERROR_NONE;
}
/**
* 获取预置值的大小
* @return
*/
public int getPreValueLen(){
int index = SIZE_IP + SIZE_FUNCTION_CODE + SIZE_REGISTER_START;
return (modBusData[index] & 0xFF) * SIZE_SINGLE_VALUE;
}
/**
* 获取对象的长度
* @return
*/
public int getModBusLength() {
byte functionCode = this.modBusData[SIZE_IP];
switch (FunctionCode.getEnum(functionCode)) {
case READ :
return SIZE_IP + SIZE_FUNCTION_CODE + SIZE_REGISTER_START + SIZE_REGISTER_NUM + SIZE_CR;
case WRITE :
return SIZE_IP + SIZE_FUNCTION_CODE + SIZE_REGISTER_START + SIZE_SINGLE_VALUE + SIZE_CR;
case WRITE_MORE:
int length = SIZE_IP + SIZE_FUNCTION_CODE + SIZE_REGISTER_START + SIZE_REGISTER_NUM + SIZE_COUNT_BYTES + SIZE_CR;
default:break;
}
return 0;
}
}

29
ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/Register.java

@ -0,0 +1,29 @@
package wiki.tall.ccmq.common.bean.dto.ccmodbus;
import lombok.Data;
/**
* @description:
* @author: whj
* @time: 2020/12/15 19:18
*/
@Data
public class Register {
/**
* 平车编号
*/
private Long authId;
/**
* 寄存器地址
*/
private int addr;
/**
* 寄存器的值
*/
private Long value;
/**
* 时间
*/
private Long time = System.currentTimeMillis();
}

2
ccmq/src/main/java/wiki/tall/ccmq/common/util/CRCUtil.java

@ -27,4 +27,6 @@ public class CRCUtil {
crc[0] = (byte)(wcrc & 0xFF); // crc的低八位
crc[1] = (byte)((wcrc >> 8) & 0xFF); // crc的高八位
}
}

2
ccmq/src/main/resources/application.properties

@ -1,5 +1,5 @@
# 选择开发环境{dev|test|prod}
spring.profiles.active=prod
spring.profiles.active=test
# 设置应用名
spring.application.name=tall-message

Loading…
Cancel
Save