摘要:在kafka-0.8.2之后,producer不再区分同步(sync)和异步方式(async),所有的请求以异步方式发送,这样提升了客户端效率。

本文分享自华为云社区《kafka消息发送模式》,作者:dayu_dls。

在kafka-0.8.2之后,producer不再区分同步(sync)和异步方式(async),所有的请求以异步方式发送,这样提升了客户端效率。producer请求会返回一个应答对象,包括偏移量或者错误信。这种异步方地批量的发送消息到kafka broker节点,因而可以减少server端资源的开销。新的producer和所有的服务器网络通信都是异步地,在ack=-1模式下需要等待所有的replica副本完成复制时,可以大幅减少等待时间。

为生产者设置属性

1. bootstrap.servers: 该属性指定broker 的地址清单,地址的格式为host:po 忱。清单里不需要包含所有的broker 地址,生产者会给定的broker 里查找到其他broker 的信息。不过建议至少要提供两个broker 的信息, 一且其中一个若机,生产者仍然能够连接到集群上。

2. key.serializer: broker 希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因此可以把Java 对象作为键和值发送给broker 。这样的代码具有良好的可读性,不过生产者需要知道如何把这些Java 对象转换成字节数组。key. serializer必须被设置为一个实现了org.apache.kafka.common.serialization.StringSerializer接口的类,生产者会使用这个类把键对象序列化成字节数组。Kafka 客户端默认提供了ByteArraySerializer(这个只做很少的事情)、StringSerializer和IntegeSerializer,因此,如果你只使用常见的几种Java 对象类型,那么就没必要实现自己的序列化器。要注意, key.serializer是必须设置的,就算你打算只发送值内容。

3. value.serializer: 与key.serializer一样,value.serializer指定的类会将值序列化。如果键和值都是字符串,可以使用与key.serializer一样的序列化器。如果键是整数类型而值是字符串,那么需要使用不同的序列化器。

kafka发送端3种不同的发送模式

1、Fire-and-forget

只发送消息,不关心消息是否发送成功。本质上也是一种异步发送的方式,消息先存储在缓冲区中,达到设定条件后批量发送。当然这是kafka吞吐量最高的一种方式,并配合参数acks=0,这样生产者不需要等待服务器的响应,以网络能支持的最大速度发送消息。但是也是消息最不可靠的一种方式,因为对于发送失败的消息没有做任何处理。

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
try {producer.send(record);
} catch (Exception e) {e.printStackTrace();
}

在发送消息之前有可能会发生异常,例如是序列化消息失败的SerializationException、缓冲区满的BufferExhaustedException、发送超时的TimeoutException或者发送的线程被中断的InterruptException。发送消息之后并没有异常处理。

2、Synchronous send

同步发送,send()方法会返回Futrue对象,通过调用Futrue对象的get()方法,等待直到结果返回,根据返回的结果可以判断是否发送成功。如果业务要求消息必须是按顺序发送的,那么可以使用同步的方式,并且只能在一个partation上,结合参数设置retries的值让发送失败时重试,设置max_in_flight_requests_per_connection=1,可以控制生产者在收到服务器晌应之前只能发送1个消息,在消息发送成功后立刻flush,从而控制消息顺序发送。

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
try {RecordMetadata metadata = producer.send(record).get();
} catch (Exception e) {e.printStackTrace();
}
producer.flush();
producer.close();

在调用send()方法后再调用get()方法等待结果返回。如果发送失败会抛出异常,如果发送成功会返回一个RecordMetadata对象,然后可以调用offset()方法获取该消息在当前分区的偏移量。

KafkaProducer有两种类型的异常,第一种是可以重试的Retriable,该类异常可以通过重新发送消息解决。例如是连接异常后重新连接、“no leader”异常后重新选取新的leader。KafkaProducer可以配置为遇到该类异常后自动重新发送消息直到超过重试次数。第二类是不可重试的,例如是“message size too large”(消息太大),该类异常会马上返回错误。

3、Asynchronous send

异步发送,在调用send()方法的时候指定一个callback函数,当broker接收到返回的时候,该callback函数会被触发执行。如果业务需要知道消息发送是否成功,并且对消息的顺序不关心,那么可以用异步+回调的方式来发送消息,配合参数retries=0,并将发送失败的消息记录到日志文件中;要使用callback函数,先要实现org.apache.kafka.clients.producer.Callback接口,该接口只有一个onCompletion方法。如果发送异常,onCompletion的参数Exception e会为非空。

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);producer.send(myRecord,new Callback() {public void onCompletion(RecordMetadata metadata, Exception e) {if(e != null) {e.printStackTrace();} else {System.out.println("The offset of the record we just sent is: " + metadata.offset());}}});  

异步发送相关参数

