Consumer注意事项:

  • 单个分区的消息只能由ConsumerGrop中某个Consumer消费,换言之,一个消费者可以消费多个分区,也可以消费一个分区,但是不能多个消费者消费同一个分区
  • Consumer从partition中消费消息是顺序消费,默认是从头开始
  • 单个ComsumerGrop会消费所有partition中的消息
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;import java.time.Duration;
import java.util.*;public class ConsumerSample {private final static String TOPIC_NAME="jiangzh-topic";public static void main(String[] args) {
//        helloworld();// 手动提交offset
//        commitedOffset();// 手动对每个Partition进行提交
//        commitedOffsetWithPartition();// 手动订阅某个或某些分区,并提交offset
//        commitedOffsetWithPartition2();// 手动指定offset的起始位置,及手动提交offset
//        controlOffset();// 流量控制controlPause();}/*工作里这种用法,有,但是不推荐*/private static void helloworld(){Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.220.128:9092");props.setProperty("group.id", "test");props.setProperty("enable.auto.commit", "true");props.setProperty("auto.commit.interval.ms", "1000");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> consumer = new KafkaConsumer(props);// 消费订阅哪一个Topic或者几个Topicconsumer.subscribe(Arrays.asList(TOPIC_NAME));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));for (ConsumerRecord<String, String> record : records)System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",record.partition(),record.offset(), record.key(), record.value());}}/*手动提交offset*/private static void commitedOffset() {Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.220.128:9092");props.setProperty("group.id", "test");props.setProperty("enable.auto.commit", "false");props.setProperty("auto.commit.interval.ms", "1000");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer(props);// 消费订阅哪一个Topic或者几个Topicconsumer.subscribe(Arrays.asList(TOPIC_NAME));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));for (ConsumerRecord<String, String> record : records) {// 想把数据保存到数据库,成功就成功,不成功...// TODO record 2 dbSystem.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());// 如果失败,则回滚, 不要提交offset}// 如果成功,手动通知offset提交consumer.commitAsync();}}/*手动提交offset,并且手动控制partition*/private static void commitedOffsetWithPartition() {Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.220.128:9092");props.setProperty("group.id", "test");props.setProperty("enable.auto.commit", "false");props.setProperty("auto.commit.interval.ms", "1000");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer(props);// 消费订阅哪一个Topic或者几个Topicconsumer.subscribe(Arrays.asList(TOPIC_NAME));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));// 每个partition单独处理for(TopicPartition partition : records.partitions()){List<ConsumerRecord<String, String>> pRecord = records.records(partition);for (ConsumerRecord<String, String> record : pRecord) {System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());}long lastOffset = pRecord.get(pRecord.size() -1).offset();// 单个partition中的offset,并且进行提交Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();offset.put(partition,new OffsetAndMetadata(lastOffset+1));// 提交offsetconsumer.commitSync(offset);System.out.println("=============partition - "+ partition +" end================");}}}/*手动提交offset,并且手动控制partition,更高级*/private static void commitedOffsetWithPartition2() {Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.220.128:9092");props.setProperty("group.id", "test");props.setProperty("enable.auto.commit", "false");props.setProperty("auto.commit.interval.ms", "1000");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer(props);// jiangzh-topic - 0,1两个partitionTopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);// 消费订阅哪一个Topic或者几个Topic
//        consumer.subscribe(Arrays.asList(TOPIC_NAME));// 消费订阅某个Topic的某个分区consumer.assign(Arrays.asList(p0));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));// 每个partition单独处理for(TopicPartition partition : records.partitions()){List<ConsumerRecord<String, String>> pRecord = records.records(partition);for (ConsumerRecord<String, String> record : pRecord) {System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());}long lastOffset = pRecord.get(pRecord.size() -1).offset();// 单个partition中的offset,并且进行提交Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();offset.put(partition,new OffsetAndMetadata(lastOffset+1));// 提交offsetconsumer.commitSync(offset);System.out.println("=============partition - "+ partition +" end================");}}}/*手动指定offset的起始位置,及手动提交offset*/private static void controlOffset() {Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.220.128:9092");props.setProperty("group.id", "test");props.setProperty("enable.auto.commit", "false");props.setProperty("auto.commit.interval.ms", "1000");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer(props);// jiangzh-topic - 0,1两个partitionTopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);// 消费订阅某个Topic的某个分区consumer.assign(Arrays.asList(p0));while (true) {// 手动指定offset起始位置/*1、人为控制offset起始位置2、如果出现程序错误,重复消费一次*//*1、第一次从0消费【一般情况】2、比如一次消费了100条, offset置为101并且存入Redis3、每次poll之前,从redis中获取最新的offset位置4、每次从这个位置开始消费*/consumer.seek(p0, 700);ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));// 每个partition单独处理for(TopicPartition partition : records.partitions()){List<ConsumerRecord<String, String>> pRecord = records.records(partition);for (ConsumerRecord<String, String> record : pRecord) {System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());}long lastOffset = pRecord.get(pRecord.size() -1).offset();// 单个partition中的offset,并且进行提交Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();offset.put(partition,new OffsetAndMetadata(lastOffset+1));// 提交offsetconsumer.commitSync(offset);System.out.println("=============partition - "+ partition +" end================");}}}/*流量控制 - 限流*/private static void controlPause() {Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.220.128:9092");props.setProperty("group.id", "test");props.setProperty("enable.auto.commit", "false");props.setProperty("auto.commit.interval.ms", "1000");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer(props);// jiangzh-topic - 0,1两个partitionTopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);// 消费订阅某个Topic的某个分区consumer.assign(Arrays.asList(p0,p1));long totalNum = 40;while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));// 每个partition单独处理for(TopicPartition partition : records.partitions()){List<ConsumerRecord<String, String>> pRecord = records.records(partition);long num = 0;for (ConsumerRecord<String, String> record : pRecord) {System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());/*1、接收到record信息以后,去令牌桶中拿取令牌2、如果获取到令牌,则继续业务处理3、如果获取不到令牌, 则pause等待令牌4、当令牌桶中的令牌足够, 则将consumer置为resume状态*/num++;if(record.partition() == 0){if(num >= totalNum){consumer.pause(Arrays.asList(p0));}}if(record.partition() == 1){if(num == 40){consumer.resume(Arrays.asList(p0));}}}long lastOffset = pRecord.get(pRecord.size() -1).offset();// 单个partition中的offset,并且进行提交Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();offset.put(partition,new OffsetAndMetadata(lastOffset+1));// 提交offsetconsumer.commitSync(offset);System.out.println("=============partition - "+ partition +" end================");}}}}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;public class ConsumerThreadSample {private final static String TOPIC_NAME="jiangzh-topic";/*这种类型是经典模式,每一个线程单独创建一个KafkaConsumer,用于保证线程安全*/public static void main(String[] args) throws InterruptedException {KafkaConsumerRunner r1 = new KafkaConsumerRunner();Thread t1 = new Thread(r1);t1.start();Thread.sleep(15000);r1.shutdown();}public static class KafkaConsumerRunner implements Runnable{private final AtomicBoolean closed = new AtomicBoolean(false);private final KafkaConsumer consumer;public KafkaConsumerRunner() {Properties props = new Properties();props.put("bootstrap.servers", "192.168.220.128:9092");props.put("group.id", "test");props.put("enable.auto.commit", "false");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");consumer = new KafkaConsumer<>(props);TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);consumer.assign(Arrays.asList(p0,p1));}public void run() {try {while(!closed.get()) {//处理消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> pRecord = records.records(partition);// 处理每个分区的消息for (ConsumerRecord<String, String> record : pRecord) {System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",record.partition(),record.offset(), record.key(), record.value());}// 返回去告诉kafka新的offsetlong lastOffset = pRecord.get(pRecord.size() - 1).offset();// 注意加1consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}}}catch(WakeupException e) {if(!closed.get()) {throw e;}}finally {consumer.close();}}public void shutdown() {closed.set(true);consumer.wakeup();}}}
//共用consumer针对不同的消息创建不同handlerimport org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ConsumerRecordThreadSample {private final static String TOPIC_NAME = "jiangzh-topic";public static void main(String[] args) throws InterruptedException {String brokerList = "192.168.220.128:9092";String groupId = "test";int workerNum = 5;CunsumerExecutor consumers = new CunsumerExecutor(brokerList, groupId, TOPIC_NAME);consumers.execute(workerNum);Thread.sleep(1000000);consumers.shutdown();}// Consumer处理public static class CunsumerExecutor{private final KafkaConsumer<String, String> consumer;private ExecutorService executors;public CunsumerExecutor(String brokerList, String groupId, String topic) {Properties props = new Properties();props.put("bootstrap.servers", brokerList);props.put("group.id", groupId);props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));}public void execute(int workerNum) {executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());while (true) {ConsumerRecords<String, String> records = consumer.poll(200);for (final ConsumerRecord record : records) {executors.submit(new ConsumerRecordWorker(record));}}}public void shutdown() {if (consumer != null) {consumer.close();}if (executors != null) {executors.shutdown();}try {if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {System.out.println("Timeout.... Ignore for this case");}} catch (InterruptedException ignored) {System.out.println("Other thread interrupted this shutdown, ignore for this case.");Thread.currentThread().interrupt();}}}// 记录处理public static class ConsumerRecordWorker implements Runnable {private ConsumerRecord<String, String> record;public ConsumerRecordWorker(ConsumerRecord record) {this.record = record;}@Overridepublic void run() {// 假如说数据入库操作System.out.println("Thread - "+ Thread.currentThread().getName());System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());}}
}

JoinGroup的过程

在rebalance之前,需要保证coordinator是已经确定好了的,整个rebalance的过程分为两个步骤, Join和Sync

join:

表示加入到consumer group中,在这一步中,所有的成员都会向coordinator发送joinGroup的请 求。一旦所有成员都发送了joinGroup请求,那么coordinator会选择一个consumer担任leader角色, 并把组成员信息和订阅信息发送消费者 leader选举算法比较简单,如果消费组内没有leader,那么第一个加入消费组的消费者就是消费者 leader,如果这个时候leader消费者退出了消费组,那么重新选举一个leader,这个选举很随意,类似 于随机算法

  •  protocol_metadata: 序列化后的消费者的订阅信息
  • leader_id: 消费组中的消费者,coordinator会选择一个座位leader,对应的就是member_id
  • member_metadata 对应消费者的订阅信息
  • members:consumer group中全部的消费者的订阅信息
  • generation_id: 年代信息,类似于之前讲解zookeeper的时候的epoch是一样的,对于每一轮 rebalance,generation_id都会递增。主要用来保护consumer group。隔离无效的offset提交。也就 是上一轮的consumer成员无法提交offset到新的consumer group中

Synchronizing Group State

主要逻辑是向GroupCoordinator发送 SyncGroupRequest请求,并且处理SyncGroupResponse响应,简单来说,就是leader将消费者对应 的partition分配方案同步给consumer group 中的所有consume

每个消费者都会向coordinator发送syncgroup请求,不过只有leader节点会发送分配方案,其他消费者 只是打打酱油而已。当leader把方案发给coordinator以后,coordinator会把结果设置到 SyncGroupResponse中。这样所有成员都知道自己应该消费哪个分区。  consumer group的分区分配方案是在客户端执行的

kafka基础学习(三)ConsumerAPI相关推荐

  1. 大数据基础学习三:Ubuntu下安装VMware Tools超详细步骤及需要注意的问题(以ubuntu-18.04.3、Mware Workstation 15.1.0 Pro为例)

    大数据基础学习三:Ubuntu下安装VMware Tools超详细步骤及需要注意的问题 (以ubuntu-18.04.3.Mware Workstation 15.1.0 Pro for Window ...

  2. PR基础学习(三) 载入编辑素材

    鼠标点击项目面板里面任一视频,并按住不放,将其拖动到时间轴面板 时间轴面板说明 时间轴面板是音视频编辑处理的工作中心,其会生成一个对应的序列(Sequence),用于记录所做的编辑与修改和其他一些功能 ...

  3. Verilog基础学习三

    文章目录 一.基础门电路(Basic Gate) 1.gate 2.真值表 3.关于电路设计思路 4.门电路与向量 二.多路选择器(multiplexer) 1. 2-to-1 multiplexer ...

  4. salesforce lightning零基础学习(三) 表达式的!(绑定表达式)与 #(非绑定表达式)

    在salesforce的classic中,我们使用{!expresion}在前台页面展示信息,在lightning中,上一篇我们也提及了,如果展示attribute的值,可以使用{!v.expresi ...

  5. 音频基础学习三——声音的时频谱

    文章目录 前言 时域与频域 1.什么是时域? 2.什么是频域? 3.一张图理解时域和频域 4.意义 总结 前言 在上一篇文章中,我们了解到:任何重复的波形都可以分解为含有基波频率和一系列为基波倍数的谐 ...

  6. JDBC基础学习(三)—处理BLOB类型数据

    一.BLOB类型介绍 在MySQL中,BLOB是一个二进制的大型对象,可以存储大量数据的容器,它能容纳不同大小的数据. 在MySQL中有四种BLOB类型. 实际使用中根据需要存入的数据大小定义不同的B ...

  7. Linux基础学习三:VMware和CentOS的安装详细图文教程

    安装步骤 安装VMware VMware是一款功能强大的桌面虚拟计算机软件,用来虚拟具有完整硬件功能的计算机系统.通过VMware虚拟机,你可以在一台物理计算机上模拟出一台或多台虚拟的计算机,这些虚拟 ...

  8. IOS网络基础学习三:NSURLSession的Download下载任务和代理方法

    文章目录 block下载任务 下载代理方法 block下载任务 - (void)touchesBegan:(NSSet<UITouch *> *)touches withEvent:(UI ...

  9. CSS基础学习三:CSS语法

    CSS语法分为基础语法和高级语法. 一CSS 基础语法 (1)CSS 语法 CSS 规则由两个主要的部分构成:选择器,以及一条或多条声明,请使用花括号来包围声明. selector {declarat ...

最新文章

  1. 可视化反投射:坍塌尺寸的概率恢复:ICCV9论文解读
  2. 在一个数组中找出和为目标值的那 两个 整数,并返回他们的数组下标python代码(Leetcode1)
  3. 解决fatal: No remote repository specified. Please, specify either a URL...
  4. C/C++之string类小结
  5. Java类类getDeclaredMethod()方法及示例
  6. 厦门大学c语言上机答案,厦门大学C语言程序设计2016模拟题讲评及课程复习.pptx...
  7. python机器学习库xgboost——xgboost算法
  8. 蓝桥杯2015年第六届C/C++省赛A组第七题-手链样式
  9. Linux学习笔记之实现黑客帝国炫酷效果
  10. ENVI执行监督分类后分类结果背景也被分类了的解决方案
  11. 设计模式 装饰者模式 带你重回传奇世界
  12. php 解析array,深度解析PHP数组函数array_slice
  13. QT 带有动画的 圆形进度条 水波进度条
  14. MySQL练习题(4)
  15. 干货!基于常识图谱和混合策略的情绪支持对话系统
  16. 基础平台项目之集成Jquery.pagination.js实现分页
  17. 除了一汽和长白山 吉林还有数字经济这个新标签
  18. 百度李彦宏遭现场泼水,百度不再是百度,百度依然是百度
  19. 计算机打音乐两只老虎,两只老虎(音乐、汇编程序)
  20. GameFramework---框架简介与下载(一)

热门文章

  1. 2018第一届世界区块链大会:14场实力演讲+3场专题讨论精彩绝伦
  2. 关于GPS模块数据解析-无名科创GPS模块
  3. SLAM实操入门(六):连接Velodyne的16线激光雷达并可视化
  4. oracle 横竖互换,oracle横竖表互转
  5. 代码编写中的疑问与问题解法
  6. 7-3 计算油费 (10 分)
  7. 408计算机学科专业基础综合2020,2018年408计算机学科专业基础综合.pdf
  8. 用C++打开指定网址
  9. Flutter中Drat虚拟机服务介绍 --- VmService
  10. 《Adobe Photoshop CS5中文版经典教程(全彩版)》—第2课2.7节使用海绵工具调整饱和度...