Browse Source

20210511处理热成像数据

logistics
zy_Java 4 years ago
parent
commit
a654e1bca7
  1. 3
      cloudutil/src/main/resources/application-util-test.yml
  2. 8
      logistics/src/main/java/com/ccsens/logistics/LogisticsApplication.java
  3. 82
      logistics/src/main/java/com/ccsens/logistics/Netty/NettyClient.java
  4. 31
      logistics/src/main/java/com/ccsens/logistics/Netty/SimpleClientHandler.java
  5. 15
      logistics/src/main/java/com/ccsens/logistics/Util/Constant.java
  6. 2
      logistics/src/main/java/com/ccsens/logistics/bean/dto/CarRecordDto.java
  7. 33
      logistics/src/main/java/com/ccsens/logistics/bean/dto/ThermalImageryDto.java
  8. 4
      logistics/src/main/java/com/ccsens/logistics/service/INettyService.java
  9. 13
      logistics/src/main/java/com/ccsens/logistics/service/IThermalImageryService.java
  10. 13
      logistics/src/main/java/com/ccsens/logistics/service/NerryService.java
  11. 89
      logistics/src/main/java/com/ccsens/logistics/service/ThermalImageryService.java
  12. 2
      logistics/src/main/resources/application-test.yml
  13. 32
      util/src/test/java/com/ccsens/util/nattyTest.java

3
cloudutil/src/main/resources/application-util-test.yml

@ -85,7 +85,8 @@ eureka:
# defaultZone: http://admin:admin@49.232.6.143:7010/eureka/
# defaultZone: http://admin:admin@192.168.0.99:7010/eureka/
defaultZone: http://admin:admin@192.168.31.13:7010/eureka/
defaultZone: http://admin:admin@192.168.4.113:7010/eureka/
# defaultZone: http://admin:admin@192.168.31.13:7010/eureka/
# defaultZone: http://admin:admin@test.tall.wiki:7010/eureka/
instance:
# 是否注册IP到eureka server,如不指定或设为false,那就回注册主机名到eureka server

8
logistics/src/main/java/com/ccsens/logistics/LogisticsApplication.java

@ -1,6 +1,6 @@
package com.ccsens.logistics;
import com.ccsens.logistics.service.INettyService;
import com.ccsens.logistics.Netty.NettyClient;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
@ -10,8 +10,6 @@ import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableAsync;
import javax.annotation.Resource;
@MapperScan(basePackages = {"com.ccsens.logistics.persist.*"})
@ServletComponentScan
@EnableAsync
@ -20,8 +18,6 @@ import javax.annotation.Resource;
@EnableFeignClients(basePackages = "com.ccsens.cloudutil.feign")
@SpringBootApplication(scanBasePackages = "com.ccsens")
public class LogisticsApplication implements CommandLineRunner {
@Resource
private INettyService nettyService;
public static void main(String[] args) {
SpringApplication.run(LogisticsApplication.class, args);
@ -30,6 +26,6 @@ public class LogisticsApplication implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
NettyClient.start();
}
}

82
logistics/src/main/java/com/ccsens/logistics/Netty/NettyClient.java

@ -0,0 +1,82 @@
package com.ccsens.logistics.Netty;
import com.ccsens.logistics.service.IThermalImageryService;
import com.ccsens.logistics.service.ThermalImageryService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringEncoder;
import javax.annotation.Resource;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class NettyClient {
@Resource
private IThermalImageryService thermalImageryService;
public static void start() throws Exception {
// 首先,netty通过ServerBootstrap启动服务端
Bootstrap client = new Bootstrap();
//第1步 定义线程组,处理读写和链接事件,没有了accept事件
EventLoopGroup group = new NioEventLoopGroup();
client.group(group );
//第2步 绑定客户端通道
client.channel(NioSocketChannel.class);
//第3步 给NIoSocketChannel初始化handler, 处理读写事件
//通道是NioSocketChannel
client.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//字符串编码器,一定要加在SimpleClientHandler 的上面
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(
Integer.MAX_VALUE, Delimiters.lineDelimiter()[0]));
//找到他的管道 增加他的handler
ch.pipeline().addLast(new SimpleClientHandler());
}
});
//连接服务器
ChannelFuture future = client.connect("47.106.129.173", 15506).sync();
//请求热成像图
Runnable runnable = new Runnable() {
@Override
public void run() {
// thermalImageryService.getImagery();
future.channel().writeAndFlush("{\"service_id\":\"123\",\"type\":\"READSN4\",\"thing_id\":\"050005A2\",\"data\":{\"GetType\":\"Auto\",\"Highest\":70,\"Lowest\":30}}"+"\r\n");
future.channel().writeAndFlush("{\"service_id\":\"123\",\"type\":\"READSN4\",\"thing_id\":\"050005BD\",\"data\":{\"GetType\":\"Auto\",\"Highest\":70,\"Lowest\":30}}"+"\r\n");
}
};
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor();
// 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
service.scheduleAtFixedRate(runnable, 2, 5, TimeUnit.SECONDS);
// 创建并执行在给定延迟后启用的 ScheduledFuture。
// 参数:
// callable - 要执行的功能
// delay - 从现在开始延迟执行的时间
// unit - 延迟参数的时间单位
// 返回:
// 可用于提取结果或取消的 ScheduledFuture
// service.schedule(runnable, 5000, TimeUnit.MILLISECONDS);
// //当通道关闭了,就继续往下走
// future.channel().closeFuture().sync();
//接收服务端返回的数据
// AttributeKey<String> key = AttributeKey.valueOf("ServerData");
// Object result = future.channel().attr(key).get();
// System.out.println(result.toString());
}
}

