Browse Source

20210303tcp认证添加mq类型

master
zy_Java 4 years ago
parent
commit
c52bf174c7
  1. 5
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/ChannelManager.java
  2. 8
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/WrapperedChannel.java
  3. 19
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusConverter.java
  4. 11
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java
  5. 4
      ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/AuthMessage.java
  6. 1
      ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java
  7. 3
      ccmq/src/main/java/wiki/tall/ccmq/common/util/WebConstant.java
  8. 60
      ccmq/src/main/resources/application-wiki.properties
  9. 2
      ccmq/src/main/resources/application.properties
  10. 46
      ccmq/src/main/resources/druid-wiki.properties
  11. 27
      ccmq/src/main/resources/setting-wiki.properties
  12. 2
      pom.xml

5
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/ChannelManager.java

@ -79,15 +79,16 @@ public class ChannelManager {
* @param major 客户端主版本号
* @param minor 客户端次版本号
* @param message 业务相关参数
* @param mqType 硬件设备消息转发mq类型
*/
public static synchronized void authChannel(Channel channel,String userId,Integer major,Integer minor, String message){
public static synchronized void authChannel(Channel channel,String userId,Integer major,Integer minor, String message, Byte mqType){
logger.debug("Invoke authedChannels({},{},{},{})",channel.id().asLongText(),userId,major,minor);
major = major != null ? major : 0;
minor = minor != null ? minor : 0;
WrapperedChannel wrapperedChannel = rawChannels.get(channel);
if(wrapperedChannel != null){
wrapperedChannel.whenAuthed(userId,major,minor, message);
wrapperedChannel.whenAuthed(userId,major,minor, message, mqType);
Set<WrapperedChannel> authedWchannelSet = authedChannels.computeIfAbsent(userId, k -> new HashSet<>());
authedWchannelSet.add(wrapperedChannel);
logger.debug("Authed channel {} with user {}", channel.id().asLongText(), userId);

8
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/WrapperedChannel.java

@ -77,6 +77,11 @@ public class WrapperedChannel {
* 发送数据条数(每发送到一条协议加1包括心跳)
*/
private long dataSendCount;
/**
* mqType,仅用于硬件设备认证时指定消息转发给哪个mq
*/
private Byte mqType;
/**业务相关参数*/
private String message;
/**寄存器 硬件系统*/
@ -154,11 +159,12 @@ public class WrapperedChannel {
authedAtInSeconds = DateUtil.currentSeconds();
}
public void whenAuthed(String userId,int major,int minor, String message){
public void whenAuthed(String userId,int major,int minor, String message, Byte mqType){
whenAuthed();
setVersion(major,minor);
this.userId = userId;
this.message = message;
this.mqType = mqType;
if (StrUtil.isNotEmpty(message)) {
try {
Map<String, Object> map = JacksonUtil.jsonToMap(message);

19
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/client/netty/tcphexserver/ModbusConverter.java

@ -102,19 +102,26 @@ public class ModbusConverter {
* @return
*/
private static InMessage toAuthMessage(short register, byte[] values) throws JsonProcessingException {
if (values == null || values.length < 3) {
int minLength = 4;
if (values == null || values.length < minLength) {
return null;
}
byte major = values[CCModBusEntity.SIZE_MAJOR];
byte minor = values[CCModBusEntity.SIZE_MINOR];
int majorIndex = CCModBusEntity.SIZE_MAJOR - 1;
int minorIndex = majorIndex + CCModBusEntity.SIZE_MINOR;
int mqIndex = minorIndex + CCModBusEntity.SIZE_MQ;
byte major = values[majorIndex];
byte minor = values[minorIndex];
byte mq = values[mqIndex];
long userId = 0;
for(int i=CCModBusEntity.SIZE_MAJOR+CCModBusEntity.SIZE_MINOR;i<values.length;i++){
for(int i=mqIndex + 1;i<values.length;i++){
userId <<= 8;
userId |= values[i] & 0xFF;
}
AuthMessage message = new AuthMessage(userId, major, minor);
AuthMessage message = new AuthMessage(userId, major, minor, mq);
InMessage inMessage = new InMessage();
inMessage.setData(JacksonUtil.beanToJson(message));
inMessage.setToDomain(MessageConstant.DomainType.Server);

11
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/MessageHandler.java

@ -294,7 +294,7 @@ public class MessageHandler {
userId = getUserService().getUserIdByToken(inSysData.getData().getToken());
}
if(StrUtil.isNotEmpty(userId)){
ChannelManager.authChannel(ChannelManager.getCurrentChannel(),userId,inSysData.getData().getMajor(),inSysData.getData().getMinor(), inSysData.getMessage());
ChannelManager.authChannel(ChannelManager.getCurrentChannel(),userId,inSysData.getData().getMajor(),inSysData.getData().getMinor(), inSysData.getMessage(), null);
onClientOnLine(MessageConstant.DomainType.User,userId);
authSuccess = true;
}
@ -303,7 +303,7 @@ public class MessageHandler {
//硬件授权
logger.info("authid授权");
// authId有值默认授权通过
ChannelManager.authChannel(ChannelManager.getCurrentChannel(),String.valueOf(inSysData.getData().getAuthId()),inSysData.getData().getMajor(),inSysData.getData().getMinor(), inSysData.getMessage());
ChannelManager.authChannel(ChannelManager.getCurrentChannel(),String.valueOf(inSysData.getData().getAuthId()),inSysData.getData().getMajor(),inSysData.getData().getMinor(), inSysData.getMessage(), inSysData.getData().getMq());
logger.info("授权完成");
onClientOnLine(MessageConstant.DomainType.User,String.valueOf(inSysData.getData().getAuthId()));
logger.info("设置在线");
@ -463,6 +463,8 @@ public class MessageHandler {
Map<String, Long> register = wrapperedChannel.getRegister();
// 授权数据
Long authId = Long.parseLong(wrapperedChannel.getUserId());
// 接收数据的业务放
byte mqType = wrapperedChannel.getMqType();
// 版本信息
String version = wrapperedChannel.getVersion();
switch (oper) {
@ -483,8 +485,7 @@ public class MessageHandler {
int startAddr = CCModBusEntity.getRegisterStartAddr(modBusData);
// 写寄存器数量
int num = CCModBusEntity.getRegisterStartNum(modBusData);
// 接收数据的业务放
byte addr = modBusMessage.getData().getAddr();
OutMessageSet set = new OutMessageSet();
for (int i = 0; i < num; i++) {
@ -510,7 +511,7 @@ public class MessageHandler {
OutMessage outMessage1 = new OutMessage(JacksonUtil.beanToJson(register1));
set.add(outMessage1);
}
String mqName = (String)RedisUtil.hget(MessageConstant.Redis.KEY_MQ_COLLECTION, String.valueOf(addr));
String mqName = (String)RedisUtil.hget(MessageConstant.Redis.KEY_MQ_COLLECTION, String.valueOf(mqType));
logger.info("mqName:{}", mqName);
if (CollectionUtil.isNotEmpty(set.getMessageSet()) && StrUtil.isNotEmpty(mqName)) {

4
ccmq/src/main/java/com/ccsens/ccmq/lowlevel/message/client/AuthMessage.java

@ -18,6 +18,7 @@ public class AuthMessage extends ServerMessage {
private String token;
private Long authId;
private String userId;
private byte mq;
// 主版本号
private int major;
// 此版本号
@ -40,12 +41,13 @@ public class AuthMessage extends ServerMessage {
data.setMinor(minor);
}
public AuthMessage(Long authId,int major,int minor){
public AuthMessage(Long authId,int major,int minor, byte mq){
this();
data = new Data();
data.setAuthId(authId);
data.setMajor(major);
data.setMinor(minor);
data.mq = mq;
}
public AuthMessage(Long authId){

1
ccmq/src/main/java/wiki/tall/ccmq/common/bean/dto/ccmodbus/CCModBusEntity.java

@ -48,6 +48,7 @@ public class CCModBusEntity {
public static final int SIZE_MAX = 255;
public static final int SIZE_MAJOR = 1;
public static final int SIZE_MINOR = 1;
public static final int SIZE_MQ = 1;
public static final int SIZE_DATA_MAX = SIZE_MAX - SIZE_ADDR - SIZE_OPER - SIZE_CRC - SIZE_FILTER - SIZE_LEN;
public static final int SIZE_MIN = SIZE_FILTER + SIZE_LEN + SIZE_ADDR + SIZE_OPER + SIZE_CRC;
public static final byte ADDRESS = 0x14;

3
ccmq/src/main/java/wiki/tall/ccmq/common/util/WebConstant.java

@ -9,7 +9,8 @@ public class WebConstant {
/***
* TCP协议是作为从设备时的ID
*/
public static final byte SLAVE_ID = 1;
public static final byte SLAVE_ID = 20;
// public static final byte SLAVE_ID = 1;
public static final String DYNAMIC_DATASOURCE_SCHEMA_KEY = "${schema}";
public static final String JWT_ACCESS_TOKEN_SECERT = Base64.encode("v%R5jNdmouCN?");

60
ccmq/src/main/resources/application-wiki.properties

@ -0,0 +1,60 @@
# eureka注册:禁用
eureka.client.register-with-eureka = false
euraka.client.fetch-registry = false
# File Upload
spring.servlet.multipart.max-file-size=50MB
spring.servlet.multipart.max-request-size=100MB
# Redis配置信息(spring有默认配置,不配置默认连接127.0.0.1)
# Redis数据库索引(默认为0)
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.password=
spring.redis.timeout=1000ms
spring.redis.jedis.pool.max-active=200
spring.redis.jedis.pool.max-wait=-1ms
spring.redis.jedis.pool.max-idle=10
spring.redis.jedis.pool.min-idle=0
# RabbitMQ配置信息
#spring.rabbitmq.host=api.ccsens.com
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=yu
spring.rabbitmq.password=sd12345
# spring.rabbitmq.listener.simple.acknowledge-mode=manual
# MongoDB配置信息
spring.data.mongodb.uri=mongodb://yulei:sxwiki_mongo#@49.232.6.143:27017/test
# 最大连接数,每一台服务器
spring.data.mongodb.option.max-connection-per-host=100
# 可阻塞线程队列容量
spring.data.mongodb.option.threads-allowed-to-block-for-connection-multiplier=10
# 每一台服务器的最小连接数
spring.data.mongodb.option.min-connection-per-host=1
#连接超时时间 1分钟
spring.data.mongodb.option.connect-timeout=60000
#等待时间 120000 2 * 60 * 1000
spring.data.mongodb.option.max-wait-time=120000
#Socket超时时间
spring.data.mongodb.option.socket-timeout=10000
#保持连接
spring.data.mongodb.option.socket-keep-alive=true
## 连接空闲时间8小时,否则连接太过频繁
spring.data.mongodb.option.max-connection-idle-time=28800000
spring.data.mongodb.option.max-connection-life-time=0
#心跳
spring.data.mongodb.option.heartbeat-socket-timeout=10000
spring.data.mongodb.option.heartbeat-connect-timeout=15000
spring.data.mongodb.option.min-heartbeat-frequency=5000
spring.data.mongodb.option.heartbeat-frequency=100000
# Settings
setting.snowflake.workerId=1
setting.snowflake.datacenterId=1

2
ccmq/src/main/resources/application.properties

@ -1,5 +1,5 @@
# 选择开发环境{dev|test|prod}
spring.profiles.active=test
spring.profiles.active=dev
# 设置应用名
spring.application.name=tall-message

46
ccmq/src/main/resources/druid-wiki.properties

@ -0,0 +1,46 @@
spring.datasource.druid.dynamicUrl=jdbc:mysql://localhost:3306/${schema}
#spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/db_encryptmoneypacket?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
#spring.datasource.druid.username=root
#spring.datasource.druid.password=111111
spring.datasource.druid.url=jdbc:mysql://49.232.6.143/tall?useUnicode=true&characterEncoding=UTF-8
spring.datasource.druid.username=root
spring.datasource.druid.password=po3OynBO[M3579p6L7)o
spring.datasource.druid.driverClassName=com.mysql.jdbc.Driver
# 连接池的配置信息
# 初始化大小,最小,最大
spring.datasource.druid.initialSize=5
spring.datasource.druid.minIdle=5
spring.datasource.druid.maxActive=20
# 配置获取连接等待超时的时间
spring.datasource.druid.maxWait=60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
spring.datasource.druid.timeBetweenEvictionRunsMillis=60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
spring.datasource.druid.minEvictableIdleTimeMillis=300000
spring.datasource.druid.validationQuery=SELECT 1 FROM DUAL
spring.datasource.druid.testWhileIdle=true
spring.datasource.druid.testOnBorrow=false
spring.datasource.druid.testOnReturn=false
# 打开PSCache,并且指定每个连接上PSCache的大小
spring.datasource.druid.poolPreparedStatements=true
spring.datasource.druid.maxPoolPreparedStatementPerConnectionSize=20
# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
#spring.datasource.druid.filters=stat,wall,log4j
spring.datasource.druid.filters=stat,wall
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
spring.datasource.druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
# Druid访问信息
spring.datasource.druid.servletName=druidServlet
spring.datasource.druid.servletUrlMapping=/druid/*
spring.datasource.druid.servletLoginUsername=druid
spring.datasource.druid.servletLoginPassword=111111
spring.datasource.druid.servletLogSlowSql=true
spring.datasource.druid.servletResetEnable=true
spring.datasource.druid.filterName=druidFilter
spring.datasource.druid.filterUrlPattern=/*
spring.datasource.druid.filterExclusions=*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*
spring.datasource.druid.filterProfileEnable=true

27
ccmq/src/main/resources/setting-wiki.properties

@ -0,0 +1,27 @@
=## SnowFlake Setting
setting.snowflake.workerId=1
setting.snowflake.dataCenterId=1
## RabbitMQ Setting
setting.mq.inName = tall_message_1
setting.mq.outName = tall_message_2
## Ack Setting (never | error | always)
setting.server.name = server_default
setting.server.acktype = error
setting.client.acktype = always
## Heart Setting (s)
setting.keepAlive.enable = true
setting.keepAlive.maxIdleSeconds = 40
## Netty Server Setting
setting.netty.ws.port = 8196
setting.netty.ws.uri = /message/v4.0/ws
setting.netty.ws.type = netty_ws
setting.netty.tcptext.port = 8199
setting.netty.tcptext.type = netty_tcp_text
setting.netty.tcphex.port = 8195
setting.netty.tcphex.type = netty_tcp_hex

2
pom.xml

@ -9,7 +9,7 @@
<module>eureka</module>
<module>ccmq</module>
<module>histrixmonitor</module>
<module>scheduler</module>
<!-- <module>scheduler</module>-->
</modules>
<groupId>com.ccsens</groupId>
<artifactId>common</artifactId>

Loading…
Cancel
Save