文章目录

  • 1. 生产者消息发送流程
    • 1.1 发送原理
    • 1.2 生产者重要参数列表
  • 2. 生产者分区
    • 2.1 分区的优点
    • 2.2 生产者发送消息的分区策略
  • 3. 生产者吞吐量与数据可靠性
    • 3.1 吞吐量
    • 3.2 数据可靠性
  • 4. 生产者数据幂等性与事务
    • 4.1 幂等性
    • 4.2 事务
  • 5. 生产者的数据有序与乱序

1. 生产者消息发送流程

1.1 发送原理

在消息发送的过程中,涉及到两个线程——main线程和sender线程。在main线程中创建了一个双端队列RecordAccumulator,main线程将消息发送给RecordAccumulator,sender线程不断从RecordAccumulator 中拉取消息发送到kafka broker中

1.2 生产者重要参数列表

参数名称 描述
bootstrap.servers 生产者连接集群所需的broker地址清单。可以设置 1 个或者多个,中间用逗号隔开。
注意:这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker 信息
key.serializer 和 value.serializer 指定发送消息的 key 和 value 的序列化类型。一定要写全类名
buffer.memory RecordAccumulator 缓冲区总大小,默认 32m
batch.size 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加
linger.ms 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间
acks 0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的
max.in.flight.requests.per.connection 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字
retries 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了
retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms
enable.idempotence 是否开启幂等性,默认 true,开启幂等性
compression.type 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。
支持压缩类型:none、gzip、snappy、lz4 和 zstd

2. 生产者分区

2.1 分区的优点

  1. 便于合理的使用存储资源。每个partition在一个broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台broker上。合理控制分区的任务,可以实现负载均衡的效果。
  2. 提高并行度。生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费数据。

2.2 生产者发送消息的分区策略

  1. 默认分区器DefaultPartitioner
    (1)指明partition的情况下,直接将指明的值作为partition值;
    (2)没有指明partition值,但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
    (3)既没有partition值,又没有key值的情况下,kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能使用该分区,等该分区的batch已满或已完成,kafka再随机一个分区进行使用(和上次的分区不同)
    例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16K)或者linger.ms设置的时间到了,kafka再随机一个分区进行使用(如果还是0会继续随机)

  2. 自定义分区器
    研发人员可以根据企业需求,自己重新实现分区器
    (1)定义类实现Partitioner接口
    (2)重写partition()方法

