前言

有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢?

正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。

内容较多,对源码感兴趣的朋友请系好安全带?(源码基于 v0.10.0.0 版本分析)。同时最好是有一定的 Kafka 使用经验,知晓基本的用法。

简单的消息发送

在分析之前先看一个简单的消息发送是怎么样的。

以下代码基于 SpringBoot 构建。

首先创建一个 org.apache.kafka.clients.producer.Producer 的 bean。

主要关注 bootstrap.servers,它是必填参数。指的是 Kafka 集群中的 broker 地址,例如 127.0.0.1:9094

其余几个参数暂时不做讨论,后文会有详细介绍。

接着注入这个 bean 即可调用它的发送函数发送消息。

这里我给某一个 Topic 发送了 10W 条数据,运行程序消息正常发送。

但这仅仅只是做到了消息发送,对消息是否成功送达完全没管,等于是纯 异步的方式。

同步

那么我想知道消息到底发送成功没有该怎么办呢?

其实 ProducerAPI 已经帮我们考虑到了,发送之后只需要调用它的 get() 方法即可同步获取发送结果。

发送结果:

这样的发送效率其实是比较低下的,因为每次都需要同步等待消息发送的结果。

异步

为此我们应当采取异步的方式发送,其实 send() 方法默认则是异步的,只要不手动调用 get() 方法。

但这样就没法获知发送结果。

所以查看 send() 的 API 可以发现还有一个参数。

  1. Future<RecordMetadata> send(ProducerRecord<K, V> producer, Callback callback);

Callback 是一个回调接口,在消息发送完成之后可以回调我们自定义的实现。

执行之后的结果:

同样的也能获取结果,同时发现回调的线程并不是上文同步时的 主线程,这样也能证明是异步回调的。

同时回调的时候会传递两个参数:

  • RecordMetadata 和上文一致的消息发送成功后的元数据。

  • Exception 消息发送过程中的异常信息。

但是这两个参数并不会同时都有数据,只有发送失败才会有异常信息,同时发送元数据为空。

所以正确的写法应当是:

至于为什么会只有参数一个有值,在下文的源码分析中会一一解释。

源码分析

现在只掌握了基本的消息发送,想要深刻的理解发送中的一些参数配置还是得源码说了算。

首先还是来谈谈消息发送时的整个流程是怎么样的, Kafka 并不是简单的把消息通过网络发送到了 broker中,在 Java 内部还是经过了许多优化和设计。

发送流程

为了直观的了解发送的流程,简单的画了几个在发送过程中关键的步骤。

从上至下依次是:

  • 初始化以及真正发送消息的 kafka-producer-network-thread IO 线程。

  • 将消息序列化。

  • 得到需要发送的分区。

  • 写入内部的一个缓存区中。

  • 初始化的 IO 线程不断的消费这个缓存来发送消息。

步骤解析

接下来详解每个步骤。

初始化

调用该构造方法进行初始化时,不止是简单的将基本参数写入 KafkaProducer。比较麻烦的是初始化 Sender 线程进行缓冲区消费。

初始化 IO 线程处:

可以看到 Sender 线程有需要成员变量,比如:

  1. acks,retries,requestTimeout

等,这些参数会在后文分析。

序列化消息

在调用 send() 函数后其实第一步就是序列化,毕竟我们的消息需要通过网络才能发送到 Kafka。

其中的 valueSerializer.serialize(record.topic(),record.value()); 是一个接口,我们需要在初始化时候指定序列化实现类。

我们也可以自己实现序列化,只需要实现 org.apache.kafka.common.serialization.Serializer 接口即可。

路由分区

接下来就是路由分区,通常我们使用的 Topic 为了实现扩展性以及高性能都会创建多个分区。

如果是一个分区好说,所有消息都往里面写入即可。

但多个分区就不可避免需要知道写入哪个分区。

通常有三种方式。

指定分区

可以在构建 ProducerRecord 为每条消息指定分区。

这样在路由时会判断是否有指定,有就直接使用该分区。

这种一般在特殊场景下会使用。

自定义路由策略

如果没有指定分区,则会调用 partitioner.partition 接口执行自定义分区策略。

而我们也只需要自定义一个类实现 org.apache.kafka.clients.producer.Partitioner接口,同时在创建 KafkaProducer 实例时配置 partitioner.class 参数。

通常需要自定义分区一般是在想尽量的保证消息的顺序性。

或者是写入某些特有的分区,由特别的消费者来进行处理等。

默认策略

最后一种则是默认的路由策略,如果我们啥都没做就会执行该策略。

