Netty 中的心跳机制,还有谁不会?
上一篇:深夜看了张一鸣的微博,让我越想越后怕
作者:rickiyang
出处:www.cnblogs.com/rickiyang/p/11074231.html
我们知道在TCP长连接或者WebSocket长连接中一般我们都会使用心跳机制–即发送特殊的数据包来通告对方自己的业务还没有办完,不要关闭链接。
那么心跳机制可以用来做什么呢?
我们知道网络的传输是不可靠的,当我们发起一个链接请求的过程之中会发生什么事情谁都无法预料,或者断电,服务器重启,断网线之类。
如果有这种情况的发生对方也无法判断你是否还在线。所以这时候我们引入心跳机制,在长链接中双方没有数据交互的时候互相发送数据(可能是空包,也可能是特殊数据),对方收到该数据之后也回复相应的数据用以确保双方都在线,这样就可以确保当前链接是有效的。
1. 如何实现心跳机制一般实现心跳机制由两种方式:
TCP协议自带的心跳机制来实现;
在应用层来实现。
但是TCP协议自带的心跳机制系统默认是设置的是2小时的心跳频率。它检查不到机器断电、网线拔出、防火墙这些断线。而且逻辑层处理断线可能也不是那么好处理。另外该心跳机制是与TCP协议绑定的,那如果我们要是使用UDP协议岂不是用不了?所以一般我们都不用。
而一般我们自己实现呢大致的策略是这样的:
Client启动一个定时器,不断发送心跳;
Server收到心跳后,做出回应;
Server启动一个定时器,判断Client是否存在,这里做判断有两种方法:时间差和简单标识。
时间差:
收到一个心跳包之后记录当前时间;
判断定时器到达时间,计算多久没收到心跳时间=当前时间-上次收到心跳时间。如果改时间大于设定值则认为超时。
简单标识:
收到心跳后设置连接标识为true;
判断定时器到达时间,如果未收到心跳则设置连接标识为false;
今天我们来看一下Netty的心跳机制的实现,在Netty中提供了IdleStateHandler类来进行心跳的处理,它可以对一个 Channel 的 读/写设置定时器, 当 Channel 在一定事件间隔内没有数据交互时(即处于 idle 状态), 就会触发指定的事件。
该类可以对三种类型的超时做心跳机制检测:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
readerIdleTimeSeconds:设置读超时时间;
writerIdleTimeSeconds:设置写超时时间;
allIdleTimeSeconds:同时为读或写设置超时时间;
下面我们还是通过一个例子来讲解IdleStateHandler的使用。
服务端:
public class HeartBeatServer {private int port;public HeartBeatServer(int port) {this.port = port;}public void start(){EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workGroup = new NioEventLoopGroup();ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup).channel(NioServerSocketChannel.class).childHandler(new HeartBeatServerChannelInitializer());try {ChannelFuture future = server.bind(port).sync();future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}public static void main(String[] args) {HeartBeatServer server = new HeartBeatServer(7788);server.start();}
}
服务端Initializer:
public class HeartBeatServerChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast("handler",new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new HeartBeatServerHandler());}
}
在这里IdleStateHandler也是handler的一种,所以加入addLast。我们分别设置4个参数:读超时时间为3s,写超时和读写超时为0,然后加入时间控制单元。另外,关注公众号互联网架构师,在后台回复:2T,可以获取我整理的 Java 系列面试题和答案,非常齐全。
服务端handler:
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{private int loss_connect_time = 0;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(ctx.channel().remoteAddress() + "Server :" + msg.toString());}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if(evt instanceof IdleStateEvent){//服务端对应着读事件,当为READER_IDLE时触发IdleStateEvent event = (IdleStateEvent)evt;if(event.state() == IdleState.READER_IDLE){loss_connect_time++;System.out.println("接收消息超时");if(loss_connect_time > 2){System.out.println("关闭不活动的链接");ctx.channel().close();}}else{super.userEventTriggered(ctx,evt);}}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}
我们看到在handler中调用了userEventTriggered方法,IdleStateEvent的state()方法一个有三个值:READER_IDLE,WRITER_IDLE,ALL_IDLE。正好对应读事件写事件和读写事件。
再来写一下客户端:
public class HeartBeatsClient {private int port;private String address;public HeartBeatsClient(int port, String address) {this.port = port;this.address = address;}public void start(){EventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new HeartBeatsClientChannelInitializer());try {ChannelFuture future = bootstrap.connect(address,port).sync();future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}finally {group.shutdownGracefully();}}public static void main(String[] args) {HeartBeatsClient client = new HeartBeatsClient(7788,"127.0.0.1");client.start();}
}
客户端Initializer:
public class HeartBeatsClientChannelInitializer extends ChannelInitializer<SocketChannel> {protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast("handler", new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new HeartBeatClientHandler());}
}
这里我们设置了IdleStateHandler的写超时为3秒,客户端执行的动作为写消息到服务端,服务端执行读动作。
客户端handler:
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",CharsetUtil.UTF_8));private static final int TRY_TIMES = 3;private int currentTime = 0;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("激活时间是:"+new Date());System.out.println("链接已经激活");ctx.fireChannelActive();}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("停止时间是:"+new Date());System.out.println("关闭链接");}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {System.out.println("当前轮询时间:"+new Date());if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.WRITER_IDLE) {if(currentTime <= TRY_TIMES){System.out.println("currentTime:"+currentTime);currentTime++;ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());}}}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String message = (String) msg;System.out.println(message);if (message.equals("Heartbeat")) {ctx.write("has read message from server");ctx.flush();}ReferenceCountUtil.release(msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}
启动服务端和客户端我们看到输出为:
我们再来屡一下思路:
首先客户端激活channel,因为客户端中并没有发送消息所以会触发客户端的IdleStateHandler,它设置的写超时时间为3s;
然后触发客户端的事件机制进入userEventTriggered方法,在触发器中计数并向客户端发送消息;
服务端接收消息;
客户端触发器继续轮询发送消息,直到计数器满不再向服务端发送消息;
服务端在IdleStateHandler设置的读消息超时时间5s内未收到消息,触发了服务端中handler的userEventTriggered方法,于是关闭客户端的链接。
大体我们的简单心跳机制就是这样的思路,通过事件触发机制以及计数器的方式来实现,上面我们的案例中最后客户端没有发送消息的时候我们是强制断开了客户端的链接,那么既然可以关闭,我们是不是也可是重新链接客户端呢?因为万一客户端本身并不想关闭而是由于别的原因导致他无法与服务端通信。下面我们来说一下重连机制。
当我们的服务端在未读到客户端消息超时而关闭客户端的时候我们一般在客户端的finally块中方的是关闭客户端的代码,这时我们可以做一下修改的,finally是一定会被执行新的,所以我们可以在finally块中重新调用一下启动客户端的代码,这样就又重新启动了客户端了,上客户端代码:
/*** 本Client为测试netty重连机制* Server端代码都一样,所以不做修改* 只用在client端中做一下判断即可*/
public class HeartBeatsClient2 {private int port;private String address;ChannelFuture future;public HeartBeatsClient2(int port, String address) {this.port = port;this.address = address;}public void start(){EventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new HeartBeatsClientChannelInitializer());try {future = bootstrap.connect(address,port).sync();future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}finally {//group.shutdownGracefully();if (null != future) {if (future.channel() != null && future.channel().isOpen()) {future.channel().close();}}System.out.println("准备重连");start();System.out.println("重连成功");}}public static void main(String[] args) {HeartBeatsClient2 client = new HeartBeatsClient2(7788,"127.0.0.1");client.start();}
}
其余部分的代码与上面的实例并无异同,只需改造客户端即可,我们再运行服务端和客户端会看到客户端虽然被关闭了,但是立马又被重启:
当然生产级别的代码应该不是这样实现的吧,哈哈。
感谢您的阅读,也欢迎您发表关于这篇文章的任何建议,关注我,技术不迷茫!小编到你上高速。
· END ·
最后,关注公众号互联网架构师,在后台回复:2T,可以获取我整理的 Java 系列面试题和答案,非常齐全。
正文结束
推荐阅读 ↓↓↓
1.不认命,从10年流水线工人,到谷歌上班的程序媛,一位湖南妹子的励志故事
2.如何才能成为优秀的架构师?
3.从零开始搭建创业公司后台技术栈
4.程序员一般可以从什么平台接私活?
5.37岁程序员被裁,120天没找到工作,无奈去小公司,结果懵了...
6.IntelliJ IDEA 2019.3 首个最新访问版本发布,新特性抢先看
7.漫画:程序员相亲图鉴,笑屎我了~
8.15张图看懂瞎忙和高效的区别!
一个人学习、工作很迷茫?
点击「阅读原文」加入我们的小圈子!
Netty 中的心跳机制,还有谁不会?相关推荐
- 这样讲 Netty 中的心跳机制,还有谁不会?
作者:永顺 segmentfault.com/a/1190000006931568 基础 何为心跳 顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, ...
- Netty 中的心跳机制
何为心跳 顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性. 为什么需要心跳 因为网络的不可靠性, ...
- 用Netty撸一个心跳机制和断线重连!
来源:www.jianshu.com/p/1a28e48edd92 心跳机制 何为心跳 所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确 ...
- Eureka中的心跳机制
前言 从以前的单体架构到现在的微服务分布式架构, 随着架构的演变, 所需要的技术越来越多, 要求的也越来越多了, 今天来谈一下微服务领域中的心跳机制 在微服务领域,心跳机制很常见了, ...
- netty 中的心跳检测机制
为什么要心跳检测机制 当服务端接收到客户端的连接以后,与客户端建立 NioSocketChannel 数据传输的双工通道,但是如果连接建立以后,客户端一直不给服务端发送消息,这种情况下是占用了资源,属 ...
- 基于Windows Socket 的网络通信中的心跳机制原理
引言 在采用TCP 连接的C/S 结构的系统中,当通信的一方正常关闭或退出时,另一方能收到相应的连接 断开的通知,然后进行必要的处理:但如果任意一方发生所谓的"非优雅断开",如:意 ...
- 聊聊分布式存储系统中的心跳机制以及主节点下发指令给从节点
心跳( heartbeat )是分布式系统中常用的技术.顾名思义,心跳就是以固定的频率向其他节点汇报当前节点状态的方式.收到心跳,一般可以认为发送心跳的这个节点在当前的网络中状态是良好的. 同时分布式 ...
- vue 心跳监控_【笔记】vue中websocket心跳机制
data () { return { ws: null,//建立的连接 lockReconnect: false,//是否真正建立连接 timeout: 28*1000,//30秒一次心跳 timeo ...
- 面试官问:服务的心跳机制与断线重连,Netty底层是怎么实现的?懵了
点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 心跳机制 何为心跳 所谓心跳, 即在 TCP 长连接中, ...
- Netty实现心跳机制与断线重连
点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 来源:https://www.jianshu.com/p/ ...
最新文章
- java中coverage怎么取消_别人家的ABM都是怎么成功的?
- python 整数最大_Python程序使用floor()方法查找最大整数
- C++高手总结的编程规律
- JVM优化系列-String对象在虚拟机中的实现
- Android上传文件至服务器
- The file contains a character that cannot be represented in the current code pag
- chrome浏览器隐藏地址栏_谷歌Chrome浏览器正在开发新功能:可直接复制粘贴隐藏密码...
- python itemgetter_Python operator.itemgetter
- vmware workstation 14 密钥
- LaTeX数学公式-详细教程
- mac上好用的压缩_Mac图片压缩工具(早晚用到,建议收藏)
- 内存核心频率、工作频率,等效频率、预读取技术详解
- OCR识别提取图片中文字原理
- win10删除U盘分区
- React从零到一Demo演练(上)
- 数据库查询的降序排列
- 幻立方解法之素数3阶幻立方
- 利用easyX图形库画迷宫问题的路径
- 达梦8在VMware虚拟机麒麟系统下命令安装
- Scala 模式匹配 match-case