字节架构师:来说说 Kafka 的消费者客户端详解,你都搞懂了吗,Javaweb面试总结
虽然说消费者与消费者组这种模型可以让整体的消费能力具备横向伸缩性,但是对于分区固定的情况下,增加消费者并不一定能提升消费能力,如图所示,此时就有一台打印机无法分配到分区而消费不了数据。
1.2 消息投递模式
之前说过消息队列的两种模式,即点对点和发布订阅模式。而 Kafka 同时支持这两种模式。下面的这个理解很关键。
点对点模式基于队列,类似于同一个消费者组中的数据,由生产者发送数据到分区,然后消费者拉取分区的消息进行消费,此时消息只能被同一个消费者组的消费者消费一次。
发布订阅模式模式就是 kafka 中的分区消息可以被不同消费者组的消费者消费。这就是一对多的广播模式应用。
当然,消费者组是一个逻辑的概念,通过客户端参数 group.id 来配置,默认值为空字符串。而消费者并不是逻辑的概念,它是真正消费数据的实体,可以是线程、也可以是一个机器。
好,明白了消费者与消费者组的概念,接下来我们正式打开 消费者客户端的潘多拉魔盒。
二、Kafka 消费者的应用
同样,消费者也是依赖于 Kafak 的客户端,正常的消费逻辑是下面几个步骤:
1、配置消费者客户端参数及创建相应的消费者实例
2、订阅主题
3、拉取消息并消费
4、提交消费位移
5、关闭消费者实例
这里的位移可能我们还不清楚是什么意思,别急,我们后面会讲到,先来看下一个典型的消费者它应该怎么写。
2.1 消费者客户端演示
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(“bootstrap.servers”, “192.168.81.101:9092”);
props.put(“group.id”, “test”); //消费者组
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(“xiaolei2”));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf(“offset = %d, key = %s, value = %s%n”, record.offset(), record.key(), record.value());
}
}
}
}
2.2 必要参数配置
在创建消费者的时候,Kafka 有 4 个参数 是必填的,比生产者多了一个。
bootstrap.servers : 这个参数用来指定连接 Kafka 集群的 broker 地址列表,可以是单个地址,也可以用逗号分割填上 Kafka 集群地址。
key.deserializer 和 value. deserializer :因为消息发送的时候将key 和 value 进行序列化生成字节数组,因此消费数据的时候需要反序列化为原来的数据。
group.id : 消费者所在组的名称,默认值为 ”“,如果设置为空,则会抛出异常 Exception in thread “main” org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration. 复制代码
2.3 订阅主题与分区
在创建出 consumer 之后,我们需要为它订阅相关的主题,一个消费者可以订阅一个或多个主题。这里可以使用两个 API
consumer.subscribe(Collection topics) :指明需要订阅的主题的集合;
consumer.subscribe(Pattern pattern) :使用正则来匹配需要订阅的集合。
对于它订阅的是个集合,我们也容易理解,Kafka 可以通过正则表达式 来匹配相关主题,例如下面的这样:
consumer.subscribe(Pattern.compile(“topic-.*”));
但是如果 consumer 重复定义的话,就以后面的为准,下面订阅的就是 xiaolei3 这个主题。
consumer.subscribe(Arrays.asList(“xiaolei2”));
consumer.subscribe(Arrays.asList(“xiaolei3”));
订阅完主题,我们讲讲它怎么定义分区。
直接订阅特定分区。
consumer.assign(Arrays.asList(new TopicPartition(“xiaolei2”,0)));
这里面使用了 assing 方法来订阅特定分区。那如果不知道有哪些分区怎么办呢?
可以使用 KafkaConsumer 的 partitionsFor() 方法用来查询指定主题的元数据信息。
下面这种实现:
consumer.assign(Arrays.asList(new TopicPartition(“xiaolei2”,0)));
ArrayList topicPartitions = new ArrayList<>();
List partitionInfos = consumer.partitionsFor(“xiaolei2”);
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
}
consumer.assign(topicPartitions);
最后,Kafka 中的消费是基于拉取式的,消息的消费分两种,
一个是推送(push):服务端主动把消息发送给消费者,例如微信公众号文章的发送
一个是拉取(poll):消费者主动向服务端发起请求获取。
Kafka 只需要轮询 API 向服务器定时请求数据,一旦消费者订阅了主题,轮询就会处理所有的细节,例如发送心跳、获取数据、分区再平衡等。而我们则处理业务即可。
三、消费位移
3.1 什么是偏移量
对于 Kafka 的分区来说,它的每条消息都有唯一的偏移量,用来展示消息在分区中对应的位置,它是一个单调递增的整数。在 0.9 版本之后 Kafka 的偏移量是存储在 Kafka 的 _consumer_offsets 主题中。消费者在消费完消息之后会向 这个主题中进行 消费位移的提交。消费者在重新启动的时候就会从新的消费位移处开始消费消息。
因为,位移提交是在消费完所有拉取到的消息之后才执行的,如果不能正确提交偏移量,就可能发生数据丢失或重复消费。
如果在消费到 x+2 的时候发生异常,发生故障,在故障恢复后,重新拉取消息还是从 x处开始,那么之前 x到 x+2 的数据就重复消费了。
如果在消费到 x+2 的时候,提前把 offset 提交了,此时消息还没有消费完,然后发生故障,等重启之后,就从新的 offset x+5 处开始消费,那么 x+2 到 x+5 中间的消息就丢失了。
因此,在什么时机提交 偏移量 显的尤为重要,在 Kafka 中位移的提交分为手动提交和自动提交,下面对这两种展示讲解。
3.2 自动提交偏移量
在 Kafka 中默认的消费位移的提交方式是 自动提交。这个在消费者客户端参数 enable.auto.commit 配置,默认为 true。它是定期向 _comsumer_offsets 中提交 poll 拉取下来的最大消息偏移量。定期时间在 auto.commit.interval.ms 配置,默认为 5s。
虽然自动提交消费位移的方式非常方便,让编码更加简洁,但是自动提交是存在问题的,就是我们上面说的数据丢失和重复消费,这两种它一个不落,因此,Kafka 提供了手动提交位移量,更加灵活的处理消费位移。
3.3 手动提交偏移量
开启手动提交位移的前提是需要关闭自动提交配置,将 enable.auto.commit 配置更改为 false。
根据用户需要,这个偏移量值可以是分为两类:
常规的,手动提交拉取到的最大偏移量。
手动提交固定值的偏移量。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。
3.3.1 同步提交 offset
由于同步提交 offsets 有失败重试机制,故更加可靠。
public class CustomComsumer {
public static void main(String[] args) {
Properties props = new Properties();
//Kafka集群
props.put(“bootstrap.servers”, “hadoop102:9092”);
//消费者组,只要group.id相同,就属于同一个消费者组
props.put(“group.id”, “test”);
props.put(“enable.auto.commit”, “false”);//关闭自动提交offset
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(“first”));//消费者订阅主题
while (true) {
//消费者拉取数据
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf(“offset = %d, key = %s, value = %s%n”, record.offset(), record.key(), record.value());
}
//同步提交,当前线程会阻塞直到offset提交成功
consumer.commitSync();
}
}
}
3.3.2 异步提交 offset
虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交offset的方式。
以下为异步提交offset的示例:
public class CustomConsumer {
public static void main(String[] args) {
Properties props = new Properties();
//Kafka集群
props.put(“bootstrap.servers”, “hadoop102:9092”);
//消费者组,只要group.id相同,就属于同一个消费者组
props.put(“group.id”, “test”);
//关闭自动提交offset
props.put(“enable.auto.commit”, “false”);
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(“first”));//消费者订阅主题
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);//消费者拉取数据
for (ConsumerRecord<String, String> record : records) {
System.out.printf(“offset = %d, key = %s, value = %s%n”, record.offset(), record.key(), record.value());
}
//异步提交
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println(“Commit failed for” + offsets);
}
}
});
}
}
}
异步提交可以提高程序的吞吐量,因为此时你可以尽管请求数据,而不用等待响应。
异步提交的时候同样有失败的情况出现,假设第一次提交了 100 的位移,但是提交失败了,第二次提交了 200 的位移,此时怎么处理?
如果重试,将 100 的位移再次提交,这次提交成功了,就会覆盖 200 的位移,此时变成 100。那么就会出现消费重复的情况,继续从100 处开始消费。
因此,基于这个原因,可以使用 同步 +异步的组合方式,在100 提交之后必须等待请求成功才能提交 200 的位移。
3.3.3 同步加异步提交
在正常的轮询中使用异步提交来保证吞吐量,但是在最后关闭消费者之前,或发生异常之后,此时使用同步提交的方式来保证最后的提交成功。这是在最后做的一次把关。
try {
while (true) {
// 拉取消息逻辑处理
// 异步提交
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 即将要关闭消费者,同步提交保证提交成功
consumer.commitSync();
} finally {
consumer.close();
}
}
3.4 指定位移消费
因为消费位移的存在,我们可以在消费者关闭、宕机重启、再平衡的时候找到存储的位移位置,开始消费,但是消费位移并不是一开始就有的,例如下面这几种情况:
1、当一个新的消费者组建立的时候
2、消费者组内的一个消费者订阅了一个新的主题;
3、_comsumer_offsets 主题的位移信息过期被删除
这几种情况 Kafka 没办法找到 消费位移,就会根据 客户端参数 auto.offset.reset 的配置来决定从何处开始消费,默认为 latest。
earliest:当各分区下存在已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费;
latest:当各分区下存在已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费该分区下新产生的数据(默认值);
none:当各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的offset,则直接抛出NoOffsetForPartitionException异常;
Kafka 的 auto.offset.reset 参数只能让我们粗粒度的从开头或末尾开始消费,并不能指定准确的位移开始拉取消息,而 KafkaConsumer 中的 seek()方法正好提供了这个功能,可以让我们提前消费和回溯消费,这样为消息的消费提供了很大的灵活性,seek()方法还可以通过 storeOffsetToDB 将消息位移保存在外部存储介质中,还可以配合再平衡监听器来提供更加精准的消费能力。
3.4.1 seek 指定位移消费
seek 方法定义如下:
public void seek(TopicPartition partition, long offset)
partition 表示分区
offset 表示从分区的哪个位置开始消费
afkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(“xiaolei2”));
consumer.poll(Duration.ofMillis(10000));
Set assignment = consumer.assignment();
for (TopicPartition tp : assignment) {
consumer.seek(tp,100);
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf(“offset = %d, key = %s, value = %s%n”, record.offset(), record.key(), record.value());
}
}
seek() 方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll() 方法的调用过程中实现的,也就是说,在执行 seek() 方法之前需要先执行一次 poll() 方法,等到分配到分区之后才可以重置消费位置。
因此,在poll()方法中设置一个时间等待分区完成,然后在通过 assignment()方法获取分区信息进行数据消费。
如果在 poll()方法中设置为0 那么就无法获取到分区。这个时间如果太长也会造成不必要的等待,下面看看优化的方案。
3.4.2 seek 指定位移消费优化
consumer.subscribe(Arrays.asList(“xiaolei2”));
Set assignment = new HashSet<>();
while (assignment.size()==0){
consumer.poll(Duration.ofMillis(100));
assignment=consumer.assignment();
}
for (TopicPartition tp : assignment) {
consumer.seek(tp,100);
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf(“offset = %d, key = %s, value = %s%n”, record.offset(), record.key(), record.value());
}
}
3.4.3 seek 从分区开头或末尾消费
如果消费者组内的消费者在启动的时候能够找到消费位移,除非发生位移越界,否则 auto.offset.reset 参数不会奏效。此时如果想指定从开头或末尾开始消费,也需要 seek() 方法来实现。
如果按照指定位移消费的话,就需要先获取每个分区的开头或末尾的 offset 了。可以使用 beginningOffsets() 和 endOffsets() 方法。
Set assignment = new HashSet<>();
// 在poll()方法内部执行分区分配逻辑,该循环确保分区已被分配。
// 当分区消息为0时进入此循环,如果不为0,则说明已经成功分配到了分区。
while (assignment.size() == 0) {
consumer.poll(100);
// assignment()方法是用来获取消费者所分配到的分区消息的
// assignment的值为:topic-demo-3, topic-demo-0, topic-demo-2, topic-demo-1
assignment = consumer.assignment();
}
// 指定分区从头消费
Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment);
for (TopicPartition tp : assignment) {
Long offset = beginOffsets.get(tp);
System.out.println(“分区 " + tp + " 从 " + offset + " 开始消费”);
consumer.seek(tp, offset);
}
// 指定分区从末尾消费
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
for (TopicPartition tp : assignment) {
Long offset = endOffsets.get(tp);
System.out.println(“分区 " + tp + " 从 " + offset + " 开始消费”);
consumer.seek(tp, offset);
}
// 再次执行poll()方法,消费拉取到的数据。
// …(省略)
其实,KafkaConsumer 中直接提供了 seekToBeginning() 和 seekToEnd() 方法来实现上述功能。具体定义如下:
public void seekToBeginning(Collection partitions)
public void seekToEnd(Collection part
《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》
【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享
itions)
替代代码如下:
Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment);
for (TopicPartition tp : assignment) {
Long offset = beginOffsets.get(tp);
System.out.println(“分区 " + tp + " 从 " + offset + " 开始消费”);
consumer.seek(tp, offset);
}
3.4.5 根据时间戳消费
比如,我们要消费前天这时刻的消息,此时就无法直接追溯到这个位置了,这时可以使用 KafkaConsumer 的 offsetsForTimes 方法
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
offsetsForTimes() 方法的参数 timestampsToSearch 是一个 Map 类型,其中 key 为待查询的分区,value 为待查询的时间戳,该方法会返回时间戳大于等于查询时间的第一条消息对应的 offset 和 timestamp 。
接下来就以消费当前时间前一天之后的消息为例,代码如下:
Set assignment = new HashSet<>();
while (assignment.size() == 0) {
consumer.poll(100);
assignment = consumer.assignment();
}
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition tp : assignment) {
// 设置查询分区时间戳的条件:获取当前时间前一天之后的消息
timestampToSearch.put(tp, System.currentTimeMillis() - 24 * 3600 * 1000);
}
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);
for(TopicPartition tp: assignment){
OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
// 如果offsetAndTimestamp不为null,则证明当前分区有符合时间戳条件的消息
if (offsetAndTimestamp != null) {
consumer.seek(tp, offsetAndTimestamp.offset());
}
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
// 消费记录
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + “:” + record.value() + “:” + record.partition() + “:” + record.timestamp());
}
}
四、控制或关闭消费
KafkaConsumer 提供了对消费速度进行控制的方法,某些时刻,我们可能会关闭或暂停某个分区的消费,而先消费其他分区,当达到一定条件时再恢复这些分区的消费,这两个方法是 pause() (暂停消费) 和 resume()(恢复消费)。
public void pause(Collection partitions) {
this.acquireAndEnsureOpen();
try {
this.log.debug(“Pausing partitions {}”, partitions);
Iterator var2 = partitions.iterator();
while(var2.hasNext()) {
TopicPartition partition = (TopicPartition)var2.next();
this.subscriptions.pause(partition);
}
} finally {
this.release();
}
}
public void resume(Collection partitions) {
this.acquireAndEnsureOpen();
try {
this.log.debug(“Resuming partitions {}”, partitions);
Iterator var2 = partitions.iterator();
while(var2.hasNext()) {
TopicPartition partition = (TopicPartition)var2.next();
this.subscriptions.resume(partition);
}
} finally {
this.release();
}
}
除了暂停和恢复之外,Kafka 还提供了午餐的 paused() 方法来返回暂停的分区集合。
public Set paused()
字节架构师:来说说 Kafka 的消费者客户端详解,你都搞懂了吗,Javaweb面试总结相关推荐
- 背废完虐面试官,字节架构师 8 年心血终成《图解设计模式》手册,拿下 offer 洒洒水啦
引言 记得我第一次做项目的时候,当时没什么经验,开始之前也没有什么规划,想到需要啥功能就去实现啥功能,最后做到一半的时候发现进行不下去了. 这就是吃了没有把设计模式学好的亏,其实我们把程序员分成两类, ...
- 从Java程序员进阶到架构师,6大核心技能要领详解
" java架构师技能将分为如下6大环节:数据结构和算法,Java高级特性,Java web核心,数据库,Java框架与必备工具,系统架构设计. 希望能真正帮助到从程序员进阶到架构师之路的朋 ...
- Akamai首席架构师Will:WebRTC、QUIC、DASH、AV1都前景可观
William Robert Law是Akamai媒体业务群的首席架构师,自从去年邀请他参加LiveVideoStackCon 2017后,我们就亲切的称他Will.在对他的邮件采访中,他谈到了中美科 ...
- 【架构师】零基础到精通——网关详解
博客昵称:架构师Cool 最喜欢的座右铭:一以贯之的努力,不得懈怠的人生. 作者简介:一名退役Coder,软件设计师/鸿蒙高级工程师认证,在备战高级架构师/系统分析师,欢迎关注小弟! 博主小留言:哈喽 ...
- 程序员、架构师、技术经理、技术总监和CTO分别都是干什么的?
前言 作为一名程序员,当亲戚问起职业的时候其实自己都不知道该如何描述,如果你说你是做Java开发或者web前端开发这一类说辞,家里人不仅听得云里雾里可能还会觉得这工作没那么好.正确的做法是回答在某某公 ...
- wsasend发送不可靠_架构师总结:kafka 如何保证数据的可靠性和一致性
Kafka 作为一个商业级消息中间件,消息可靠性的重要性可想而知.本文从 Producter 往 Broker 发送消息.Topic 分区副本以及 Leader 选举几个角度介绍数据的可靠性. Pro ...
- 关于计算机面试重难点 之 操作系统,字节架构师有话说
用户态与核心态?哪些操作会导致用户态切换到核心态?# 用户态与核心态是指操作系统两种运行级别.操作系统核心的功能与服务(进程)运行在内核态,例如:进程管理.内存管理.设备管理.文件管理等:用户进程只能 ...
- 爽,字节架构师DDD(领域驱动设计)巅峰之作,拆解业务代码真好用
前 言 至少20年前,一些顶尖的软件设计人员就已经认识到领域建模和设计的重要性,但令人惊讶的是,这么长时间以来几乎没有人写出点儿什么,告诉大家应该做哪些工作或如何去做.尽管这些工作还没有被清楚地表述出 ...
- 阿里架构师用3点讲透数据中台,这些都是你没看过的
数据实际上是一个非常传统的行业. 有软件开始的那一天起,数据这个行业就存在了.比如说原来最早的时候,有非常多的数据报表数据可视化,然后到后来,有了商业智能,有了Data Warehouse(就是数据仓 ...
- 一线互联网架构师筑基必备技能之Java篇,一招彻底弄懂!
前言 众所周知,Java开发人员的生存环境可谓是与以前大相径庭,以IT行业发展来说,在十几年前的时候,IT行业的技术人才是稀缺的,程序员最初的招聘行情,只要你会敲"holle world&q ...
最新文章
- SharpDevelop源码分析 (一、序+基本概念) 收藏
- 对 java 同步锁 以及 级别升级的 理解
- oracle ora32771,Oracle的文件号、相对文件号及其他(续)
- Python中lambda的使用,与它的三个好基友介绍!
- php的email函数发送失败,php中mail函数发送邮件失败的解决方法_php技巧
- redis强一致性_Redis的8连击,验证你是否熟练掌握Redis核心知识点。
- The 9 Deep Learning Papers You Need To Know About (Understanding CNNs Part 3)
- 完整的CJK Unicode范围(5.0版)
- centos6.5安装自动化工具ansible和图形化工具tower
- 配置各个连接oracle客户端
- 1.1信息安全基础概念
- “数据科学”课程群与 “数据科学导论”课程建设初探
- (二)如何使用Progress Bar
- Ubuntu 16.04升级到Ubuntu 16.10的方法:
- 计算机英语趣味知识竞赛,(经典)精华版 英语趣味知识竞赛.ppt
- 微光集市-商品及其商品信息的显示(版本1.0)
- BGP选路规则(实验做的有点乱)
- SercureCRT使用
- javascript取本周星期一到星期天对应日期的通用方法
- 虚拟计算服务器吗,云计算服务器是虚拟的吗
热门文章
- php 12306查询结果,使用php怎么编写一个12306余票查询功能
- SDL —— SDL_ttf
- 图片轮播banner实现
- python引入包报错
- 关于C++版本的海图渲染引擎MyS57Map
- gem5中的O3 Pipeline Viewer Visualization实现方法
- android将彩图转为黑白_android 将图片转换成黑白图片
- Python 十进制转换二进制
- 阿里的世界版图——“风清扬”的全球梦
- android icon psd,60个PSD格式的ICON和按钮模板