大家好我是月夜枫,我们接着继续去理解最后这条消息是如何被消费者消费掉的。其中最核心的有以下内容。

1、多线程安全问题

当多个线程访问某个类时,这个类始终都能表现出正确的行为,那么就称这个类是线程安全的。

对于线程安全,还可以进一步定义:

当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替进行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

1.1生产者

KafkaProducer的实现是线程安全的。

KafkaProducer就是一个不可变类。线程安全的,可以在多个线程中共享单个KafkaProducer实例

所有字段用private final修饰,且不提供任何修改方法,这种方式可以确保多线程安全。

如何节约资源的多线程使用KafkaProducer实例

package com.msb.concurrent;import com.msb.selfserial.User;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 类说明:多线程下使用生产者*/
public class KafkaConProducer {//发送消息的个数private static final int MSG_SIZE = 1000;//负责发送消息的线程池private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private static CountDownLatch countDownLatch  = new CountDownLatch(MSG_SIZE);private static User makeUser(int id){User user = new User(id);String userName = "msb_"+id;user.setName(userName);return user;}/*发送消息的任务*/private static class ProduceWorker implements Runnable{private ProducerRecord<String,String> record;private KafkaProducer<String,String> producer;public ProduceWorker(ProducerRecord<String, String> record, KafkaProducer<String, String> producer) {this.record = record;this.producer = producer;}public void run() {final String id = Thread.currentThread().getId() +"-"+System.identityHashCode(producer);try {producer.send(record, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(null!=exception){exception.printStackTrace();}if(null!=metadata){System.out.println(id+"|" +String.format("偏移量:%s,分区:%s", metadata.offset(),metadata.partition()));}}});System.out.println(id+":数据["+record+"]已发送。");countDownLatch.countDown();} catch (Exception e) {e.printStackTrace();}}}public static void main(String[] args) {// 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址properties.put("bootstrap.servers","127.0.0.1:9092");// 设置String的序列化properties.put("key.serializer", StringSerializer.class);properties.put("value.serializer", StringSerializer.class);// 构建kafka生产者对象KafkaProducer<String,String> producer  = new KafkaProducer<String, String>(properties);try {for(int i=0;i<MSG_SIZE;i++){User user = makeUser(i);ProducerRecord<String,String> record = new ProducerRecord<String,String>("concurrent-test",null,System.currentTimeMillis(), user.getId()+"", user.toString());executorService.submit(new ProduceWorker(record,producer));}countDownLatch.await();} catch (Exception e) {e.printStackTrace();} finally {producer.close();executorService.shutdown();}}}

1.2消费者

KafkaConsumer的实现不是线程安全的

实现消费者多线程最常见的方式: 线程封闭 ——即为每个线程实例化一个 KafkaConsumer对象

package com.msb.concurrent;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 类说明:多线程下正确的使用消费者,需要记住,一个线程一个消费者*/
public class KafkaConConsumer {public static final int CONCURRENT_PARTITIONS_COUNT = 2;private static ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENT_PARTITIONS_COUNT);private static class ConsumerWorker implements Runnable{private KafkaConsumer<String,String> consumer;public ConsumerWorker(Map<String, Object> config, String topic) {Properties properties = new Properties();properties.putAll(config);this.consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Collections.singletonList(topic));}public void run() {final String ThreadName = Thread.currentThread().getName();try {while(true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String, String> record:records){System.out.println(ThreadName+"|"+String.format("主题:%s,分区:%d,偏移量:%d," +"key:%s,value:%s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));//do our work}}} finally {consumer.close();}}}public static void main(String[] args) {/*消费配置的实例*/Map<String,Object> properties = new HashMap<String, Object>();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG,"c_test");properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");for(int i = 0; i<CONCURRENT_PARTITIONS_COUNT; i++){//一个线程一个消费者executorService.submit(new ConsumerWorker(properties, "concurrent-test"));}}}

2、群组协调

消费者要加入群组时,会向群组协调器发送一个JoinGroup请求,第一个加入群主的消费者成为群主,群主会获得群组的成员列表,并负责给每一个消费者分配分区。分配完毕后,群主把分配情况发送给群组协调器,协调器再把这些信息发送给所有的消费者,每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。群组协调的工作会在消费者发生变化(新加入或者掉线),主题中分区发生了变化(增加)时发生。

2.1组协调器

组协调器是Kafka服务端自身维护的。

组协调器( GroupCoordinator )可以理解为各个消费者协调器的一个中央处理器, 每个消费者的所有交互都是和组协调器( GroupCoordinator )进行的。

  1. 选举Leader消费者客户端

  1. 处理申请加入组的客户端

  1. 再平衡后同步新的分配方案

  1. 维护与客户端的心跳检测

  1. 管理消费者已消费偏移量,并存储至 __consumer_offset中

kafka上的组协调器( GroupCoordinator )协调器有很多,有多少个 __consumer_offset分区, 那么就有多少个组协调器( GroupCoordinator )

默认情况下, __consumer_offset有50个分区, 每个消费组都会对应其中的一个分区,对应的逻辑为 hash(group.id)%分区数。

2.2消费者协调器

每个客户端(消费者的客户端)都会有一个消费者协调器, 他的主要作用就是向组协调器发起请求做交互, 以及处理回调逻辑

  1. 向组协调器发起入组请求

  1. 向组协调器发起同步组请求(如果是Leader客户端,则还会计算分配策略数据放到入参传入)

  1. 发起离组请求

  1. 保持跟组协调器的心跳线程

  1. 向组协调器发送提交已消费偏移量的请求

2.3消费者加入分组的流程

1、客户端启动的时候, 或者重连的时候会发起JoinGroup的请求来申请加入的组中。

2、当前客户端都已经完成JoinGroup之后, 客户端会收到JoinGroup的回调, 然后客户端会再次向组协调器发起SyncGroup的请求来获取新的分配方案

3、当消费者客户端关机/异常 时, 会触发离组LeaveGroup请求。

当然有主动的消费者协调器发起离组请求,也有组协调器一直会有针对每个客户端的心跳检测, 如果监测失败,则就会将这个客户端踢出Group。

4、客户端加入组内后, 会一直保持一个心跳线程,来保持跟组协调器的一个感知。

并且组协调器会针对每个加入组的客户端做一个心跳监测,如果监测到过期, 则会将其踢出组内并再平衡。

2.4消费者消费的offset的存储

__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。

__consumer_offsets 是 kafka 自行创建的,和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。

kafka-consumer-groups.bat --bootstrap-server :9092 --group c_test --describe

那么如何使用 kafka 提供的脚本查询某消费者组的元数据信息呢?

Math.abs(groupID.hashCode()) % numPartitions,

__consumer_offsets 的每条消息格式大致如图所示

可以想象成一个 KV 格式的消息,key 就是一个三元组:group.id+topic+分区号,而 value 就是 offset 的值

3、分区再均衡

当消费者群组里的消费者发生变化,或者主题里的分区发生了变化,都会导致再均衡现象的发生。从前面的知识中,我们知道,Kafka中,存在着消费者对分区所有权的关系,

这样无论是消费者变化,比如增加了消费者,新消费者会读取原本由其他消费者读取的分区,消费者减少,原本由它负责的分区要由其他消费者来读取,增加了分区,哪个消费者来读取这个新增的分区,这些行为,都会导致分区所有权的变化,这种变化就被称为 再均衡 。

再均衡对Kafka很重要,这是消费者群组带来高可用性和伸缩性的关键所在。不过一般情况下,尽量减少再均衡,因为再均衡期间,消费者是无法读取消息的,会造成整个群组一小段时间的不可用。

消费者通过向称为群组协调器的broker(不同的群组有不同的协调器)发送心跳来维持它和群组的从属关系以及对分区的所有权关系。如果消费者长时间不发送心跳,群组协调器认为它已经死亡,就会触发一次再均衡。

心跳由单独的线程负责,相关的控制参数为max.poll.interval.ms。

3.1消费者提交偏移量导致的问题

当我们调用poll方法的时候,broker返回的是生产者写入Kafka但是还没有被消费者读取过的记录,消费者可以使用Kafka来追踪消息在分区里的位置,我们称之为 偏移量 。消费者更新自己读取到哪个消息的操作,我们称之为 提交 。

消费者是如何提交偏移量的呢?消费者会往一个叫做_consumer_offset的特殊主题发送一个消息,里面会包括每个分区的偏移量。发生了再均衡之后,消费者可能会被分配新的分区,为了能够继续工作,消费者者需要读取每个分区最后一次提交的偏移量,然后从指定的地方,继续做处理。

分区再均衡的例子:

某软件公司,有一个项目,有两块的工作,有两个码农,一个小王、一个小李,一个负责一块(分区消费),干得好好的。突然一天,小王桌子一拍不干了,老子中了5百万了,不跟你们玩了,立马收拾完电脑就走了。这个时候小李就必须承担两块工作,这个时候就是发生了分区再均衡。

过了几天,你入职,一个萝卜一个坑,你就入坑了,你承担了原来小王的工作。这个时候又会发生了分区再均衡。

1)如果提交的偏移量小于消费者实际处理的最后一个消息的偏移量,处于两个偏移量之间的消息会被重复处理,

2)如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失

3.2再均衡监听器实战

我们创建一个分区数是3的主题rebalance

kafka-topics.bat --bootstrap-server localhost:9092  --create --topic rebalance --replication-factor 1 --partitions 3

在为消费者分配新分区或移除旧分区时,可以通过消费者API执行一些应用程序代码,在调用 subscribe()方法时传进去一个 ConsumerRebalancelistener实例就可以了。

ConsumerRebalancelistener有两个需要实现的方法。

