欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-analysis-of-old-producer-beginning/


众所周知,目前Kafka的最新版本已经到达1.0.0,很多公司运行的kafka也大多升级到了0.10.x版本,Kafka的Producer客户端早已不再使用0.8.2.x就已基本停止维护的Scala版本的Producer了,那么我们还有必要了解它么?当然很有必要,通过Kafka Old Producer我们可以了解Kafka变迁升级的历史:旧版的Old Producer模型相对简单利于初始了解,通过对Old Producer的了解也可以慢慢的发现隐患的问题,这样进一步可以研究探讨解决方法,最后再通过对新版Producer的学习来提升对Kafka的认知,与此同时也可以让读者在遇到相似问题的时候可以借鉴Kafka的优化过来来优化自己的应用。以铜为鉴,可以正衣冠。

在使用Scala版本的Kafka生产者客户端kafka.javaapi.producer.Producer时,实际上调用的是kafka.producer.Producer类。

package kafka.javaapi.producer
class Producer[K, V](private val underlying : kafka.producer.Producer[K, V]) extends scala.AnyRef {def this(config : kafka.producer.ProducerConfig) = { /* compiled code */ }def send(message : kafka.producer.KeyedMessage[K, V]) : scala.Unit = { /* compiled code */ }def send(messages : java.util.List[kafka.producer.KeyedMessage[K, V]]) : scala.Unit = { /* compiled code */ }def close : scala.Unit = { /* compiled code */ }
}

包括kafka-console-producer.sh的脚本(常用来测试发送消息之用)中,对于0.8.2.x版本如果不指定“-- new-producer”参数;或者对于.0.0版本如果指定“-- old-producer”参数的话,实际上内部调用的都是kafka.producer.Producer这个类。

对于kafka-console-producer.sh脚本的内容如下:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

我们看到实际上kafka-console-producer.sh的内容就是运行kafka.tools.ConsoleProducer而已,可以看到main函数代码块中的config.useOldProducer,这个笔者看的是1.0.0版本的代码,而0.8.2.2版本中的ConsoleProducer对应的是config.useNewProducer,稍有不同而已,不过如果都指定使用旧版的Scala的Producer,那么都是指kafka.producer.OldProducer。

object ConsoleProducer {def main(args: Array[String]) {try {val config = new ProducerConfig(args)val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]reader.init(System.in, getReaderProps(config))val producer =if(config.useOldProducer) {new OldProducer(getOldProducerProps(config))} else {new NewShinyProducer(getNewProducerProps(config))}

进一步剖析,kafka.producer.OldProducer的内部构造很简单,关键代码如下:

class OldProducer(producerProps: Properties) extends BaseProducer {// default to byte array partitionerif (producerProps.getProperty("partitioner.class") == null)producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName)val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))

可以看到内部的producer最终还是实例化的kafka.producer.Producer。最终验证了开篇所述的旧版的Kafka生产者客户端即为Kafka.producer.Producer。

新版的Java版的Kafka客户端是:org.apache.kafka.clients.producer.KafkaProducer,读者请注意区分。对于新版的KafkaProducer在以后的文章中会有详细介绍。

下面就来深入了解下Kafka.producer.Producer(下面如无特殊说明都将Kafka.producer.Producer此简称为Producer)了。当实例化Producer的时候,首先要读取、解析以及校验配置信息的合法性,根据配置信息来实例化Producer。Producer的配置项有18个,比如设置分区器、消息压缩方式等,这些都比较好理解,而最主要的配置就是request.required.acks和producer.type这两个配置。

request.required.acks是用来配置生产端消息确认的方式,在0.8.x这个系列的版本之中,可以配置为0,1,-1的值,也可以配置为其他的整数值,用来控制一条消息经由多少个ISR中的副本所在的Broker确认之后才向客户端发送确认信息,这个参数在之后的版本,比如1.0.0版本中就只能设置0,1,-1(all)这3(4)种取值,分别表示:

