原文地址:http://www.aboutyun.com/thread-9938-1-1.html

问题导读
1.Kafka提供了Producer类作为java producer的api,此类有几种发送方式?
2.总结调用producer.send方法包含哪些流程?
3.Producer难以理解的在什么地方?

producer的发送方式剖析
Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式。
sync架构图

async架构图

调用流程如下:

代码流程如下:
Producer:当new Producer(new ProducerConfig()),其底层实现,实际会产生两个核心类的实例:Producer、DefaultEventHandler。在创建的同时,会默认new一个ProducerPool,即我们每new一个java的Producer类,就会有创建Producer、EventHandler和ProducerPool,ProducerPool为连接不同kafka broker的池,初始连接个数有broker.list参数决定。
调用producer.send方法流程:
当应用程序调用producer.send方法时,其内部其实调的是eventhandler.handle(message)方法,eventHandler会首先序列化该消息,
eventHandler.serialize(events)-->dispatchSerializedData()-->partitionAndCollate()-->send()-->SyncProducer.send()
调用逻辑解释:当客户端应用程序调用producer发送消息messages时(既可以发送单条消息,也可以发送List多条消息),调用eventhandler.serialize首先序列化所有消息,序列化操作用户可以自定义实现Encoder接口,下一步调用partitionAndCollate根据topics的messages进行分组操作,messages分配给dataPerBroker(多个不同的Broker的Map),根据不同Broker调用不同的SyncProducer.send批量发送消息数据,SyncProducer包装了nio网络操作信息。
Producer的sync与async发送消息处理,大家看以上架构图一目了然。
partitionAndCollate方法详细作用:获取所有partitions的leader所在leaderBrokerId(就是在该partiionid的leader分布在哪个broker上),
创建一个HashMap>>>,把messages按照brokerId分组组装数据,然后为SyncProducer分别发送消息作准备工作。

名称解释:partKey:分区关键字,当客户端应用程序实现Partitioner接口时,传入参数key为分区关键字,根据key和numPartitions,返回分区(partitions)索引。记住partitions分区索引是从0开始的。

Producer平滑扩容机制
如果开发过producer客户端代码,会知道metadata.broker.list参数,它的含义是kafak broker的ip和port列表,producer初始化时,就连接这几个broker,这时大家会有疑问,producer支持kafka cluster新增broker节点?它又没有监听zk broker节点或从zk中获取broker信息,答案是肯定的,producer可以支持平滑扩容broker,他是通过定时与现有的metadata.broker.list通信,获取新增broker信息,然后把新建的SyncProducer放入ProducerPool中。等待后续应用程序调用。

DefaultEventHandler类中初始化实例化BrokerPartitionInfo类,然后定期brokerPartitionInfo.updateInfo方法,DefaultEventHandler部分代码如下:def handle(events: Seq[KeyedMessage[K,V]]) {......while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)if (topicMetadataRefreshInterval >= 0 &&SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))sendPartitionPerTopicCache.clear()topicMetadataToRefresh.clearlastTopicMetadataRefreshTime = SystemTime.milliseconds}outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)if (outstandingProduceRequests.size > 0) {info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))//休眠时间,多长时间刷新一次
        Thread.sleep(config.retryBackoffMs)// 生产者定期请求刷新最新topics的broker元数据信息
        Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)).....}}}

BrokerPartitionInfo的updateInfo方法代码如下:

 def updateInfo(topics: Set[String], correlationId: Int) {var topicsMetadata: Seq[TopicMetadata] = Nil//根据topics列表,meta.broker.list,其他配置参数,correlationId表示请求次数,一个计数器参数而已//创建一个topicMetadataRequest,并随机的选取传入的broker信息中任何一个去取metadata,直到取到为止val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)topicsMetadata = topicMetadataResponse.topicsMetadata// throw partition specific exceptiontopicsMetadata.foreach(tmd =>{trace("Metadata for topic %s is %s".format(tmd.topic, tmd))if(tmd.errorCode == ErrorMapping.NoError) {topicPartitionInfo.put(tmd.topic, tmd)} elsewarn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))tmd.partitionsMetadata.foreach(pmd =>{if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,ErrorMapping.exceptionFor(pmd.errorCode).getClass))} // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
      })})producerPool.updateProducer(topicsMetadata)}