  1. public void

onPartitionsRevoked( Collection< TopicPartition> partitions)方法会在

再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了

  1. public void

onPartitionsAssigned( Collection< TopicPartition> partitions)方法会在重新分配分区之后和消费者开始读取消息之前被调用。

具体使用,我们先创建一个3分区的主题,然后实验一下,

在再均衡开始之前会触发onPartitionsRevoked方法

在再均衡开始之后会触发onPartitionsRevoked方法

Kafka之消费全流程相关推荐

  1. 面试官:说说Kafka处理请求的全流程

    今天来讲讲 Kafka Broker端处理请求的全流程,剖析下底层的网络通信是如何实现的.Reactor在kafka上的应用. 再说说社区为何在2.3版本将请求类型划分成两大类,又是如何实现两类请求处 ...

  2. 答读者问:Kafka顺序消费吞吐量下降该如何优化?

    大家好,我是威哥,<RocketMQ技术内幕>一书作者,荣获RocketMQ官方社区优秀布道师.CSDN2020博客执之星Top2等荣誉称号.目前担任中通快递技术平台部资深架构师,主要负责 ...

  3. Kafka生产者——消息发送流程,同步、异步发送API

    生产者消息发送流程 发送原理 Kafka的Producer发送消息采用的是异步发送的方式. 在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAc ...

