1、依赖

<!-- netty依赖-->
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.0.23.Final</version>
</dependency>

2、搭建 Netty 服务端

Netty服务器:NettyServer

public class NettyServer {public NettyServer(int port) {this.port = port;}public static void main(String[] args) throws Exception {new EchoServer(8888).start(); // 启动}public void start() {// 负责连接请求EventLoopGroup bossGroup = new NioEventLoopGroup();// 负责事件响应EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 服务器启动项ServerBootstrap serverBootstrap = new ServerBootstrap();// 临时存放已完成三次握手的请求的队列的最大长度。// 如果未设置或所设置的值小于1,Java将使用默认值50。// 如果大于队列的最大长度,请求会被拒绝serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);// 保持长连接状态serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//绑定线程池:handler是针对bossGroup,childHandler是针对workerHandlerserverBootstrap.group(bossGroup, workerGroup)// 选择nioChannel.channel(NioServerSocketChannel.class)// 绑定监听端口.localAddress(this.port)// 绑定客户端连接时候触发操作.childHandler(new ChannelInitializer<SocketChannel>() { @Overrideprotected void initChannel(SocketChannel ch) throws Exception {logger.info("新增客户端连接,IP:" + ch.localAddress().getHostName() + ",Port:" + ch.localAddress().getPort());ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 22, 2, 1, 0));ch.pipeline().addLast(new ByteArrayEncoder());// 编码(发送数据)ch.pipeline().addLast(new ByteArrayDecoder());// 解码(接受数据)ch.pipeline().addLast(new NettyServerHandler()); // 客户端触发操作}});// 服务器异步创建绑定定ChannelFuture channelFuture = serverBootstrap.bind().sync();//该方法进行阻塞,等待服务端链路关闭之后继续执行。//这种模式一般都是使用Netty模块主动向服务端发送请求,然后最后结束才使用channelFuture.channel().closeFuture().sync();} finally {// 释放线程池资源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

Netty处理器:NettyServerHandler

@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {// 是否登出操作private Boolean isLogout;//region 使用依赖注入private static NettyServerHandler nettyServerHandler;@AutowiredMyService myService;public NettyServerHandler() {}@PostConstructpublic void init() {nettyServerHandler = this;}//region /*** channel 通道 action 活跃* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {logger.info("客户端连接成功:" + ctx.channel().localAddress().toString() + " 通道已激活!");}/*** channel 通道 action 不活跃* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("客户端断开连接:" + ctx.channel().localAddress().toString() + " 通道不活跃!");// channel失效,从Map中移除NettyChannelMap.removeValue((SocketChannel) ctx.channel());// 关闭流ctx.close();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.info("服务器:收到客户端数据");try {byte[] msgByte = (byte[]) msg;// 将channel保存到内存NettyChannelMap.add(uuid, (SocketChannel) channelHandlerContext.channelHandlerContext.channel());//TODO其他操作nettyServerHandler.myService.analysis(msgByte);//返回应答NettyChannelMap.get(uuid).writeAndFlush(response_msg);} finally {ReferenceCountUtil.release(msg);}}/*** 读取完毕客户端发送过来的数据之后的操作*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {logger.info("服务端:接收数据处理完毕");// 登出:写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。if (this.isLogout) {ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);}}/*** 服务端发生异常的操作:可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();logger.error("服务端:发生异常,异常信息:" + cause.getMessage());}}

启动类:NettyApplication

@SpringBootApplication
public class NettyApplication{public static void main(String[] args) {SpringApplication springApplication = new SpringApplication(NettyApplication.class);springApplication.run(args);// netty服务器启动try {new NettyServer(8888).start(); // 启动} catch (Exception e) {logger.error("netty服务器启动异常", e);}}
}

测试工具:mqtt客户端客户端工具.rar(通信猫+MQTTBox)_mqtt客户端,mqttbox下载-其它文档类资源-CSDN下载

附录1:常用的TCP粘包拆包解决方案

通过在消息头中定义长度字段来标示消息的总长度。

大多数的协议在协议头中都会携带长度字段,用于标识消息体或则整包消息的长度。LengthFieldBasedFrameDecoder通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息,只要传入正确的参数,就可以轻松解决“读半包”的问题。

解码:LengthFieldBasedFrameDecoder

  • maxFrameLength:发送数据包的最大长度
  • lengthFieldOffset:长度域的偏移量。长度域位于整个数据包字节数组中的开始下标。
  • lengthFieldLength:长度域的字节数长度。
  • lengthAdjustment:长度域的偏移量矫正。如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就需要进行矫正。矫正的值为:包长 - 长度域的值 – 长度域偏移 – 长度域长。
  • initialBytesToStrip:丢弃的起始字节数。丢弃处于此索引值前面的字节。
ch.pipeline().addLast("framer", new LengthFieldBasedFrameDecoder(1024, 0, 4, 0,4));

编码:LengthFieldPrepender

  • lengthFieldLength:长度属性的字节长度
  • lengthIncludesLengthFieldLength:false,长度字节不算在总长度中; true,算到总长度中
ch.pipeline().addLast("addLength", new LengthFieldPrepender(4, false));

在生成的数据包中添加一个长度字段,用于记录当前数据包的长度。

参考:[netty]--最通用TCP黏包解决方案:LengthFieldBasedFrameDecoder和LengthFieldPrepender_惜暮的博客-CSDN博客

Netty粘包/半包问题解析_AnEra的博客-CSDN博客

附录2:无法使用@Autowired注入

@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {//region 使用依赖注入private static NettyServerHandler nettyServerHandler;@AutowiredMyService myService;public NettyServerHandler() {}@PostConstructpublic void init() {nettyServerHandler = this;}//region
}

调用时:

nettyServerHandler.myService.analysis(msgByte);

参考:Netty handler无法使用@Autowired注入bean_远方丶丶的博客-CSDN博客

附录3:netty 解码器

  • 一次解码器:ByteToMessageDecoder

io.netty.buffer.ByteBuf (原始数据流)-> io.netty.buffer.ByteBuf (用户数据)

内置解码器

LineBasedFrameDecoder
是一个特殊的分隔符解码器,该解码器使用的分隔符为:windows的r\n和类linux的\n。

DelimiterBasedFrameDecoder
是更通用的分隔符解码器,可支持多个分隔符,每个分隔符可为一个或多个字符。

FixedLengthFrameDecoder
按照固定长度frameLength解码出消息帧

LengthFieldBasedFrameDecoder
基于长度字段的消息帧解码器,该解码器可根据数据包中的长度字段动态的解码出消息帧。

  • 二次解码器:MessageToMessageDecoder

io.netty.buffer.ByteBuf (用户数据)-> Java Object
常用的二次解码器,json、Protobuf、xml等

编解码字符串的StringEncoder和StringDecoder

编解码对象的ObjectEncoder和ObjectDecoder

编解码字节码的ByteArrayEncoder和ByteArrayDecoder

编解码Protobuf的ProtobufEncoder和ProtobufDecoder

参考:netty的编解码器介绍_惜暮的博客-CSDN博客

Netty笔记(六)之netty中的解码器_jannals的博客-CSDN博客

附录4:检测空闲连接和超时处理

服务端、客户端添加IdleStateHandler心跳检测处理器,并添加自定义处理类实现userEventTriggered()方法作为超时事件的逻辑处理。

Netty服务器:

Netty的IdleStateHandler心跳机制主要是用来检测远端是否存活,如果不存活或活跃则对空闲连接进行处理避免资源的浪费。

server添加下面代码

//第一个参数设置未读时间,第二个参数设置为未写时间,第三个为都未进行操作的时间,单位秒
ch.pipeline().addLast(new IdleStateHandler(4, 8, 12));
//添加超时检查机制--事件消息捕获类
// 在处理器该userEventTriggered方法中去处理 IdleStateEvent(读 空闲,写空闲,读写空闲)
ch.pipeline().addLast(new HeartbeatServerHandle());

IdleStateHandler的构造器:

  • readerIdleTimeSeconds:读超时。即当在指定的时间间隔内没有从 Channel 读取到数据时,会触发一个 READER_IDLE 的 IdleStateEvent 事件。
  • writerIdleTimeSeconds:写超时。即当在指定的时间间隔内没有数据写入到 Channel 时,会触发一个 WRITER_IDLE 的 IdleStateEvent 事件。
  • allIdleTimeSeconds:读/写超时。即当在指定的时间间隔内没有读或写操作时,会触发一个 ALL_IDLE 的 IdleStateEvent 事件。

这三个参数默认的时间单位是

处理空闲事件处理器:HeartbeatServerHandle

@Slf4j
public class HeartbeatServerHandle extends SimpleChannelInboundHandler<String> {private static ConcurrentHashMap<String,Long> concurrentHashMap = new ConcurrentHashMap<>();@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 判断事件是否是IdleStateEventif (evt instanceof IdleStateEvent) {String evtState = null;String key = ctx.channel().id().asLongText();Long count = concurrentHashMap.getOrDefault(key, 0L);//将该事件消息强转为心跳事件IdleStateEvent idleStateHandler = (IdleStateEvent) evt;IdleState state = idleStateHandler.state();switch(state) {case READER_IDLE:evtState = "读空闲";break;case WRITER_IDLE:evtState = "写空闲";break;case ALL_IDLE:evtState = "读写空闲";count++;break;default:break;}log.info("userEventTriggered-evtState:{}", evtState);// 空闲计数达5次, 进行测试连接是否正常if (count > 2L){ctx.writeAndFlush("测试客户端是否能接收信息")// 发送失败时关闭通道, 在或者可以在达到空闲多少次后, 进行关闭通道.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);  concurrentHashMap.remove(key);return;}concurrentHashMap.put(key,count);} else {// 事件不是一个 IdleStateEvent 的话,就将它传递给下一个处理程序super.userEventTriggered(ctx, evt);}}
}

Netty客户端

设定IdleStateHandler心跳检测每四秒进行一次写检测,如果四秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每四秒向服务端发送一次消息

server添加下面代码

ch.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
ch.pipeline().addLast(new HeartBeatClientHandler());

处理类

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent){IdleStateEvent event = (IdleStateEvent)evt;switch (e.state()) {case WRITER_IDLE:if (curTime<beatTime){curTime++;ctx.writeAndFlush("biubiu");}default:break;}
}

参考:Netty网络框架学习笔记-7((心跳)检测空闲连接以及超时) - 懵懵懂懂的猫 - 博客园

netty的编解码、粘包拆包问题、心跳检测机制原理_知识分子_的博客-CSDN博客

Netty学习(五)—IdleStateHandler心跳机制_zhenyutu的博客-CSDN博客_idlestatehandler

附录5:Keepalive

keepalive机制

TCP保活机制,就是为了保证连接的有效性,探测连接的对端是否存活的作用,在间隔一定的时间发探测包,根据回复来确认该连接是否有效。

Netty中直接提供了ChannelOption.SO_KEEPALIVE选项,将其传给ServerBootstrap.childOption方法,即可开启TCP Keepalive功能,配置好相关内核参数后,剩下的交给内核搞定。

如果一方已经关闭或异常终止连接,而另一方却不知道,我们将这样的TCP连接称为半打开的。TCP通过保活定时器(KeepAlive)来检测半打开连接。在高并发的网络服务器中,经常会出现漏掉socket的情况,对应的结果有一种情况就是出现大量的CLOSE_WAIT状态的连接。这个时候,可以通过设置KEEPALIVE选项来解决这个问题。

SO_KEEPALIVE

默认值:

Netty默认关闭该功能,即值为:false 。

启用该功能时,TCP会主动探测空闲连接的有效性。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右(默认的心跳间隔是7200s即2小时)上层没有任何数据传输的情况下,这套机制才会被激活。

ServerBootstrap sb = new ServerBootstrap();
// 保持长连接状态
sb.childOption(ChannelOption.SO_KEEPALIVE, true);

设置SO_KEEPALIVE选项来开启KEEPALIVE,然后通过TCP_KEEPIDLE、TCP_KEEPINTVL和TCP_KEEPCNT设置keepalive的开始时间、间隔、次数等参数。

TCP_KEEPIDLE:在TCP保活打开的情况下,最后一次数据交换到TCP发送第一个保活探测包的间隔,即允许的持续空闲时长,或者说每次正常发送心跳的周期,默认值为7200s(2h)
TCP_KEEPCNT: 在发送第一个保活探测包之后,没有接收到对方确认,继续发送保活探测包次数,默认值为9(次)
TCP_KEEPINTVL:在发送保活探测包之后,没有接收到对方确认,继续发送保活探测包的发送频率,默认值为75s

调整Keepalive参数

环境:centOS7(linux2.4以后),JDK 11,netty-all-4.1.66.Final(4.1.36以后版本)

ServerBootstrap sb = new ServerBootstrap();
// 配置TCP Keepalive参数,将Keepalive空闲时间设为120秒
sb.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPIDLE), 120);
sb.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPINTERVAL), 75);
sb.option(NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPCOUNT), 9);

