使用MQ的时候,经常会有按顺序消费的需求,比如大数据团队为了做数据分析,会把数据库里数据同步到其他系统做一些数据统计分析。同步MySQL的时候,为了保证数据同步的实时性,会在中间加一个MQ,多个线程来消费MQ里的数据。

这种同步一般是读取binlog数据,你在MySQL里增改删了数据,对应出来就是3条增改删binlog日志发送到MQ里面,消费的时候肯定必须要按照增改删的顺序执行。如果你换成删除、修改、增加,就导致数据乱套了。

图1 binlog同步

我们以kafka举例,看下哪些环节会出现数据顺序不一致情况,又怎么解决。

假设kafka分配了3个partition,kafka的一个特性就是,能保证写入一个partition中的数据一定是有顺序的。

生产者写的时候,可以指定一个key,比如是订单id作为key,这个id对应的数据一定会写到同一个partition中去,而且这个partition中的数据都是有顺序的。

图2 kafka partition

kafka的消费者开始消费partition中的数据,一个消费消费一个partition,一个partition只能被一个消费者消费,不会出现一个消费者同时消费多个partition的情况。假如现在有3个partition,你启动4个消费者,那么就会有一个消费者消费不到数据。

图3 一个消费者消费一个partition

到目前为止,每个消费者消费到的数据都是有顺序性的。但消费者内部如果是单线程的,效率就会比较低,如果生产者写入kafka的数据量比较大,消费不及时,就会出现消息堆积的情况,所以消费者需要多线程的方式运行。

假如消费者里启动了3个线程,并发的来消费数据,线程之间如果不做同步控制,还是会导致数据乱掉。

图4 消费者多线程消费MQ

那如何保证kafka消费者多线程按顺序消费数据呢?

多个线程不能直接拿数据去处理,此时我们可以在同步系统中搞多个内存队列,消费者拿到数据之后,根据每条数据的key做hash取模,把相同id的数据分配到同一个内存队列中去。

每个内存队列里的数据都是有顺序性的,给每个内存队列都对应一个线程,去消费内存队列中的数据。

假如有3条增改删的数据,都是对同一个id的处理,那么hash取模后就会写入到同一个内存队列里去,由同一个线程去消费,然后按顺序写入数据库中。

如果消费者按照单线程消费处理,一条数据耗费几十毫秒,1秒钟只能处理十几条数据,吞吐量就会非常低。如果开启多线程的方式处理,就会几倍的提高吞吐量,同时也保证了数据的顺序性。

整个流程按这样的设计方案来处理,就可以保证数据的顺序性。

有道无术,术可成;有术无道,止于术

欢迎大家关注Java之道公众号

好文章,我在看❤️

