Kafka消费逻辑

简介:本文主要叙述KafkaConsumer消费逻辑(本文使用的是flink 中的Kafka-client),
是如何获取获取数据,这里直奔主题,从KafkaConsumer直接看起来。

1、唤醒 KafakaConsumerThread 线程消费
       
KafakaConsumerThread的 run 方法的逻辑主要在 KafakaConsumer#poll(long timeout) 里面,我们每次向 Kafka broker发送请求的时候,通常指定时间内不管有没有数据都会立即返回而不是一直等待知道有数据,这里一直在循环到指定的超时时间为止。参考代码如下:

public ConsumerRecords<K, V> poll(long timeout) {do {//进行一次消费Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);if (!records.isEmpty()) {//获取到数据不为空时,继续新建一个请求(异步)。if (fetcher.sendFetches() > 0 || client.pendingRequestCount() > 0)client.pollNoWakeup(); //将请求发送出去if (this.interceptors == null)return new ConsumerRecords<>(records);elsereturn this.interceptors.onConsume(new ConsumerRecords<>(records));}// 根据timeout 参数,计算剩余可用的时间long elapsed = time.milliseconds() - start;remaining = timeout - elapsed;} while (remaining > 0); //在超时时间到达前会一直循环的

从上面代码可以看到,在没有获取到数据的时候同时超时时间到达之前,会反复通过调用 pollonce 方法进行消息的拉取,当 pollonce 方法如果获取到数据的时候,都会接着再新增一个FetchRequest请求,并且发送了这个请求,这是为了下一次poll的时候不需要发送请求直接返回数据。
         如果没有数据而且超时时间没到的时候,会一直调用pollonce方法进行一次消息的获取。我们看下KafkaConsumer#pollonce这个方法逻辑。参考代码如下:

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {// 确认服务端的GroupCoordinator可用,以及当前consumer已加入消费者群组coordinator.poll(time.milliseconds());//从内存中尝试获取数据Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();if (!records.isEmpty())return records;// send any new fetches (won't resend pending fetches)//没有获取到数据,构建请求,将请求对象放入unsent队列。等待发送fetcher.sendFetches();//真正请求broker,获取到的数据存入FetchResponse#responseData中client.poll(pollTimeout, now, new PollCondition() {@Overridepublic boolean shouldBlock() {// since a fetch might be completed by the background thread, we need this poll condition// to ensure that we do not block unnecessarily in poll()return !fetcher.hasCompletedFetches();}});//是否有新的consumer加入等需要rebalanceif (coordinator.needRejoin())return Collections.emptyMap();//再次从内存尝试获取数据return fetcher.fetchedRecords();
}

上面说过,我们在KafakaConsumer#poll的时候如果获取到了数据的时候会再发送一次请求,如果这个请求请求到了数据,会将这个数据存在内存中,下一次获取的时候直接调用fetcher.fetchedRecords()方法从内存中获取到数据后返回。
          如果这个请求没有返回数据,则再会新增一个请求 (fetcher.sendFetches()) 并发送请求(client.poll),然后再次调用fetcher.fetchedRecords()返回,形成一个循环流程~~。
下面我们分别看下fetcher.sendFetches()、client.poll以及  fetcher.fetchedRecords()这3个方法是如何运行的。
   首先看下fetcher.sendFetches()方法,参考代码如下:

public int sendFetches() {//创建FetchRequests,多个分区首领可能在不同的节点。Map<Node, FetchRequest.Builder> fetchRequestMap = createFetchRequests();//根据节点循环,for(){//创建ClientRequest节点信息、分区信息等,存入unsent 队列,并不是真正发送。client.send()//添加监听信息,等待回调将消息等写入Fetcher#LinkedQueue中。.addListener()}
}

将请求进行封装丢进队列中,并且对这个请求加个回调函数方便对请求结果的处理。下一步就是发送这个请求,

2、网络请求

下面我们主要看下这个NetworkClient#poll方法:

public List<ClientResponse> poll(long timeout, long now) {//元数据更新long metadataTimeout = metadataUpdater.maybeUpdate(now);try {//网络IO进行Select.selectthis.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));} catch (IOException e) {log.error("Unexpected error during I/O", e);}// process completed actionslong updatedNow = this.time.milliseconds();List<ClientResponse> responses = new ArrayList<>();handleAbortedSends(responses);//处理已完成的请求handleCompletedSends(responses, updatedNow);//处理请求响应的数据、回调那个添加的监听handleCompletedReceives(responses, updatedNow);handleDisconnections(responses, updatedNow);handleConnections();handleInitiateApiVersionRequests(updatedNow);handleTimedOutRequests(responses, updatedNow);
}

这个方法所做的事情就是更新元数据信息、网络IO处理、后面几个方法都是用来处理请求后的逻辑的。我们还是往下看selector.poll方法做的什么事情,接着上代码:Select#poll。

public void poll(long timeout) throws IOException {//Select.select,获取下个事件的类型,读、写、可连接等int readyKeys = select(timeout);long endSelect = time.nanoseconds();this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());// 根据不同的事件做对应的事情,参照SelectionKey描述,OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPTif (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);}//如果有数据返回记录到completedReceives中addToCompletedReceives();
}

这里先是获取当前事件类型。然后根据事件的类型做相应的读写等操作。

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,boolean isImmediatelyConnected,long currentTimeNanos) {if (isImmediatelyConnected || key.isConnectable()) {if (channel.finishConnect()) {this.connected.add(channel.id());this.sensors.connectionCreated.record();SocketChannel socketChannel = (SocketChannel) key.channel();log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",socketChannel.socket().getReceiveBufferSize(),socketChannel.socket().getSendBufferSize(),socketChannel.socket().getSoTimeout(),channel.id());} elsecontinue;}/* if channel is not ready finish prepare */if (channel.isConnected() && !channel.ready())channel.prepare();/* if channel is ready read from any connections that have readable data */if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {NetworkReceive networkReceive;while ((networkReceive = channel.read()) != null)addToStagedReceives(channel, networkReceive);}/* if channel is ready write to any sockets that have space in their buffer and for which we have data */if (channel.ready() && key.isWritable()) {Send send = channel.write();if (send != null) {this.completedSends.add(send);this.sensors.recordBytesSent(channel.id(), send.size());}}}
}

这个方法就是就是根据不同的key,针对不同的key类型做不同的逻辑处理,是真正将请求发送出去以及接受响应的方法。
具体NIO方面操作就不往下细扒了。我还是回头看下当前请求节点上的channel有可读的数据时,是如何将数据放入Handover 队列里面供下游使用,这里我们看到上述channel.isReadable的时候调用了addToStagedReceives方法,这个方法会将buffer中的数据最终放入Selector# stagedReceives的内存中,而刚才在NetworkClient#poll方法中有个handleCompletedReceives方法,这个方法最终会回调RequestFutureListener#onSuccess() 将消息存入Fetch中的completedFetches。最后被fetcher.fetchedRecords()调用获取。

Kafka Consumer 消费逻辑相关推荐

  1. 记录kafka consumer 消费失败

    简单记录一下困扰一天的kafka问题 背景是拿备份数据启kafka服务,之后项目就连不上kafka 输出大致是这样的: [      Thread-27] o.a.k.c.c.internals.Ab ...

  2. kafka consumer 停止消费topic

    现象 在kafka consumer (以 kafka1.0.0为例)消费 topic 时,常常会出现程序还在运行,但是已经不消费消息了(kafka producer正常生产消息),使用kafka命令 ...

  3. 1.12.Flink Kafka-Connector详解、Consumer消费策略设置、动态加载Topic、Consumers Offset 自动提交、Producer、容错等

    1.12.Flink Kafka-Connector详解 1.12.1.Kafka Consumer消费策略设置 1.12.2.Kafka Consumer的容错 1.12.3.动态加载Topic 1 ...

  4. Kafka Consumer多线程消费

    概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...

  5. Kafka Consumer位移(Offset)提交——解决Consumer重复消费和消息丢失问题

    本文目录 1.Consumer 位移(offset) 1.2 位移(offset)的作用 2. 位移(offset)提交导致的问题 2.1 消息丢失 2.2 消息重复消费 3 Consumer位移提交 ...

  6. 总结kafka的consumer消费能力很低造成重复消费死循环的情况下的处理方案

    简介 由于项目中需要使用kafka作为消息队列,并且项目是基于spring-boot来进行构建的,所以项目采用了spring-kafka作为原生kafka的一个扩展库进行使用.先说明一下版本: spr ...

  7. kafka 主动消费_Kafka核心API——Consumer消费者

    Consumer之自动提交 在上文中介绍了Producer API的使用,现在我们已经知道如何将消息通过API发送到Kafka中了,那么现在的生产者/消费者模型就还差一位扮演消费者的角色了.因此,本文 ...

  8. kafka 主动消费_Kafka消费组(consumer group)

    在开始之前,我想花一点时间先来明确一些概念和术语,这会极大地方便我们下面的讨论.另外请原谅这文章有点长,毕竟要讨论的东西很多,虽然已然删除了很多太过细节的东西. 一. 误区澄清与概念明确 1 Kafk ...

  9. 【kafka】Kafka consumer处理大消息数据过大导致消费停止问题

    文章目录 1.概述 2.案例分析 3.kafka的设计初衷 3.1 broker 配置 3.2 Consumer 配置 M.扩展 1.概述 转载:https://www.cnblogs.com/wyn ...

  10. 使用spark.streaming.kafka.consumer.poll.ms和reconnect.backoff.ms解决spark streaming消费kafka时任务不稳定的问题

    问题描述 在用spark streaming程序消费kafka的数据时,遇到了一个神奇的现象:同样的数据量.相似的数据,在消费时,有些批次的数据在做map操作时神奇的多了40多秒,具体看下面的数据:在 ...

最新文章

  1. 面对0基础学IT的一些感想
  2. YAML基础知识及搭建一台简洁版guestbook
  3. 让孩子尽快了解这个世界
  4. python的while循环时if不能打印_Python if语句在while循环中没有响应
  5. LL(1)文法与其对应的FOLLOW,SELECT和FIRST集
  6. [转载] javascript入门_WebAssembly入门-仅使用14行JavaScript
  7. javascript 自定义对象的两种方法
  8. SSL安全证书不受信任怎么办
  9. Command not found 解决
  10. (附源码)node.js物资管理系统 毕业设计 071130
  11. 有向加权图 最大弱连通分支_指数基金介绍专栏:指数基金的加权方式,你都知道哪些?...
  12. Ray入门指南(1)----ray分布式框架的介绍
  13. 【Python数据分析】pandas索引介绍
  14. 近场通信(NFC)手机支付解决方案
  15. Android Camera开发系列(下)——自定义Camera实现拍照查看图片等功能
  16. H3CIE RS+——ospf(1)
  17. 使用Photoshop给黑白照片着色变彩色相片实例教程
  18. 点成分享 | QBC干式血液分析仪打开血常规检测的新天地
  19. 智慧养老之养老院解决方案-养老院人员定位-智慧养老解决方案-新导智能
  20. 查询借出次数超过2次的所有图书的书号和书名;

热门文章

  1. 心形函数的正确打开方式(Unity3D Shader)
  2. 千元4G拍照机皇 中兴红牛手机4月18号开卖
  3. centos:/usr/bin/perl is needed by mysql-community-server
  4. word中公式全篇一次性转math type公式方法,而且出现omml2mml.xsl 问题的解决方法(有效!!!)
  5. 新猿木子李:0基础学python培训教程 Python操作Excel之读取数据
  6. 关于百度 OpenRASP 的个人拙见
  7. 航芯技术分享 | 了解汽车上的OBD
  8. 关于阿里云服务器安全组规则
  9. 关于CSS媒体查询--电脑尺寸大全
  10. 互联网技术人应该如何与上级沟通?