抓包结果:

抓包工具:Wireshark · Go Deep.

参考:

Netty——参数说明 - 曹伟雄 - 博客园

聊聊TCP Keepalive、Netty和Docker - eshizhan - 博客园

Netty有哪些配置TCP心跳参数的方法,都有什么区别? - 墨天轮

springboot整合netty实现tcp通信相关推荐

  1. Springboot整合Netty,实现Socket通信

    文章目录 *Springboot整合Netty,实现Socket通信* 1.模拟单客户端 2.模拟单服务端 总结 Springboot整合Netty,实现Socket通信 1.模拟单客户端 引入Net ...

  2. Springboot 整合 Netty 实战(附源码)

    作者:pjmike_pj juejin.im/post/5bd584bc518825292865395d 前言 这一篇文章主要介绍如何用Springboot 整合 Netty,由于本人尚处于学习Net ...

  3. SpringBoot 整合 Netty

    SpringBoot整合Netty 一.common工程 1.maven依赖 2.自定义Netty数据包类型(NettyPacketType) 3.自定义Netty数据包(NettyPacket) 4 ...

  4. 关于SpringBoot整合Netty客户端和服务端实现JT808协议

    关于SpringBoot整合Netty客户端和服务端实现JT808协议 最近做了一个使用netty实现交通部JT808协议的项目,对比了mina和netty两种框架的使用,先整理一下netty的实现过 ...

  5. springboot整合netty

    前面介绍了netty的基本使用以及和websocket的整合,下面就说说如何用springboot整合netty,毕竟我们是要把netty作为一个服务端的框架整合到我们的项目中去的,总不能用main函 ...

  6. 三分钟构建高性能 WebSocket 服务 | 超优雅的 SpringBoot 整合 Netty 方案

    前言 每当使用SpringBoot进行Weboscket开发时,最容易想到的就是spring-boot-starter-websocket(或spring-websocket).它可以让我们使用注解, ...

  7. Springboot整合netty实战

    本文来简单说下Springboot如何来整合netty 文章目录 概述 概述

  8. SpringBoot整合Netty+Nats

    Netty 是由 JBOSS 提供的一个 Java 开源框架.Netty 提供异步的.基于事件驱动的网络应用程序框架,用以快速开发高性能.高可靠性的网络 IO 程序,是目前最流行的 NIO 框架,Ne ...

  9. Springboot整合Netty注意事项

    2019独角兽企业重金招聘Python工程师标准>>> 1.一定要使用线程池来启动Netty 一般情况下,启动netty的类,我们也会写成一个组件,当然写成配置(@Configura ...

最新文章

  1. R Learnilng 十八讲7-12
  2. 我用 tensorflow 实现的“一个神经聊天模型”:一个基于深度学习的聊天机器人
  3. 利用JMF进行多媒体编程
  4. oracle 数据导入 数据和备注(comment)乱码问题解决办法
  5. (三)Controller接口控制器详解(二)
  6. 超值爆赞丨Java 程序员推荐的学习教程,刷爆了朋友圈...
  7. (STL,set)安迪的第一个字典
  8. Myeclipse8.5 反编译插件 jad 安装
  9. php header()的用法
  10. ASP.NET--邮件发送
  11. FileReader读取本地文件
  12. 2021-09-02编写一个 SQL 查询,获取 Employee 表中第二高的薪水(Salary) 。
  13. safari打不开cookies_从Mac和iOS上的safari阻止cookies的设置方法
  14. 230. Kth Smallest Element in a BSTs
  15. win7安装 - 避免产生100m系统保留分区的办法
  16. SQL注入案例演示与防范措施大全
  17. 新媒体运营教程:头条平台视频运营和分析
  18. php添加商品和显示商品的业务逻辑
  19. 高等数学-求曲线拐点
  20. 为什么我不再运营百家号了?这说出了我的心声

热门文章

  1. 《周易》六十四卦歌诀
  2. Yoga C930 NM-B741 EYG70 Ariel-SVT REV 1.0联想笔记本图纸
  3. 华为scp快充协议详解_华为5V/8A超级快充实测+拆解:用料良心
  4. 不同label样本画图——颜色分配plt.cm.Spectral
  5. python: plt.cm.Set1, Set2,Set3返回颜色
  6. 逆水寒商业脚本制作视频
  7. ANSYS|workbench输出梁的剪力图和弯矩图
  8. 专攻心脑疾病AI市场,数坤科技完成创世伙伴领投2亿元B轮融资
  9. 操作系统的fock和mmap
  10. Echarts英文版地图——强大的高德地图