点击?蓝色“ 深入原理”,关注并“设为星标”

技术干货,第一时间推送

消息无序产生的原因

消息队列,既然是队列就能保证消息在进入队列,以及出队列的时候保证消息的有序性,显然这是在消息的生产端(Producer),但是往往在生产环境中有多个消息的消费端(Consumer),尽管消费端在拉取消息时是有序的,但各个消息由于网络等方面原因无法保证在各个消费端中处理时有序。

1 业务角度分析与解决

场景分析

先后两次修改了商品信息,消息A和消息B先后同步写入MySQL,接着异步写入消息队列中发送消息,此时消息队列生产端(Producer)按时序先后发出了A和B两条消息(消息A先发出,消息B后发出)。按业务逻辑,商品信息的最终状态需要以消息A和消息B综合为准。

看似一个比较常见的同步写数据库,异步发送消息的场景,但实际上需要保证消息的有序消费。

  • 假设1:消息A只包含修改的商品名称,消息B只包含修改的商品重量,此时消息队列的消费端实际上不需要关注消息时序,消息队列消费端(Consumer)只管消费即可。

  • 假设2:消息A包含修改的商品名称、重量,消息B包含修改的商品名称,此时消费端首先接收到消息B,后接收到消息A,那么消息B的修改就会被覆盖。此时消息队列的消费端实际上又需要关注消息时序

可见,你无法保证消息中包含什么信息,此时必须保证消息的有序消费。

业务角度如何保证消息有序消费

  • 生产端在发送消息时,始终保证消息是全量信息。

  • 消费端在接收消息时,通过缓存时间戳的方式,消费消息时判断消息产生的时间是否最新,如果不是则丢弃,如果是则执行下一步。

下面通过伪代码的方式描述:

生产端伪代码

insertWare(ware); #插入数据到数据库,通常在插入数据库时我们只会update修改的字段,而不会全量插入

ware = selectWareById(ware.getId); #获取商品的全量信息(此时是最新的),用于将它放入到消息队列中

syncMq(ware); #异步发送mq消息A

消费端伪代码

ware = fetchWare(); #获取消息

if (isLasted(ware)) #通过商品的修改时间戳判断是否是最新的修改

TODO #执行下一步业务逻辑

else

return #丢弃该消息

重点在于消费端如何判断该消息是否是最新的修改也就是isLasted方法。

isLasted方法

Long modified = getCacheById(ware.getId); #获取缓存中该条商品的最新修改时间

If (ware.getModified > modified) { #如果消息中商品修改时间大于缓存中的时间,说明是最新操作

setCacheById(ware); #将该条消息的商品修改时间戳写入到缓存中

return true;
} else #如果消息中的商品修改时间小于缓存中的时间,说明该条消息属于“历史操作”,不对其更新

return false;

以上就是通过伪代码的方式,描述如何通过业务手段保证消息有序消费,重点在于全量发送信息和缓存时间戳。在其中还有一些技术实现细节。

例如:消费端消费消息B,执行到获取时间戳缓存之后,并在重新设置新的缓存之前,此时另一个消费端恰好也正在消费B它也正执行到获取时间戳缓存,由于消息A此时并没有更新缓存,消息A拿到的缓存仍然是旧的缓存,这时就会存在两个消费端都认为自己所消费的消息时最新的,造成该丢弃的消息没丢。

显然,这是分布式线程安全问题,分布式锁通常使用Redis或者ZooKeeper,加锁后的执行时序如下图所示。

这是从业务角度保证消息在消费端有序消费。通过在消息发送端全量发送消息以及在消息消费端缓存时间戳就可以保证消息的有序消费。

在上述场景中是先同步写入MySQL,再获取商品全量数据,接着再异步发送消息。这一系列的步骤可以通过接MySQL的binlog实现,在同步写入MySQL后,MySQL发送binlog变更,通过阿里巴巴Canal中间件接收MySQL的binlog变更再发送消息到消息队列。

2 消息中间件角度分析与解决

  • RabbitMQ:一个 queue,多个 consumer。比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者2先执行完操作,把 data2 存入数据库,然后是 data1/data3。这不明显乱了。

  • Kafka:比如说我们建了一个 topic,有三个 partition。生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。

    消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。接着,我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。

解决方案

RabbitMQ

拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer ,按顺序分发给底层不同的 worker 来处理。

Kafka

  • 一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。

  • 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。

其实两种消息中间件的第二种解决方式都是利用了Reactor模式。采用一个接收线程,然后根据消息的业务特性(比如根据某字段),相同特性的消息进入同一个队列,然后使用工作线程来依次处理。

3 Reactor模式

单线程Reactor模型

单线程的Reactor模式对于客户端的所有请求使用一个专门的线程去处理,这个线程无限循环地监听是否有客户端的请求抵达,一旦收到客户端的请求,就将其分发给响应处理程序进行处理。

采用基于事件驱动的设计,当有事件触发时才会调用处理器进行数据处理。使用Reactor模式可以对线程的数量进行控制,可以使用一个线程去处理大量的事件。

