版本2.4.0

Kafka的客户端消费者在启动的过程中会通过ensureActiveGroup()方法来确保自己是可用的消费者,在这个方法中,会向kafka的broker集群发送join请求,在join请求的response中可以得到该生产者所订阅的topic中被分配得到的分区信息。而接下来的消息拉取将会只请求此处分配得到的topic分区。此时,当前获得的topic分区的消费偏移量还是未知的,在正式拉取消息之前需要构造fetchOffset请求得到具体的偏移量位置以便消费。

private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {Node coordinator = checkAndGetCoordinator();if (coordinator == null)return RequestFuture.coordinatorNotAvailable();log.debug("Fetching committed offsets for partitions: {}", partitions);// construct the requestOffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(this.groupId,new ArrayList<>(partitions));// send the request with a callbackreturn client.send(coordinator, requestBuilder).compose(new OffsetFetchResponseHandler());
}

每次当kafka的消费者需要通过poll()方法拉取消息的时候,将会通过sendFetches()方法来试图拉取消息。

在准备发送fetch请求拉取消息的时候,首先需要通过prepareFetchRequests()方法来准备fetch请求。

已经完成拉取而没有实际处理的topic分区暂时没有必要再次拉取消息,而过滤掉以上情况的broker分配给该消费者的topic分区,将会用来做发送fetch请求的准备。

private List<TopicPartition> fetchablePartitions() {Set<TopicPartition> exclude = new HashSet<>();if (nextInLineRecords != null && !nextInLineRecords.isFetched) {exclude.add(nextInLineRecords.partition);}for (CompletedFetch completedFetch : completedFetches) {exclude.add(completedFetch.partition);}return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp));
}

而所要发送的topic分区将会根据其leader副本所在的broker节点构造fetch请求准备发送拉取消息。

for (TopicPartition partition : fetchablePartitions()) {// Use the preferred read replica if set, or the position's leaderSubscriptionState.FetchPosition position = this.subscriptions.position(partition);Node node = selectReadReplica(partition, position.currentLeader.leader, currentTimeMs);if (node == null || node.isEmpty()) {metadata.requestUpdate();} else if (client.isUnavailable(node)) {client.maybeThrowAuthFailure(node);// If we try to send during the reconnect blackout window, then the request is just// going to be failed anyway before being sent, so skip the send for nowlog.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);} else if (this.nodesWithPendingFetchRequests.contains(node.id())) {log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node);} else {// if there is a leader and no in-flight requests, issue a new fetchFetchSessionHandler.Builder builder = fetchable.get(node);if (builder == null) {int id = node.id();FetchSessionHandler handler = sessionHandler(id);if (handler == null) {handler = new FetchSessionHandler(logContext, id);sessionHandlers.put(id, handler);}builder = handler.newBuilder();fetchable.put(node, builder);}builder.add(partition, new FetchRequest.PartitionData(position.offset,FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize, position.currentLeader.epoch));log.debug("Added {} fetch request for partition {} at position {} to node {}", isolationLevel,partition, position, node);}
}

可以看到,发送到同一个broker的fetch请求将会被集中发送,Kafka消费者客户端将会以异步的方式发送这些fetch请求,在其请求返回的时候进行处理。

long fetchOffset = requestData.fetchOffset;
FetchResponse.PartitionData<Records> fetchData = entry.getValue();log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",isolationLevel, fetchOffset, partition, fetchData);
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,resp.requestHeader().apiVersion()));

异步接收的fetch请求将会被组装成CompletedFetch缓存在completedFetches集合中等待解析。

而后,将会通过fetchRecords()方法中,将completedFetches中的拉取消息的请求从缓存中取出并解析得到所需要的消息。

