上两篇文章都在讨论顺序消息的一些知识,看到有个读者的留言如下:

这个问题问得非常棒,由于在之前的文章中并没有提及到,因此我在这篇文章中单独讲解,本文将从消费顺序性这个问题出发,深度剖析 Kafka/RocketMQ 消费线程模型。

Kafka

kafka 的消费类 KafkaConsumer 是非线程安全的,因此用户无法在多线程中共享一个 KafkaConsumer 实例,且 KafkaConsumer 本身并没有实现多线程消费逻辑,如需多线程消费,还需要用户自行实现,在这里我会讲到 Kafka 两种多线程消费模型。

1、每个线程维护一个 KafkaConsumer

这样相当于一个进程内拥有多个消费者,也可以说消费组内成员是有多个线程内的 KafkaConsumer 组成的。

但其实这个消费模型是存在很大问题的,从消费消费模型可看出每个 KafkaConsumer 会负责固定的分区,因此无法提升单个分区的消费能力,如果一个主题分区数量很多,只能通过增加 KafkaConsumer 实例提高消费能力,这样一来线程数量过多,导致项目 Socket 连接开销巨大,项目中一般不用该线程模型去消费。

2、单 KafkaConsumer 实例 + 多 worker 线程

针对第一个线程模型的缺点,我们可采取 KafkaConsumer 实例与消息消费逻辑解耦,把消息消费逻辑放入单独的线程中去处理,线程模型如下:

从消费线程模型可看出,当 KafkaConsumer 实例与消息消费逻辑解耦后,我们不需要创建多个 KafkaConsumer 实例就可进行多线程消费,还可根据消费的负载情况动态调整 worker 线程,具有很强的独立扩展性,在公司内部使用的多线程消费模型就是用的单 KafkaConsumer 实例 + 多 worker 线程模型。

但这个消费模型由于消费逻辑是利用多线程进行消费的,因此并不能保证其消息的消费顺序,在这里我们可以引入阻塞队列的模型,一个 woker 线程对应一个阻塞队列,线程不断轮训从阻塞队列中获取消息进行消费,对具有相同 key 的消息进行取模,并放入相同的队列中,实现顺序消费, 消费模型如下:

但是以上两个消费线程模型,存在一个问题:

在消费过程中,如果 Kafka 消费组发生重平衡,此时的分区被分配给其它消费组了,如果拉取回来的消息没有被消费,虽然 Kakfa 可以实现 ConsumerRebalanceListener 接口,在新一轮重平衡前主动提交消费偏移量,但这貌似解决不了未消费的消息被打乱顺序的可能性?

因此在消费前,还需要主动进行判断此分区是否被分配给其它消费者处理,并且还需要锁定该分区在消费当中不能被分配到其它消费者中(但 kafka 目前做不到这一点)。

参考 RocketMQ 的做法:

在消费前主动调用 ProcessQueue#isDropped 方法判断队列是否已过期,并且对该队列进行加锁处理(向 broker 端请求该队列加锁)。

RocketMQ

RocketMQ 不像 Kafka 那么“原生”,RocketMQ 早已为你准备好了你的需求,它本身的消费模型就是单 consumer 实例 + 多 worker 线程模型,有兴趣的小伙伴可以从以下方法观摩 RocketMQ 的消费逻辑:

org.apache.rocketmq.client.impl.consumer.PullMessageService#run

RocketMQ 会为每个队列分配一个 PullRequest,并将其放入 pullRequestQueue,PullMessageService 线程会不断轮询从 pullRequestQueue 中取出 PullRequest 去拉取消息,接着将拉取到的消息给到 ConsumeMessageService 处理,ConsumeMessageService 有两个子接口:

// 并发消息消费逻辑实现类

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;

// 顺序消息消费逻辑实现类

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;

其中,ConsumeMessageConcurrentlyService 内部有一个线程池,用于并发消费,同样地,如果需要顺序消费,那么 RocketMQ 提供了 ConsumeMessageOrderlyService 类进行顺序消息消费处理。

经过对 Kafka 消费线程模型的思考之后,从 ConsumeMessageOrderlyService 源码中能够看出 RocketMQ 能够实现局部消费顺序,我认为主要有以下两点:

1)RocketMQ 会为每个消息队列建一个对象锁,这样只要线程池中有该消息队列在处理,则需等待处理完才能进行下一次消费,保证在当前 Consumer 内,同一队列的消息进行串行消费。

2)向 Broker 端请求锁定当前顺序消费的队列,防止在消费过程中被分配给其它消费者处理从而打乱消费顺序。

总结

经过这篇文章的分析后,尝试回答文章开头的那个问题:

1)多分区的情况下:

如果想要保证 Kafka 在消费时要保证消费的顺序性,可以使用每个线程维护一个 KafkaConsumer 实例,并且是一条一条地去拉取消息并进行消费(防止重平衡时有可能打乱消费顺序);对于能容忍消息短暂乱序的业务(话说回来, Kafka 集群也不能保证严格的消息顺序),可以使用单 KafkaConsumer 实例 + 多 worker 线程 + 一条线程对应一个阻塞队列消费线程模型。

1)单分区的情况下:

由于单分区不存在重平衡问题,以上两个线程模型的都可以保证消费的顺序性。

另外如果是 RocketMQ,使用 MessageListenerOrderly 监听消费可保证消息消费顺序。

很多人也有这个疑问:既然 Kafka 和 RocketMQ 都不能保证严格的顺序消息,那么顺序消费还有意义吗?