-Reactor 负责响应IO事件,当检测到一个新的事件会将其发送给相应的处理程序去处理。

  • Handler 负责处理非阻塞的行为,标识系统管理的资源,同时将处理程序与事件绑定。

Reactor是单个线程,需要处理accept连接,同时发送请求到处理器中。由于只是单个线程,所以处理器中的业务需要能够快速处理完毕。

单线程的Reactor与NIO流程类似,只是将消息相关处理独立到Handler中。虽然NIO中一个线程可以支持所有的IO处理,但瓶颈也是显而易见的。如果某个客户端多次进行请求时在Handler中的处理速度较慢,那么后续的客户端请求都会被积压,导致响应变慢。所以需要引入Reactor多线程模型。

单线程的Reactor的特点是只有一个Reactor线程,也就是说只有一个Selector事件通知器,因此字节的读取I/O和后续的业务处理process()均由Reactor线程来做,很显然业务的处理影响后续事件的分发,所以引出多线程版本进行优化。

从性能角度来看,单线程的Reactor没有过多的提升空间,因为IO和CPU的速度严重不匹配。

单线程的Reactor模式并没有解决IO和CPU处理速度不匹配问题,所以多线程的Reactor模式引入了线程池的概念,将耗时的IO操作交由线程池处理,处理完毕后再同步到selectionkey中。

多线程Reactor模型

考虑到工作线程的复用,可以将工作线程设计线程池。将处理器的执行放入线程池,并使用多线程处理业务逻辑,Reactor仍然是单个线程。

Reactor读线程模型是将Handler中的IO操作和非IO操作分开,操作IO的线程称为IO线程,非IO操作的线程称为工作线程。客户端的请求会被直接丢到线程池中,因此不会发生堵塞。

多线程的Reactor的特点是一个Reactor线程和多个处理线程,将业务处理即process交给线程池进行了分离,Reactor线程只关注事件分发和字节的发送和读取。需要注意的是,实际的发送和读取还是由Reactor来处理。当在高并发环境下,有可能会出现连接来不及接收。

当用户进一步增加时Reactor也会出现瓶颈,因为Reactor既要处理IO操作请求也要响应连接请求。为了分担Reactor的负担,可以引入主从Reactor模型。

主从Reactor模型

对于多个CPU的机器,为了充分利用系统资源会将Reactor拆分为两部分。

  • Main Reactor 负责监听连接,将accept连接交给Sub Reactor处理,主Reactor用于响应连接请求。

  • Sub Reactor 处理accept连接,从Reactor用于处理IO操作请求。

现在主流通信框架中的 NIO 通信框架都是基于主从 Reactor 线程模型来实现的。在这个模型中,Acceptor 不再是一个单独的 NIO 线程,而是一个线程池。Acceptor 接收到客户端的 TCP 连接请求,建立连接之后,后续的 I/O 操作将交给 Worker I/O 线程

为什么需要单独拆分一个Reactor来处理监听呢?

因为像TCP这样需要经过3次握手才能建立连接,这个建立的过程也是需要消耗时间和资源的,单独拆分一个Reactor来处理,可以提高性能。

优缺点

Reactor模式的核心是解决多请求问题,如果有特别多的请求同时发生,不会因为线程池被短时间占满而拒绝服务。一般实现多请求的模块,会采用线程池的实现方案,这种方案对于并发量不是特别大的场景是足够用的,比如单机TPS 1000以下都是够用的。

线程池方案的最大缺点是:如果瞬间有大并发,则会一下子耗满线程,整个服务将会陷入阻塞中,后续请求无法介入。基于Reactor模式实现的方案,会有一个Dispatcher先接收事件event,然后快速分发给相应的耗时eventHandler处理器去处理,这样就不会阻塞请求的接收。

Reactor模式的优点是什么呢?

  • 响应快,不为单个同步时间所阻塞,虽然Reactor自身依然是同步的。

  • 编程相对简单,可以最大程度的避免复杂的多线程以及同步问题和多线程以及多进程的切换开销。

  • 可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源。

  • 可复用性, Reactor框架本身与具体事件处理逻辑无关,具有很高的复用性。

Reactor模式的缺点是什么呢?

  • 相比传统的模型,Reactor增加了一定的复杂性,因而具有一定的门槛,并且不易于调试。

  • Reactor模式需要底层的Synchronous Event Demultiplexer支持,比如Java中的Selector支持,操作系统的select系统调用支持。

  • Reactor模式在IO读写数据时会在同一线程中实现,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间。例如在大文件传输时,IO操作会影响其他客户端的时间,因而对于这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或者采用Proactor模式。

结构

Reactor中的核心组件有哪些呢?

  • Reactor
    IO事件的派发者,相当于有分发功能的Selector

  • Acceptor
    接收客户端连接并建立对应客户端的Handler,向Reactor注册此Handler。相当于NIO中建立连接的那个判断分支。

  • Handler
    和一个客户端通讯的实体,一般在基础的Handler上会有更进一步的层次划分,用来抽象诸如decodeprocessencode这些过程。相当于消息读写处理等操作类。

在Reactor模式中有五个关键的参与者:描述符handle、同步事件分离器demultiplexer、事件处理器接口event handler、具体的事件处理器、Reactor管理器

