diff --git a/cloudutil/src/main/resources/application-util-test.yml b/cloudutil/src/main/resources/application-util-test.yml index 6b1cab01..dc540800 100644 --- a/cloudutil/src/main/resources/application-util-test.yml +++ b/cloudutil/src/main/resources/application-util-test.yml @@ -85,7 +85,8 @@ eureka: # defaultZone: http://admin:admin@49.232.6.143:7010/eureka/ # defaultZone: http://admin:admin@192.168.0.99:7010/eureka/ - defaultZone: http://admin:admin@192.168.31.13:7010/eureka/ + defaultZone: http://admin:admin@192.168.4.113:7010/eureka/ +# defaultZone: http://admin:admin@192.168.31.13:7010/eureka/ # defaultZone: http://admin:admin@test.tall.wiki:7010/eureka/ instance: # 是否注册IP到eureka server,如不指定或设为false,那就回注册主机名到eureka server diff --git a/logistics/src/main/java/com/ccsens/logistics/LogisticsApplication.java b/logistics/src/main/java/com/ccsens/logistics/LogisticsApplication.java index fed962d7..b4fd7974 100644 --- a/logistics/src/main/java/com/ccsens/logistics/LogisticsApplication.java +++ b/logistics/src/main/java/com/ccsens/logistics/LogisticsApplication.java @@ -1,6 +1,6 @@ package com.ccsens.logistics; -import com.ccsens.logistics.service.INettyService; +import com.ccsens.logistics.Netty.NettyClient; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; @@ -10,8 +10,6 @@ import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.scheduling.annotation.EnableAsync; -import javax.annotation.Resource; - @MapperScan(basePackages = {"com.ccsens.logistics.persist.*"}) @ServletComponentScan @EnableAsync @@ -20,8 +18,6 @@ import javax.annotation.Resource; @EnableFeignClients(basePackages = "com.ccsens.cloudutil.feign") @SpringBootApplication(scanBasePackages = "com.ccsens") public class LogisticsApplication implements CommandLineRunner { - @Resource - private INettyService nettyService; public static void main(String[] args) { SpringApplication.run(LogisticsApplication.class, args); @@ -30,6 +26,6 @@ public class LogisticsApplication implements CommandLineRunner { @Override public void run(String... args) throws Exception { - + NettyClient.start(); } } diff --git a/logistics/src/main/java/com/ccsens/logistics/Netty/NettyClient.java b/logistics/src/main/java/com/ccsens/logistics/Netty/NettyClient.java new file mode 100644 index 00000000..302d6e70 --- /dev/null +++ b/logistics/src/main/java/com/ccsens/logistics/Netty/NettyClient.java @@ -0,0 +1,82 @@ +package com.ccsens.logistics.Netty; + +import com.ccsens.logistics.service.IThermalImageryService; +import com.ccsens.logistics.service.ThermalImageryService; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.Delimiters; +import io.netty.handler.codec.string.StringEncoder; + +import javax.annotation.Resource; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class NettyClient { + @Resource + private IThermalImageryService thermalImageryService; + + public static void start() throws Exception { + // 首先,netty通过ServerBootstrap启动服务端 + Bootstrap client = new Bootstrap(); + + //第1步 定义线程组,处理读写和链接事件,没有了accept事件 + EventLoopGroup group = new NioEventLoopGroup(); + client.group(group ); + + //第2步 绑定客户端通道 + client.channel(NioSocketChannel.class); + + //第3步 给NIoSocketChannel初始化handler, 处理读写事件 + //通道是NioSocketChannel + client.handler(new ChannelInitializer() { + @Override + protected void initChannel(NioSocketChannel ch) throws Exception { + //字符串编码器,一定要加在SimpleClientHandler 的上面 + ch.pipeline().addLast(new StringEncoder()); + ch.pipeline().addLast(new DelimiterBasedFrameDecoder( + Integer.MAX_VALUE, Delimiters.lineDelimiter()[0])); + //找到他的管道 增加他的handler + ch.pipeline().addLast(new SimpleClientHandler()); + } + }); + + //连接服务器 + ChannelFuture future = client.connect("47.106.129.173", 15506).sync(); + //请求热成像图 + Runnable runnable = new Runnable() { + @Override + public void run() { +// thermalImageryService.getImagery(); + future.channel().writeAndFlush("{\"service_id\":\"123\",\"type\":\"READSN4\",\"thing_id\":\"050005A2\",\"data\":{\"GetType\":\"Auto\",\"Highest\":70,\"Lowest\":30}}"+"\r\n"); + future.channel().writeAndFlush("{\"service_id\":\"123\",\"type\":\"READSN4\",\"thing_id\":\"050005BD\",\"data\":{\"GetType\":\"Auto\",\"Highest\":70,\"Lowest\":30}}"+"\r\n"); + } + }; + ScheduledExecutorService service = Executors + .newSingleThreadScheduledExecutor(); + // 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间 + service.scheduleAtFixedRate(runnable, 2, 5, TimeUnit.SECONDS); +// 创建并执行在给定延迟后启用的 ScheduledFuture。 +// 参数: +// callable - 要执行的功能 +// delay - 从现在开始延迟执行的时间 +// unit - 延迟参数的时间单位 +// 返回: +// 可用于提取结果或取消的 ScheduledFuture +// service.schedule(runnable, 5000, TimeUnit.MILLISECONDS); + +// //当通道关闭了,就继续往下走 +// future.channel().closeFuture().sync(); + + //接收服务端返回的数据 +// AttributeKey key = AttributeKey.valueOf("ServerData"); +// Object result = future.channel().attr(key).get(); +// System.out.println(result.toString()); + } + +} diff --git a/logistics/src/main/java/com/ccsens/logistics/Netty/SimpleClientHandler.java b/logistics/src/main/java/com/ccsens/logistics/Netty/SimpleClientHandler.java new file mode 100644 index 00000000..e18f05cc --- /dev/null +++ b/logistics/src/main/java/com/ccsens/logistics/Netty/SimpleClientHandler.java @@ -0,0 +1,31 @@ +package com.ccsens.logistics.Netty; + +import com.ccsens.logistics.service.IThermalImageryService; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.AttributeKey; +import org.hibernate.validator.constraints.pl.REGON; + +import javax.annotation.Resource; +import java.nio.charset.Charset; + +public class SimpleClientHandler extends ChannelInboundHandlerAdapter{ + @Resource + private IThermalImageryService thermalImageryService; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof ByteBuf) { + String value = ((ByteBuf) msg).toString(Charset.defaultCharset()); + System.out.println("服务器端返回的数据:" + value); + thermalImageryService.disposeMessage(value); + } + +// AttributeKey key = AttributeKey.valueOf("ServerData"); +// ctx.channel().attr(key).set("客户端处理完毕"); + + //把客户端的通道关闭 +// ctx.channel().close(); + } +} diff --git a/logistics/src/main/java/com/ccsens/logistics/Util/Constant.java b/logistics/src/main/java/com/ccsens/logistics/Util/Constant.java new file mode 100644 index 00000000..e60eeba2 --- /dev/null +++ b/logistics/src/main/java/com/ccsens/logistics/Util/Constant.java @@ -0,0 +1,15 @@ +package com.ccsens.logistics.Util; + +/** + * @author 逗 + */ +public class Constant { + /** + * 心跳信息 + */ + public static final String HEARTBEAT = "SN4"; + /** + * 热成像图片 + */ + public static final String THERMAL_IMAGERY = "SN4Image"; +} diff --git a/logistics/src/main/java/com/ccsens/logistics/bean/dto/CarRecordDto.java b/logistics/src/main/java/com/ccsens/logistics/bean/dto/CarRecordDto.java index 29c67f28..91ab08b0 100644 --- a/logistics/src/main/java/com/ccsens/logistics/bean/dto/CarRecordDto.java +++ b/logistics/src/main/java/com/ccsens/logistics/bean/dto/CarRecordDto.java @@ -25,4 +25,6 @@ public class CarRecordDto { private Long carWeight; } + + } diff --git a/logistics/src/main/java/com/ccsens/logistics/bean/dto/ThermalImageryDto.java b/logistics/src/main/java/com/ccsens/logistics/bean/dto/ThermalImageryDto.java new file mode 100644 index 00000000..7513f849 --- /dev/null +++ b/logistics/src/main/java/com/ccsens/logistics/bean/dto/ThermalImageryDto.java @@ -0,0 +1,33 @@ +package com.ccsens.logistics.bean.dto; + +import com.netflix.discovery.converters.Auto; +import lombok.Data; + +@Data +public class ThermalImageryDto { + /** + * 去服务端请求热成像图 + */ + @Data + public static class RequestThermalImagery{ + private int service_id = 123; + private String type = "READSN4"; + private String thing_id; + private String data; + } + + @Data + public static class RequestThermalImageryDate{ + private String GetType = "Auto"; + private int Highest = 70; + private int Lowest = 30; + } + + @Data + public static class GetThermalImageryDate{ + private String maxT; + private String maxTx; + private String maxTy; + private String imageData; + } +} diff --git a/logistics/src/main/java/com/ccsens/logistics/service/INettyService.java b/logistics/src/main/java/com/ccsens/logistics/service/INettyService.java deleted file mode 100644 index 27e57dd1..00000000 --- a/logistics/src/main/java/com/ccsens/logistics/service/INettyService.java +++ /dev/null @@ -1,4 +0,0 @@ -package com.ccsens.logistics.service; - -public interface INettyService { -} diff --git a/logistics/src/main/java/com/ccsens/logistics/service/IThermalImageryService.java b/logistics/src/main/java/com/ccsens/logistics/service/IThermalImageryService.java new file mode 100644 index 00000000..50a8ae81 --- /dev/null +++ b/logistics/src/main/java/com/ccsens/logistics/service/IThermalImageryService.java @@ -0,0 +1,13 @@ +package com.ccsens.logistics.service; + +import java.util.List; + +public interface IThermalImageryService { + List getImagery(); + + /** + * 处理返回的数据 + * @param value 数据json + */ + void disposeMessage(String value); +} diff --git a/logistics/src/main/java/com/ccsens/logistics/service/NerryService.java b/logistics/src/main/java/com/ccsens/logistics/service/NerryService.java deleted file mode 100644 index 27629198..00000000 --- a/logistics/src/main/java/com/ccsens/logistics/service/NerryService.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.ccsens.logistics.service; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; - -@Slf4j -@Service -@Transactional(propagation = Propagation.REQUIRED,rollbackFor = Exception.class) -public class NerryService implements INettyService{ - -} diff --git a/logistics/src/main/java/com/ccsens/logistics/service/ThermalImageryService.java b/logistics/src/main/java/com/ccsens/logistics/service/ThermalImageryService.java new file mode 100644 index 00000000..2b41a86f --- /dev/null +++ b/logistics/src/main/java/com/ccsens/logistics/service/ThermalImageryService.java @@ -0,0 +1,89 @@ +package com.ccsens.logistics.service; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.lang.Snowflake; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSONObject; +import com.ccsens.logistics.Util.Constant; +import com.ccsens.logistics.bean.dto.ThermalImageryDto; +import com.ccsens.logistics.bean.po.LogisticsEquipment; +import com.ccsens.logistics.bean.po.LogisticsEquipmentExample; +import com.ccsens.logistics.bean.po.LogisticsHeatImagingRecord; +import com.ccsens.logistics.persist.mapper.LogisticsEquipmentMapper; +import com.ccsens.logistics.persist.mapper.LogisticsHeatImagingRecordMapper; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import javax.annotation.Resource; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; + +@Slf4j +@Service +@Transactional(propagation = Propagation.REQUIRED,rollbackFor = Exception.class) +public class ThermalImageryService implements IThermalImageryService{ + + @Resource + private LogisticsEquipmentMapper logisticsEquipmentMapper; + @Resource + private LogisticsHeatImagingRecordMapper heatImagingRecordMapper; + @Resource + private Snowflake snowflake; + + @Override + public List getImagery(){ + List str = new ArrayList<>(); + //查询redis内的热成像摄像头的编号 + + //没有则查找数据库 + LogisticsEquipmentExample equipmentExample = new LogisticsEquipmentExample(); + equipmentExample.createCriteria().andEquipmentTypeEqualTo((byte) 4); + List logisticsEquipments = logisticsEquipmentMapper.selectByExample(equipmentExample); + if(CollectionUtil.isNotEmpty(logisticsEquipments)){ + for (LogisticsEquipment equipment:logisticsEquipments){ + ThermalImageryDto.RequestThermalImagery thermalImagery = new ThermalImageryDto.RequestThermalImagery(); + thermalImagery.setThing_id(equipment.getEquipmentNumber()); + String s = JSONObject.toJSONString(thermalImagery); + str.add(s); + } + } + //编写请求对象 + return str; + } + + @Override + public void disposeMessage(String value) { + //解析字符串 + try { + ThermalImageryDto.RequestThermalImagery thermalImagery = JSONObject.parseObject(value,ThermalImageryDto.RequestThermalImagery.class); + switch (thermalImagery.getType()){ + case Constant.HEARTBEAT: + //TODO 心跳不处理 + break; + case Constant.THERMAL_IMAGERY: + if(StrUtil.isNotEmpty(thermalImagery.getData())) { + ThermalImageryDto.GetThermalImageryDate date = JSONObject.parseObject(thermalImagery.getData(), ThermalImageryDto.GetThermalImageryDate.class); + LogisticsHeatImagingRecord heatImagingRecord = new LogisticsHeatImagingRecord(); + heatImagingRecord.setId(snowflake.nextId()); + heatImagingRecord.setMaxT(new BigDecimal(date.getMaxT())); + heatImagingRecord.setMaxTx(date.getMaxTx()); + heatImagingRecord.setMaxTy(date.getMaxTy()); + heatImagingRecord.setImageData(date.getImageData()); + heatImagingRecordMapper.insertSelective(heatImagingRecord); + } + break; + default: + break; + } + thermalImagery.getType(); + }catch (Exception e){ + log.info("解析热成像返回数据异常 {}",e); + } + + } +} diff --git a/logistics/src/main/resources/application-test.yml b/logistics/src/main/resources/application-test.yml index 853aa5dc..0e5dc51d 100644 --- a/logistics/src/main/resources/application-test.yml +++ b/logistics/src/main/resources/application-test.yml @@ -28,7 +28,7 @@ swagger: enable: true eureka: instance: - ip-address: 192.168.31.13 + ip-address: 127.0.0.1 file: path: /home/cloud/logistics/uploads/ signUpUrl: https://test.tall.wiki/compete/ diff --git a/util/src/test/java/com/ccsens/util/nattyTest.java b/util/src/test/java/com/ccsens/util/nattyTest.java index ce2cf250..e3dfd1c2 100644 --- a/util/src/test/java/com/ccsens/util/nattyTest.java +++ b/util/src/test/java/com/ccsens/util/nattyTest.java @@ -10,6 +10,10 @@ import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringEncoder; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + public class nattyTest { @@ -41,13 +45,33 @@ public class nattyTest { ChannelFuture future = client.connect("47.106.129.173", 15506).sync(); - for(int i=0;i<5;i++){ - future.channel().writeAndFlush("{\"service_id\":\"123\",\"type\":\"READSN4\",\"thing_id\":\"050005A2\",\"data\":{\"GetType\":\"Auto\",\"Highest\":70,\"Lowest\":30}}"+"\r\n"); - System.out.println("请求"); - Thread.sleep(5000); - } - - +// for(int i=0;i<5;i++){ +// future.channel().writeAndFlush("{\"service_id\":\"123\",\"type\":\"READSN4\",\"thing_id\":\"050005A2\",\"data\":{\"GetType\":\"Auto\",\"Highest\":70,\"Lowest\":30}}"+"\r\n"); +// System.out.println("请求"); +// Thread.sleep(5000); +// } + Long time = System.currentTimeMillis(); + Runnable runnable = new Runnable() { + @Override + public void run() { + // task to run goes here + future.channel().writeAndFlush("{\"service_id\":\"123\",\"type\":\"READSN4\",\"thing_id\":\"050005A2\",\"data\":{\"GetType\":\"Auto\",\"Highest\":70,\"Lowest\":30}}"+"\r\n"); + System.out.println("请求"); + System.out.println(System.currentTimeMillis() - time); + } + }; + ScheduledExecutorService service = Executors + .newSingleThreadScheduledExecutor(); + // 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间 + service.scheduleAtFixedRate(runnable, 1, 3, TimeUnit.SECONDS); +// 创建并执行在给定延迟后启用的 ScheduledFuture。 +// 参数: +// callable - 要执行的功能 +// delay - 从现在开始延迟执行的时间 +// unit - 延迟参数的时间单位 +// 返回: +// 可用于提取结果或取消的 ScheduledFuture +// service.schedule(runnable, 5000, TimeUnit.MILLISECONDS); // //当通道关闭了,就继续往下走 // future.channel().closeFuture().sync();