11 changed files with 240 additions and 9 deletions
@ -0,0 +1,5 @@ |
|||
package com.ccsens.pelma_analyse.service; |
|||
|
|||
public interface PelmaDataAccept { |
|||
public void handle(String data); |
|||
} |
@ -0,0 +1,16 @@ |
|||
package com.ccsens.pelma_analyse.service.impl; |
|||
|
|||
import com.ccsens.pelma_analyse.service.PelmaDataAccept; |
|||
import com.ccsens.pelma_analyse.util.GetServiceUrlAnalyseUtil; |
|||
import org.springframework.web.client.RestTemplate; |
|||
|
|||
public class PelmaDataAcceptImpl implements PelmaDataAccept { |
|||
@Override |
|||
public void handle(String accept_str) { |
|||
String pelma_url = GetServiceUrlAnalyseUtil.getUrl(); |
|||
|
|||
RestTemplate restTemplate = new RestTemplate(); |
|||
String url = pelma_url + "/store?pelma=" + accept_str; |
|||
restTemplate.getForEntity(url, String.class); |
|||
} |
|||
} |
@ -0,0 +1,21 @@ |
|||
package com.ccsens.pelma_analyse.util; |
|||
|
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
@Component |
|||
public class GetServiceUrlAnalyseUtil { |
|||
|
|||
private static String url; |
|||
|
|||
public static String getUrl() |
|||
{ |
|||
return url; |
|||
} |
|||
|
|||
@Value("${service_url_analyse}") |
|||
public void setUrl(String service_url_analyse) |
|||
{ |
|||
url = service_url_analyse; |
|||
} |
|||
} |
@ -0,0 +1,22 @@ |
|||
package com.ccsens.pelma_analyse.util.netty; |
|||
|
|||
import org.springframework.boot.ApplicationArguments; |
|||
import org.springframework.boot.ApplicationRunner; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import javax.annotation.Resource; |
|||
|
|||
/** |
|||
* 监听Spring容器启动完成,完成后启动Netty服务器 |
|||
* @author Gjing |
|||
**/ |
|||
@Component |
|||
public class NettyStartListener implements ApplicationRunner { |
|||
@Resource |
|||
private SocketServer socketServer; |
|||
|
|||
@Override |
|||
public void run(ApplicationArguments args) throws Exception { |
|||
this.socketServer.start(); |
|||
} |
|||
} |
@ -0,0 +1,66 @@ |
|||
package com.ccsens.pelma_analyse.util.netty; |
|||
|
|||
import com.ccsens.pelma_analyse.service.PelmaDataAccept; |
|||
import com.ccsens.pelma_analyse.service.impl.PelmaDataAcceptImpl; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import io.netty.channel.ChannelInboundHandlerAdapter; |
|||
import io.netty.channel.group.ChannelGroup; |
|||
import io.netty.channel.group.DefaultChannelGroup; |
|||
import io.netty.util.concurrent.GlobalEventExecutor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* Socket拦截器,用于处理客户端的行为 |
|||
* |
|||
* @author Gjing |
|||
**/ |
|||
@Slf4j |
|||
@Component |
|||
public class SocketHandler extends ChannelInboundHandlerAdapter { |
|||
|
|||
public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
|||
|
|||
/** |
|||
* 读取到客户端发来的消息 |
|||
* |
|||
* @param ctx ChannelHandlerContext |
|||
* @param msg msg |
|||
* @throws Exception e |
|||
*/ |
|||
@Override |
|||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
|||
// 由于我们配置的是 字节数组 编解码器,所以这里取到的用户发来的数据是 byte数组
|
|||
byte[] data = (byte[]) msg; |
|||
String accept_str = new String(data); |
|||
log.info("收到消息: " + accept_str); |
|||
|
|||
PelmaDataAccept pelmaDataAccept = new PelmaDataAcceptImpl(); |
|||
pelmaDataAccept.handle(accept_str); |
|||
// 给其他人转发消息
|
|||
// for (Channel client : clients) {
|
|||
// if (!client.equals(ctx.channel())) {
|
|||
// client.writeAndFlush(data);
|
|||
// }
|
|||
// }
|
|||
|
|||
} |
|||
|
|||
@Override |
|||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { |
|||
log.info("新的客户端链接:" + ctx.channel().id().asShortText()); |
|||
clients.add(ctx.channel()); |
|||
} |
|||
|
|||
@Override |
|||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { |
|||
clients.remove(ctx.channel()); |
|||
} |
|||
|
|||
@Override |
|||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { |
|||
cause.printStackTrace(); |
|||
ctx.channel().close(); |
|||
clients.remove(ctx.channel()); |
|||
} |
|||
} |
@ -0,0 +1,25 @@ |
|||
package com.ccsens.pelma_analyse.util.netty; |
|||
|
|||
import io.netty.channel.ChannelInitializer; |
|||
import io.netty.channel.ChannelPipeline; |
|||
import io.netty.channel.socket.SocketChannel; |
|||
import io.netty.handler.codec.bytes.ByteArrayDecoder; |
|||
import io.netty.handler.codec.bytes.ByteArrayEncoder; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
/** |
|||
* Socket 初始化器,每一个Channel进来都会调用这里的 InitChannel 方法 |
|||
* @author Gjing |
|||
**/ |
|||
@Component |
|||
public class SocketInitializer extends ChannelInitializer<SocketChannel> { |
|||
@Override |
|||
protected void initChannel(SocketChannel socketChannel) throws Exception { |
|||
ChannelPipeline pipeline = socketChannel.pipeline(); |
|||
// 添加对byte数组的编解码,netty提供了很多编解码器,你们可以根据需要选择
|
|||
pipeline.addLast(new ByteArrayDecoder()); |
|||
pipeline.addLast(new ByteArrayEncoder()); |
|||
// 添加上自己的处理器
|
|||
pipeline.addLast(new SocketHandler()); |
|||
} |
|||
} |
@ -0,0 +1,58 @@ |
|||
package com.ccsens.pelma_analyse.util.netty; |
|||
|
|||
import io.netty.bootstrap.ServerBootstrap; |
|||
import io.netty.channel.nio.NioEventLoopGroup; |
|||
import io.netty.channel.socket.nio.NioServerSocketChannel; |
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import javax.annotation.Resource; |
|||
|
|||
/** |
|||
* @author Gjing |
|||
**/ |
|||
@Slf4j |
|||
@Component |
|||
public class SocketServer { |
|||
@Resource |
|||
private SocketInitializer socketInitializer; |
|||
|
|||
@Getter |
|||
private ServerBootstrap serverBootstrap; |
|||
|
|||
/** |
|||
* netty服务监听端口 |
|||
*/ |
|||
@Value("${netty.port:8088}") |
|||
private int port; |
|||
/** |
|||
* 主线程组数量 |
|||
*/ |
|||
@Value("${netty.bossThread:1}") |
|||
private int bossThread; |
|||
|
|||
/** |
|||
* 启动netty服务器 |
|||
*/ |
|||
public void start() { |
|||
this.init(); |
|||
this.serverBootstrap.bind(this.port); |
|||
log.info("Netty started on port: {} (TCP) with boss thread {}", this.port, this.bossThread); |
|||
} |
|||
|
|||
/** |
|||
* 初始化netty配置 |
|||
*/ |
|||
private void init() { |
|||
// 创建两个线程组,bossGroup为接收请求的线程组,一般1-2个就行
|
|||
NioEventLoopGroup bossGroup = new NioEventLoopGroup(this.bossThread); |
|||
// 实际工作的线程组
|
|||
NioEventLoopGroup workerGroup = new NioEventLoopGroup(); |
|||
this.serverBootstrap = new ServerBootstrap(); |
|||
this.serverBootstrap.group(bossGroup, workerGroup) // 两个线程组加入进来
|
|||
.channel(NioServerSocketChannel.class) // 配置为nio类型
|
|||
.childHandler(this.socketInitializer); // 加入自己的初始化器
|
|||
} |
|||
} |
Loading…
Reference in new issue