该策略也会使得消息分配的比较均匀。

来看看它的实现:

简单的来说分为以下几步:

  • 获取 Topic 分区数。

  • 将内部维护的一个线程安全计数器 +1。

  • 与分区数取模得到分区编号。

其实这就是很典型的轮询算法,所以只要分区数不频繁变动这种方式也会比较均匀。

写入内部缓存

send() 方法拿到分区后会调用一个 append() 函数:

该函数中会调用一个 getOrCreateDeque() 写入到一个内部缓存中 batches

消费缓存

在最开始初始化的 IO 线程其实是一个守护线程,它会一直消费这些数据。

通过图中的几个函数会获取到之前写入的数据。这块内容可以不必深究,但其中有个 completeBatch 方法却非常关键。

调用该方法时候肯定已经是消息发送完毕了,所以会调用 batch.done() 来完成之前我们在 send() 方法中定义的回调接口。

从这里也可以看出为什么之前说发送完成后元数据和异常信息只会出现一个。

Producer 参数解析

发送流程讲完了再来看看 Producer 中比较重要的几个参数。

acks

acks 是一个影响消息吞吐量的一个关键参数。

主要有 [all、-1,0,1] 这几个选项,默认为 1。

由于 Kafka 不是采取的主备模式,而是采用类似于 Zookeeper 的主备模式。

前提是 Topic 配置副本数量 replica>1

acks=all/-1 时:

意味着会确保所有的 follower 副本都完成数据的写入才会返回。

这样可以保证消息不会丢失!

但同时性能和吞吐量却是最低的。

acks=0 时:

producer 不会等待副本的任何响应,这样最容易丢失消息但同时性能却是最好的!

acks=1 时:

这是一种折中的方案,它会等待副本 Leader 响应,但不会等到 follower 的响应。

一旦 Leader 挂掉消息就会丢失。但性能和消息安全性都得到了一定的保证。

batch.size

这个参数看名称就知道是内部缓存区的大小限制,对他适当的调大可以提高吞吐量。

但也不能极端,调太大会浪费内存。小了也发挥不了作用,也是一个典型的时间和空间的权衡。

上图是几个使用的体现。

retries

retries 该参数主要是来做重试使用,当发生一些网络抖动都会造成重试。

这个参数也就是限制重试次数。

但也有一些其他问题。

  • 因为是重发所以消息顺序可能不会一致,这也是上文提到就算是一个分区消息也不会是完全顺序的情况。

  • 还是由于网络问题,本来消息已经成功写入了但是没有成功响应给 producer,进行重试时就可能会出现 消息重复。这种只能是消费者进行幂等处理。

高效的发送方式

如果消息量真的非常大,同时又需要尽快的将消息发送到 Kafka。一个 producer 始终会收到缓存大小等影响。

那是否可以创建多个 producer 来进行发送呢?

  • 配置一个最大 producer 个数。

  • 发送消息时首先获取一个 producer,获取的同时判断是否达到最大上限,没有就新建一个同时保存到内部的 List中,保存时做好同步处理防止并发问题。

  • 获取发送者时可以按照默认的分区策略使用轮询的方式获取(保证使用均匀)。

这样在大量、频繁的消息发送场景中可以提高发送效率减轻单个 producer的压力。

关闭 Producer

最后则是 Producer 的关闭,Producer 在使用过程中消耗了不少资源(线程、内存、网络等)因此需要显式的关闭从而回收这些资源。

默认的 close() 方法和带有超时时间的方法都是在一定的时间后强制关闭。

但在过期之前都会处理完剩余的任务。

所以使用哪一个得视情况而定。

本文转自公众号:crossoverJie

转载于:https://www.cnblogs.com/technologykai/articles/10635036.html

