文章目录

  • 概述
  • 1. sync vs async
    • 1.1 java代码同步和异步
  • 2. 可靠性机制(ack属性配置)
    • 2.1 oneway
  • 3. 一般配置
  • 4. 同步异步和ack的联系和区别
  • 参考

概述

kafka有同步(sync)、异步(async)以及oneway这三种发送方式,某些概念上区分也可以分为同步和异步两种,同步和异步的发送方式通过“producer.type”参数指定,而oneway由“request.require.acks”参数指定。

1. sync vs async

在官方文档Producer Configs中有如下:

翻译过来就是:
producer.type的默认值是sync,即同步的方式。这个参数指定了在后台线程中消息的发送方式是同步的还是异步的。如果设置成异步的模式,可以运行生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。

为什么是后台线程进行发送? 其实client调用发送接口,所有的数据被临时加入请求队列InFlightRequest,后台线程是通过 读取该队列的数据,进行发送操作的。

对于异步模式,还有4个配套的参数,如下:

Property Default Description
queue.buffering.max.ms 5000 启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。
queue.buffering.max.messages 10000 启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。
queue.enqueue.timeout.ms -1 当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。
batch.num.messages 200 启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量)

以batch的方式推送数据可以极大的提高处理效率,kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。

注:这里的参数是指安装包中的shell脚本命令,而java客户端代码,需要用相应的语法

总结:

  • 同步方式,一定是逐条发送的,第一条响应到达后,才会请求第二条
  • 异步方式,可以发送一条,也可以批量发送多条,特性是不需等第一次(注意这里单位是次,因为单次可以是单条,也可以是批量数据)响应,就立即发送第二次

1.1 java代码同步和异步

同步发送
如果需要使用同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。

kafkaTemplate.send("testJson", message).get();

异步发送回调
可以从返回的future对象中稍后获取发送的结果,ProducerRecord、RecordMetadata包含了返回的结果信息

ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("testJson", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {@Overridepublic void onFailure(Throwable ex) {ex.printStackTrace();}@Overridepublic void onSuccess(SendResult<String, Message> result) {System.out.println(result.getProducerRecord());System.out.println(result.getRecordMetadata());}
});

2. 可靠性机制(ack属性配置)

producers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks",这个参数决定了producer要求leader partition收到确认的副本个数:

  • 如果acks设置为0,表示producer不会等待broker的相应,所以,producer无法知道消息是否发生成功,这样有可能导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。

  • 若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待知道broker确认收到消息。

  • 若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。

2.1 oneway

前面只提到了sync和async,那么oneway是什么呢?
oneway是只顾消息发出去而不管死活,消息可靠性最低,但是低延迟、高吞吐,这种对于某些完全对可靠性没有要求的场景还是适用的,即request.required.acks设置为0。

oneway的效果也是异步的。因此,设置同步和异步非时候,要综合考虑。

3. 一般配置

对于sync的发送方式:

producer.type=sync
request.required.acks=1

对于async的发送方式:

producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200

对于oneway的发送发送:

producer.type=async           '既然都已经设置ack=0忽略高可靠性了,也就没必要设置为同步了'
request.required.acks=0

4. 同步异步和ack的联系和区别


上图分析:

  • 当用户调用send时,就完成数据发送了(对于用户来说),后台线程负责实际发送数据,因此,新版本下,我们说数据发送总是异步的。

  • send()方法每次只能发送一条数据至InFlightRequest队列

  • 用户可以通过send().get() ,把用户主线程改为同步方式
    因此,同步和异步概念 分为用户线程和发送线程,用户线程有同步和异步之分;发送线程只有异步

    用户线程选择同步,效果是逐条发送,因为请求队列InFlightRequest中永远最多有一条数据。异步+设置 后台线程的异步发送参数:max.in.flight.requests.per.connection=1 & batch.size=1,效果也是逐条发送。

    max.in.flight.requests.per.connection控制只能发送一次请求,发送次数有个窗口,控制该窗口的值,但是每次可发送一批数据;batch.size是控制一批数据的上限,当batch.size=1时,每次最多发送一条。组合在一起就是 只能连续发送一次请求,每次最多发送一条。

同步和异步指client(producer)是否收到leader给的ack后才发下一条,acks = 0, -1和1是指leader节点和follower节点数据同步的方式,可靠性机制,是保证数据能成功备份到其他节点的机制,二者是独立关系,因此可以是下面的组合

同步+ack任意值
异步+ack任意值

但是由于ack的选项有3个,会有最佳搭配的概念,例如:

producer.type=async           '既然都已经设置ack=0忽略高可用性了,也就没必要设置为同步了'
request.required.acks=0

既然都已经设置ack=0忽略高可靠性了,ack=0牺牲可靠性换取速度,也就没必要设置为同步了,设置为异步又可以提高数据

参考