一般来说普通的的顺序消息能够满足大部分业务场景,如果业务能够容忍集群异常状态下消息短暂不一致的情况,则不需要严格的顺序消息。

如果你对文章还有什么疑问和补充或者发现文中有错误的地方,欢迎留言,我们一起探讨。

原文 张程辉

rdkafka线程过多_Kafka/RocketMQ 多线程消费时如何保证消费顺序?相关推荐

  1. rdkafka线程过多_Kafka快速入门(十一)——RdKafka源码分析

    Kafka快速入门(十一)--RdKafka源码分析 一.RdKafka C源码分析 1.Kafka OP队列 RdKafka将与Kafka Broke的交互.内部实现的操作都封装成Operator结 ...

  2. rdkafka线程过多_kafka producer性能调优

    1. 介绍 2. 本文的一些前提 讨论的kafka版本为0.10.0 没有broker端的再压缩 消息都有8字节的时间戳介绍信息 3. 优化目标 给定一个要发送的数据集,在满足持久性.有序性的前提下优 ...

  3. rdkafka线程过多_我是如何处理大并发量订单处理的 KafKa部署总结

    今天要介绍的是消息中间件KafKa,应该说是一个很牛的中间件吧,背靠Apache 与很多有名的中间件搭配起来用效果更好哦 ,为什么不用RabbitMQ,因为公司需要它. 网上已经有很多怎么用和用到哪的 ...

  4. java cpu个数_cpu个数、核数、线程数、Java多线程关系的理解

    一 cpu个数.核数.线程数的关系 cpu个数:是指物理上,也及硬件上的核心数: 核数:是逻辑上的,简单理解为逻辑上模拟出的核心数: 线程数:是同一时刻设备能并行执行的程序个数,线程数=cpu个数 * ...

  5. burp爆破线程设置多少_多线程到底需要设置多少个线程?

    我们在使用线程池的时候,会有两个疑问点: 线程池的线程数量设置过多会导致线程竞争激烈 如果线程数量设置过少的话,还会导致系统无法充分利用计算机资源 那么如何设置才不会影响系统性能呢?其实线程池的设置是 ...

  6. 020.day20 线程概述 多线程优缺点 线程的创建 线程常用方法 生命周期 多线程同步...

    目录 多线程 一.线程概述 四.线程常用方法 多线程 一.线程概述 1. 进程 正在执行的应用程序(java.exe),一个可执行的程序一次运行的过程 独立性:不同进程之间相互独立 动态性:是一直活动 ...

  7. java线程集合点_Java多线程学习笔记(三) 甚欢篇

    使人有乍交之欢,不若使其无久处之厌 <小窗幽记>很多时候,我们需要的都不是再多一个线程,我们需要的线程是许多个,我们需要让他们配合.同时我们还有一个愿望就是复用线程,就是将线程当做一个工人 ...

  8. 多线程情况下如何保证线程安全

    一.线程安全等级 其实线程安全并不是一个"非黑即白"单项选择题.按照"线程安全"的安全程度由强到弱来排序,我们可以将java语言中各种操作共享的数据分为以下5类 ...

  9. 为什么线程过多会损害性能

    线程太多 线程是从多核芯片中提取性能的当前选择方法.似乎如果有一点线程是好的,那么很多线程必须更好.实际上,线程太多会使程序陷入瘫痪.本文讨论了为什么以及如何基于任务的编程可以避免该问题.英特尔®线程 ...

最新文章

  1. Ubuntu 系统 在终端中过滤log 特殊的信息
  2. java xftp_IDEA使用Xshell利用Xftp部署javaweb项目
  3. Java8 Lambda不仅仅只是语法糖
  4. CSS3实现页面的平滑过渡
  5. Android webView 缓存 Cache + HTML5离线功能 解决
  6. SAP Commerce的Runtime Attributes
  7. Spring Security和自定义密码编码
  8. OCR文本检测-RRPN
  9. 请检查virtualboxapi是否正确安装_MBR膜组件安装施工方案指南
  10. 【软考】面向对象程序设计复习指南
  11. 电脑管理器地址栏 按右键会有的功能
  12. 修复Git打包的一个Bug
  13. 01. PM之项目启动Kickoff -- 可不只是一起吃个饭
  14. a轮融资1亿多不多_A轮融资一共就三件重要的事情 B轮最重要的两件事
  15. 企业微信根据微信联系人批量自动发送邀请,还可根据手机号批量添加
  16. 使用fs传真模块mod_fax的一点点经验
  17. 关于Beyond Compare 4秘钥过期处理方法,百试不爽
  18. 兔子与狐狸c语言,【狐狸和兔子的故事】_ 狐狸和兔子故事_亲亲宝贝网
  19. “.plt“文件转png格式图片简写
  20. iframe 自适应高度的多种实现方式

热门文章

  1. c语言wpf99乘法表,使用JSP输出九九乘法表
  2. html中的文本格式化标签+多媒体标签+关于IE浏览器兼容的问题(干货!)
  3. import win32com.client在python中报错及其解决办法
  4. Mac 使用常见问题汇集
  5. 嵌入式软件工程师笔试题
  6. 2012 依赖注入框架
  7. json 字符串反序列化成DataSet
  8. qsort的7种用法(转)
  9. 蓝桥杯第五届省赛JAVA真题----最长公共子序列
  10. windows10系统的电脑如何设置密码?