消息通知系统详解1—通讯方式
消息通知系统详解2—后端设计
消息通知系统详解3—Netty
消息通知系统详解4—整合Netty和WebSocket

目录

  • 整体设计
    • 上线登录后向系统索取
    • 在线时系统向接收者主动推送
  • Rabbitmq搭建
  • IO编程
    • 传统IO编程
    • NIO编程

上个小节,我们讲到前后端通讯方式选型,那这节我们介绍下后端架构如何去设计?

整体设计

用户获取新的消息通知有两种模式

  • 上线登录后向系统主动索取
  • 在线时系统向接收者主动推送新消息

设想下,用户的通知消息和新通知提醒数据都放在数据库中,数据库的读写操作频繁。如果消息量大,DB压力较大,可能出现数据瓶颈。

这边按照上述两种模式,拆分下设计:

上线登录后向系统索取

此模式是接受者请求系统,系统将新的消息通知返回给接收者的模式,流程如下:

  1. 接收者向服务端netty请求
  2. WebSocket连接Netty服务把连接放到自己的连接池中
  3. Netty根据接受者信息向RabbitMQ查询消息
  4. 如果有新消息,返回新消息通知
  5. 使用WebSocket连接向,接收者返回新消息的数量

在线时系统向接收者主动推送

此模式是系统将新的消息通知返回给接收者的模式,流程如下:

  1. RabbitMQ将新消息数据推送给Netty
  2. Netty从连接池中取出接收者的WebSocket连接
  3. Netty通过接收者的WebSocket连接返回新消息的数量

Rabbitmq搭建

在虚拟机中启动RabbitMQ

docker run -id --name=tensquare_rabbit -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15672:15672 -p 25672:25672 rabbitmq:management

访问地址:http://192.168.200.128:15672

登录账号: guest

登录密码: guest

IO编程

上面提到了Netty,在开始了解Netty之前,先来实现一个客户端与服务端通信的程序,使用传统的IO编程和使用NIO编程有什么不一样。

传统IO编程

每个客户端连接过来后,服务端都会启动一个线程去处理该客户端的请求。阻塞I/O的通信模型示意图如下:

业务场景:客户端每隔两秒发送字符串给服务端,服务端收到之后打印到控制台。