Reactor的结构

Reactor网路编程设计模式

Reactor模式要求主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,如果有的话立即将该事件通知给工作线程(逻辑单元)。除此之外,主线程不做任何其它实质性的工作。读写数据、接收新连接、处理客户端请求均在工作线程中完成。

参考

https://www.jianshu.com/p/458e4b276607https://www.cnblogs.com/yulinfeng/p/11254925.html

-深入原理-

   知其然并知其所以然    

videojs如何获取请求消息_消息队列中,如何保证消息的顺序性?相关推荐

  1. videojs如何获取请求消息_中通消息平台 Kafka 顺序消费线程模型的实践与优化

    各类消息中间件对顺序消息实现的做法是将具有顺序性的一类消息发往相同的主题分区中,只需要将这类消息设置相同的 Key 即可,而 Kafka 会在任意时刻保证一个消费组同时只能有一个消费者监听消费,因此可 ...

  2. 消息队列面试 - 如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

    消息队列面试 - 如何保证消息不被重复消费? 面试题 如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性? 面试官心理分析 其实这是很常见的一个问题,这俩问题基本可以连起来问.既然是消费消息, ...

  3. 消息队列面试 - 如何保证消息的顺序性?

    消息队列面试 - 如何保证消息的顺序性? 面试题 如何保证消息的顺序性? 面试官心理分析 其实这个也是用 MQ 的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的 ...

  4. 消息队列面试 - 如何保证消息的可靠性传输?

    消息队列面试 - 如何保证消息的可靠性传输? 面试题 如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题? 面试官心理分析 这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条 ...

  5. linux查看消息队列的状态,linux – 如何知道某个时间点在消息队列中收到的消息数...

    我有一个实现,其中硬件每秒通过DMA传输将300个数据包发送到主机应用程序,然后发送到主机应用程序的消息队列. 当数据包以高速率发送到应用程序时,我看到应用程序没有收到这样的一个或两个数据包.当我使用 ...

  6. linux qos mq,Pika + RabbitMQ:将basic_qos设置为prefetch = 1仍然会消耗队列中的所有消息...

    我有一个python工作者客户端,它可以让10个工人分别挂接到一个RabbitMQ队列中.有点像这样: #!/usr/bin/python worker_count=10 def mqworker(q ...

  7. IM系统中如何保证消息的可靠投递(即QoS机制)(转)

    消息的可靠性,即消息的不丢失和不重复,是im系统中的一个难点.当初qq在技术上(当时叫oicq)因为以下两点原因才打败了icq: 1)qq的消息投递可靠(消息不丢失,不重复) 2)qq的垃圾消息少(它 ...

  8. 如何保证消息不被重复消费~~~~~(如何保证消息队列的幂等性)

    分析:这个问题其实换一种问法就是,如何保证消息队列的幂等性?这个问题可以认为是消息队列领域的基本问题.换句话来说,是在考察你的设计能力,这个问题的回答可以根据具体的业务场景来答,没有固定的答案. 回答 ...

  9. 公众号 接收规则 消息_微信公众平台 发送模板消息(Java接口开发)

    前言:最近一直再弄微信扫码推送图文消息和模板消息发送,感觉学习到了不少东西.今天先总结一下微信公众平台模板消息的发送.因为这个自己弄了很久,开始很多地方不明白,所以今天好好总结一下. 微信公众平台技术 ...

最新文章

  1. RS232与RS485的功能与区别!
  2. Idea使用Lombok简化实体类代码
  3. Tensorflow 2.0.0-alpha 安装 Linux系统
  4. Web 设计中的 5 个最具争议性的话题
  5. Machine Learning - Andrew Ng on Coursera (Week 6)
  6. std的find和reverse_iterator联合使用
  7. c语言中语句作用,学习C语言的用途~
  8. HDU - 6183 Color it 2017广西邀请赛(线段树)
  9. docker stop
  10. dell 重装linux系统_U盘装系统开机按哪个键
  11. js 读取json文件_JavaScript 项目中常见配置文件介绍
  12. Eclipse快捷键之搜索
  13. STM32程序中使用printf打印中文字符乱码
  14. linux运维故障案列,linux 运维故障排查思路
  15. UG NX 12 内部草图和外部草图的区别
  16. 办公室多显示器共享主机解决方案
  17. Word删除表格后空白页的方法
  18. 什么叫结构化程序设计?它的主要内容是什么
  19. 有管网气体消防系统小知识来啦
  20. 斐波那契数列_详解(C语言)

热门文章

  1. java发送接收UDP数据包:字符串,byte[]字节数组,文件等
  2. c++字符串string操作全解
  3. 沈逸老师PHP魔鬼特训笔记(3)
  4. scp实现mac与linux服务器之间文件传输
  5. Scrum Meeting day 2
  6. elasticsearch集群搭建实例
  7. Visual C++中的异常处理浅析[轉]
  8. [转载] python异常和错误有什么区别_python的错误和异常
  9. [转载] java中final,finally,finalize三者的作用和区别
  10. Spring浅入浅出——不吹牛逼不装逼