为什么80%的码农都做不了架构师?>>>   

#测试代码

上次讲了KafkaProducer的用法和实现代码,这里继续来看看Consumer是怎样工作的。 同样先来看看示例代码:

import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class SimpleConsumer {private Logger LOG = LoggerFactory.getLogger(SimpleConsumer.class);private final ConsumerConnector consumer;private final String topic;private ExecutorService executor;private static String groupId = "";public static void main(String [] args){String zookeeper = "node87:2181";groupId = String.valueOf(new Date().getTime());//每次生成一个新的groupId方便测试String topic = "test1234";int threadCount = 1;SimpleConsumer simpleConsumer = new SimpleConsumer(zookeeper, groupId, topic);simpleConsumer.run(threadCount);try {Thread.sleep(100000);  //等待100秒后关掉服务} catch (InterruptedException e) {//}simpleConsumer.shutdown();}public SimpleConsumer(String a_zookeeper, String a_groupId, String a_topic) {//创建一个ConsumerConnector负责和zookeeper通信,createJavaConsumerConnector(config : ConsumerConfig)是scala的方法。内部实例化了一个//kafka.javaapi.consumer.ZookeeperConsumerConnector(config)对象返回consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));this.topic = a_topic;this.executor = Executors.newCachedThreadPool();}public void shutdown() {if (consumer != null) consumer.shutdown();if (executor != null) executor.shutdown();try {if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");}} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly");}}/*** 读取kafkaStream方法*/public void run(int a_numThreads){ //a_numThreads=1//topic,创建stream的数量Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, a_numThreads);//创建MessageStreams MapMap<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);executor.execute(new ConsumerTest(streams.get(0)));  //因为上面只创建了一条stream,这里直接获取之}public class ConsumerTest implements Runnable {KafkaStream<byte[], byte[]> stream;public ConsumerTest(KafkaStream<byte[], byte[]> stream) {this.stream = stream;}public void run() {//每个stream都支持一个Iterator用来获取消息ConsumerIterator iterator = stream.iterator();LOG.info("groupId:{}",groupId);while(true){try {if(iterator.hasNext()) {MessageAndMetadata<byte[], byte[]> data = iterator.next();LOG.info("message:{}, partition:{}, offset:{},", new String(data.message()), data.partition(), data.offset());}}catch (ConsumerTimeoutException e){System.out.println(Thread.currentThread().getName() + "----" + "超时...");}}}}private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);  //zookeeper地址props.put("group.id", a_groupId);  //group idprops.put("zookeeper.session.timeout.ms", "4000");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "largest");  //新group-consumer启动后从最新(largest)/最旧(smalles)的数据开始读取props.put("consumer.timeout.ms","3000"); //消费者等待新消息时间,超过此时间没有收到新的消息会抛出一个ConsumerTimeoutException,如果设为-1return new ConsumerConfig(props);}
}

应用代码很简单,消费数据的流程是这样的:

  1. 创建一个ConsumerConnector对象实例,负责和zookeeper通信
  2. ConsumerConnector实例在zookeeper上注册相应节点,初始化若干条Stream负责和kafka-Broker通信。
  3. 每条Stream上都可以创建一个Iterator来获取消息。

#ConsumerConnector接口 这里使用的是kafka通过scala实现此接口的类:kafka.javaapi.consumer.ZookeeperConsumerConnector 下面摘自scaladoc:

ZookeeperConsumerConnector类处理和zookeeper的交互工作,包括:

  1. 在/consumers/[group_id]/注册 每个consumer在一个group中都有自己的唯一id。consumer在创建的时候会在上述路径中创建一个临时节点[ids/节点名],保存此consumer读取的topic列表。Consumer会监视其所在的[group_id]目录的变化,比如说ids目录变化就会触发一次rebalance。这里的id由消费者指定,而不是zk按序生成。 此路径下包含: /consumers/[group_id]/ids。ids目录下为本group中每个存活的consumer都创建一个节点consumer-id /consumers/[group_id]/owners。owners目录下为本group消费的每个的topic创建一个目录,目录中为每个partition创建一个节点,节点的内容为正在消费此partition的consumer-id /consumers/[group_id]offsets。offsets目录下为本group消费的每个的topic创建一个目录,目录中为每个partition创建一个节点,节点的内容为正在消费此partition的offset
  2. 监听broker节点:/brokers/[0...N] --> { "host" : "host:port", "topics" : {"topic1": ["partition1" ... "partitionN"], ..., "topicN": ["partition1" ... "partitionN"] } } /brokers/[ids]/下每一个子节点代表一个正在运行的broker。在kafka的配置中的broker.id参数对应的就是这里的ids。节点内容为json格式,内容为broker监听的host和端口 /broker/[topics]/下包含所有topic的信息

进入ZookeeperConsumerConnector后,首先看到:

private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,val enableFetcher: Boolean) // for testing onlyextends ConsumerConnector {private val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher)private val messageStreamCreated = new AtomicBoolean(false)//... ...
}

创建了一个val变量(类似于Java中的final) underlying, 其实这是作为一个单例在处理consumer客户端跟zookeeper的交互的核心。 然后是val messageStreamCreated,目的是为了防止多次在同一consumer上创建多次stream.(具体目的还在研究中)

而这里我们发现其实这里存在两个同名不同包的ZookeeperConsumerConnector,java直接调用的是kafka.javaapi.consumer.ZookeeperConsumerConnector, 而在此类内部实例化的时候创建的是一个kafka.consumer.ZookeeperConsumerConnector类的实例。

按照惯例,同样先来看看这个类有什么类属性,

