作者 | 草捏子

整理 | 杨碧玉

出品 | 草捏子(ID:chaycao)

头图 |  CSDN 下载自视觉中国

本文将学习 Kafka 生产者的使用和原理,文中使用的 kafka-clients 版本号为2.6.0。下面进入正文,先通过一个示例看下如何使用生产者 API 发送消息。

public class Producer {public static void main(String[] args) {// 1. 配置参数Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 2. 根据参数创建KafkaProducer实例(生产者)KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 3. 创建ProducerRecord实例(消息)ProducerRecord<String, String> record = new ProducerRecord<>("topic-demo", "hello kafka");// 4. 发送消息producer.send(record);// 5. 关闭生产者示例producer.close();}}

关于配置的三个必填参数

首先创建一个 Properties 实例,设置了三个必填参数:

  • bootstrap.servers: broker 的地址清单;

  • key.serializer:消息的键的序列化器;

  • value.serializer:消息的值的序列化器。

由于 broker 希望接受的是字节数组,所以需要将消息中的键值序列化成字节数组。在设置好参数后,根据参数创建 KafkaProducer 实例,也就是用于发送消息的生产者,接着再创建准备发送的消息 ProducerRecord 实例,然后使用 KafkaProducer 的 send 方法发送消息,最后再关闭生产者。

关于 KafkaProducer ,我们先记住两点:

  • 在创建实例的时候,需要指定配置;

  •  send 方法可发送消息。

send方法

关于配置我们先只了解这三个必填参数,下面我们看下 send 方法,关于发送消息的方式有三种

1、发送并忘记(fire-and-forget)

在发送消息给 Kafka 时,不关心消息是否正常到达,只负责成功发送,存在丢失消息的可能。上面给出的示例就是这种方式。

2、同步发送(sync)

send 方法的返回值是一个 Future 对象,当调用其 get 方法时将阻塞等待  Kafka 的响应。如下:

Future<RecordMetadata> recordMetadataFuture = producer.send(record);
RecordMetadata recordMetadata = recordMetadataFuture.get();

RecordMetadata 对象中包含有消息的一些元数据,如消息的主题、分区号、分区中的偏移量、时间戳等。

3、异步发送(async)

在调用 send 方法时,指定回调函数,在 Kafka 返回响应时,将调用该函数。如下:

producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {e.printStackTrace();} else {System.out.println(recordMetadata.topic() + "-"+ recordMetadata.partition() + ":" + recordMetadata.offset());}}
});

onCompletion 有两个参数,其类型分别是 RecordMetadata 和 Exception 。当消息发送成功时, recordMetadata 为非 null ,而 e 将为 null 。当消息发送失败时,则反之。

消息对象ProducerRecord

下面我们认识下消息对象 ProducerRecord ,封装了发送的消息,其定义如下:

public class ProducerRecord<K, V> {private final String topic;  // 主题private final Integer partition;  // 分区号private final Headers headers;  // 消息头部private final K key;  // 键private final V value;  // 值private final Long timestamp;  // 时间戳// ...其他构造方法和成员方法
}

其中主题和值为必填,其余非必填。例如当给出了分区号,则相当于指定了分区,而当未给出分区号时,若给出了键,则可用于计算分区号。关于消息头部和时间戳,暂不讲述。

发送消息时用到的组件

在对生产者对象 KafkaProducer 和消息对象 ProducerRecord 有了认识后,下面我们看下在使用生产者发送消息时,会使用到的组件有生产者拦截器、序列化器和分区器。其架构(部分)如下:

1、生产者拦截器: ProducerInterceptor 接口,主要用于在消息发送前做一些准备工作,比如对消息做过滤,或者修改消息内容,也可以用于在发送回调逻辑前做一些定制化的需求,例如统计类工作。

2、序列化器, Serializer 接口,用于将数据转换为字节数组。

3、分区器, Partitioner 接口,若未指定分区号,且提供 key 。

处理过程

下面结合代码来看下处理过程,加深印象。

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// 拦截器,拦截消息进行处理ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);return doSend(interceptedRecord, callback);
}