如何保证消息队列里的数据顺序执行?相关推荐

  1. rocketmq怎么保证消息一致性_如何保证消息队列的高可用和幂等性以及数据丢失,顺序一致性...

    (1)RabbitMQ的高可用性 RabbitMQ是比较有代表性的,因为是基于主从做高可用性的,我们就以他为例子讲解第一种MQ的高可用性怎么实现. rabbitmq有三种模式:单机模式,普通集群模式, ...

  2. 如何保证 Redis 消息队列中的数据不丢失?

    Redis 最常见的业务场景就是缓存读取与存储,而随着时间的推移,有人开始将它作为消息队列来使用了,并且随着 Redis 版本的发展,在 Redis.2.0.0 中新增了发布订阅模式(Pub/Sub) ...

  3. 【重难点】【RabbitMQ 02】如何避免消息重复投递和消息重复消费、如何防止消息丢失、如何保证消息的顺序性、如何保证消息队列的可用性

    [重难点][RabbitMQ 02]如何避免消息重复投递和消息重复消费.如何防止消息丢失.如何保证消息的顺序性.如何保证消息队列的可用性 文章目录 [重难点][RabbitMQ 02]如何避免消息重复 ...

  4. 如何保证消息不被重复消费~~~~~(如何保证消息队列的幂等性)

    分析:这个问题其实换一种问法就是,如何保证消息队列的幂等性?这个问题可以认为是消息队列领域的基本问题.换句话来说,是在考察你的设计能力,这个问题的回答可以根据具体的业务场景来答,没有固定的答案. 回答 ...

  5. 消息队列面试 - 如何保证消息队列的高可用?

    面试题 如何保证消息队列的高可用? 面试官心理分析 如果有人问到你 MQ 的知识,高可用是必问的.上一讲提到,MQ 会导致系统可用性降低.所以只要你用了 MQ,接下来问的一些要点肯定就是围绕着 MQ ...

  6. rocketmq 如何保证高可用_如何保证消息队列是高可用的

    为什么写这篇文章? 博主有两位朋友分别是小A和小B: 小A,工作于传统软件行业(某社保局的软件外包公司),每天工作内容就是和产品聊聊需求,改改业务逻辑.再不然就是和运营聊聊天,写几个SQL,生成下报表 ...

  7. RTX5 | 消息队列03 - 获取消息队列里消息的数量,并一次性提取出来

    文章目录 一.前言 二.实验目的 三.API 3.1.osMessageQueueGetCount 四.代码 4.1.main.h 4.2.main.c 五.Event Recorder调试 5.1. ...

  8. Flink使用KafkaSource从Kafka消息队列中读取数据

    Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...

  9. java如何保证mq一定被消费_消费端如何保证消息队列MQ的有序消费

    消息无序产生的原因 消息队列,既然是队列就能保证消息在进入队列,以及出队列的时候保证消息的有序性,显然这是在消息的生产端(Producer),但是往往在生产环境中有多个消息的消费端(Consumer) ...

最新文章

  1. swift Array 数组
  2. oracle 导入excel时间格式,将.xls或者.excel格式的数据导入到Oracle中
  3. 面向大数据的异构内存系统
  4. cocos2d-x 如何使用Visual Studio 2010和xcode 4混合编写手机游戏
  5. php 去掉后导字符,PHP去除字符串最后一个字符的三种方法实例
  6. 中科院信工所经验_在中科院信工所读研是一种怎样的体验?
  7. devexpress控件使用笔记
  8. 当管理遇上“人情”,你会怎么做?
  9. native链接mysql报错_连接报错'mysql_native_password'
  10. 一个非常强大的视频、音频、二维码、图片、压缩等在线网址
  11. Android 利用重力感应调整手机模式
  12. 智道分析吊瓜子的营养价值
  13. php 细表格,使用PHP轻松地创建一个表格 - 小俊学习网
  14. android:exported、enabled属性
  15. Firefox(火狐)好用的插件
  16. arduinopn532模块_Arduino 开源 NFC近场通讯模块 PN532
  17. 2007年中国地方门户网站市场规模达6.1亿元
  18. 多模式MaaS仅仅适用于大城市吗?
  19. 岁岁年年人不同——LVS2019多媒体会议见闻(一)
  20. 河北大学计算机科学与技术专业硬件实训——C语言控制51单片机实现LED小灯的控制

热门文章

  1. 11相机不流畅_小米11有望本月发布,小米10退位让贤,256GB版本跌至3799
  2. java的位置_Java中数据存放的位置
  3. mysql中视图和表的区别及联系_MySQL中Update、select联用操作单表、多表,及视图与临时表的区别...
  4. linux手动调节屏幕亮度命令
  5. 什么是压栈操作?指令:PUSH src ;src为16位操作数
  6. 计算机网络之物理层:1、接口特性、同步异步、串行并行、双工
  7. 【专栏必读】王道考研408计算机组成原理万字笔记和题目题型总结(从学生角度辅助大家理解):各章节导航及思维导图
  8. 1389. 按既定顺序创建目标数组
  9. Anaconda简介以及安装
  10. TCP/IP四层模型及各层协议首部详述(包含IOS7层)