  4. 基于神策用户画像,在线教育企业线索标签体系搭建及培育全流程解析

    作者介绍:TigerHu,环球网校大数据营销产品 leader,主导数据产品线和营销 CRM 产品线. 本文内容均从作者真实实践过程出发,结合作者公司与神策数据合作真实场景,从神策用户画像产品出发,全 ...

  5. videojs如何获取请求消息_中通消息平台 Kafka 顺序消费线程模型的实践与优化

    各类消息中间件对顺序消息实现的做法是将具有顺序性的一类消息发往相同的主题分区中,只需要将这类消息设置相同的 Key 即可,而 Kafka 会在任意时刻保证一个消费组同时只能有一个消费者监听消费,因此可 ...

  6. kafka重复消费问题

    开篇提示:kafka重复消费的根本原因就是"数据消费了,但是offset没更新"!而我们要探究一般什么情况下会导致offset没更新? 今天查看Elasticsearch索引的时候 ...

  7. 乾通嗖嗖抢先布局多元化用工 实现全流程数智化人力管理

    12月9日,乾通互连战略升级暨乾通嗖嗖媒体发布会在北京举行.在发布会上,乾通互连对外宣布组织架构再升级,未来将围绕薪酬社保福利及多元化用工两大业务板块进行战略布局,推出多元化用工解决方案--乾通嗖嗖. ...

  8. MMOCR: OpenMMLab 全流程的文字检测识别理解工具箱

    号外号外,继 港中文-商汤OpenMMLab开源全景图!之后,OpenMMLab 又有新成员加入咯-       01       MMOCR 特点 全流程:支持文字检测.文字识别以及其下游任务,比如 ...

  9. kafka的消费顺序_Kafka原理和实践云平台技术栈13

    导读:之前发布了云平台技术栈(ps:点击可查看),本文主要说一下其中的Kafka! 作者:阿龙 cnblogs.com/along21/p/10278100.htm 1.认识 Kafka 1.1 Ka ...

最新文章

  1. 如何在python开发的GUI界面程序中恰当地使用PyExecJS
  2. 450g吐司烘烤温度_教你一手如何判断吐司面包是否烤熟
  3. python中tensorflow的函数简单用法(未完)
  4. java enum 变量_java枚举使用详解
  5. 打造最舒适的webview调试环境
  6. excel vba 特殊符号
  7. 传统开发被冲击得“七零八落”,云原生时代下开发者要如何自救?
  8. C# this关键字(给底层类库扩展成员方法)
  9. 【POJ 2342】Anniversary party(入门树形dp)
  10. 统计系统中所有进程占用内存的方法
  11. 罗技无法使用计算机上的配置文件,罗技游戏软件检测不到游戏启动,导致无法自动切换配置文件...
  12. 顺序表的基本操作代码实现
  13. 个人永久性免费-Excel催化剂功能第44波-可见区域复制粘贴不覆盖隐藏内容
  14. 动作捕捉助力无人车多源传感器信息融合导航技术
  15. 【产业互联网周报】罗永浩AR创业公司获美团领投;英特尔自动驾驶子公司Mobileye敲定IPO条款;星环科技登陆科创板...
  16. android开发--不安装支付宝客户端调H5页面问题
  17. HZAU 1001 Handing Out Candies
  18. 流利阅读 2019.3.18 Can baijiu, China’s sorghum firewater, go global?
  19. Python常用STL
  20. 所谓的光辉岁月,并不是后来闪耀的日子,而是无人问津时你对梦想的偏执。

热门文章

  1. (Talking face) EVP
  2. 使用wifi网卡笔记5---AP模式
  3. 懵懂新手查找区 !!! 超详细项目各个层以及内置小层的用处
  4. 倾斜摄影三维实景模型为智慧城市提供全流程可视化支撑
  5. Ionic4--路由跳转
  6. Capacitated Facility Location Problem (Algorithm Design and Analysis Project)
  7. 机器学习——线性回归、房价预测案例【正规方案与梯度下降】
  8. 10款Windows实用软件推荐,满满的干货,总有一款是你必备的
  9. Mac 下 移动硬盘只读解决方案
  10. windows +linux(ubuntu) 2020.7 双系统最新安装心得