31
logistics/src/main/java/com/ccsens/logistics/Netty/SimpleClientHandler.java

@ -0,0 +1,31 @@
package com.ccsens.logistics.Netty;
import com.ccsens.logistics.service.IThermalImageryService;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import org.hibernate.validator.constraints.pl.REGON;
import javax.annotation.Resource;
import java.nio.charset.Charset;
public class SimpleClientHandler extends ChannelInboundHandlerAdapter{
@Resource
private IThermalImageryService thermalImageryService;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
String value = ((ByteBuf) msg).toString(Charset.defaultCharset());
System.out.println("服务器端返回的数据:" + value);
thermalImageryService.disposeMessage(value);
}
// AttributeKey<String> key = AttributeKey.valueOf("ServerData");
// ctx.channel().attr(key).set("客户端处理完毕");
//把客户端的通道关闭
// ctx.channel().close();
}
}

15
logistics/src/main/java/com/ccsens/logistics/Util/Constant.java

@ -0,0 +1,15 @@
package com.ccsens.logistics.Util;
/**
* @author
*/
public class Constant {
/**
* 心跳信息
*/
public static final String HEARTBEAT = "SN4";
/**
* 热成像图片
*/
public static final String THERMAL_IMAGERY = "SN4Image";
}

2
logistics/src/main/java/com/ccsens/logistics/bean/dto/CarRecordDto.java

@ -25,4 +25,6 @@ public class CarRecordDto {
private Long carWeight;
}
}

33
logistics/src/main/java/com/ccsens/logistics/bean/dto/ThermalImageryDto.java

@ -0,0 +1,33 @@
package com.ccsens.logistics.bean.dto;
import com.netflix.discovery.converters.Auto;
import lombok.Data;
@Data
public class ThermalImageryDto {
/**
* 去服务端请求热成像图
*/
@Data
public static class RequestThermalImagery{
private int service_id = 123;
private String type = "READSN4";
private String thing_id;
private String data;
}
@Data
public static class RequestThermalImageryDate{
private String GetType = "Auto";
private int Highest = 70;
private int Lowest = 30;
}
@Data
public static class GetThermalImageryDate{
private String maxT;
private String maxTx;
private String maxTy;
private String imageData;
}
}

4
logistics/src/main/java/com/ccsens/logistics/service/INettyService.java

@ -1,4 +0,0 @@
package com.ccsens.logistics.service;
public interface INettyService {
}

13
logistics/src/main/java/com/ccsens/logistics/service/IThermalImageryService.java

@ -0,0 +1,13 @@
package com.ccsens.logistics.service;
import java.util.List;
public interface IThermalImageryService {
List<String> getImagery();
/**
* 处理返回的数据
* @param value 数据json
*/
void disposeMessage(String value);
}

13
logistics/src/main/java/com/ccsens/logistics/service/NerryService.java

@ -1,13 +0,0 @@
package com.ccsens.logistics.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@Transactional(propagation = Propagation.REQUIRED,rollbackFor = Exception.class)
public class NerryService implements INettyService{
}

89
logistics/src/main/java/com/ccsens/logistics/service/ThermalImageryService.java

