diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusConverter.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusConverter.java index 0f48db1..8bf2dad 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusConverter.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusConverter.java @@ -27,24 +27,34 @@ public class ModbusConverter { short register = (short) ccModBusEntity.getRegisterStartAddr(); byte oper = (byte) (ccModBusEntity.getOper() & 0xFF); - MessageConstant.Register type = MessageConstant.Register.valueOf(data[0], data[1] ); - if (type == null) { - message = new InMessage(); - message.setData(JacksonUtil.beanToJson(new ModBusMessage(register,oper,ccModBusEntity.getModbusData()))); - message.setToDomain(MessageConstant.DomainType.Server); - return message; - } - switch (type) { - case Ping: { - message = toHeartMessage(register,oper,ccModBusEntity.getValue()); +// MessageConstant.Register type = MessageConstant.Register.valueOf(data[0], data[1] ); +// if (type == null) { +// message = new InMessage(); +// message.setData(JacksonUtil.beanToJson(new ModBusMessage(register,oper,ccModBusEntity.getModbusData()))); +// message.setToDomain(MessageConstant.DomainType.Server); +// return message; +// } + switch (oper) { + case CCModBusEntity.Oper.PING: { + message = toHeartMessage(); break; } - case Auth:{ - message = toAuthMessage(register,oper,ccModBusEntity.getValue()); + case CCModBusEntity.Oper.AUTH:{ + message = toAuthMessage(register,ccModBusEntity.getValue()); break; } default:{ - message = null; + // TODO 兼容智慧平车旧版本的授权操作,智慧平车修改后,即可删除此if语句 + MessageConstant.Register type = MessageConstant.Register.valueOf(data[0], data[1] ); + if (type == MessageConstant.Register.Auth) { + message = toOldAuthMessage(register,ccModBusEntity.getValue()); + return message; + } + + // 读写操作 + message = new InMessage(); + message.setData(JacksonUtil.beanToJson(new ModBusMessage(ccModBusEntity.getAddr(), register,oper,ccModBusEntity.getModbusData()))); + message.setToDomain(MessageConstant.DomainType.Server); } } return message; @@ -70,11 +80,10 @@ public class ModbusConverter { // } /** - * OriginaData ==> 0x00 - * @param oper + * 心跳 * @return */ - private static InMessage toHeartMessage(short addr, int oper, byte[] originData) throws JsonProcessingException { + private static InMessage toHeartMessage() throws JsonProcessingException { InMessage message = new InMessage(); PingMessage serverMessage = new PingMessage(); message.setData(JacksonUtil.beanToJson(serverMessage)); @@ -88,20 +97,50 @@ public class ModbusConverter { /** * OriginaData ==> 0x00 0x00 0x00 0x64 - * @param oper + * 授权 + * @param register 业务类型 + * @param values 授权数据 主版本 次版本 授权ID * @return */ - private static InMessage toAuthMessage(short register, int oper, byte[] values) throws JsonProcessingException { + private static InMessage toAuthMessage(short register, byte[] values) throws JsonProcessingException { + if (values == null || values.length < 3) { + return null; + } + byte major = values[CCModBusEntity.SIZE_MAJOR]; + byte minor = values[CCModBusEntity.SIZE_MINOR]; long userId = 0; - for(int i=0;i 0x00 0x00 0x00 0x64 + * 授权 + * @param register 业务类型 + * @param values 授权数据 主版本 次版本 授权ID + * @return + */ + private static InMessage toOldAuthMessage(short register, byte[] values) throws JsonProcessingException { + + if (values == null || values.length < 1) { + return null; + } + long userId = 0; + for(int i=0;i register = wrapperedChannel.getRegister(); + // 授权数据 + Long authId = Long.parseLong(wrapperedChannel.getUserId()); + // 版本信息 + String version = wrapperedChannel.getVersion(); switch (oper) { case CCModBusEntity.Oper.READ : { // TODO 读操作 - WrapperedChannel wrapperedChannel = ChannelManager.getWrapperedChannelByChannel(ChannelManager.getCurrentChannel()); - Object o = wrapperedChannel.getRegister().get(modBusMessage.getData().getRegister()); + Object o = register.get(modBusMessage.getData().getRegister()); break; } case CCModBusEntity.Oper.WRITE : { @@ -468,11 +481,11 @@ public class MessageHandler { byte[] modBusData = modBusMessage.getData().getModBusData(); // 读起始寄存器地址 int startAddr = CCModBusEntity.getRegisterStartAddr(modBusData); + // 写寄存器数量 int num = CCModBusEntity.getRegisterStartNum(modBusData); - WrapperedChannel wrapperedChannel = ChannelManager.getWrapperedChannelByChannel(ChannelManager.getCurrentChannel()); - Map register = wrapperedChannel.getRegister(); - // 授权数据 - Long authId = register.get(String.valueOf(MessageConstant.Register.Auth.getAddr())); + // 接收数据的业务放 + byte addr = modBusMessage.getData().getAddr(); + OutMessageSet set = new OutMessageSet(); for (int i = 0; i < num; i++) { // 寄存器的原始数据 @@ -493,13 +506,17 @@ public class MessageHandler { register1.setAuthId(authId); register1.setAddr(startAddr+i); register1.setValue(value); + register1.setVersion(version); OutMessage outMessage1 = new OutMessage(JacksonUtil.beanToJson(register1)); set.add(outMessage1); } - if (CollectionUtil.isNotEmpty(set.getMessageSet())) { - messageHandler.rabbitTemplate.convertAndSend("wisdom_car", + String mqName = (String)RedisUtil.hget(MessageConstant.Redis.KEY_MQ_COLLECTION, String.valueOf(addr)); + logger.info("mqName:{}", mqName); + if (CollectionUtil.isNotEmpty(set.getMessageSet()) && StrUtil.isNotEmpty(mqName)) { + + messageHandler.rabbitTemplate.convertAndSend(mqName, JacksonUtil.beanToJson(set)); - logger.info("modbus通知mq:{}", set); + logger.info("modbus通知mq({}):{}",mqName, set); } // 3. 封装返回值 outMessage = new OutMessage(JacksonUtil.beanToJson(new ModBusAckMessage(modBusData))); diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/AuthMessage.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/AuthMessage.java index 06f4420..cfbab79 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/AuthMessage.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/AuthMessage.java @@ -40,6 +40,14 @@ public class AuthMessage extends ServerMessage { data.setMinor(minor); } + public AuthMessage(Long authId,int major,int minor){ + this(); + data = new Data(); + data.setAuthId(authId); + data.setMajor(major); + data.setMinor(minor); + } + public AuthMessage(Long authId){ this(); data = new Data(); diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/ModBusMessage.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/ModBusMessage.java index f42c439..cfa20a3 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/ModBusMessage.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/ModBusMessage.java @@ -15,6 +15,7 @@ public class ModBusMessage extends ServerMessage { @lombok.Data public static class Data{ + private byte addr; private short register; private byte oper; private byte[] modBusData; @@ -27,9 +28,10 @@ public class ModBusMessage extends ServerMessage { public ModBusMessage(){ setType(MessageConstant.ClientMessageType.ModBus.name()); } - public ModBusMessage(short register, byte oper, byte[] modBusData) { + public ModBusMessage(byte addr, short register, byte oper, byte[] modBusData) { this(); data = new Data(); + data.addr = addr; data.register = register; data.oper = oper; data.modBusData = modBusData; diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java index 34c6102..937c05b 100644 --- a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java @@ -8,6 +8,16 @@ import lombok.Data; * 消息相关常量 */ public class MessageConstant { + + /** + * redis参数 + */ + public static class Redis{ + /**mq集合key*/ + public static final String KEY_MQ_COLLECTION = "mqCollection"; + } + + /** * 寄存器地址 */ @@ -92,7 +102,9 @@ public class MessageConstant { //客户端收到消息ACK Ack(0x02), //客户端请求连接状态 - ChannelStatus(0x03); + ChannelStatus(0x03), + // 未认证 + UnAuth(0x04); //撤销某个消息 //DelMessage(0x04), //客户端请求连接状态 diff --git a/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/server/UnAuthMessage.java b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/server/UnAuthMessage.java new file mode 100644 index 0000000..2df971b --- /dev/null +++ b/ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/server/UnAuthMessage.java @@ -0,0 +1,15 @@ +package com.ccsens.ccmq.lowlevel.message.server; + +import com.ccsens.ccmq.lowlevel.message.common.MessageConstant; +import com.ccsens.ccmq.lowlevel.message.common.ServerMessage; +import lombok.Data; + +/** + * @author zhangsan + */ +@Data +public class UnAuthMessage extends ServerMessage { + public UnAuthMessage(){ + setType(MessageConstant.ServerMessageType.UnAuth.name()); + } +} diff --git a/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java b/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java index 58e4285..92cecd0 100644 --- a/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java +++ b/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java @@ -6,6 +6,7 @@ import io.netty.buffer.ByteBuf; /** * format: Filter(2) len(1) addr(1) oper(1) data(x) crc(2) + * 授权data: major(1) minor(1) authId(长度由设备决定,消息系统以long接收) * eg: FF FE 0A 10 93 00 00 00 00 00 00 94 2A * Notice: crc is example ,not the real data */ @@ -20,10 +21,18 @@ public class CCModBusEntity { ERROR_NEED_MORE_DATA } + /** + * 操作类型 + */ public static class Oper{ + public static final byte AUTH = (byte) 0X90 ; + public static final byte UN_AUTH = (byte) 0X91 ; + public static final byte PING = (byte) 0X92; + public static final byte PONG = (byte) 0X93; public final static byte READ = 0X03; public final static byte WRITE = 0X06; public final static byte WRITE_MORE = 0X10; + } public static final int SIZE_FILTER = 2; @@ -37,8 +46,10 @@ public class CCModBusEntity { public static final int SIZE_DATA_SINGLE = 2; public static final int SIZE_DATA_MIN = 1; public static final int SIZE_MAX = 255; + public static final int SIZE_MAJOR = 1; + public static final int SIZE_MINOR = 1; public static final int SIZE_DATA_MAX = SIZE_MAX - SIZE_ADDR - SIZE_OPER - SIZE_CRC - SIZE_FILTER - SIZE_LEN; - public static final int SIZE_MIN = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_DATA_MIN + SIZE_CRC; + public static final int SIZE_MIN = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_CRC; public static final byte ADDRESS = 0x14; public static final int POSITION_DATA = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_REGISTER_START + SIZE_REGISTER_NUM + SIZE_COUNT_LENGTH; public static final int POSITION_OPER = SIZE_FILTER + SIZE_LEN + SIZE_ADDR; @@ -168,14 +179,24 @@ public class CCModBusEntity { return originData; } + /** + * 获取写操作后面的内容 + * @return + */ public byte[] getValue(){ getOriginData(); - byte[] values; - if (oper == Oper.WRITE_MORE) { - // TODO 写多个寄存器 + byte[] values = null; + byte oper = getOper(); + if (oper == Oper.AUTH) { + // 授权 major(1) minor(1) authId(长度由设备决定,消息系统以long接收) + int length = getLen() - SIZE_ADDR - SIZE_OPER - SIZE_CRC; + values = new byte[length]; + System.arraycopy(originData, 0, values, 0, length); + } else if (oper == Oper.WRITE_MORE) { + // 写多个寄存器 values = new byte[originData[SIZE_REGISTER_START + SIZE_REGISTER_NUM]]; System.arraycopy(originData, SIZE_REGISTER_START + SIZE_REGISTER_NUM + CCModBusEntity.SIZE_COUNT_LENGTH, values, 0, values.length); - } else { + } else if (oper == Oper.WRITE){ // 写一个寄存器 values = new byte[SIZE_DATA_SINGLE]; System.arraycopy(originData, SIZE_REGISTER_START, values, 0, CCModBusEntity.SIZE_DATA_SINGLE); diff --git a/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/Register.java b/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/Register.java index bc653a8..cf693b2 100644 --- a/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/Register.java +++ b/ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/Register.java @@ -1,5 +1,6 @@ package wiki.tall.ccmq.common.bean.dto.ccmodbus; +import com.rabbitmq.client.impl.ClientVersion; import lombok.Data; /** @@ -14,6 +15,7 @@ public class Register { * 平车编号 */ private Long authId; + private String version; /** * 寄存器地址 */ diff --git a/ccmq/src/main/java/wiki/tall/ccmq/common/config/RedisConfig.java b/ccmq/src/main/java/wiki/tall/ccmq/common/config/RedisConfig.java index 6ee0f00..c929252 100644 --- a/ccmq/src/main/java/wiki/tall/ccmq/common/config/RedisConfig.java +++ b/ccmq/src/main/java/wiki/tall/ccmq/common/config/RedisConfig.java @@ -31,8 +31,10 @@ public class RedisConfig { template.setHashKeySerializer(stringRedisSerializer); // value序列化方式采用jackson template.setValueSerializer(jackson2JsonRedisSerializer); - // hash的value序列化方式采用jackson - template.setHashValueSerializer(jackson2JsonRedisSerializer); +// // hash的value序列化方式采用jackson +// template.setHashValueSerializer(jackson2JsonRedisSerializer); + // hash的value序列化方式采用String + template.setHashValueSerializer(stringRedisSerializer); template.afterPropertiesSet(); return template; } diff --git a/ccmq/src/main/resources/application-dev.properties b/ccmq/src/main/resources/application-dev.properties index 535de6d..c12dc54 100644 --- a/ccmq/src/main/resources/application-dev.properties +++ b/ccmq/src/main/resources/application-dev.properties @@ -21,7 +21,8 @@ spring.redis.jedis.pool.min-idle=0 # RabbitMQϢ #spring.rabbitmq.host=49.233.89.188 -spring.rabbitmq.host=192.168.0.99 +#spring.rabbitmq.host=192.168.0.99 +spring.rabbitmq.host=www.tall.wiki spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=111111 diff --git a/ccmq/src/main/resources/application-prod.properties b/ccmq/src/main/resources/application-prod.properties index e343dbc..df81c1f 100644 --- a/ccmq/src/main/resources/application-prod.properties +++ b/ccmq/src/main/resources/application-prod.properties @@ -30,8 +30,35 @@ spring.rabbitmq.password=111111 # MongoDBϢ spring.data.mongodb.uri=mongodb://wei:111111@127.0.0.1:27017/test -spring.data.mongodb.option.socket-timeout=5000 -spring.data.mongodb.option.max-connection-idle-time=10000 + +# ÿһ̨ +spring.data.mongodb.option.max-connection-per-host=100 +# ̶߳ +spring.data.mongodb.option.threads-allowed-to-block-for-connection-multiplier=10 +# ÿһ̨С +spring.data.mongodb.option.min-connection-per-host=1 + +#ӳʱʱ 1 +spring.data.mongodb.option.connect-timeout=60000 +#ȴʱ 120000 2 * 60 * 1000 +spring.data.mongodb.option.max-wait-time=120000 +#Socketʱʱ +spring.data.mongodb.option.socket-timeout=10000 +# +spring.data.mongodb.option.socket-keep-alive=true + +## ӿʱ8Сʱ̫Ƶ +spring.data.mongodb.option.max-connection-idle-time=28800000 +spring.data.mongodb.option.max-connection-life-time=0 +# +spring.data.mongodb.option.heartbeat-socket-timeout=10000 +spring.data.mongodb.option.heartbeat-connect-timeout=15000 +spring.data.mongodb.option.min-heartbeat-frequency=5000 +spring.data.mongodb.option.heartbeat-frequency=100000 + + + + # Settings setting.snowflake.workerId = 1 setting.snowflake.datacenterId = 1 diff --git a/ccmq/src/main/resources/application-test.properties b/ccmq/src/main/resources/application-test.properties index 5c41288..455198f 100644 --- a/ccmq/src/main/resources/application-test.properties +++ b/ccmq/src/main/resources/application-test.properties @@ -28,9 +28,30 @@ spring.rabbitmq.password=111111 # MongoDBϢ spring.data.mongodb.uri=mongodb://wei:111111@49.233.89.188:27017/test +# ÿһ̨ +spring.data.mongodb.option.max-connection-per-host=100 +# ̶߳ +spring.data.mongodb.option.threads-allowed-to-block-for-connection-multiplier=10 +# ÿһ̨С +spring.data.mongodb.option.min-connection-per-host=1 + +#ӳʱʱ 1 +spring.data.mongodb.option.connect-timeout=60000 +#ȴʱ 120000 2 * 60 * 1000 +spring.data.mongodb.option.max-wait-time=120000 +#Socketʱʱ +spring.data.mongodb.option.socket-timeout=10000 +# spring.data.mongodb.option.socket-keep-alive=true -spring.data.mongodb.option.socket-timeout=5000 -spring.data.mongodb.option.max-connection-idle-time=10000 + +## ӿʱ8Сʱ̫Ƶ +spring.data.mongodb.option.max-connection-idle-time=28800000 +spring.data.mongodb.option.max-connection-life-time=0 +# +spring.data.mongodb.option.heartbeat-socket-timeout=10000 +spring.data.mongodb.option.heartbeat-connect-timeout=15000 +spring.data.mongodb.option.min-heartbeat-frequency=5000 +spring.data.mongodb.option.heartbeat-frequency=100000 # Settings setting.snowflake.workerId=1 diff --git a/ccmq/src/main/resources/application.properties b/ccmq/src/main/resources/application.properties index 97e42eb..dd8b329 100644 --- a/ccmq/src/main/resources/application.properties +++ b/ccmq/src/main/resources/application.properties @@ -1,5 +1,5 @@ # ѡ񿪷{dev|test|prod} -spring.profiles.active=prod +spring.profiles.active=test # Ӧ spring.application.name=tall-message diff --git a/ccmq/src/main/resources/logback-spring.xml b/ccmq/src/main/resources/logback-spring.xml index c3f03e2..1207337 100644 --- a/ccmq/src/main/resources/logback-spring.xml +++ b/ccmq/src/main/resources/logback-spring.xml @@ -1,22 +1,23 @@ cc-message - + - + - + - + + @@ -28,19 +29,19 @@ - + + <!– 日志保存周期 –> 30 - + <!– 总大小 –> 100MB %d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n - + --> @@ -136,7 +137,7 @@ - + \ No newline at end of file