Netty实战:设计一个IM框架就这么简单!
bitchat 是一个基于 Netty 的 IM 即时通讯框架
项目地址:https://github.com/all4you/bitchat
快速开始
bitchat-example 模块提供了一个服务端与客户端的实现示例,可以参照该示例进行自己的业务实现。
启动服务端
要启动服务端,需要获取一个 Server 的实例,可以通过 ServerFactory 来获取。
目前只实现了单机模式下的 Server ,通过 SimpleServerFactory 只需要定义一个端口即可获取一个单机的 Server 实例,如下所示:
public class StandaloneServerApplication {public static void main(String[] args) {Server server = SimpleServerFactory.getInstance().newServer(8864);server.start();}
}
服务端启动成功后,将显示如下信息:
启动客户端
目前只实现了直连服务器的客户端,通过 SimpleClientFactory 只需要指定一个 ServerAttr 即可获取一个客户端,然后进行客户端与服务端的连接,如下所示:
public class DirectConnectServerClientApplication {public static void main(String[] args) {Client client = SimpleClientFactory.getInstance().newClient(ServerAttr.getLocalServer(8864));client.connect();doClientBiz(client);}
}
客户端连接上服务端后,将显示如下信息:
体验客户端的功能
目前客户端提供了三种 Func,分别是:登录,查看在线用户列表,发送单聊消息,每种 Func 有不同的命令格式。
登录
通过在客户端中执行以下命令 -lo houyi 123456
即可实现登录,目前用户中心还未实现,通过 Mock 的方式实现一个假的用户服务,所以输入任何的用户名密码都会登录成功,并且会为用户创建一个用户id。
登录成功后,显示如下:
查看在线用户
再启动一个客户端,并且也执行登录,登录成功后,可以执行 -lu
命令,获取在线用户列表,目前用户是保存在内存中,获取的结果如下所示:
发送单聊信息
用 gris 这个用户向 houyi 这个用户发送单聊信息,只要执行 -pc 1 hello,houyi
命令即可
其中第二个参数数要发送消息给那个用户的用户id,第三个参数是消息内容
消息发送方,发送完消息:
消息接收方,接收到消息:
客户端断线重连
客户端和服务端之间维持着心跳,双方都会检查连接是否可用,客户端每隔5s会向服务端发送一个 PingPacket,而服务端接收到这个 PingPacket 之后,会回复一个 PongPacket,这样表示双方都是健康的。
当因为某种原因,服务端没有收到客户端发送的消息,服务端将会把该客户端的连接断开,同样的客户端也会做这样的检查。
当客户端与服务端之间的连接断开之后,将会触发客户端 HealthyChecker 的 channelInactive 方法,从而进行客户端的断线重连。
整体架构
单机版
单机版的架构只涉及到服务端、客户端,另外有两者之间的协议层,如下图所示:
除了服务端和客户端之外,还有三大中心:消息中心,用户中心,链接中心。
消息中心:主要负责消息的存储与历史、离线消息的查询
用户中心:主要负责用户和群组相关的服务
链接中心:主要负责保存客户端的链接,服务端从链接中心获取客户端的链接,向其推送消息
集群版
单机版无法做到高可用,性能与可服务的用户数也有一定的限制,所以需要有可扩展的集群版,集群版在单机版的基础上增加了一个路由层,客户端通过路由层来获得可用的服务端地址,然后与服务端进行通讯,如下图所示:
客户端发送消息给另一个用户,服务端接收到这个请求后,从 Connection中心中获取目标用户“挂”在哪个服务端下,如果在自己名下,那最简单直接将消息推送给目标用户即可,如果在其他服务端,则需要将该请求转交给目标服务端,让目标服务端将消息推送给目标用户。
自定义协议
通过一个自定义协议来实现服务端与客户端之间的通讯,协议中有如下几个字段:
*
* <p>
* The structure of a Packet is like blow:
* +----------+----------+----------------------------+
* | size | value | intro |
* +----------+----------+----------------------------+
* | 1 bytes | 0xBC | magic number |
* | 1 bytes | | serialize algorithm |
* | 4 bytes | | packet symbol |
* | 4 bytes | | content length |
* | ? bytes | | the content |
* +----------+----------+----------------------------+
* </p>
*
每个字段的含义
所占字节 | 用途 |
---|---|
1 | 魔数,默认为 0xBC |
1 | 序列化的算法 |
4 | Packet 的类型 |
4 | Packet 的内容长度 |
? | Packet 的内容 |
序列化算法将会决定该 Packet 在编解码时,使用何种序列化方式。
Packet 的类型将会决定到达服务端的字节流将被反序列化为何种 Packet,也决定了该 Packet 将会被哪个 PacketHandler 进行处理。
内容长度将会解决 Packet 的拆包与粘包问题,服务端在解析字节流时,将会等到字节的长度达到内容的长度时,才进行字节的读取。
除此之外,Packet 中还会存储一个 sync 字段,该字段将指定服务端在处理该 Packet 的数据时是否需要使用异步的业务线程池来处理。
健康检查
服务端与客户端各自维护了一个健康检查的服务,即 Netty 为我们提供的 IdleStateHandler,通过继承该类,并且实现 channelIdle 方法即可实现连接 “空闲” 时的逻辑处理,当出现空闲时,目前我们只关心读空闲,我们既可以认为这条链接出现问题了。
那么只需要在链接出现问题时,将这条链接关闭即可,如下所示:
public class IdleStateChecker extends IdleStateHandler {private static final int DEFAULT_READER_IDLE_TIME = 15;private int readerTime;public IdleStateChecker(int readerIdleTime) {super(readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime, 0, 0, TimeUnit.SECONDS);readerTime = readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime;}@Overrideprotected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {log.warn("[{}] Hasn't read data after {} seconds, will close the channel:{}", IdleStateChecker.class.getSimpleName(), readerTime, ctx.channel());ctx.channel().close();}}
另外,客户端需要额外再维护一个健康检查器,正常情况下他负责定时向服务端发送心跳,当链接的状态变成 inActive 时,该检查器将负责进行重连,如下所示:
public class HealthyChecker extends ChannelInboundHandlerAdapter {private static final int DEFAULT_PING_INTERVAL = 5;private Client client;private int pingInterval;public HealthyChecker(Client client, int pingInterval) {Assert.notNull(client, "client can not be null");this.client = client;this.pingInterval = pingInterval <= 0 ? DEFAULT_PING_INTERVAL : pingInterval;}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);schedulePing(ctx);}private void schedulePing(ChannelHandlerContext ctx) {ctx.executor().schedule(() -> {Channel channel = ctx.channel();if (channel.isActive()) {log.debug("[{}] Send a PingPacket", HealthyChecker.class.getSimpleName());channel.writeAndFlush(new PingPacket());schedulePing(ctx);}}, pingInterval, TimeUnit.SECONDS);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.executor().schedule(() -> {log.info("[{}] Try to reconnecting...", HealthyChecker.class.getSimpleName());client.connect();}, 5, TimeUnit.SECONDS);ctx.fireChannelInactive();}}
业务线程池
我们知道,Netty 中维护着两个 IO 线程池,一个 boss 主要负责链接的建立,另外一个 worker 主要负责链接上的数据读写,我们不应该使用 IO 线程来处理我们的业务,因为这样很可能会对 IO 线程造成阻塞,导致新链接无法及时建立或者数据无法及时读写。
为了解决这个问题,我们需要在业务线程池中来处理我们的业务逻辑,但是这并不是绝对的,如果我们要执行的逻辑很简单,不会造成太大的阻塞,则可以直接在 IO 线程中处理,比如客户端发送一个 Ping 服务端回复一个 Pong,这种情况是没有必要在业务线程池中进行处理的,因为处理完了最终还是要交给 IO 线程去写数据。但是如果一个业务逻辑需要查询数据库或者读取文件,这种操作往往比较耗时间,所以就需要将这些操作封装起来交给业务线程池去处理。
服务端允许客户端在传输的 Packet 中指定采用何种方式进行业务的处理,服务端在将字节流解码成 Packet 之后,会根据 Packet 中的 sync 字段的值,确定怎样对该 Packet 进行处理,如下所示:
public class ServerPacketDispatcher extends SimpleChannelInboundHandler<Packet> {@Overridepublic void channelRead0(ChannelHandlerContext ctx, Packet request) {// if the packet should be handled asyncif (request.getAsync() == AsyncHandle.ASYNC) {EventExecutor channelExecutor = ctx.executor();// create a promisePromise<Packet> promise = new DefaultPromise<>(channelExecutor);// async execute and get a futureFuture<Packet> future = executor.asyncExecute(promise, ctx, request);future.addListener(new GenericFutureListener<Future<Packet>>() {@Overridepublic void operationComplete(Future<Packet> f) throws Exception {if (f.isSuccess()) {Packet response = f.get();writeResponse(ctx, response);}}});} else {// sync execute and get the response packetPacket response = executor.execute(ctx, request);writeResponse(ctx, response);}}
}
不止是IM框架
bitchat 除了可以作为 IM 框架之外,还可以作为一个通用的通讯框架。
Packet 作为通讯的载体,通过继承 AbstractPacket 即可快速实现自己的业务,搭配 PacketHandler 作为数据处理器即可实现客户端与服务端的通讯。
Netty实战:设计一个IM框架就这么简单!相关推荐
- 常用RPC框架及如何设计一个RPC框架
一天学会的Java基础课程,整整300集 拿出来分享给大家!拿走不谢!手把手教学,学会轻松就业_哔哩哔哩_bilibili 常用RPC远程调用框架有哪些? httpclient.grpc.dubbo. ...
- 框架有几层_如何设计一个自动化框架
对于如何设计一个自动化框架之前,首先得清楚什么是自动框架,设计时有哪些是需要注意的,然后该怎么去做? 什么是自动化测试框架? 1.什么是框架? 特指为解决一个开放性问题而设计的具有一定约束性的支撑结构 ...
- 如何设计一个RPC框架?
首先,我们需要知道什么是RPC框架? RPC(Remote Procedure Call)叫作远程过程调用,它是利用网络从远程计算机上请求服务,可以理解为把程序的一部分放在其他远程计算机上执行.通过网 ...
- netty框架_Netty实战:设计一个IM框架
从事Java已经5年,目前在某互联网公司做就Java系统架构师,每天都会写一些技术文章,感兴趣的同事请关注我,谢谢.(需要架构资料私信我) bitchat 是一个基于 Netty 的 IM 即时通讯框 ...
- java im 框架_Netty实战:设计一个IM框架
来源:逅弈逐码 bitchat 是一个基于 Netty 的 IM 即时通讯框架 项目地址: https://github.com/all4you/bitchat 快速开始 bitchat-exampl ...
- 《Netty实战-写一个RPC应用》
一.服务端 1.1.定义rpc服务代理注解 用于标识需要作为远程调用的接口,应用于类上 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUN ...
- 设计一款日历java_用JAVA设计一个表示2013年的简单日历系统。
展开全部 我也 想要 这个 程序...我找了一个,你可62616964757a686964616fe78988e69d8331333332623864以借鉴下, //下面是一个带界面的java日历.可 ...
- 设计一个对银行账户余额操作的简单程序(Java)
假设 账号:123456789 密码:5211314 余额:10000000 进入程序输出选择操作选项: 1.存款 2取款 3.查询余额 要求使用Scanner交互式操作. 存款,取款,查询均需要输入 ...
- Dubbo面试 - 如何自己设计一个类似 Dubbo 的 RPC 框架?
Dubbo面试 - 如何自己设计一个类似 Dubbo 的 RPC 框架? 面试题 如何自己设计一个类似 Dubbo 的 RPC 框架? 面试官心理分析 说实话,就这问题,其实就跟问你如何自己设计一个 ...
- JAVA面试题:你怎么设计一个消息队列?
1 面试题 写一个消息队列,你如何进行架构设计,说一下你的思路! 2 考点分析 一般面试官要考察两块: (1)你有没有对某一个消息队列做过较为深入的原理的了解,或者从整体了解把握住一个mq的架构原理 ...
最新文章
- 计算机教室网络安全应急预案,北京科技大学计算机与通信工程学院-计算机与通信工程学院实验室安全应急预案...
- MT6573驱动开发日志之touchpanel .
- 【深度学习】NetAug(网络增强)—Dropout的反面
- linux cache 内核参数,Linux内核中drop_caches参数
- SSH putty Disconnected: Server protocol violation: unexpected SSH2_MSG_UNIMPLEMENTED packet
- 【linux】linux命令如何查看文件、文件夹的属性,包括大小、修改时间、谁修改的...
- 【前端框架-Vue-基础】$attr及$listeners实现跨多级组件的通信
- python集合的元素可以是_Python集合的元素中,为什么不可以是包含嵌套列表的元组?...
- ucharts 折线 点_ucharts图表引入的两种方式
- java怎么实现查找n功能_java 实现微信搜索附近人功能
- Alibaba秋招前端测试题
- cdr圆形渐变填充怎么设置_适用于平面设计的软件cdr!
- vbScript实现开机后的开心网自动登陆
- 计算时间差 html,计算时间差的公式
- python networkx 边权重_Python/NetworkX:动态计算边权重
- 详解ENet | CPU可以实时的道路分割网络
- C99中的restrict关键字
- 孪生素数 所谓孪生素数指的就是间隔为 2 的相邻素数,它们之间的距离已经近得不能再近了
- Python程序员别秃了,护发防脱发小妙招,收藏吧
- 牛客网刷题之SQL篇:非技术快速入门39T