@ -0,0 +1,89 @@
package com.ccsens.logistics.service;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.ccsens.logistics.Util.Constant;
import com.ccsens.logistics.bean.dto.ThermalImageryDto;
import com.ccsens.logistics.bean.po.LogisticsEquipment;
import com.ccsens.logistics.bean.po.LogisticsEquipmentExample;
import com.ccsens.logistics.bean.po.LogisticsHeatImagingRecord;
import com.ccsens.logistics.persist.mapper.LogisticsEquipmentMapper;
import com.ccsens.logistics.persist.mapper.LogisticsHeatImagingRecordMapper;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Service
@Transactional(propagation = Propagation.REQUIRED,rollbackFor = Exception.class)
public class ThermalImageryService implements IThermalImageryService{
@Resource
private LogisticsEquipmentMapper logisticsEquipmentMapper;
@Resource
private LogisticsHeatImagingRecordMapper heatImagingRecordMapper;
@Resource
private Snowflake snowflake;
@Override
public List<String> getImagery(){
List<String> str = new ArrayList<>();
//查询redis内的热成像摄像头的编号
//没有则查找数据库
LogisticsEquipmentExample equipmentExample = new LogisticsEquipmentExample();
equipmentExample.createCriteria().andEquipmentTypeEqualTo((byte) 4);
List<LogisticsEquipment> logisticsEquipments = logisticsEquipmentMapper.selectByExample(equipmentExample);
if(CollectionUtil.isNotEmpty(logisticsEquipments)){
for (LogisticsEquipment equipment:logisticsEquipments){
ThermalImageryDto.RequestThermalImagery thermalImagery = new ThermalImageryDto.RequestThermalImagery();
thermalImagery.setThing_id(equipment.getEquipmentNumber());
String s = JSONObject.toJSONString(thermalImagery);
str.add(s);
}
}
//编写请求对象
return str;
}
@Override
public void disposeMessage(String value) {
//解析字符串
try {
ThermalImageryDto.RequestThermalImagery thermalImagery = JSONObject.parseObject(value,ThermalImageryDto.RequestThermalImagery.class);
switch (thermalImagery.getType()){
case Constant.HEARTBEAT:
//TODO 心跳不处理
break;
case Constant.THERMAL_IMAGERY:
if(StrUtil.isNotEmpty(thermalImagery.getData())) {
ThermalImageryDto.GetThermalImageryDate date = JSONObject.parseObject(thermalImagery.getData(), ThermalImageryDto.GetThermalImageryDate.class);
LogisticsHeatImagingRecord heatImagingRecord = new LogisticsHeatImagingRecord();
heatImagingRecord.setId(snowflake.nextId());
heatImagingRecord.setMaxT(new BigDecimal(date.getMaxT()));
heatImagingRecord.setMaxTx(date.getMaxTx());
heatImagingRecord.setMaxTy(date.getMaxTy());
heatImagingRecord.setImageData(date.getImageData());
heatImagingRecordMapper.insertSelective(heatImagingRecord);
}
break;
default:
break;
}
thermalImagery.getType();
}catch (Exception e){
log.info("解析热成像返回数据异常 {}",e);
}
}
}

2
logistics/src/main/resources/application-test.yml

@ -28,7 +28,7 @@ swagger:
enable: true
eureka:
instance:
ip-address: 192.168.31.13
ip-address: 127.0.0.1
file:
path: /home/cloud/logistics/uploads/
signUpUrl: https://test.tall.wiki/compete/

32
util/src/test/java/com/ccsens/util/nattyTest.java

@ -10,6 +10,10 @@ import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringEncoder;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class nattyTest {
@ -41,13 +45,33 @@ public class nattyTest {
ChannelFuture future = client.connect("47.106.129.173", 15506).sync();
for(int i=0;i<5;i++){
// for(int i=0;i<5;i++){
// future.channel().writeAndFlush("{\"service_id\":\"123\",\"type\":\"READSN4\",\"thing_id\":\"050005A2\",\"data\":{\"GetType\":\"Auto\",\"Highest\":70,\"Lowest\":30}}"+"\r\n");
// System.out.println("请求");
// Thread.sleep(5000);
// }
Long time = System.currentTimeMillis();
Runnable runnable = new Runnable() {
@Override
public void run() {
// task to run goes here
future.channel().writeAndFlush("{\"service_id\":\"123\",\"type\":\"READSN4\",\"thing_id\":\"050005A2\",\"data\":{\"GetType\":\"Auto\",\"Highest\":70,\"Lowest\":30}}"+"\r\n");
System.out.println("请求");
Thread.sleep(5000);
System.out.println(System.currentTimeMillis() - time);
}
};
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor();
// 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
service.scheduleAtFixedRate(runnable, 1, 3, TimeUnit.SECONDS);
// 创建并执行在给定延迟后启用的 ScheduledFuture。
// 参数:
// callable - 要执行的功能
// delay - 从现在开始延迟执行的时间
// unit - 延迟参数的时间单位
// 返回:
// 可用于提取结果或取消的 ScheduledFuture
// service.schedule(runnable, 5000, TimeUnit.MILLISECONDS);
// //当通道关闭了,就继续往下走
// future.channel().closeFuture().sync();

Loading…
Cancel
Save