【Netty】利用Netty实现心跳检测和重连机制
一、前言
心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制。
我们用到的很多框架都用到了心跳检测,比如服务注册到 Eureka Server 之后会维护一个心跳连接,告诉 Eureka Server 自己还活着。本文就是利用 Netty 来实现心跳检测,以及客户端重连。
二、设计思路
- 分为客户端和服务端
- 建立连接后,客户端先发送一个消息询问服务端是否可以进行通信了。
- 客户端收到服务端 Yes 的应答后,主动发送心跳消息,服务端接收到心跳消息后,返回心跳应答,周而复始。
- 心跳超时利用 Netty 的 ReadTimeOutHandler 机制,当一定周期内(默认值50s)没有读取到对方任何消息时,需要主动关闭链路。如果是客户端,重新发起连接。
- 为了避免出现粘/拆包问题,使用 DelimiterBasedFrameDecoder 和 StringDecoder 来处理消息。
三、编码
- 先编写客户端 NettyClient
public class NettyClient {private static final String HOST = "127.0.0.1";private static final int PORT = 9911;private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);EventLoopGroup group = new NioEventLoopGroup();private void connect(String host,int port){try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true).remoteAddress(new InetSocketAddress(host,port)).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ByteBuf delimiter = Unpooled.copiedBuffer("$_", CharsetUtil.UTF_8);ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter)).addLast(new StringDecoder())// 当一定周期内(默认50s)没有收到对方任何消息时,需要主动关闭链接.addLast("readTimeOutHandler",new ReadTimeoutHandler(50)).addLast("heartBeatHandler",new HeartBeatReqHandler());}});// 发起异步连接操作ChannelFuture future = b.connect().sync();future.channel().closeFuture().sync();}catch (Exception e){e.printStackTrace();}finally {// 所有资源释放完之后,清空资源,再次发起重连操作executor.execute(()->{try {TimeUnit.SECONDS.sleep(5);//发起重连操作connect(NettyClient.HOST,NettyClient.PORT);} catch (InterruptedException e) {e.printStackTrace();}});}}public static void main(String[] args) {new NettyClient().connect(NettyClient.HOST,NettyClient.PORT);}}
这里稍微复杂点的就是38行开始的重连部分。
2. 心跳消息发送类 HeartBeatReqHandler
package cn.sp.heartbeat;import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;/*** Created by 2YSP on 2019/5/23.*/
@ChannelHandler.Sharable
public class HeartBeatReqHandler extends SimpleChannelInboundHandler<String> {private volatile ScheduledFuture<?> heartBeat;private static final String hello = "start notify with server$_";@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer(hello.getBytes()));System.out.println("================");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (heartBeat != null){heartBeat.cancel(true);heartBeat = null;}ctx.fireExceptionCaught(cause);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {if ("ok".equalsIgnoreCase(msg)){//服务端返回ok开始心跳heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx),0,5000, TimeUnit.MILLISECONDS);}else {System.out.println("Client receive server heart beat message : --->"+msg);}}private class HeartBeatTask implements Runnable{private final ChannelHandlerContext ctx;public HeartBeatTask(ChannelHandlerContext ctx){this.ctx = ctx;}@Overridepublic void run() {String heartBeat = "I am ok";System.out.println("Client send heart beat message to server: ----->"+heartBeat);ctx.writeAndFlush(Unpooled.copiedBuffer((heartBeat+"$_").getBytes()));}}
}
channelActive()方法在首次建立连接后向服务端问好,如果服务端返回了 “ok” 就创建一个线程每隔5秒发送一次心跳消息。如果发生了异常,就取消定时任务并将其设置为 null,等待 GC 回收。
3. 服务端 NettyServer
public class NettyServer {public static void main(String[] args) {new NettyServer().bind(9911);}private void bind(int port){EventLoopGroup group = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(group).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter)).addLast(new StringDecoder()).addLast("readTimeOutHandler",new ReadTimeoutHandler(50)).addLast("HeartBeatHandler",new HeartBeatRespHandler());}});// 绑定端口,同步等待成功b.bind(port).sync();System.out.println("Netty Server start ok ....");}catch (Exception e){e.printStackTrace();}}
}
- 心跳响应类 HeartBeatRespHandler
package cn.sp.heartbeat;import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;/*** Created by 2YSP on 2019/5/23.*/
@ChannelHandler.Sharable
public class HeartBeatRespHandler extends SimpleChannelInboundHandler<String> {private static final String resp = "I have received successfully$_";@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {if (msg.equals("start notify with server")){ctx.writeAndFlush(Unpooled.copiedBuffer("ok$_".getBytes()));}else {//返回心跳应答信息System.out.println("Receive client heart beat message: ---->"+ msg);ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));}}}
第一次告诉客户端我已经准备好了,后面打印客户端发过来的信息并告诉客户端我已经收到你的消息了。
四、测试
启动服务端再启动客户端,可以看到心跳检测正常,如下图。
服务端控制台:
客户端控制台:
现在让服务端宕机一段时间,看客户端能否重连并开始正常工作。
关闭服务端后,客户端周期性的连接失败,控制台输出如图:
重新启动服务端,过一会儿就会发现重连成功了。
五、总结
总得来说,使用 Netty 实现心跳检测还是比较简单的,这里比较懒没有使用其他序列化协议(如 ProtoBuf 等),如果感兴趣的话大家可以自己试试。
代码地址,点击这里。
有篇SpringBoot 整合长连接心跳机制的文章写的也很不错,地址https://crossoverjie.top/2018/05/24/netty/Netty(1)TCP-Heartbeat/
【Netty】利用Netty实现心跳检测和重连机制相关推荐
- socket心跳机制图片_WebSocket心跳检测和重连机制
1. 心跳重连原由 心跳和重连的目的用一句话概括就是客户端和服务端保证彼此还活着,避免丢包发生. websocket连接断开有以下两种情况: 前端断开 在使用websocket过程中,可能会出现网络断 ...
- Websocket心跳检测、重连机制
前言 为了获取实时数据,前端需要和后端保持通信,HTTP 协议只能是客户端向服务器发出请求,服务器返回查询结果.这种单向请求的特点,注定了如果服务器有连续的状态变化,客户端要获知就非常麻烦.我们只能使 ...
- Netty 4.0 实现心跳检测和断线重连
一 实现心跳检测 原理:当服务端每隔一段时间就会向客户端发送心跳包,客户端收到心跳包后同样也会回一个心跳包给服务端 一般情况下,客户端与服务端在指定时间内没有任何读写请求,就会认为连接是idle(空闲 ...
- netty自定义消息实现心跳检测与重连
netty的心跳发送的重连,主要在client端.前面有关于自定义协议的demo:https://blog.csdn.net/zc_ad/article/details/83829620 其实客户端心 ...
- Netty下的WebSocket心跳检测
什么是心跳检测 心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制. 在WebSocket中即判断套接字是否已经与服务器断开,无法使用,此时要清理服务器的该 ...
- Netty教程07:心跳检测机制
需求:编写一个 Netty心跳检测机制案例, 当服务器超过3秒没有读时,就提示读空闲 当服务器超过5秒没有写操作时,就提示写空闲 实现当服务器超过7秒没有读或者写操作时,就提示读写空闲 Server ...
- Netty(3)心跳检测、WebSocket、Protobuf
一 心跳检测 当连接数很大时,我们想要释放部分空闲连接:或者连接已经断了但服务端没检测到.我们都可以使用Netty自动的心跳检测类IdleStateHandler来实现 1.1 类介绍 IdleSta ...
- netty入门demo;心跳检测后自动关闭通道
jdk原生的socket实现nio太麻烦,而且有很多问题,netty经过了大厂认证,就选它了. netty server package com.chan.netty.service;import i ...
- socket心跳检测和重连小demo
转载自: http://blog.csdn.net/u011791526/article/details/53536403 有时候我们的程序要求socket一直保持连接,并且希望在socket断开以后 ...
最新文章
- taro 引入js_Taro跨端开发之多业务模块管理 React Native篇(终篇)
- python deque的内在实现 本质上就是双向链表所以用于stack、队列非常方便
- NVelocity模板引擎初学总结。[zhuan]
- Hadoop使用常见问题以及解决方法
- USACO training 2.4.5 Fractions to Decimals题解
- 4K超清,2500万人在线,猫晚直播技术全解读
- 【计算机网络】因特网结构
- 计算机应用类专业综合冲刺卷,计算机应用类专业(综合)二模试卷2011
- HCIE Security 全套笔记整理
- 十大经典排序算法与算法复杂度
- c语言 写高斯分布函数
- Unity学习笔记(一)—— 基础知识
- 【路径规划】基于狼群算法之三维路径规划matlab源码
- matlab 绘制四棱锥,素描教程:怎么绘制四棱锥
- 了解KVM切换器的四种类形
- android 虚拟按键源码流程分析
- POSE estimation,肢体估计HPE
- 23个经典营销创业案例,彻底颠覆你的营销思维
- Fractions to Decimals
- AI万物生成技术,颠覆内容传统创作模式 | iBrandUp 职位内推
热门文章
- 湖北沙洋中学2021年6月高考成绩查询,荆门2021年中考时间定于6月19日至22日!全市设7个考区、18个考点…...
- Clickhouse在头条火山引擎智能数据洞察的应用
- SOLIDWORKS PDM的智能报表自动生成工具
- 基于神经网络的图像人体轮廓提取(一些经验)
- 脑洞 | 横扫围棋界的AlphaGo竟然出纪录片了!介意剧透者慎点……
- 1、Dymola2018安装配置及实例演示
- android文本框左右加减按钮长按一直加减
- ArcGIS中进行shp矢量文件和EXCEL进行字段连接Field Join关联遇到:提示没有OID字段问题。(地理国情监测)解决步骤
- Linux 使用unzip解压时报错End-of-central
- 微信公众号网页授权----redirect_uri域名与后台配置不一致,错误码10003 错误