/*** 1. 实现接口 Partitioner* 2. 实现 3 个方法:partition,close,configure* 3. 编写 partition 方法,返回分区号*/
public class MyPartitioner implements Partitioner {/*** 尚硅谷大数据技术之 Kafka* —————————————————————————————* 返回信息对应的分区** @param topic      主题* @param key        消息的 key* @param keyBytes   消息的 key 序列化后的字节数组* @param value      消息的 value* @param valueBytes 消息的 value 序列化后的字节数组* @param cluster    集群元数据可以查看分区信息* @return*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue = value.toString();// 创建 partitionint partition;// 判断消息是否包含 atguiguif (msgValue.contains("atguigu")) {partition = 0;} else {partition = 1;}// 返回分区号return partition;}// 关闭资源@Overridepublic void close() {}// 配置方法@Overridepublic void configure(Map<String, ?> configs) {}
}

(3)在生产者的配置中,添加该分区器参数

3. 生产者吞吐量与数据可靠性

3.1 吞吐量

(1)batch.size:批次大小,默认16K
(2)linger.ms:等待时间,修改为5~100ms。默认0ms
(3)compression.type:压缩snappy。默认none
(4)RecordAccumulator:缓冲区大小,修改为64M,默认32M

3.2 数据可靠性

  1. kafka如何处理某个follower故障,不能与leader进行数据同步的问题?
    leader维护一个动态的in-sync replica set(ISR),意为和leader保持同步的folloer+leader集合(leader: 0,isr:0,1,2)
    如果follower长时间没有向leader发送通信请求或者数据同步,则该follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s

  2. 数据完全可靠的条件
    ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

  3. 可靠性总结
    (1)acks = 0,生产者发送过来数据就不管了,可靠性差,效率高。生产环境中很少使用;
    (2)acks = 1,生产者发送过来数据leader应答,可靠性中等,效率中等。生产环境中多用于传输普通日志,允许丢个别数量;
    (3)ack = -1,生产者发送过来数据leader和ISR队列里面所有follower应答,可靠性高,效率低。生产环境一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

4. 生产者数据幂等性与事务

(1)当ack = 0时:数据最多传递一次,可以保证数据不重复,但是不能保证数据不丢失;
(2)当ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2:数据至少传第一次,可以保证数据不丢失,但是不能保证数据不重复。
(3)对于非常重要的信息,例如钱,要求数据既不能重复也不能丢失。在kafka 0.11之后,引入了一项重大特性:幂等性和事务

4.1 幂等性

  1. 幂等性就是指producer不论向Broker发送多少次重复数据,Broker都只会持久化一条数据,保证了不重复。
  2. 精确一次 = 幂等性 + 至少一次(ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2)

重复数据的判断标准:
具有有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是kafka每次重启都会分配一个新的;Partition表示分区号;SeqNumber是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复

  1. 幂等性开启参数 enable.idempotence 默认为 true,false 关闭

4.2 事务

  1. 开启事务,就必须开启幂等性
  2. kafka事务一共有5个api
//1、初始化事务
void initTransactions();//2、开启事务
void beginTransaction() throws ProducerFencedException;//3、在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;//4、提交事务
void commitTransaction() throws ProducerFencedException;//5、放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

5. 生产者的数据有序与乱序

  1. 单分区内有序

(1)在kafka 1.X版本之前保证单分区有序,条件如下

max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)

(2)在kafka 1.X及以后保证单分区有序,条件如下

未开启幂等性
max.in.flight.requests.per.connection需要设置为1开启幂等性
max.in.flight.requests.per.connection需要设置小于等于5。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的
  1. 多分区内无序。如果要让多分区也有序,可以在消费者处添加中间件存储数据

Kafka(二):生产者相关推荐

  1. Kafka的生产者优秀架构设计

    来自:架构之美 前言 Kafka 是一个高吞吐量的分布式的发布订阅消息系统,在全世界都很流行,在大数据项目里面使用尤其频繁.笔者看过多个大数据开源产品的源码,感觉 Kafka 的源码是其中质量比较上乘 ...

  2. Kafka 的生产者优秀架构设计

    作者 | 孙玄 责编 | 郭芮 来源 | 架构之美(ID:beautyArch) Kafka 是一个高吞吐量的分布式的发布订阅消息系统,在全世界都很流行,在大数据项目里面使用尤其频繁.笔者看过多个大数 ...

  3. kafka中生产者和消费者的分区问题

    本文来书说下kafka中生产者和消费者的分区问题 文章目录 概述 主题的分区数设置 分区与生产者 分区与消费者 range roundrobin(轮询) 本文参考 本文小结 概述 我们知道,生产者发送 ...

  4. 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例

    从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...

  5. 手撸kafka producer生产者的分区器(partition)API

    简介:本篇博客是对kafka produce 生产者分区器的API(Java) 包含以下内容:分区使用原则,分区器使用原则,分区器相关代码编写及pom.xml配置文件编写,到最后的运行结果. 目录标题 ...

  6. 详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)

    1 缘起 学习消息队列的过程中,先补习了RabbitMQ相关知识, 接着又重温了Kafka相关的知识, 发现,我并没有积累Java原生操作Kafka的文章, 只使用SpringBoot集成过Kafka ...

  7. 【kafka系列】kafka之生产者发送消息实践

    目录 一.准备工作 二.终端命令 生产者命令 消费者命令 三.Java实践 搭建项目 异步发送-无回调 异步发送-有回调 同步发送 一.准备工作 进入实战之前先熟悉一下topic的相关命令,使用终端命 ...

  8. 【Kafka】生产者

    客户端开发 生产者就是负责向kafka发送消息的应用程序.一个正常的生产逻辑的步骤为: 配置生产者客户端参数及创建相应的生产者实例. 构建待发送的消息. 发送消息. 关闭生产者实例. 在创建真正的生产 ...

  9. kafka中生产者是如何把消息投递到哪个分区的?消费者又是怎么选择分区的?...

    作者 | 废物大师兄 来源 | https://www.cnblogs.com/cjsblog/p/9664536.html 1. 前言 我们知道,生产者发送消息到主题,消费者订阅主题(以消费者组的名 ...

最新文章

  1. Freemarker整合Spring
  2. 中级程序员教程-Cache映像技术
  3. SSM中进行注解式和XML配置式事务管理
  4. hive执行drop卡死一例:java.lang.NoSuchMethodError: org.apache.commons.lang3.StringUtils.isAnyBlank
  5. html后台数据分类管理,细分数据.html
  6. .NET Core Tools 1.0 版本
  7. CAS自旋锁到底是什么?为什么能实现线程安全?
  8. 批量调度工具 Taskctl 作业类型的维护管理
  9. 基于java的超市管理系统设计(含源文件)
  10. 在开发板显示24位的bmp格式图片
  11. Mybatis插件动态数据库链接
  12. java生成word 框勾_Java根据word模板生成word文档之设计详细思路—word标签定义 | 学步园...
  13. AM437X系列编译环境搭建
  14. 【论文分享】ACL 2020 立场检测相关研究
  15. 线下WINDOWS主机挂载华为云存储
  16. python把字典保存到文件_将Python字典保存到文件中,并定期更新它
  17. win10关闭实时防护的步骤教程
  18. 用牛顿迭代法求方程。
  19. 记录女士出差一周必备物品清单用哪个便签比较好
  20. 抢红包 html 模板,微信抢红包源码和模拟demo

热门文章

  1. Centos8修改源并更新
  2. 酷睿计算机系统吗,酷睿i3和i5的区别是什么?电脑处理器i3和i5的区别介绍
  3. HDU-OJ 杭电1495非常可乐
  4. 庖丁解牛式读《Attention is all your need》
  5. ubuntu 开机启动 ibus 输入法
  6. 计算机命令提示符开热点,win7系统使用cmd命令创建wifi热点的方法
  7. Codeforces A. Bear and Big Brother
  8. 四川2021年高考成绩等位分查询,2019年四川高考等位分查询,志愿填报更精准!...
  9. Nagios监控软件源码安装
  10. python实现http请求并发_Python复习笔记(十)Http协议--Web服务器-并发服务器