Browse Source

modbus 功能码区分授权和心跳

master
zhizhi wu 5 years ago
parent
commit
9edfae0a5e
  1. 92
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusConverter.java
  2. 39
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java
  3. 4
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/AuthMessage.java
  4. 4
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/ModBusMessage.java
  5. 72
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java
  6. 15
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/server/UnAuthMessage.java
  7. 6
      ccmq/src/main/java/wiki/tall/ccmq/common/TallMessageApplication.java
  8. 31
      ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java
  9. 2
      ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/Register.java
  10. 6
      ccmq/src/main/java/wiki/tall/ccmq/common/config/RedisConfig.java
  11. 3
      ccmq/src/main/resources/application-dev.properties
  12. 25
      ccmq/src/main/resources/application-test.properties
  13. 2
      ccmq/src/main/resources/application.properties

92
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusConverter.java

@ -27,24 +27,27 @@ public class ModbusConverter {
short register = (short) ccModBusEntity.getRegisterStartAddr();
byte oper = (byte) (ccModBusEntity.getOper() & 0xFF);
MessageConstant.Register type = MessageConstant.Register.valueOf(data[0], data[1] );
if (type == null) {
message = new InMessage();
message.setData(JacksonUtil.beanToJson(new ModBusMessage(register,oper,ccModBusEntity.getModbusData())));
message.setToDomain(MessageConstant.DomainType.Server);
return message;
}
switch (type) {
case Ping: {
message = toHeartMessage(register,oper,ccModBusEntity.getValue());
// MessageConstant.Register type = MessageConstant.Register.valueOf(data[0], data[1] );
// if (type == null) {
// message = new InMessage();
// message.setData(JacksonUtil.beanToJson(new ModBusMessage(register,oper,ccModBusEntity.getModbusData())));
// message.setToDomain(MessageConstant.DomainType.Server);
// return message;
// }
switch (oper) {
case CCModBusEntity.Oper.PING: {
message = toHeartMessage();
break;
}
case Auth:{
message = toAuthMessage(register,oper,ccModBusEntity.getValue());
case CCModBusEntity.Oper.AUTH:{
message = toAuthMessage(register,ccModBusEntity.getValue());
break;
}
default:{
message = null;
// 读写操作
message = new InMessage();
message.setData(JacksonUtil.beanToJson(new ModBusMessage(ccModBusEntity.getAddr(), register,oper,ccModBusEntity.getModbusData())));
message.setToDomain(MessageConstant.DomainType.Server);
}
}
return message;
@ -70,11 +73,10 @@ public class ModbusConverter {
// }
/**
* OriginaData ==> 0x00
* @param oper
* 心跳
* @return
*/
private static InMessage toHeartMessage(short addr, int oper, byte[] originData) throws JsonProcessingException {
private static InMessage toHeartMessage() throws JsonProcessingException {
InMessage message = new InMessage();
PingMessage serverMessage = new PingMessage();
message.setData(JacksonUtil.beanToJson(serverMessage));
@ -88,20 +90,25 @@ public class ModbusConverter {
/**
* OriginaData ==> 0x00 0x00 0x00 0x64
* @param oper
* 授权
* @param register 业务类型
* @param values 授权数据 主版本 次版本 授权ID
* @return
*/
private static InMessage toAuthMessage(short register, int oper, byte[] values) throws JsonProcessingException {
private static InMessage toAuthMessage(short register, byte[] values) throws JsonProcessingException {
if (values == null || values.length < 3) {
return null;
}
byte major = values[CCModBusEntity.SIZE_MAJOR];
byte minor = values[CCModBusEntity.SIZE_MINOR];
long userId = 0;
for(int i=0;i<CCModBusEntity.SIZE_DATA_SINGLE;i++){
for(int i=CCModBusEntity.SIZE_MAJOR+CCModBusEntity.SIZE_MINOR;i<values.length;i++){
userId <<= 8;
userId |= values[i] & 0xFF;
}
// TODO 此处如何传参? 参数仅为authid
AuthMessage message = new AuthMessage(userId);
message.setMessage("{\""+register+"\":" + userId + "}");
AuthMessage message = new AuthMessage(userId, major, minor);
InMessage inMessage = new InMessage();
inMessage.setData(JacksonUtil.beanToJson(message));
inMessage.setToDomain(MessageConstant.DomainType.Server);
@ -130,6 +137,11 @@ public class ModbusConverter {
ccModBusEntity = fromAuthMessage();
break;
}
// 没有权限(未认证)
case UnAuth:{
ccModBusEntity = fromUnAuthMessage();
break;
}
case Ack:{
ccModBusEntity = fromModBusMessage(JacksonUtil.jsonToBean(outMessage.getData(), ModBusAckMessage.class));
@ -179,21 +191,31 @@ public class ModbusConverter {
return ccModBusEntity;
}
/**
* 返回客户端的授权信息转成CCModBus协议
* @return CCModBusEntity
*/
private static CCModBusEntity fromAuthMessage() {
byte []originData = new byte[CCModBusEntity.SIZE_REGISTER_START + CCModBusEntity.SIZE_REGISTER_NUM];
// 寄存器起始地址
originData[0] = MessageConstant.Register.Auth.first;
originData[1] = MessageConstant.Register.Auth.second;
// 寄存器数量
originData[2] = 0x00;
originData[3] = 0x01;
byte []originData = {};
byte addr = CCModBusEntity.ADDRESS;
byte oper = CCModBusEntity.Oper.WRITE_MORE;
byte oper = CCModBusEntity.Oper.AUTH;
CCModBusEntity ccModBusEntity = new CCModBusEntity(addr,oper,originData);
return ccModBusEntity;
}
/**
* 返回客户端的授权信息转成CCModBus协议
* @return CCModBusEntity
*/
private static CCModBusEntity fromUnAuthMessage() {
byte []originData = {};
byte addr = CCModBusEntity.ADDRESS;
byte oper = CCModBusEntity.Oper.UN_AUTH;
CCModBusEntity ccModBusEntity = new CCModBusEntity(addr,oper,originData);
return ccModBusEntity;
}
/**
* OriginalData: 0x00
@ -201,16 +223,10 @@ public class ModbusConverter {
* @return
*/
private static CCModBusEntity fromHeartMessage(PongMessage message){
byte []originData = new byte[CCModBusEntity.SIZE_REGISTER_START + CCModBusEntity.SIZE_REGISTER_NUM];
// 寄存器起始地址
originData[0] = MessageConstant.Register.Ping.first;
originData[1] = MessageConstant.Register.Ping.second;
// 寄存器数量
originData[2] = 0x00;
originData[3] = 0x01;
byte []originData = {};
byte addr = CCModBusEntity.ADDRESS;
byte oper = CCModBusEntity.Oper.WRITE_MORE;
byte oper = CCModBusEntity.Oper.PONG;
CCModBusEntity ccModBusEntity = new CCModBusEntity(addr,oper,originData);
return ccModBusEntity;
}

39
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java

@ -16,7 +16,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@ -26,6 +25,7 @@ import wiki.tall.ccmq.common.config.SettingProps;
import wiki.tall.ccmq.common.util.*;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -46,7 +46,7 @@ public class MessageHandler {
*/
private static final Integer MAX_MESSAGE_NUM = 50;
@Autowired
@Resource
private AmqpTemplate rabbitTemplate;
private static IMessageDao getMessageDao(){
@ -299,6 +299,7 @@ public class MessageHandler {
authSuccess = true;
}
} else if (inSysData.getData().getAuthId() != null) {
// TODO 此处如何校验authid是否有效
//硬件授权
logger.info("authid授权");
// authId有值默认授权通过
@ -451,11 +452,23 @@ public class MessageHandler {
//1.按照功能码读写数据
ModBusMessage modBusMessage = JacksonUtil.jsonToBean(data, ModBusMessage.class);
byte oper = modBusMessage.getData().getOper();
WrapperedChannel wrapperedChannel = ChannelManager.getWrapperedChannelByChannel(ChannelManager.getCurrentChannel());
if (StrUtil.isEmpty(wrapperedChannel.getUserId())) {
// 未授权不允许读写
ClientManager.sendServerMessage(inMessage.getFromDomain(), inMessage.getInvokerMessage(),
OutMessageSet.newInstance().ackId(null).add(new OutMessage(JacksonUtil.beanToJson(new UnAuthMessage())))
);
return;
}
Map<String, Long> register = wrapperedChannel.getRegister();
// 授权数据
Long authId = Long.parseLong(wrapperedChannel.getUserId());
// 版本信息
String version = wrapperedChannel.getVersion();
switch (oper) {
case CCModBusEntity.Oper.READ : {
// TODO 读操作
WrapperedChannel wrapperedChannel = ChannelManager.getWrapperedChannelByChannel(ChannelManager.getCurrentChannel());
Object o = wrapperedChannel.getRegister().get(modBusMessage.getData().getRegister());
Object o = register.get(modBusMessage.getData().getRegister());
break;
}
case CCModBusEntity.Oper.WRITE : {
@ -468,11 +481,11 @@ public class MessageHandler {
byte[] modBusData = modBusMessage.getData().getModBusData();
// 读起始寄存器地址
int startAddr = CCModBusEntity.getRegisterStartAddr(modBusData);
// 写寄存器数量
int num = CCModBusEntity.getRegisterStartNum(modBusData);
WrapperedChannel wrapperedChannel = ChannelManager.getWrapperedChannelByChannel(ChannelManager.getCurrentChannel());
Map<String, Long> register = wrapperedChannel.getRegister();
// 授权数据
Long authId = register.get(String.valueOf(MessageConstant.Register.Auth.getAddr()));
// 接收数据的业务放
byte addr = modBusMessage.getData().getAddr();
OutMessageSet set = new OutMessageSet();
for (int i = 0; i < num; i++) {
// 寄存器的原始数据
@ -493,13 +506,17 @@ public class MessageHandler {
register1.setAuthId(authId);
register1.setAddr(startAddr+i);
register1.setValue(value);
register1.setVersion(version);
OutMessage outMessage1 = new OutMessage(JacksonUtil.beanToJson(register1));
set.add(outMessage1);
}
if (CollectionUtil.isNotEmpty(set.getMessageSet())) {
messageHandler.rabbitTemplate.convertAndSend("wisdom_car",
String mqName = (String)RedisUtil.hget(MessageConstant.Redis.KEY_MQ_COLLECTION, String.valueOf(addr));
logger.info("mqName:{}", mqName);
if (CollectionUtil.isNotEmpty(set.getMessageSet()) && StrUtil.isNotEmpty(mqName)) {
messageHandler.rabbitTemplate.convertAndSend(mqName,
JacksonUtil.beanToJson(set));
logger.info("modbus通知mq:{}", set);
logger.info("modbus通知mq({}):{}",mqName, set);
}
// 3. 封装返回值
outMessage = new OutMessage(JacksonUtil.beanToJson(new ModBusAckMessage(modBusData)));

4
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/AuthMessage.java

@ -40,9 +40,11 @@ public class AuthMessage extends ServerMessage {
data.setMinor(minor);
}
public AuthMessage(Long authId){
public AuthMessage(Long authId,int major,int minor){
this();
data = new Data();
data.setAuthId(authId);
data.setMajor(major);
data.setMinor(minor);
}
}

4
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/ModBusMessage.java

@ -15,6 +15,7 @@ public class ModBusMessage extends ServerMessage {
@lombok.Data
public static class Data{
private byte addr;
private short register;
private byte oper;
private byte[] modBusData;
@ -27,9 +28,10 @@ public class ModBusMessage extends ServerMessage {
public ModBusMessage(){
setType(MessageConstant.ClientMessageType.ModBus.name());
}
public ModBusMessage(short register, byte oper, byte[] modBusData) {
public ModBusMessage(byte addr, short register, byte oper, byte[] modBusData) {
this();
data = new Data();
data.addr = addr;
data.register = register;
data.oper = oper;
data.modBusData = modBusData;

72
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/common/MessageConstant.java

@ -8,39 +8,49 @@ import lombok.Data;
* 消息相关常量
*/
public class MessageConstant {
/**
* 寄存器地址
* redis参数
*/
public enum Register{
/**授权*/
Auth((byte) 0x00, (byte) 0x01),
/**心跳*/
Ping((byte) 0x00, (byte) 0x02),
;
public byte first;
public byte second;
Register(byte first, byte second){
this.first = first;
this.second = second;
}
public static class Redis{
/**mq集合key*/
public static final String KEY_MQ_COLLECTION = "mqCollection";
}
public long getAddr(){
long addr = ((long)first << 8 | second) & 0xFFFF;
return addr;
}
public static Register valueOf(byte first, byte second) {
for(Register register : Register.values()){
if (register.first == first && register.second == second) {
return register;
}
}
return null;
}
}
// /**
// * 寄存器地址
// */
// public enum Register{
// /**授权*/
// Auth((byte) 0x00, (byte) 0x01),
// /**心跳*/
// Ping((byte) 0x00, (byte) 0x02),
// ;
//
// public byte first;
// public byte second;
//
// Register(byte first, byte second){
// this.first = first;
// this.second = second;
//
// }
//
// public long getAddr(){
// long addr = ((long)first << 8 | second) & 0xFFFF;
// return addr;
// }
//
// public static Register valueOf(byte first, byte second) {
// for(Register register : Register.values()){
// if (register.first == first && register.second == second) {
// return register;
// }
// }
// return null;
// }
// }
public enum ClientMessageType{
//客户端心跳
@ -92,7 +102,9 @@ public class MessageConstant {
//客户端收到消息ACK
Ack(0x02),
//客户端请求连接状态
ChannelStatus(0x03);
ChannelStatus(0x03),
// 未认证
UnAuth(0x04);
//撤销某个消息
//DelMessage(0x04),
//客户端请求连接状态

15
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/server/UnAuthMessage.java

@ -0,0 +1,15 @@
package com.ccsens.ccmq.lowlevel.message.server;
import com.ccsens.ccmq.lowlevel.message.common.MessageConstant;
import com.ccsens.ccmq.lowlevel.message.common.ServerMessage;
import lombok.Data;
/**
* @author zhangsan
*/
@Data
public class UnAuthMessage extends ServerMessage {
public UnAuthMessage(){
setType(MessageConstant.ServerMessageType.UnAuth.name());
}
}

6
ccmq/src/main/java/wiki/tall/ccmq/common/TallMessageApplication.java

@ -58,8 +58,8 @@ public class TallMessageApplication implements CommandLineRunner {
nettyWsServer.start();
nettyMBServer.start();
nettyTextServer.start();
for(int i=0;i<1;i++) {
messageHandler.loopSendMessage();
}
// for(int i=0;i<1;i++) {
// messageHandler.loopSendMessage();
// }
}
}

31
ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java

@ -6,6 +6,7 @@ import io.netty.buffer.ByteBuf;
/**
* format: Filter(2) len(1) addr(1) oper(1) data(x) crc(2)
* 授权data: major(1) minor(1) authId(长度由设备决定消息系统以long接收)
* eg: FF FE 0A 10 93 00 00 00 00 00 00 94 2A
* Notice: crc is example ,not the real data
*/
@ -20,10 +21,18 @@ public class CCModBusEntity {
ERROR_NEED_MORE_DATA
}
/**
* 操作类型
*/
public static class Oper{
public static final byte AUTH = (byte) 0X90 ;
public static final byte UN_AUTH = (byte) 0X91 ;
public static final byte PING = (byte) 0X92;
public static final byte PONG = (byte) 0X93;
public final static byte READ = 0X03;
public final static byte WRITE = 0X06;
public final static byte WRITE_MORE = 0X10;
}
public static final int SIZE_FILTER = 2;
@ -37,8 +46,10 @@ public class CCModBusEntity {
public static final int SIZE_DATA_SINGLE = 2;
public static final int SIZE_DATA_MIN = 1;
public static final int SIZE_MAX = 255;
public static final int SIZE_MAJOR = 1;
public static final int SIZE_MINOR = 1;
public static final int SIZE_DATA_MAX = SIZE_MAX - SIZE_ADDR - SIZE_OPER - SIZE_CRC - SIZE_FILTER - SIZE_LEN;
public static final int SIZE_MIN = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_DATA_MIN + SIZE_CRC;
public static final int SIZE_MIN = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_CRC;
public static final byte ADDRESS = 0x14;
public static final int POSITION_DATA = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_REGISTER_START + SIZE_REGISTER_NUM + SIZE_COUNT_LENGTH;
public static final int POSITION_OPER = SIZE_FILTER + SIZE_LEN + SIZE_ADDR;
@ -168,14 +179,24 @@ public class CCModBusEntity {
return originData;
}
/**
* 获取写操作后面的内容
* @return
*/
public byte[] getValue(){
getOriginData();
byte[] values;
if (oper == Oper.WRITE_MORE) {
// TODO 写多个寄存器
byte[] values = null;
byte oper = getOper();
if (oper == Oper.AUTH) {
// 授权 major(1) minor(1) authId(长度由设备决定,消息系统以long接收)
int length = getLen() - SIZE_ADDR - SIZE_OPER - SIZE_CRC;
values = new byte[length];
System.arraycopy(originData, 0, values, 0, length);
} else if (oper == Oper.WRITE_MORE) {
// 写多个寄存器
values = new byte[originData[SIZE_REGISTER_START + SIZE_REGISTER_NUM]];
System.arraycopy(originData, SIZE_REGISTER_START + SIZE_REGISTER_NUM + CCModBusEntity.SIZE_COUNT_LENGTH, values, 0, values.length);
} else {
} else if (oper == Oper.WRITE){
// 写一个寄存器
values = new byte[SIZE_DATA_SINGLE];
System.arraycopy(originData, SIZE_REGISTER_START, values, 0, CCModBusEntity.SIZE_DATA_SINGLE);

2
ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/Register.java

@ -1,5 +1,6 @@
package wiki.tall.ccmq.common.bean.dto.ccmodbus;
import com.rabbitmq.client.impl.ClientVersion;
import lombok.Data;
/**
@ -14,6 +15,7 @@ public class Register {
* 平车编号
*/
private Long authId;
private String version;
/**
* 寄存器地址
*/

6
ccmq/src/main/java/wiki/tall/ccmq/common/config/RedisConfig.java

@ -31,8 +31,10 @@ public class RedisConfig {
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
// // hash的value序列化方式采用jackson
// template.setHashValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用String
template.setHashValueSerializer(stringRedisSerializer);
template.afterPropertiesSet();
return template;
}

3
ccmq/src/main/resources/application-dev.properties

@ -21,7 +21,8 @@ spring.redis.jedis.pool.min-idle=0
# RabbitMQ配置信息
#spring.rabbitmq.host=49.233.89.188
spring.rabbitmq.host=192.168.0.99
#spring.rabbitmq.host=192.168.0.99
spring.rabbitmq.host=www.tall.wiki
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111

25
ccmq/src/main/resources/application-test.properties

@ -28,9 +28,30 @@ spring.rabbitmq.password=111111
# MongoDB配置信息
spring.data.mongodb.uri=mongodb://wei:111111@49.233.89.188:27017/test
# 最大连接数,每一台服务器
spring.data.mongodb.option.max-connection-per-host=100
# 可阻塞线程队列容量
spring.data.mongodb.option.threads-allowed-to-block-for-connection-multiplier=10
# 每一台服务器的最小连接数
spring.data.mongodb.option.min-connection-per-host=1
#连接超时时间 1分钟
spring.data.mongodb.option.connect-timeout=60000
#等待时间 120000 2 * 60 * 1000
spring.data.mongodb.option.max-wait-time=120000
#Socket超时时间
spring.data.mongodb.option.socket-timeout=10000
#保持连接
spring.data.mongodb.option.socket-keep-alive=true
spring.data.mongodb.option.socket-timeout=5000
spring.data.mongodb.option.max-connection-idle-time=10000
## 连接空闲时间8小时,否则连接太过频繁
spring.data.mongodb.option.max-connection-idle-time=28800000
spring.data.mongodb.option.max-connection-life-time=0
#心跳
spring.data.mongodb.option.heartbeat-socket-timeout=10000
spring.data.mongodb.option.heartbeat-connect-timeout=15000
spring.data.mongodb.option.min-heartbeat-frequency=5000
spring.data.mongodb.option.heartbeat-frequency=100000
# Settings
setting.snowflake.workerId=1

2
ccmq/src/main/resources/application.properties

@ -1,5 +1,5 @@
# 选择开发环境{dev|test|prod}
spring.profiles.active=prod
spring.profiles.active=test
# 设置应用名
spring.application.name=tall-message

Loading…
Cancel
Save