【Kafka】消息的同步发送和异步发送
文章目录
- 概述
- 1. sync vs async
- 1.1 java代码同步和异步
- 2. 可靠性机制(ack属性配置)
- 2.1 oneway
- 3. 一般配置
- 4. 同步异步和ack的联系和区别
- 参考
概述
kafka有同步(sync)、异步(async)以及oneway这三种发送方式,某些概念上区分也可以分为同步和异步两种,同步和异步的发送方式通过“producer.type”参数指定,而oneway由“request.require.acks”参数指定。
1. sync vs async
在官方文档Producer Configs中有如下:
翻译过来就是:
producer.type的默认值是sync,即同步的方式。这个参数指定了在后台线程中消息的发送方式是同步的还是异步的。如果设置成异步的模式,可以运行生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。
为什么是后台线程进行发送? 其实client调用发送接口,所有的数据被临时加入请求队列InFlightRequest,后台线程是通过 读取该队列的数据,进行发送操作的。
对于异步模式,还有4个配套的参数,如下:
Property | Default | Description |
---|---|---|
queue.buffering.max.ms | 5000 | 启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。 |
queue.buffering.max.messages | 10000 | 启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。 |
queue.enqueue.timeout.ms | -1 | 当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。 |
batch.num.messages | 200 | 启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量) |
以batch的方式推送数据可以极大的提高处理效率,kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。
注:这里的参数是指安装包中的shell脚本命令,而java客户端代码,需要用相应的语法
总结:
- 同步方式,一定是逐条发送的,第一条响应到达后,才会请求第二条
- 异步方式,可以发送一条,也可以批量发送多条,特性是不需等第一次(注意这里单位是次,因为单次可以是单条,也可以是批量数据)响应,就立即发送第二次
1.1 java代码同步和异步
同步发送
如果需要使用同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。
kafkaTemplate.send("testJson", message).get();
异步发送回调
可以从返回的future对象中稍后获取发送的结果,ProducerRecord、RecordMetadata包含了返回的结果信息
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("testJson", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {@Overridepublic void onFailure(Throwable ex) {ex.printStackTrace();}@Overridepublic void onSuccess(SendResult<String, Message> result) {System.out.println(result.getProducerRecord());System.out.println(result.getRecordMetadata());}
});
2. 可靠性机制(ack属性配置)
producers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks",这个参数决定了producer要求leader partition收到确认的副本个数:
如果acks设置为0,表示producer不会等待broker的相应,所以,producer无法知道消息是否发生成功,这样有可能导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。
若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待知道broker确认收到消息。
若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。
2.1 oneway
前面只提到了sync和async,那么oneway是什么呢?
oneway是只顾消息发出去而不管死活,消息可靠性最低,但是低延迟、高吞吐,这种对于某些完全对可靠性没有要求的场景还是适用的,即request.required.acks
设置为0。
oneway的效果也是异步的。因此,设置同步和异步非时候,要综合考虑。
3. 一般配置
对于sync的发送方式:
producer.type=sync
request.required.acks=1
对于async的发送方式:
producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
对于oneway的发送发送:
producer.type=async '既然都已经设置ack=0忽略高可靠性了,也就没必要设置为同步了'
request.required.acks=0
4. 同步异步和ack的联系和区别
上图分析:
当用户调用send时,就完成数据发送了(对于用户来说),后台线程负责实际发送数据,因此,新版本下,我们说数据发送总是异步的。
send()方法每次只能发送一条数据至InFlightRequest队列
用户可以通过send().get() ,把用户主线程改为同步方式
因此,同步和异步概念 分为用户线程和发送线程,用户线程有同步和异步之分;发送线程只有异步用户线程选择同步,效果是逐条发送,因为请求队列InFlightRequest中永远最多有一条数据。异步+设置 后台线程的异步发送参数:max.in.flight.requests.per.connection=1 & batch.size=1,效果也是逐条发送。
max.in.flight.requests.per.connection控制只能发送一次请求,发送次数有个窗口,控制该窗口的值,但是每次可发送一批数据;batch.size是控制一批数据的上限,当batch.size=1时,每次最多发送一条。组合在一起就是 只能连续发送一次请求,每次最多发送一条。
同步和异步指client(producer)是否收到leader给的ack后才发下一条,acks = 0, -1和1是指leader节点和follower节点数据同步的方式,可靠性机制,是保证数据能成功备份到其他节点的机制,二者是独立关系,因此可以是下面的组合
同步+ack任意值
异步+ack任意值
但是由于ack的选项有3个,会有最佳搭配的概念,例如:
producer.type=async '既然都已经设置ack=0忽略高可用性了,也就没必要设置为同步了'
request.required.acks=0
既然都已经设置ack=0忽略高可靠性了,ack=0牺牲可靠性换取速度,也就没必要设置为同步了,设置为异步又可以提高数据
参考
Kafka之sync、async以及oneway
kafka 同步、异步发送java代码同步和异步
【Kafka】消息的同步发送和异步发送相关推荐
- 消息的同步发送,异步发送以及消息发送的可靠性
最近写的一个通信框架中有两种最基本的消息发送方式:同步发送和异步发送. 同步方式: 消息的发送方发A送一条消息到接收端B,B收到消息之后需要对消息进行处理,然后发送ACK确认消息回A,A收到B的ACK ...
- 异步发送,那消息可靠性怎么保证?
消息丢失可能发生在生产者发送消息.MQ本身丢失消息.消费者丢失消息3个方面. 生产者丢失 生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断MQ没收到,消 ...
- 在Java中发送kafka消息
文章目录 一.kafka API简介 二.引入kafka java客户端依赖 三.发送消息 四.消息分区机制 1.自定义分区机制 五.消息序列化 1.自定义序列化器 六.producer拦截器 七.消 ...
- 19 kafka消息队列
文章目录 19 kafka消息队列 一.kafka介绍 1.消息队列基本介绍 2.常用的消息队列介绍 3.消息队列的应用场景 4.消息队列的两种模式 5.kafka的基本介绍 6.kafka的架构介绍 ...
- kafka之Producer同步与异步消息发送及事务幂等性案例应用实战
本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客.版权声明:本套Spark商业应用实战归作者(秦凯新)所有,禁止转载,欢迎学习. 秦凯新的技术社区 ...
- Kafka生产者——消息发送流程,同步、异步发送API
生产者消息发送流程 发送原理 Kafka的Producer发送消息采用的是异步发送的方式. 在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAc ...
- kafka实现异步发送_Kafka 异步消息也会阻塞?记一次 Dubbo 频繁超时排查过程
线上某服务 A 调用服务 B 接口完成一次交易,一次晚上的生产变更之后,系统监控发现服务 B 接口频繁超时,后续甚至返回线程池耗尽错误 Thread pool is EXHAUSTED.因为服务 B ...
- kafka实现异步发送_Kafka Producer 异步发送消息居然也会阻塞?
Kafka 一直以来都以高吞吐量的特性而家喻户晓,就在上周,在一个性能监控项目中,需要使用到 Kafka 传输海量消息,在这过程中遇到了一个 Kafka Producer 异步发送消息会被阻塞的问题, ...
- KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)
文章目录 一.基础集成 1. 技术选型 2. 导入依赖 3. kafka配置 4. auto-offset-reset 简述 5. 新增一个订单类 6. 生产者(异步) 7. 消费者 8. kafka ...
最新文章
- “.Net 社区虚拟大会”(dotnetConf) 2016 Day 3 Keynote: Scott Hanselman
- 2018.3.13 12周2次课
- php curl ob start,curl - php中开启缓冲压缩 ob_start('ob_gzhandler') 之后是在什么时候开始的压缩?...
- 数据挖掘十大经典算法之——PageRank 算法
- mpmath.psi python_【Python Package】mpmath学习笔记(2)
- python做一个小游戏_利用python做个小游戏
- 医院耗材管理系统开发_11
- 数据库文件导入SQL数据库
- JSP幼儿园管理系统
- 解决Windows7下virtualbox安装ubuntu出现的0x00000000指令引用0x00000000内存,该内存不能为written问题
- 使用Resnet网络对人脸图像分类识别出男女性别(包含数据集制作+训练+测试)
- 聚美上市后将往何方:服装特卖和100%透明的化妆品渠道
- python自动上传百度网盘_树莓派使用百度云盘自动上传存储监控照片
- python中sub的用法_python 正则表达式篇 - sub 用法
- 怎样写一篇critical review
- 3D游戏建模:女性角色制作
- Python:数据降序排列索引
- 评定职称/积分落户,原来软考证书含金量这么大!
- 64位Windows 8 运行Trial-Reset,但是提示缺少“MSCOMCTL.OCX”的解决方法
- 2018/8/22部分算法总结 二维几何常用算法
热门文章
- HTML-Css文字排版--字体--段落
- 【吐血整理】java三元表达式比较三个数
- 闭包,及闭包中this指向
- 计算机维修工中级在线阅读,《计算机维修工中级2).doc
- 分类树(回归树)的优劣势
- 关闭office Skype for business 自启动 ---> win7【亲测可用,可供参考】
- 如何查找电脑蓝屏原因之详解
- js对象深拷贝的实现
- Monthly Expense (二分初级典例)
- python es 数据库 ik_Centos7 搭建ES搜索引擎,并通过go-mysql-elasticsearch 实现数据同步...