上面是 KafkaProducer 的 send 方法,首先会将消息传给拦截器的 onSend 方法,然后进入 doSend 方法。其中 doSend 方法较长,但内容并不复杂,下面给出了主要步骤的注释。

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {TopicPartition tp = null;try {throwIfProducerClosed();// 1.确认数据发送到的topic的metadata可用long nowMs = time.milliseconds();ClusterAndWaitTime clusterAndWaitTime;try {clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);} catch (KafkaException e) {if (metadata.isClosed())throw new KafkaException("Producer closed while send in progress", e);throw e;}nowMs += clusterAndWaitTime.waitedOnMetadataMs;long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);Cluster cluster = clusterAndWaitTime.cluster;// 2.序列化器,序列化消息的key和valuebyte[] serializedKey;try {serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer", cce);}byte[] serializedValue;try {serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer", cce);}// 3.分区器,获取或计算分区号int partition = partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);setReadOnly(record.headers());Header[] headers = record.headers().toArray();int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),compressionType, serializedKey, serializedValue, headers);ensureValidRecordSize(serializedSize);long timestamp = record.timestamp() == null ? nowMs : record.timestamp();if (log.isTraceEnabled()) {log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);}Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);if (transactionManager != null && transactionManager.isTransactional()) {transactionManager.failIfNotReadyForSend();}// 4.消息累加器,缓存消息RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);if (result.abortForNewBatch) {int prevPartition = partition;partitioner.onNewBatch(record.topic(), cluster, prevPartition);partition = partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);if (log.isTraceEnabled()) {log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);}// producer callback will make sure to call both 'callback' and interceptor callbackinterceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);}if (transactionManager != null && transactionManager.isTransactional())transactionManager.maybeAddPartitionToTransaction(tp);// 5.如果batch满了或者消息大小超过了batch的剩余空间需要创建新的batch// 将唤醒sender线程发送消息if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);this.sender.wakeup();}return result.future;} catch (ApiException e) {log.debug("Exception occurred during message send:", e);if (callback != null)callback.onCompletion(null, e);this.errors.record();this.interceptors.onSendError(record, tp, e);return new FutureFailure(e);} catch (InterruptedException e) {this.errors.record();this.interceptors.onSendError(record, tp, e);throw new InterruptException(e);} catch (KafkaException e) {this.errors.record();this.interceptors.onSendError(record, tp, e);throw e;} catch (Exception e) {this.interceptors.onSendError(record, tp, e);throw e;}
}

doSend方法

doSend 方法主要分为5个步骤

  • 在发送数据前,先确认数据发送的 topic 的 metadata 是可用的( partition 的 leader 存在即为可用,如果开启了权限控制,则还要求 client 具有相应的权限);

  • 序列化器,序列化消息的 key 和 value ;

  • 分区器,获取或计算分区号;

  • 消息累加器,缓存消息;

  • 在消息累加器中,消息会被放在一个 batch 中,用于批量发送,当 batch 满了或者消息大小超过了 batch 的剩余空间需要创建新的 batch ,则将唤醒 sender 线程发送消息。

关于 meatadata 本文将不深究,序列化器、分区器前文也给出了介绍。下面我们主要看下消息累加器。

消息累加器

消息累加器,其作用是用于缓存消息,以便批量发送消息。在 RecordAccumulator 中用一个 ConcurrentMap < TopicPartition ,  Deque < ProducerBatch >>  batches 的 map 变量保存消息。作为 key 的 TopicPartition 封装了 topic 和分区号,而对应的 value 为 ProducerBatch 的双端队列,也就是将发往同一个分区的消息缓存在 ProducerBatch 中。在发送消息时, Record 会被追加在队列的尾部,即加入到尾部的 ProducerBatch 中,如果 ProducerBatch 的空间不足或队列为空,则将创建新的 ProducerBatch ,然后追加。当 ProducerBatch 满了或创建新的 ProducerBatch 时,将唤醒 Sender 线程从队列的头部获取 ProducerBatch 进行发送。

RecordAccumulator

Sender 线程中会将待发送的 ProducerBatch 将转换成< Integer ,  List < ProducerBatch >>的形式,按 Kafka 节点的 ID 进行分组,然后将同一个 node 的 ProducerBatch 放在一个请求中发送。

Kafak 生产者的内容就先了解到这,下面通过思维导图对本文内容做一个简单的回顾:

参考

  • 《深入理解Kafka核心设计与实践原理》

  • 《Kafka权威指南》

  • Kafka 源码解析之 Producer  发送模型(一):  http://matt33.com/2017/06/25/kafka-producer-send-module/

更多精彩推荐
☞谷歌软件工程师薪资百万,大厂薪资有多高?
☞CSDN 创始人蒋涛:选择长沙作“大本营”,打造开发者中心城市
☞杜甫在线演唱《奇迹再现》、兵马俑真人还原……用AI技术打破次元壁的大谷来参加腾讯全球数字生态大会啦!
☞开放源码,华为鸿蒙HarmonyOS 2.0来了
☞20张图,带你搞懂高并发中的线程与线程池!
☞跨链,该怎么跨?
点分享点点赞点在看

快速了解 Kafka 生产者的使用和原理相关推荐

  1. 10 kafka生产者发送消息的原理

    1.发送原理: 在消息发送的过程中,涉及到了两个线程--main 线程和 Sender 线程.在 main 线程 中创建了一个双端队列 RecordAccumulator.main 线程将消息发送给 ...

  2. kafka使用_Kafka生产者的使用和原理

    本文将学习Kafka生产者的使用和原理,文中使用的kafka-clients版本号为2.6.0.下面进入正文,先通过一个示例看下如何使用生产者API发送消息. public class Produce ...

  3. mysql作为kafka生产者_Kafka之生产者

    [TOC] 从编程的角度而言,生产者就是负责向 Kafka 发送消息的应用程序.在 Kafka 的历史变迁 中, 一共有两个大版本的生产者客户端: 第-个是于 Kafka开源之初使用 Scala语言编 ...

  4. Kafka(生产者)

    Kafka 1.概述 1.1 消息队列 1.1.1 传统消息队列的应用场景 1.1.2 消息队列的两种模式 1.2 kafka基础结构 2.kafka的快速入门 2.1 集群部署 2.1.1 安装ja ...

  5. 2021年大数据Kafka(十):kafka生产者数据分发策略

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 生产者数据分发策略 策略一:用户指定了partition 策 ...

  6. 2021年大数据Kafka(九):kafka消息存储及查询机制原理

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 kafka消息存储及查询机制原理 一.Kafka数据存储机制 ...

  7. Kafka核心设计与实践原理总结:基础篇

    作者:未完成交响曲,资深Java工程师!目前在某一线互联网公司任职,架构师社区合伙人! 一.基本概念 1.体系架构 Producer:生产者 Consumber:消费者 Broker:服务代理节点(k ...

  8. kafka高可用集群原理

    kafka架构原理 kafka生产者发送消息原理

  9. Kafka消费者的使用和原理

    作者 | 草捏子 来源 | 草捏子(ID:chaycao) 头图 |  CSDN 下载自视觉中国 这周我们学习下消费者,还是先从一个消费者的Hello World学起: public class Co ...

最新文章

  1. AI产业智能化白皮书 | 清华x百度:全面解读AI产业化的现在和未来(附下载)...
  2. 计算机考试题 实操,计算机考试实操题-20210604194811.docx-原创力文档
  3. hadoop之 参数调优
  4. ElementUI Pagination 分页器绑定数据
  5. matlab句柄函数@和C++ 中的引用 很像
  6. linux撤销编辑内容,linux编辑利器vim常用操作
  7. 什么推动当今品牌的忠诚度
  8. 【HDU - 6186】CS Course(按位与,按位或,按位异或的区间统计,二进制拆位)
  9. C++对象数组与对象指针
  10. maven00----maven学习说明
  11. 分析师:BTC既是通胀对冲工具 也是有指数级增长潜力的资产
  12. 悦读上品 得乎益友
  13. 同一家公司不同CPU的LINUX,还是有所差异
  14. win7 GHOST删除桌面上IE图标
  15. html实现手机截屏,iPhone手机如何实现网页长截图?
  16. SD/SDHC/SDXC区别
  17. CX8825 3.1A数码显示车充IC 适用于快充方案,2019年最新方案
  18. java ActionListener 接口如何判断触发事件来源。getSource()和 getActionCommand()
  19. 基于JAVA实现GPG加密解密(Windows+java两种方式)
  20. 深度清洁,戴森Cyclone V10无绳吸尘器的高端新味道

热门文章

  1. 【Spark-core学习之六】 Spark资源调度和任务调度
  2. 从一次线上故障思考Java问题定位思路
  3. C++内存管理(转)http://www.cnblogs.com/qiubole/archive/2008/03/07/1094770.html
  4. 关于 rm -rf * 你需要知道的
  5. c# 中dynamic的使用
  6. 20145305 《信息安全系统设计基础》第13周学习总结
  7. 如何自定义Struts2表单验证后的错误信息显示格式
  8. 《Linux编程》上机作业 ·005【进程管理与通信】
  9. 如何在矩池云GPU云中安装MATLAB R2017b软件
  10. 常用排序+查找算法时间复杂度大集合