ClientUtils.fetchTopicMetadata方法代码:

def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {var fetchMetaDataSucceeded: Boolean = falsevar i: Int = 0val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)var topicMetadataResponse: TopicMetadataResponse = nullvar t: Throwable = nullval shuffledBrokers = Random.shuffle(brokers) //生成随机数while(i
ProducerPool的updateProducer
def updateProducer(topicMetadata: Seq[TopicMetadata]) {val newBrokers = new collection.mutable.HashSet[Broker]topicMetadata.foreach(tmd => {tmd.partitionsMetadata.foreach(pmd => {if(pmd.leader.isDefined)newBrokers+=(pmd.leader.get)})})lock synchronized {newBrokers.foreach(b => {if(syncProducers.contains(b.id)){syncProducers(b.id).close()syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))} elsesyncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))})}}

当我们启动kafka broker后,并且大量producer和consumer时,经常会报如下异常信息。

  1. root@lizhitao:/opt/soft$ Closing socket connection to 192.168.11.166

复制代码

笔者也是经常很长时间看源码分析,才明白了为什么ProducerConfig配置信息里面并不要求使用者提供完整的kafka集群的broker信息,而是任选一个或几个即可。因为他会通过您选择的broker和topics信息而获取最新的所有的broker信息。
值得了解的是用于发送TopicMetadataRequest的SyncProducer虽然是用ProducerPool.createSyncProducer方法建出来的,但用完并不还回ProducerPool,而是直接Close.

重难点理解:
刷新metadata并不仅在第一次初始化时做。为了能适应kafka broker运行中因为各种原因挂掉、paritition改变等变化,
eventHandler会定期的再去刷新一次该metadata,刷新的间隔用参数topic.metadata.refresh.interval.ms定义,默认值是10分钟。
这里有三点需要强调:

客户端调用send, 才会新建SyncProducer,只有调用send才会去定期刷新metadata在每次取metadata时,kafka会新建一个SyncProducer去取metadata,逻辑处理完后再close。根据当前SyncProducer(一个Broker的连接)取得的最新的完整的metadata,刷新ProducerPool中到broker的连接.每10分钟的刷新会直接重新把到每个broker的socket连接重建,意味着在这之后的第一个请求会有几百毫秒的延迟。如果不想要该延迟,把topic.metadata.refresh.interval.ms值改为-1,这样只有在发送失败时,才会重新刷新。Kafka的集群中如果某个partition所在的broker挂了,可以检查错误后重启重新加入集群,手动做rebalance,producer的连接会再次断掉,直到rebalance完成,那么刷新后取到的连接着中就会有这个新加入的broker。

说明:每个SyncProducer实例化对象会建立一个socket连接

特别注意:
在ClientUtils.fetchTopicMetadata调用完成后,回到BrokerPartitionInfo.updateInfo继续执行,在其末尾,pool会根据上面取得的最新的metadata建立所有的SyncProducer,即Socket通道producerPool.updateProducer(topicsMetadata)

在ProducerPool中,SyncProducer的数目是由该topic的partition数目控制的,即每一个SyncProducer对应一个broker,内部封了一个到该broker的socket连接。每次刷新时,会把已存在SyncProducer给close掉,即关闭socket连接,然后新建SyncProducer,即新建socket连接,去覆盖老的。
如果不存在,则直接创建新的。

转载于:https://www.cnblogs.com/davidwang456/p/4182001.html