异步发送时,kafka会先把消息存储在缓冲池中,当到达设定条件触发缓冲池消息发送。

(1)消息缓存达到batch.size;

(2)距离上一次消息发送时间间隔linger.ms;

(3)调用flush()方法,会立刻触发发送,并阻塞到当前缓冲区发送完毕;

(4)调用close(),触发发送,完毕后关闭。

buffer.memory

此配置设置生产者可用于缓冲等待发送给brokers消息的总内存字节数,默认为33554432=32MB。如果消息发送到缓存区的速度比发送到broker的速度快,那么生产者会被阻塞(根据max.block.ms配置的时间,默认为60000ms=1分钟,在0.9.0.0版本之前使用block.on.buffer.full配置),之后会抛出异常。

compression.type

生产者对生成的所有数据使用的压缩类型,默认值是none(即不压缩),有效值为none,gzip,snappy或lz4。Snappy压缩技术是Google开发的,它可以在提供较好的压缩比的同时,减少对CPU的使用率并保证好的性能,所以建议在同时考虑性能和带宽的情况下使用。Gzip压缩技术通常会使用更多的CPU和时间,但会产生更好的压缩比,所以建议在网络带宽更受限制的情况下使用。通过启用压缩功能,可以减少网络利用率和存储空间,这往往是向Kafka发送消息的瓶颈。

retries

默认值为0,当设置为大于零的值,客户端会重新发送任何发送失败的消息。注意,此重试与客户端收到错误时重新发送消息是没有区别的。在配置max.in.flight.requests.per.connection不等于1的情况下,允许重试可能会改变消息的顺序,因为如果两个批次的消息被发送到同一个分区,第一批消息发送失败但第二批成功,而第一批消息会被重新发送,则第二批消息会先被写入。注意此参数可能会改变消息的顺序性。

batch.size

当多个消息被发送到同一个分区时,生产者会把它们一起处理。此配置设置用于每批处理使用的内存字节数,默认为16384=16KB。当使用的内存满的时候,生产者会发送当前批次的所有消息。但是,这并不意味着生产者会一直等待使用的内存变满,根据下面linger.ms配置的时间也会触发消息发送。设置较小的值会增加发送的频率,从而可能会减少吞吐量;设置较大的值会使用较多的内存,设置为0会关闭批处理的功能。

linger.ms

此配置设置在发送当前批次消息之前等待新消息的时间量,默认值为0。KafkaProducer会在当前批次使用的内存已满或等待时间到达linger.ms配置时间的时候发送消息。当linger.ms>0时,延时性会增加,但会提高吞吐量,因为会减少消息发送频率。

client.id

用于标识发送消息的客户端,通常用于日志和性能指标以及配额。

max.in.flight.requests.per.connection

此配置设置客户端在单个连接上能够发送的未确认请求的最大数量,默认为5,超过此数量会造成阻塞。设置大的值可以提高吞吐量但会增加内存使用,但是需要注意的是,当设置值大于1而且发送失败时,如果启用了重试配置,有可能会改变消息的顺序。设置为1时,即使重新发送消息,也可以保证发送的顺序和写入的顺序一致。

request.timeout.ms

此配置设置客户端等待请求响应的最长时间,默认为30000ms=30秒,如果在这个时间内没有收到响应,客户端将重发请求,如果超过重试次数将抛异常。此配置应该比replica.lag.time.max.ms(broker配置,默认10秒)大,以减少由于生产者不必要的重试造成消息重复的可能性。

max.block.ms

当发送缓冲区已满或者元数据不可用时,生产者调用send()和partitionsFor()方法会被阻塞,默认阻塞时间为60000ms=1分钟。由于使用用户自定义的序列化器和分区器造成的阻塞将不会计入此时间。

max.request.size

此配置设置生产者在单个请求中能够发送的最大字节数,默认为1048576字节=1MB。例如,你可以发送单个大小为1MB的消息或者1000个大小为1KB的消息。注意,broker也有接收消息的大小限制,使用的配置是message.max.bytes=1000012字节(好奇怪的数字,约等于1MB)。

receive.buffer.bytes和send.buffer.bytes

receive.buffer.bytes:读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小,默认值为32768字节=32KB。如果设置为-1,则将使用操作系统的默认值。
send.buffer.bytes:发送数据时使用的TCP发送缓冲区(SO_SNDBUF)的大小,默认值为131072字节=128KB。如果设置为-1,则将使用操作系统的默认值。

点击关注,第一时间了解华为云新鲜技术~