public class IOServer {public static void main(String[] args) throws Exception {​ServerSocket serverSocket = new ServerSocket(8000);
​while (true) {// (1) 阻塞方法获取新的连接Socket socket = serverSocket.accept();
​new Thread() {@Overridepublic void run() {String name = Thread.currentThread().getName();
​try {// (2) 每一个新的连接都创建一个线程,负责读取数据byte[] data = new byte[1024];InputStream inputStream = socket.getInputStream();while (true) {int len;// (3) 按字节流方式读取数据while ((len = inputStream.read(data)) != -1) {System.out.println("线程" + name + ":" + new String(data, 0, len));}}} catch (Exception e) {}}}.start();}}
}

客户端实现:

public class MyClient {​public static void main(String[] args) {//测试使用不同的线程数进行访问for (int i = 0; i < 5; i++) {new ClientDemo().start();}}
​static class ClientDemo extends Thread {@Overridepublic void run() {try {Socket socket = new Socket("127.0.0.1", 8000);while (true) {socket.getOutputStream().write(("测试数据").getBytes());socket.getOutputStream().flush();Thread.sleep(2000);}} catch (Exception e) {}}}
}

从服务端代码中我们可以看到,在传统的IO模型中,每个连接创建成功之后都需要一个线程来维护,每个线程包含一个while死循环。

如果在用户数量较少的情况下运行是没有问题的,但是对于用户数量比较多的业务来说,服务端可能需要支撑成千上万的连接,IO模型可能就不太合适了。
如果有1万个连接就对应1万个线程,继而1万个while死循环,这种模型存在以下问题:

  • 当客户端越多,就会创建越多的处理线程。线程是操作系统中非常宝贵的资源,同一时刻有大量的线程处于阻塞状态是非常严重的资源浪费。并且如果务器遭遇洪峰流量冲击,例如双十一活动,线程池会瞬间被耗尽,导致服务器瘫痪。
  • 因为是阻塞式通信,线程爆炸之后操作系统频繁进行线程切换,应用性能急剧下降。
  • IO编程中数据读写是以字节流为单位,效率不高。

NIO编程

NIO,也叫做new-IO或者non-blocking-IO,可理解为非阻塞IO。NIO编程模型中,新来一个连接不再创建一个新的线程,而是可以把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责,我们用一幅图来对比一下IO与NIO:

如上图所示,IO模型中,一个连接都会创建一个线程,对应一个while死循环,死循环的目的就是不断监测这条连接上是否有数据可以读。但是在大多数情况下,1万个连接里面同一时刻只有少量的连接有数据可读,因此,很多个while死循环都白白浪费掉了,因为没有数据。

而在NIO模型中,可以把这么多的while死循环变成一个死循环,这个死循环由一个线程控制。这就是NIO模型中选择器(Selector)的作用,一条连接来了之后,现在不创建一个while死循环去监听是否有数据可读了,而是直接把这条连接注册到选择器上,通过检查这个选择器,就可以批量监测出有数据可读的连接,进而读取数据。

NIO的三大核心组件:通道(Channel)、缓冲(Buffer)、选择器(Selector)

通道(Channel)

是传统IO中的Stream(流)的升级版。Stream是单向的、读写分离(inputstream和outputstream),Channel是双向的,既可以进行读操作,又可以进行写操作。

缓冲(Buffer)

Buffer可以理解为一块内存区域,可以写入数据,并且在之后读取它。

选择器(Selector)

选择器(Selector)可以实现一个单独的线程来监控多个注册在她上面的信道(Channel),通过一定的选择机制,实现多路复用的效果。

NIO相对于IO的优势:

IO是面向流的,每次都是从操作系统底层一个字节一个字节地读取数据,并且数据只能从一端读取到另一端,不能前后移动流中的数据。NIO则是面向缓冲区的,每次可以从这个缓冲区里面读取一块的数据,并且可以在需要时在缓冲区中前后移动。
IO是阻塞的,这意味着,当一个线程读取数据或写数据时,该线程被阻塞,直到有一些数据被读取,或数据完全写入,在此期间该线程不能干其他任何事情。而NIO是非阻塞的,不需要一直等待操作完成才能干其他事情,而是在等待的过程中可以同时去做别的事情,所以能最大限度地使用服务器的资源。
NIO引入了IO多路复用器selector。selector是一个提供channel注册服务的线程,可以同时对接多个Channel,并在线程池中为channel适配、选择合适的线程来处理channel。由于NIO模型中线程数量大大降低,线程切换效率因此也大幅度提高。
和前面一样的场景,使用NIO实现(复制代码演示效果即可):

public class NIOServer {public static void main(String[] args) throws IOException {// 负责轮询是否有新的连接Selector serverSelector = Selector.open();// 负责轮询处理连接中的数据Selector clientSelector = Selector.open();
​new Thread() {@Overridepublic void run() {try {// 对应IO编程中服务端启动ServerSocketChannel listenerChannel = ServerSocketChannel.open();listenerChannel.socket().bind(new InetSocketAddress(8000));listenerChannel.configureBlocking(false);// OP_ACCEPT表示服务器监听到了客户连接,服务器可以接收这个连接了listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
​while (true) {// 监测是否有新的连接,这里的1指的是阻塞的时间为1msif (serverSelector.select(1) > 0) {Set<SelectionKey> set = serverSelector.selectedKeys();Iterator<SelectionKey> keyIterator = set.iterator();
​while (keyIterator.hasNext()) {SelectionKey key = keyIterator.next();
​if (key.isAcceptable()) {try {// (1) 每来一个新连接,不需要创建一个线程,而是直接注册到clientSelectorSocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();clientChannel.configureBlocking(false);// OP_READ表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)clientChannel.register(clientSelector, SelectionKey.OP_READ);} finally {keyIterator.remove();}}}}}} catch (IOException ignored) {}}}.start();
​
​new Thread() {@Overridepublic void run() {String name = Thread.currentThread().getName();try {while (true) {// (2) 批量轮询是否有哪些连接有数据可读,这里的1指的是阻塞的时间为1msif (clientSelector.select(1) > 0) {Set<SelectionKey> set = clientSelector.selectedKeys();Iterator<SelectionKey> keyIterator = set.iterator();
​while (keyIterator.hasNext()) {SelectionKey key = keyIterator.next();
​if (key.isReadable()) {try {SocketChannel clientChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);// (3) 读取数据以块为单位批量读取clientChannel.read(byteBuffer);byteBuffer.flip();System.out.println("线程" + name + ":" + Charset.defaultCharset().newDecoder().decode(byteBuffer).toString());} finally {keyIterator.remove();key.interestOps(SelectionKey.OP_READ);}}}}}} catch (IOException ignored) {}}}.start();}
}

消息通知系统详解2---后端设计相关推荐

  1. 消息通知系统详解1---通讯方式

    消息通知系统详解1-通讯方式 消息通知系统详解2-后端设计 消息通知系统详解3-Netty 消息通知系统详解4-整合Netty和WebSocket 目录 什么是消息通知系统 系统特性 通讯方式 短连接 ...

  2. UE4 输入系统详解一、 UE4如何获取win系统输入消息

    UE4 输入系统详解一. UE4如何获取win系统输入消息 UE4版本:4.253 按键输入 1.当我们按下键盘时输入时,FEngineLoop::Tick()里的每个tick执行的PumpMessa ...

  3. 如何设计一个公司级别的消息通知系统?

    实际场景 早上买早点,扫码下单,用户在微信中会收到下单成功的服务通知. 扫码出地铁后,手机会收到APP支付通知. 微信.支付宝.刷卡消费后,手机会收到短信通知. 在海底捞吃完火锅,扫结账小票上的开票二 ...

  4. Web 实时消息推送详解

    title: Web 实时消息推送详解 category: 系统设计 head: meta name: keywords content: 消息推送,短轮询,长轮询,SSE,Websocket,MQT ...

  5. DELPHI 中 Window 消息大全使用详解

    Window 消息大全使用详解 导读: Delphi是Borland公司的一种面向对象的可视化软件开发工具. Delphi集中了Visual C++和Visual Basic两者的优点:容易上手.功能 ...

  6. Window 消息大全使用详解(无聊没事做)

    Window 消息大全使用详解(无聊没事做) 楼主zhangqu_980371(能坚持一辈子的东西太少)2004-12-19 16:35:23 在 VC/MFC / 基础类 提问     消息,就是指 ...

  7. 消息队列超详解(以RabbitMQ和Kafka为例,为何使用消息队列、优缺点、高可用性、问题解决)

    消息队列超详解(以RabbitMQ和Kafka为例) 为什么要用消息队列这个东西? 先说一下消息队列的常见使用场景吧,其实场景有很多,但是比较核心的有3个:解耦.异步.削峰. 解耦:现场画个图来说明一 ...

  8. Redis数据库教程——系统详解学习Redis全过程

    Redis数据库教程--系统详解学习Redis全过程 Redis快速入门:Key-Value存储系统简介 Key-Value存储系统:     Key-Value Store是当下比较流行的话题,尤其 ...

  9. ASP.NET MVC Controller激活系统详解:默认实现

    Controller激活系统最终通过注册的ControllerFactory创建相应的Conroller对象,如果没有对ControllerFactory类型或者类型进行显式注册(通过调用当前Cont ...

最新文章

  1. d3d导致cairo不正常
  2. 深入浅出SQL Server Replication第一篇:走近Replication(上)
  3. 【caffe】windows下caffe+vs2013+python2.7+cuda8.0+cmake3.8编译与配置
  4. 既可输入又可选择的组件
  5. 电子工程可以报考二建_毕业证上财务管理专业,可以报考二建吗?
  6. 今天网络又出问题了,现在的问题变成原IP地址不可用
  7. 和gdi绘图效率比较_堪称效率神器!5款日常插件分享,错过哪一个都无比遗憾...
  8. 《嵌入式C编程:PIC单片机和C编程技术与应用》一1.2 注释
  9. mysql timestamp _mysql之TIMESTAMP(时间戳)用法详解
  10. C# static变量、const变量、readonly变量
  11. 分布式文件系统_新一代分布式文件系统XGFS揭秘——元数据服务
  12. Win10系统设置为英文
  13. 关于举办“2019 年全国传智杯 IT 技能大赛”的通知
  14. 游戏战斗音效制作技巧
  15. RL(Chapter 5): Monte Carlo Methods (MC) (蒙特卡洛方法)
  16. jmeter性能测试各个方法介绍
  17. Matlab用三种格式来表示日期与时间
  18. 解决 tabby 登录远程时报错:Handshake failed: no matching key exchange algorithm
  19. 【C++】迭代器、反向迭代器详解
  20. APP地推心得:可复制的APP地推方案

热门文章

  1. ViewPager2和Fragment的组合使用
  2. 微信小程序-从零开始制作一个跑步微信小程序 1
  3. 幸运盒子幸运砸金蛋微信盲盒游戏源码
  4. 利用D3.js快速绘制力导向图
  5. 谷歌翻译 google translation API github开源 实践
  6. AT24C02软件设计与应用
  7. Statistical Phrase-Based Translation_2003_Koehn【SMT】
  8. 小米手机 已安装了存在签名冲突的应用
  9. 小米电视6和小米电视6至尊版区别
  10. 什么是通达信接口函数