apache kafka源码分析-Producer分析---转载相关推荐

  1. Apache Kafka源码剖析:第1篇 网络引擎漫谈(类比法)

    2019独角兽企业重金招聘Python工程师标准>>> 从这一篇开始,我们来研究kafka的网络引擎的源码. 可能很多读者有疑问,说好的Kafka讲解,怎么变成Thrift了? 答案 ...

  2. Apache Kafka源码分析 – Log Management

    LogManager LogManager会管理broker上所有的logs(在一个log目录下),一个topic的一个partition对应于一个log(一个log子目录) 首先loadLogs会加 ...

  3. Apache Kafka源码剖析:第5篇 业务API处理

    2019独角兽企业重金招聘Python工程师标准>>> 之前说过了,请求到达业务线程池后,会被处理,但是如何被处理呢?这就是接下来要说的. --------------------- ...

  4. apache kafka技术分享系列(目录索引)--转载

    原文地址:http://blog.csdn.net/lizhitao/article/details/39499283 kafka开发与管理: 1)apache kafka消息服务 2)kafak安装 ...

  5. Kafka源码剖析 —— 网络I/O篇 —— 浅析KafkaSelector

    为什么80%的码农都做不了架构师?>>>    ##NioSelector和KafkaSelector有什么区别? 先说结论,KafkaSelector(org.apache.kaf ...

  6. 跟我学Kafka源码Producer分析

    2019独角兽企业重金招聘Python工程师标准>>> 跟我学Kafka源码Producer分析 博客分类: MQ 本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和 ...

  7. kafka源码分析之producer

    Producer的client端 示例代码 Properties props = new Properties(); props.put("bootstrap.servers",  ...

  8. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  9. Kafka 源码分析之网络层(二)

    上一篇介绍了概述和网络层模型实现<Kafka 源码分析之网络层(一)>,本编主要介绍在Processor中使用的nio selector的又一封装,负责具体数据的接收和发送. PS:丰富的 ...

最新文章

  1. 使用jconsole监控JVM内存
  2. 什么是加载因子/负载因子/装载因子
  3. 邮件群发工具的编写(二)数据的保存
  4. span组件内容的刷新(笔记)
  5. 懒加载、瀑布流和LightBox实现图片搜索效果
  6. 计算机交并符号,数学并集符号
  7. 让机器看了几千万篇热门文章总结的17类热门标题方式模板
  8. JSZip,saveAs压缩保存文件
  9. 【计算机网络】物理层
  10. -bash: ifconfig: command not found
  11. 公有云 私有云及架构
  12. 产业互联网的黑马,Testin云测的变革
  13. L1, L2, smooth_L1 Loss函数python实现
  14. TensorFlow 真正从零开始,TensorFlow详细安装入门图文教程
  15. 攻防世界—流量分析1
  16. 给想转行做产品经理的同学
  17. 维表(Dimension Table)与事实表的区别
  18. BUU misc 小明的保险箱
  19. oracle asm中candidate,【Oracle ASM】关于asm实例与db实例中的磁盘状态_详细分析过程...
  20. Latex公式换行时出现 Environment aligned undefined.

热门文章

  1. idea 查看一个类的子类_Java-05-多态、抽象类、接口和内部类
  2. oracle12c分页,ArcSDE10.2.1使用Oracle12c新特性分页
  3. python搭建自动化测试平台_如何用python语言搭建自动化测试环境
  4. 音频在计算机中的存储
  5. 关系到了冰点_疫情下半场,如何修复跌至冰点的亲子关系,让自己和家人活出幸福感?...
  6. add多个文件 git 文件夹_利用 git 提取文件夹下多个特定文件里的内容写到特定文件内...
  7. datetime 取分钟_如何仅从DateTime获取小时和分钟
  8. 计算机 连接ip,连接在Internet上的计算机都拥有IP地址。
  9. php页面中文乱码分析,PHP页面中文乱码分析
  10. c语言求方程组的自然数解,多元一次线性方程自然数解的算法解决办法