带你认识三种kafka消息发送模式相关推荐

  1. go 实现 kafka 消息发送、接收

    引言 网络上关于 go 实现 kafka 消息发送和接收的文章很多,但是实际操作起来又不是很清楚,本文在网络资源的基础上,结合自己搭建过程中遇到的问题进行了总结. 本文的实验主机:Mac笔记本. 一. ...

  2. 怎么关闭win10虚拟机服务器,如何关闭Win10自带虚拟机?三种关闭Win10专业版系统hyper-v虚拟机的方法...

    在win10专业版系统中,通常是自带有虚拟机Hyper-V,可以帮助大家实现其他系统的操作,但是很多用户并不知道要怎么开启虚拟机Hyper-V,hyper-v可以提高虚拟实现的可用性,但是如果我们使用 ...

  3. RabbitMQ系列-顺序消费模式和迅速消息发送模式

    MQ使用过程中,有些业务场景需要我们保证顺序消费,而如果一个Producer,一个Queue,多个Consumer的情况下是无法保证顺序的; 举例: 1.业务上产生三条消息,分别是对数据的增加.修改. ...

  4. Objective-C(十九、通知-消息发送模式之中的一个)——iOS开发基础

    结合之前的学习笔记以及參考<Objective-C编程全解(第三版)>,对Objective-C知识点进行梳理总结. 知识点一直在变.仅仅是作为參考.以苹果官方文档为准~ 十九.通知-消息 ...

  5. aardio - nanomsg/nnmsg/nnm 消息发送模式图示

    nanomsg提供6种消息发送模式: pair:单线配对模式 push - pull  (pipeline) :任务分发模式 pub(publish) - sub(subscribe) :广播订阅模式 ...

  6. 互联网赚钱三种最基本的模式

    身边经常会有朋友不满足现状,希望摆脱现在的枯燥乏味的工作,创立自己的事业,偶尔与我聊起通过互联网创业的问题,促使我认真的去考虑互联网赚钱的问题来.久而久之,我对互联网创业也有了一定的认识.现在逐渐地写 ...

  7. 领域驱动系列:三种领域逻辑组织模式的本质

    企业应用架构模式中明确提出了三种领域逻辑组织模式:事务脚本.领域模型和表模块.不少人看的云里雾里的,不少人说的似懂非懂的,主要原因是没有从项目的级别的分析和设计经验,只有单个项目模块的开发经验的人很难 ...

  8. 一个星期使用三种不同的开发模式完成资讯类App——《听风资讯》

    文章目录 1.引言 2.App开发模式的主要区别 3.App开发模式在开发项目时所使用到的技术栈 4.App开发时的感想 4.1 Native App(原生App) 4.1.1 Material De ...

  9. 云计算机根据部署方式,云计算的三种类型及部署模式

    云计算的三种类型及部署模式 猫先生 • 2019 年 07 月 08 日 云计算让开发人员和 IT 部门可以全身心投入最有价值的工作,避免采购.维护.容量规划等无价值的工作分散精力.云计算已经日渐普及 ...

最新文章

  1. 剖析Focal Loss损失函数: 消除类别不平衡+挖掘难分样本 | CSDN博文精选
  2. 8款审核AWS帐户安全性的免费工具,你值得拥有
  3. 关于SQLServer2000的全文检索使用心得
  4. 生日小助手官方网站已经发布了!
  5. C语言中static详细分析
  6. 信息学奥赛一本通(C++)在线评测系统——基础(一)C++语言——1080:余数相同问题
  7. 线程间操作无效: 从不是创建控件“Control Name'”的线程访问它问题的解决方案及原理分析...
  8. java 课后习题 计算两个日期之间的天数
  9. KMP算法的浅显解释
  10. weblogic部署项目后内存溢出
  11. 学习笔记9--汽车线控系统技术
  12. 2019年电赛H题电磁炮实录
  13. Unity3D坦克大战项目总结
  14. 东北大学软件项目管理与过程改进复习提纲(2020)——第八章《项目质量管理》
  15. 记得十年前谷歌大量使用python_关于利用Python玩转百万答题
  16. EXCEL数组公式,求多条件下的中位数的实现方法和注意点
  17. android Toast
  18. 高等学校工程教育改革试点的新动向
  19. 迅游服务器延时不稳定,用迅游玩《征途》 网络延迟天堑变通途
  20. day46--快速排序

热门文章

  1. 如何使用开源工具制作YouTube系列
  2. 小小在线教授何为BTC跳矿(教授如何在线扩大收益)
  3. Bootstrap3 插件的调用方式
  4. python爬虫可以用acada_python爬虫从入门到放弃(四)之 Requests库的基本使用(转)...
  5. php跳转分站,根据访客所在城市ip地址自动跳转到分站的php代码
  6. mysql 存储过程 批量导入数据_sql 利用存储过程批量导入数据
  7. java 读取硬件设备发送数据_Java网络编程基础
  8. h5列表 php,H5的标签使用详解
  9. django,cbv,模板层
  10. Centos安装Oracle数据库文本记录