窥探源码,让我更加优雅的使用Kafka生产者!相关推荐

  1. kafka 生产者源码解析

    为学日益,为道日损,损之又损,以至于无为,无为而无不为 0x01: 概述 kafka作为大数据领域消息系统一哥,其架构与代码设计十分巧妙与优雅,从中我们可以学习与借鉴到很多分布式高性能并发与缓存方案, ...

  2. 盘点 HashMap 源码中的那些优雅的设计!

    以下文章来源方志朋的博客,回复"666"获面试宝典 一.HashMap构造器 HashMap总共给我们提供了三个构造器来创建HashMap对象. 1.无参构造函数public Ha ...

  3. python代码大全p-代码这样写更优雅(Python版)

    要写出 Pythonic(优雅的.地道的.整洁的)代码,还要平时多观察那些大牛代码,Github 上有很多非常优秀的源代码值得阅读,比如:requests.flask.tornado,笔者列举一些常见 ...

  4. 源码 状态机_LLVM学习笔记(1)--初探源码

    一直耳闻LLVM相比于GCC: well documented 架构灵活,前后端解耦符合龙书的讲解 昨天读到了一篇虽然概括却很周到的llvm入门导引 陈钦霖:LLVM Pass入门导引​zhuanla ...

  5. 《码农修行 编写优雅代码的32条法则》林文著 笔记

    第一章 可读性 法则01:准确命名 法则02:设置缩进 法则03:保留个性 法则04:语法潜台词 法则05:补充注释 第二章 可靠性 一, 法则06:增强健壮性 二, 法则07:避免过度防御 例:if ...

  6. 新版NFT藏品交易平台/铸造市场盲盒商城仿鲸探源码+教程

    正文: 完整标题: 修改数据库配置:config/database.php 修改public\h5\static\js\index.41991f99.js,替换为你的域名 后台地址:域名/admin ...

  7. Kafka生产者和消费者分区策略部分源码解析

    之前我在看其他的博客时,发现对于kafka consumer的RoundRobin的缺点分析中,有两种观点,一种认为缺点在于如果消费者组中消费者消费的主题不同,或者消费者线程数不同,那么会造成消费者消 ...

  8. Kafka生产者源码解析

    在讲解之前,我们带着以下疑问去看 1.生产者客户端如何获取要生产数据的topic元数据: 2.生产者如何组装消息: 3.生产者组装好消息后是直接发送到broker端吗? 4.消息是如何发送到broke ...

  9. React源码分析与实现(一):组件的初始化与渲染

    原文链接地址:github.com/Nealyang 转载请注明出处 前言 战战兢兢写下开篇...也感谢小蘑菇大神以及网上各路大神的博客资料参考~ 阅读源码的方式有很多种,广度优先法.调用栈调试法等等 ...

  10. k8s angular mysql_Angular 实践:如何优雅地发起和处理请求

    Tips: 本文实现重度依赖 ObservableInput,灵感来自灵雀云同事实现的 asyncData 指令,但之前没有 ObservableInput 的装饰器,处理响应 Input 变更相对麻 ...

最新文章

  1. Javascript中typeof instanceof constructor的区别
  2. Ubuntu 16.04使用root 帐号开启 SSH 登录
  3. xshell连接服务器失败_xshell-ssh连接服务器被经常意外中断
  4. 数字图像处理:第六章 几何运算
  5. Leaflet中使用Leaflet.draw插件实现图形交互绘制和编辑(修改图形坐标点)
  6. 本期期刊主题:ASP.NET技术与JavaScript技巧,包括控件等
  7. CSS中大小单位px,em,rem 以及微信小程序的rpx
  8. 我从to B 角度看百度
  9. STKX组件技术在星地链路中的仿真模式研究
  10. 数据分析 超市条码_超市商品管理的11个常见问题 | 经验借鉴 | 商品管理
  11. 微信小程序:老人疯狂裂变引流视频推广微信小程序
  12. w10如何共享计算机硬盘,w10共享盘怎么设置_win10如何共享硬盘
  13. 三年java不会线程_Java后端开发三年多线程你都懂,问你异步编程你说你没听过???...
  14. 高考加油别学计算机图片,高考加油图片励志壁纸图片欣赏
  15. 2020北京邮电大学计算机学院复试经验分享
  16. Coprime Triples——CodeChef - COPRIME3
  17. 自动钉木箱机器人_一种木箱生产用自动钉装设备的制作方法
  18. 如何把身份证扫描成电子版?证件转电子版,这3个方法超好用
  19. 水库工程标准化运管云系统,科技助力水库工程管理国家样板创建
  20. 1556 Color the ball

热门文章

  1. git使用过程及常用命令
  2. 知识图谱研讨实录09丨肖仰华教授带你读懂知识图谱语言认知
  3. Keras 文字生成系统
  4. 我看《网络营销实战密码:策略 技巧 案例》这本书
  5. 二十一天学通C语言:使用const声明指针变量
  6. Tensorflow:tensor数据类型转换、计算和变换
  7. 自动控制原理概念梳理(脑图)
  8. 苹果系统上安装linux系统,Mac OS X下制作安装Linux系统的USB启动盘
  9. mysql 加密 en_MySQL8.0 的表空间文件加密控制
  10. 力扣-5 最长回文子串