while (recordsRemaining > 0) {if (nextInLineRecords == null || nextInLineRecords.isFetched) {CompletedFetch completedFetch = completedFetches.peek();if (completedFetch == null) break;try {nextInLineRecords = parseCompletedFetch(completedFetch);} catch (Exception e) {// Remove a completedFetch upon a parse with exception if (1) it contains no records, and// (2) there are no fetched records with actual content preceding this exception.// The first condition ensures that the completedFetches is not stuck with the same completedFetch// in cases such as the TopicAuthorizationException, and the second condition ensures that no// potential data loss due to an exception in a following record.FetchResponse.PartitionData partition = completedFetch.partitionData;if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {completedFetches.poll();}throw e;}completedFetches.poll();} else {List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);TopicPartition partition = nextInLineRecords.partition;if (!records.isEmpty()) {List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);if (currentRecords == null) {fetched.put(partition, records);} else {// this case shouldn't usually happen because we only send one fetch at a time per partition,// but it might conceivably happen in some rare cases (such as partition leader changes).// we have to copy to a new list because the old one may be immutableList<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());newRecords.addAll(currentRecords);newRecords.addAll(records);fetched.put(partition, newRecords);}recordsRemaining -= records.size();}}
}

当准备拉取的消息数量小于最大拉取数量或者completedFetches中没有已经缓存的fetch response,则会结束消息的拉取。

在这里nextInLineRecords将会缓存下一个拉取得到的消息集合。

首先通过parseCompletedFetch()方法解析completedFetches顶部的fetch response,里面主要确保得到的fetchOffset与自己之前预测的一致,并更新hw等参数到自己的缓存中,在完成上述操作后,将这一fetch结果从completedFetches中取出,并准备将其放入nextInLineRecords从中获取所得到的消息正文,并更新下一次所想消费的偏移量。而此处得到的结果也正是kafka消费者所需要得到的消息。

kafka java消费者消息拉取相关推荐

  1. 从源码分析RocketMQ系列-消息拉取PullMessageProcessor详解

    导语   在之前的分析中分析了关于SendMessageProcessor,并且提供了对应的源码分析分析对于消息持久化的问题,下面来看另外一个PullMessageProcessor,在RocketM ...

  2. RocketMQ:Consumer概述及启动流程与消息拉取源码分析

    文章目录 Consumer 概述 消费者核心类 消费者启动流程 消息拉取 PullMessageService实现机制 ProcessQueue实现机制 消息拉取基本流程 客户端发起消息拉取请求 消息 ...

  3. Consumer消息拉取和消费流程分析

    1. 前言 MQConsumer是RocketMQ提供的消费者接口,从接口定义上可以看到,它主要的功能是订阅感兴趣的Topic.注册消息监听器.启动生产者开始消费消息. ​ 消费者获取消息的模式有两种 ...

  4. Kafka | Java 消费者是如何管理TCP连接的? | 极客时间

    今天我要和你分享的主题是:Kafka 的 Java 消费者是如何管理 TCP 连接的. 在专栏中,我们专门聊过"Java生产者是如何管理 TCP 连接资源的"这个话题,你应该还有印 ...

  5. rocketmq中的消息拉取及并发消费理解

    消息拉取采用单线程形式,便于消息的顺序拉取 默认批量取32个,出现性能考虑,减少网络请求.不能保证会拉取到32个,因为消息队列中的存放的是topic-queueid对应的索引,会包含多个tag,而消息 ...

  6. kafka java客户端消息的分区与缓存发送

    当kafka发送消息的时候,在完成消息的序列化之后,如果没有指定消息的分区,将会通过Partitioner来选择该消息发往的分区,在默认情况下,将采用DefaultPartitioner来进行消息的分 ...

  7. kafka consumer配置拉取速度慢_Kafka分区分配策略(Partition Assignment Strategy)

    众所周知,Apache Kafka是基于生产者和消费者模型作为开源的分布式发布订阅消息系统(当然,目前Kafka定位于an open-source distributed event streamin ...

  8. kafka Java客户端之 consumer API 消费消息

    背景:我使用docker-compose 搭建的kafka服务 kafka的简单介绍以及docker-compose部署单主机Kafka集群 使用consumer API消费指定Topic里面的消息 ...

  9. RocketMQ源码(十七)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码

    转载来源: RocketMQ源码(19)-Broker处理DefaultMQPushConsumer发起的拉取消息请求源码[一万字]_刘Java的博客-CSDN博客 此前我们学习了RocketMQ源码 ...

最新文章

  1. 想去苹果做AI?看看你够不够格
  2. flowable 配置自定义表单_Flowable用代码自定义流程
  3. Perl 中的正则表达式
  4. thinkphp5/phpstudy分析入口文件index.php及localhost和配置域名访问网站根目录www
  5. ora 00900 已编译但有错误_技术分享|万万没想到!编译错误竟然还没灭绝???
  6. 第二节:细说一下那些由繁变简的语法
  7. 【阿里内推001期】听说你要做中台,阿里中台部门招Java开发
  8. 检测非法键盘hook_反越狱检测解读
  9. 高斯模糊 高斯核函数
  10. Home Assistant 家庭助理安装
  11. 用友U8V系统怎么重启服务器,用友u8怎么重启云服务器
  12. 如何通过织云Lite愉快地玩转TSW
  13. java学习之路2--简单工厂模式实现饮料自动贩卖机
  14. GoldenGate的安全配置
  15. Ubuntu系统入门
  16. 腾讯视频转换mp4格式用什么转换器?电脑怎么把腾讯视频转换成mp4?
  17. openmv一些常见问题与心得总结
  18. 库存出入库管理业务流程图怎么做?
  19. 03.ReactDOM.render
  20. 云图说丨华为云区块链引擎服务:高安全的区块链技术服务平台,轻松部署,快速上链

热门文章

  1. fabric-ca 登记身份时报Error: Response from server: Error Code: 20 - Authentication failure
  2. 诗和远方:无题(四十九)
  3. mysql如何抛出错误信息_如何捕获并重新抛出MySQL中的所有错误
  4. INF=0x3f3f3f3f背后的知识
  5. c++——抽象类以及string知识点补充
  6. Java 面向对象:接口的理解
  7. spin lock自旋锁
  8. Linux时间子系统(二) 软件架构
  9. Mysql 声明变量
  10. auto_cmdb--01之models.py建表