private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,val enableFetcher: Boolean) // for testing onlyextends ConsumerConnector with Logging with KafkaMetricsGroup {private val isShuttingDown = new AtomicBoolean(false) //关闭标识
private val rebalanceLock = new Object //rebalance锁
private var fetcher: Option[ConsumerFetcherManager] = None //
private var zkClient: ZkClient = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long]
private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
private val messageStreamCreated = new AtomicBoolean(false)private var sessionExpirationListener: ZKSessionExpireListener = null
private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null
private var loadBalancerListener: ZKRebalancerListener = nullprivate var offsetsChannel: BlockingChannel = null
private val offsetsChannelLock = new Objectprivate var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null// useful for tracking migration of consumers to store offsets in kafka
private val kafkaCommitMeter = newMeter("KafkaCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, Map("clientId" -> config.clientId)))val consumerIdString = {var consumerUuid : String = nullconfig.consumerId match {case Some(consumerId) // for testing only=> consumerUuid = consumerIdcase None // generate unique consumerId automatically=> val uuid = UUID.randomUUID()consumerUuid = "%s-%d-%s".format(InetAddress.getLocalHost.getHostName, System.currentTimeMillis,uuid.getMostSignificantBits().toHexString.substring(0,8))}config.groupId + "_" + consumerUuid}this.logIdent = "[" + consumerIdString + "], "

转载于:https://my.oschina.net/djzhu/blog/850282

KafkaConsumer源码解析相关推荐

  1. 谷歌BERT预训练源码解析(二):模型构建

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/weixin_39470744/arti ...

  2. 谷歌BERT预训练源码解析(三):训练过程

    目录 前言 源码解析 主函数 自定义模型 遮蔽词预测 下一句预测 规范化数据集 前言 本部分介绍BERT训练过程,BERT模型训练过程是在自己的TPU上进行的,这部分我没做过研究所以不做深入探讨.BE ...

  3. 谷歌BERT预训练源码解析(一):训练数据生成

    目录 预训练源码结构简介 输入输出 源码解析 参数 主函数 创建训练实例 下一句预测&实例生成 随机遮蔽 输出 结果一览 预训练源码结构简介 关于BERT,简单来说,它是一个基于Transfo ...

  4. Gin源码解析和例子——中间件(middleware)

    在<Gin源码解析和例子--路由>一文中,我们已经初识中间件.本文将继续探讨这个技术.(转载请指明出于breaksoftware的csdn博客) Gin的中间件,本质是一个匿名回调函数.这 ...

  5. Colly源码解析——结合例子分析底层实现

    通过<Colly源码解析--框架>分析,我们可以知道Colly执行的主要流程.本文将结合http://go-colly.org上的例子分析一些高级设置的底层实现.(转载请指明出于break ...

  6. libev源码解析——定时器监视器和组织形式

    我们先看下定时器监视器的数据结构.(转载请指明出于breaksoftware的csdn博客) /* invoked after a specific time, repeatable (based o ...

  7. libev源码解析——定时器原理

    本文将回答<libev源码解析--I/O模型>中抛出的两个问题.(转载请指明出于breaksoftware的csdn博客) 对于问题1:为什么backend_poll函数需要指定超时?我们 ...

  8. libev源码解析——I/O模型

    在<libev源码解析--总览>一文中,我们介绍过,libev是一个基于事件的循环库.本文将介绍其和事件及循环之间的关系.(转载请指明出于breaksoftware的csdn博客) 目前i ...

  9. libev源码解析——调度策略

    在<libev源码解析--监视器(watcher)结构和组织形式>中介绍过,监视器分为[2,-2]区间5个等级的优先级.等级为2的监视器最高优,然后依次递减.不区分监视器类型和关联的文件描 ...

最新文章

  1. VMware介绍与网络的三种模式
  2. GDCM:gdcm::Curve的测试程序
  3. Server Hard drive mode
  4. python实现离线翻译_10分钟教你用Python实现微信翻译机器人
  5. python复数的实部和虚部都是整数嘛_Python学习笔记:从入门到放弃(2)基本语法...
  6. SLF4J和Logback日志框架详解
  7. 收集的图像处理网站http://blog.csdn.net/chief1985/article/details/1898358
  8. visio 生成mysql脚本_Visio2010建立ER图并直接导出为SQL语句
  9. 【代码实现和训练】OCR技术——引入了Attention机制的crnn的印刷体汉字识别
  10. hdu4282 A very hard mathematic problem
  11. 融云亮相GTC全球流量大会 荣膺鲸鸣奖2019年度优秀出海服务商
  12. Android权限请求弹窗自定义,安卓权限申请处理框架Android-UsesPermission
  13. Xmind基础教程-图标
  14. 华为无线设备配置同一业务VLAN的AP间快速漫游
  15. 十一年磨一剑:中科大数学教授成功证明微分几何学两大猜想
  16. layui复选框怎么取值_layui如何获取checkbox复选框的值
  17. 副主任医师计算机英语,副主任医师职称英语网上课堂
  18. 腾讯财团再收购环球音乐10%股权;麦当劳中国携手四大供应商打造湖北供应链智慧产业园 | 美通企业日报...
  19. php将文章生成word,用php生成word模板
  20. 什么品牌蓝牙耳机音质好?五款音质好的蓝牙耳机推荐

热门文章

  1. C++ std::thread
  2. 求二进制数中1的个数(转)
  3. 设置office首字母不变大小的手段
  4. 服务器端 OR 客户端
  5. MongoDB+java+spirng+morphia
  6. Eclipse导入Elasticsearch源码
  7. Kafka生产者源码解析
  8. kafka源码编译及开发环境搭建
  9. matplotlib绘图_手把手教你使用Matplotlib绘图实战
  10. vcpkg安装_[工具]包管理工具Vcpkg 的使用