  1. 当request.required.acks=0时,这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
  2. 当request.required.acks=1(默认)时,这意味着producer在ISR中的leader已成功收到数据并得到确认。如果leader宕机了,则会丢失数据。
  3. 当request.required.acks=-1时,producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时,这样就变成了acks=1的情况。为了提高数据的可靠性,可以通过min.insync.replicas参数来辅助作用,当同步副本数不足时,生产者会跑出异常。

有关kafka的消息可靠性的更深层次的讲解可以参考我2017年初的一篇博客:kafka数据可靠性深度解读,这篇博客主要是针对0.8.2.x版本的kafka做深层次的探讨,后续会对1.0.0版本做进一步的说明。

Producer的发送模式分为同步(sync)和异步(async)两种情况,这一点可以通过参数producer.type来配置。同步模式会将消息直接发往broker中,而异步模式则会将消息存入LinkedBlockingQueue中,然后通过一个ProducerSendThread来专门发送消息。为了便于说明,笔者这里先对同步模式的情况来做说明,而异步模式只是在同步模式的基础上做了一些封装而已。

class Producer[K,V](val config: ProducerConfig,private val eventHandler: EventHandler[K,V])  // only for unit testingextends Logging {private val hasShutdown = new AtomicBoolean(false)private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)private var sync: Boolean = trueprivate var producerSendThread: ProducerSendThread[K,V] = nullprivate val lock = new Object()config.producerType match {case "sync" =>case "async" =>sync = falseproducerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,queue,eventHandler,config.queueBufferingMaxMs,config.batchNumMessages,config.clientId)producerSendThread.start()}

在讲述Producer的具体行为之前先来看一个发送方的Demo:

public class ProducerScalaDemo {public static final String brokerList = "xxx.xxx.xxx.xxx:9092";public static final String topic = "topic-zzh";public static void main(String[] args) {Properties properties = new Properties();properties.put("serializer.class", "kafka.serializer.StringEncoder");properties.put("metadata.broker.list", brokerList);properties.put("producer.type", "sync");properties.put("request.required.acks", "1");Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(properties));String message = "kafka_message-" + new Date().getTime() + " edited by hidden.zhu";KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic,null, message);producer.send(keyedMessage);}
}

我们可以看到再初始化Producer的时候之用了ProducerConfig这一个类型的参数,而在Producer的类定义中还用到了EventHandler这个类型的参数。在Scala语言中只有一个主构造函数,这个主构造函数的参数列表就是跟在类名后面括号中的各个的参数,如果要重载的话就需要自定义辅助构造函数,辅助构造函数必须调用主构造函数(this方法)。如此上面这个Demo中很显然的就调用了辅助构造函数来进行实例化,那么我们再来看下其对应的辅助构造函数:

def this(config: ProducerConfig) =this(config,new DefaultEventHandler[K,V](config,CoreUtils.createObject[Partitioner](config.partitionerClass, config.props),CoreUtils.createObject[Encoder[V]](config.serializerClass, config.props),CoreUtils.createObject[Encoder[K]](config.keySerializerClass, config.props),new ProducerPool(config)))

这里又引入了两个新的东西:DefaultEventHandler和ProducerPool,这个DefaultEventHandler继承了EventHandler这个类,这个是消息发送的关键。而ProducerPool内部是一个HashMap,其中的key是broker的id,而value就是每个broker对应的SyncProducer,这个SyncProducer就是真正的消息发送者。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-analysis-of-old-producer-beginning/


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


Kafka解惑之Old Producer(1)—— Beginning相关推荐

  1. Kafka解惑之Old Producer(4)——Case Analysis

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  2. Kafka解惑之Old Producer(3)——Async Analysis

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  3. Kafka解惑之Old Producer(2)——Sync Analysis

    上接Kafka解惑之Old Producer(1)-- Beginning 欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎 ...

  4. 如何创建Kafka客户端:Avro Producer和Consumer Client

    1.目标 - Kafka客户端 在本文的Kafka客户端中,我们将学习如何使用Kafka API 创建Apache Kafka客户端.有几种方法可以创建Kafka客户端,例如最多一次,至少一次,以及一 ...

  5. 【Flink】kafka FlinkKafkaException send data to Kafka old epoch newer producer same transactionalId

    文章目录 1.场景1 1.1 概述 2.场景2 M.参考 1.场景1 1.1 概述 重复问题:[Flink]kafka INVALID_PRODUCER_EPO send data to Kafka ...

  6. 【Flink】kafka INVALID_PRODUCER_EPO send data to Kafka old epoch newer producer same transactionalId

    文章目录 1.场景1 1.1 原因 1.2 解决 1.3 源码 2.类似问题 1.场景1 问题重复:[Flink]kafka FlinkKafkaException send data to Kafk ...

  7. apache kafka源码分析-Producer分析---转载

    原文地址:http://www.aboutyun.com/thread-9938-1-1.html 问题导读 1.Kafka提供了Producer类作为java producer的api,此类有几种发 ...

  8. 漫游Kafka设计篇之Producer和Consumer

    原文地址:http://blog.csdn.net/honglei915/article/details/37564871 Kafka视频教程同步首发,欢迎观看! Kafka Producer 消息发 ...

  9. kafka Failed to send producer

    线上环境出现的问题 版本:kafka-0.10.2.1 现象: 2017-11-29/14:45:02.937/CST WARN [kafka.utils.Logging$class.warn(Log ...

最新文章

  1. PAT_B_1074 宇宙无敌加法器
  2. GDI绘制时钟效果,与系统时间保持同步,基于Winform
  3. linux脚本后台,后台实时分流文件的shell脚本
  4. LeetCode 215 数组中的第K个最大元素
  5. 实用的无锁队列(一)
  6. Java | 原来 try 还可以这样用啊?!
  7. linux工作技能第二发:vi
  8. 《王道计算机考研》:应用层
  9. 虚拟机CentOS系统没有UNIX2dos或dos2UNIX命令的解决方案(参考各路大佬后的总结)
  10. 概率论第一章习题答案以及解析
  11. html5标题居中怎么设置,标题居中怎么设置
  12. DVWA-low通关
  13. 纬度、经度和坐标系网格
  14. 存:科幻推荐书单---超经典科幻必读
  15. 2021年美容师(高级)考试及美容师(高级)考试题
  16. python 实现任务管理清单案例
  17. 同步六进制加法计数电路设计(D触发器)
  18. 人脸识别行业应用状况及发展前景模式分析报告
  19. MYSQL登录遇到的问题:解决ERROR 2003 (HY000): Can‘t connect to MySQL server on ‘localhost:3306‘(10061)
  20. 3dsmax Node Event System

热门文章

  1. Linux Capabilities 入门教程--概念篇
  2. 电脑word在哪_怎么将图片转换成Word?学会这3种方法,轻松将图片转文字!
  3. python 消息队列、异步分布式
  4. 关于使用fastjson统一序列化响应格式。
  5. BootStrap 学习笔记(一)
  6. Docker极简入门
  7. nginx+tomcat的keepalive验证、bio/nio连接比较
  8. 跟着《架构探险》学轻量级微服务架构 (一)
  9. swfheader 0.10 Released(已更正下载地址)
  10. 《数字短片创作(修订版)》——第一部分 剧本创作 第1章 数字短片创意技法 剧本创作的构思...