Netty 心跳机制及断线重连
1、心跳检测
心跳检测是在TCP长连接中,客户端和服务端定时向对方发送数据包通知对方自己还在线,保证连接的有效性的一种机制。
为什么使用心跳检测?
- 假死:如果底层的TCP连接(socket连接)已经断开,但是服务端并没有正常关闭套接字,服务端认为这条TCP连接仍然是存在的。因为每个连接都会耗费CPU和内存资源,因此大量假死的连接会逐渐耗光服务器的资源,使得服务器越来越慢,IO处理效率越来越低,最终导致服务器崩溃。所以使用心跳检测处理这些假死的客户端
如何处理假死?
- 客户端定时进行心跳检测、服务端定时进行空闲检测。
空闲检测就是每隔一段时间检测子通道是否有数据读写,如果有,则子通道是正常的;如果没有,则子通道被判定为假死,关掉子通道。
1.1、Netty心跳检测及空闲检测
IdleStateHandler
添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类
实现userEventTriggered()
方法作为超时事件的逻辑处理;如果设定IdleStateHandler心跳检测
- 服务端:readerIdleTime每五秒进行一次读检测,
设定时间内ChannelRead()方法未被调用则触发一次userEventTrigger()方法
- 客户端:writerIdleTime每五秒进行一次读检测,
设定时间内write()方法未被调用则触发一次userEventTrigger()方法
- 服务端:readerIdleTime每五秒进行一次读检测,
IdleStateHandler 构造方法参数
- readerIdleTime:为读超时时间
- writerIdleTime:为写超时时间
- allIdleTime:所有类型的超时时间
//IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接,
//会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须
//实现userEventTriggered方法处理对应事件
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
Handler重写userEventTriggered方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;String eventType = null;switch (event.state()) {case READER_IDLE:eventType = "读空闲";readIdleTimes++; // 读空闲的计数加1break;case WRITER_IDLE:eventType = "写空闲";// 不处理break;case ALL_IDLE:eventType = "读写空闲";// 不处理break;}System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);if (readIdleTimes > 3) {System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");ctx.channel().writeAndFlush("idle close");ctx.channel().close();}
}
2、断线重连
netty 的服务端一般情况下不需要断线重连,应为服务端服务宕机就只能重新启动服务;所以今天我们研究的是客户端的断线重连;
断线重连是指由于发生网络故障而导致服务中断的情况,客户端就需要从重新连接服务端;
Netty客户端添加监听添加监听后如果连接中断会调用operationComplete方法
import com.example.netty.idle.HeartBeatClient;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import java.util.concurrent.TimeUnit;public class ConnectionListener implements ChannelFutureListener {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {if (!channelFuture.isSuccess()) {final EventLoop loop = channelFuture.channel().eventLoop();loop.schedule(new Runnable() {@Overridepublic void run() {System.err.println("服务端链接不上,开始重连操作...");HeartBeatClient.Connection.connect();}}, 1L, TimeUnit.SECONDS);} else {System.err.println("服务端链接成功...");}}}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;//服务端代码
public class HeartBeatServer {public static void main(String[] args) throws Exception {EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childOption(NioChannelOption.SO_KEEPALIVE,true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());//IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接,//会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须//实现userEventTriggered方法处理对应事件pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));pipeline.addLast(new HeartBeatServerHandler());}});System.out.println("netty server start。。");ChannelFuture future = bootstrap.bind(9000).sync();future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {worker.shutdownGracefully();boss.shutdownGracefully();}}//服务端处理handlerpublic static class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {int readIdleTimes = 0;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {System.out.println(" ====== > [server] message received : " + s);if ("Heartbeat Packet".equals(s)) {ctx.channel().writeAndFlush("ok");} else {System.out.println(" 其他信息处理 ... ");}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;String eventType = null;switch (event.state()) {case READER_IDLE:eventType = "读空闲";readIdleTimes++; // 读空闲的计数加1break;case WRITER_IDLE:eventType = "写空闲";// 不处理break;case ALL_IDLE:eventType = "读写空闲";// 不处理break;}System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);if (readIdleTimes > 3) {System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");ctx.channel().writeAndFlush("idle close");ctx.channel().close();}}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");}}
}
import com.example.netty.config.ConnectionListener;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Random;
import java.util.concurrent.TimeUnit;//客户端代码
public class HeartBeatClient {public static void main(String[] args) throws Exception {Connection.connect();}public static class Connection{public static void connect(){EventLoopGroup eventLoopGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new HeartBeatClientHandler());}});System.out.println("netty client start。。");ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();Channel channel = channelFuture.channel();String text = "Heartbeat Packet";Random random = new Random();while (channel.isActive()) {int num = random.nextInt(10);Thread.sleep(num * 1000);channel.writeAndFlush(text);}// 添加监听后 如果连接中断会调用GenericFutureListener中operationComplete方法(子类实现)channelFuture.addListener(new ConnectionListener());} catch (Exception e) {e.printStackTrace();} finally {eventLoopGroup.shutdownGracefully();}}}static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(" client received :" + msg);if (msg != null && msg.equals("idle close")) {System.out.println(" 服务端关闭连接,客户端也关闭");ctx.channel().closeFuture();}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.err.println("channelInactive 掉线了...");//使用过程中断线重连final EventLoop eventLoop = ctx.channel().eventLoop();eventLoop.schedule(new Runnable() {@Overridepublic void run() {HeartBeatClient.Connection.connect();}}, 1L, TimeUnit.SECONDS);super.channelInactive(ctx);}}
}
Netty 心跳机制及断线重连相关推荐
- Netty——心跳机制与断线重连
心跳机制与断线重连 心跳机制 IdleStateHandler 客户端 服务端 测试 正常情况 异常情况 总结 断线重连 为了保证系统的稳定性,心跳机制和断线重连可是必不可少的,而这两个在Netty中 ...
- 浅析 Netty 实现心跳机制与断线重连
基础 何为心跳 顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性. 为什么需要心跳 因为网络的不可 ...
- Netty是如何实现TCP心跳机制与断线重连的
本文来说下Netty 是如何实现 TCP 心跳机制与断线重连的 文章目录 什么是心跳机制HeartBeat 如何实现心跳机制 Netty实现自定义的心跳机制 服务端 客户端 测试效果 客户端断线重连 ...
- WebSocket的心跳机制和断线重连
背景 在服务器重启或是弱网情况下,前端不能保证一直连接成功.因此在出现被动断开的情况下,需要有心跳机制和断线重连的功能. 心跳机制:客户端每隔一段时间向服务端发送一个特有的心跳消息,每次服务端收到消息 ...
- 面试官问:服务的心跳机制与断线重连,Netty底层是怎么实现的?懵了
点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 心跳机制 何为心跳 所谓心跳, 即在 TCP 长连接中, ...
- Netty实现心跳机制与断线重连
点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 来源:https://www.jianshu.com/p/ ...
- 四、Netty 实现心跳机制与断线重连
一.概述 何为心跳 顾名思义, 所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性. 为什么需要心跳 因为网络的不 ...
- 用Netty撸一个心跳机制和断线重连!
来源:www.jianshu.com/p/1a28e48edd92 心跳机制 何为心跳 所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确 ...
- 服务的心跳机制与断线重连,Netty底层是怎么实现的?
作者:sprinkle_liz www.jianshu.com/p/1a28e48edd92 提醒:本篇适合有一定netty基础的读者阅读 心跳机制 何为心跳 所谓心跳, 即在 TCP 长连接中, 客 ...
最新文章
- [Jarvis OJ - PWN]——Tell Me Something
- mybatis mysql 配置文件详解_Mybatis的配置文件参数详解
- 2022 WebRTC发展趋势分析
- 容器编排技术 -- Kubernetes 给容器和Pod分配内存资源
- 安腾还是Power7——Unix服务器你该如何选?
- diag开关什么意思_1P空气开关便宜、好用,为什么电工师傅却要我们买2P空气开关?...
- PyTorch中为什么需要使用squeeze()和unsqueeze()操作?
- Spark技术互动问答分享
- 计算机二级考试先后顺序,(常文档排按照计算机二级考试试题顺序编写.doc
- Unity Shader 中获取屏幕坐标
- java jhat_java自带命令行工具jmap、jhat与jinfo的使用实例代码详解
- 为macbook pro安装内存条
- LSTM的优点和缺点
- android app 马甲包,关于Android多渠道打包和马甲包问题
- 前段听一个仁兄说jbpm4 改变了很多
- 代码分析之numpy.array
- ECIF OCRM ACRM关系
- 圣戈班集团2019年销售额426亿欧元,增长2.4%
- 相片打印机原理_喷墨打印机工作原理 喷墨打印机优缺点介绍【详解】
- 红杉资本:生成式AI 一个创造性的新世界