Kafka之sync、async以及oneway
kafka 同步、异步发送java代码同步和异步

【Kafka】消息的同步发送和异步发送相关推荐

  1. 消息的同步发送,异步发送以及消息发送的可靠性

    最近写的一个通信框架中有两种最基本的消息发送方式:同步发送和异步发送. 同步方式: 消息的发送方发A送一条消息到接收端B,B收到消息之后需要对消息进行处理,然后发送ACK确认消息回A,A收到B的ACK ...

  2. 异步发送,那消息可靠性怎么保证?

    消息丢失可能发生在生产者发送消息.MQ本身丢失消息.消费者丢失消息3个方面. 生产者丢失 生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断MQ没收到,消 ...

  3. 在Java中发送kafka消息

    文章目录 一.kafka API简介 二.引入kafka java客户端依赖 三.发送消息 四.消息分区机制 1.自定义分区机制 五.消息序列化 1.自定义序列化器 六.producer拦截器 七.消 ...

  4. 19 kafka消息队列

    文章目录 19 kafka消息队列 一.kafka介绍 1.消息队列基本介绍 2.常用的消息队列介绍 3.消息队列的应用场景 4.消息队列的两种模式 5.kafka的基本介绍 6.kafka的架构介绍 ...

  5. kafka之Producer同步与异步消息发送及事务幂等性案例应用实战

    本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客.版权声明:本套Spark商业应用实战归作者(秦凯新)所有,禁止转载,欢迎学习. 秦凯新的技术社区 ...

  6. Kafka生产者——消息发送流程,同步、异步发送API

    生产者消息发送流程 发送原理 Kafka的Producer发送消息采用的是异步发送的方式. 在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAc ...

  7. kafka实现异步发送_Kafka 异步消息也会阻塞?记一次 Dubbo 频繁超时排查过程

    线上某服务 A 调用服务 B 接口完成一次交易,一次晚上的生产变更之后,系统监控发现服务 B 接口频繁超时,后续甚至返回线程池耗尽错误 Thread pool is EXHAUSTED.因为服务 B ...

  8. kafka实现异步发送_Kafka Producer 异步发送消息居然也会阻塞?

    Kafka 一直以来都以高吞吐量的特性而家喻户晓,就在上周,在一个性能监控项目中,需要使用到 Kafka 传输海量消息,在这过程中遇到了一个 Kafka Producer 异步发送消息会被阻塞的问题, ...

  9. KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)

    文章目录 一.基础集成 1. 技术选型 2. 导入依赖 3. kafka配置 4. auto-offset-reset 简述 5. 新增一个订单类 6. 生产者(异步) 7. 消费者 8. kafka ...

最新文章

  1. “.Net 社区虚拟大会”(dotnetConf) 2016 Day 3 Keynote: Scott Hanselman
  2. 2018.3.13 12周2次课
  3. php curl ob start,curl - php中开启缓冲压缩 ob_start('ob_gzhandler') 之后是在什么时候开始的压缩?...
  4. 数据挖掘十大经典算法之——PageRank 算法
  5. mpmath.psi python_【Python Package】mpmath学习笔记(2)
  6. python做一个小游戏_利用python做个小游戏
  7. 医院耗材管理系统开发_11
  8. 数据库文件导入SQL数据库
  9. JSP幼儿园管理系统
  10. 解决Windows7下virtualbox安装ubuntu出现的0x00000000指令引用0x00000000内存,该内存不能为written问题
  11. 使用Resnet网络对人脸图像分类识别出男女性别(包含数据集制作+训练+测试)
  12. 聚美上市后将往何方:服装特卖和100%透明的化妆品渠道
  13. python自动上传百度网盘_树莓派使用百度云盘自动上传存储监控照片
  14. python中sub的用法_python 正则表达式篇 - sub 用法
  15. 怎样写一篇critical review
  16. 3D游戏建模:女性角色制作
  17. Python:数据降序排列索引
  18. 评定职称/积分落户,原来软考证书含金量这么大!
  19. 64位Windows 8 运行Trial-Reset,但是提示缺少“MSCOMCTL.OCX”的解决方法
  20. 2018/8/22部分算法总结 二维几何常用算法

热门文章

  1. HTML-Css文字排版--字体--段落
  2. 【吐血整理】java三元表达式比较三个数
  3. 闭包,及闭包中this指向
  4. 计算机维修工中级在线阅读,《计算机维修工中级2).doc
  5. 分类树(回归树)的优劣势
  6. 关闭office Skype for business 自启动 ---> win7【亲测可用,可供参考】
  7. 如何查找电脑蓝屏原因之详解
  8. js对象深拷贝的实现
  9. Monthly Expense (二分初级典例)
  10. python es 数据库 ik_Centos7 搭建ES搜索引擎,并通过go-mysql-elasticsearch 实现数据同步...