由浅入深!全面了解Kafka 生产者解析,赶紧拿下
一、消息发送
1.1 数据生产流程
数据生产流程图解:
- Producer创建时,会创建⼀个Sender线程并设置为守护线程
- ⽣产消息时,内部其实是异步流程;⽣产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)
- 批次发送的条件为:缓冲区数据⼤⼩达到 batch.size 或者 linger.ms 达到上限,哪个先达到就算哪个
- 批次发送后,发往指定分区,然后落盘到 broker;如果⽣产者配置了retrires参数⼤于0并且失败原因允许重试,那么客户端内部会对该消息进⾏重试
- 落盘到broker成功,返回⽣产元数据给⽣产者
- 元数据返回有两种⽅式:⼀种是通过阻塞直接返回,另⼀种是通过回调返回
1.2 必要的参数配置
先来看看我们一般在程序中是怎么配置的:
最常用的配置项:
属性 |
说明 |
重要性 |
bootstrap.servers |
⽣产者客户端与broker集群建⽴初始连接需要的broker地址列表,由该初始连接发现Kafka集群中其他的所有broker。该地址列表不需要写全部的Kafka集群中broker的地址,但也不要写⼀个,以防该节点宕机的时候不可⽤。形式为:host1:port1,host2:port2,.... |
high |
key.serializer |
实现了接⼝ |
high |
value.serializer |
实现了接⼝ |
high |
acks |
该选项控制着已发送消息的持久性。 |
high |
compression.type |
⽣产者⽣成数据的压缩格式。默认是none(没有压缩)。允许的值:none,gzip,snappy和lz4。压缩是对整个消息批次来讲的。消息批的效率也影响压缩的⽐例。消息批越⼤,压缩效率越好。字符串类型的值。默认是none。 |
high |
retries |
设置该属性为⼀个⼤于1的值,将在消息发送失败的时候重新发送消息。该重试与客户端收到异常重新发送并⽆⼆⾄。允许重试但是不设置 |
high |
1.3 拦截器
1.3.1 拦截器介绍
Producer 的拦截器(Interceptor)和 Consumer 的 Interceptor 主要⽤于实现Client端的定制化控制逻辑。
对于Producer⽽⾔,Interceptor使得⽤户在消息发送前以及Producer回调逻辑前有机会对消息做⼀些定制化需求,⽐如修改消息等。同时,Producer允许⽤户指定多个Interceptor按序作⽤于同⼀条消息从⽽形成⼀个拦截链(Interceptor Chain)。Intercetpor 的实现接⼝是
org.apache.kafka.clients.producer.ProducerInterceptor,其定义的⽅法包括:
- onSend(ProducerRecord):该⽅法封装进KafkaProducer.send⽅法中,即运⾏在⽤户主线程中。Producer确保在消息被序列化以计算分区前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响⽬标分区的计算。
- onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运⾏在Producer的IO线程中,因此不要在该⽅法中放⼊很重的逻辑,否则会拖慢Producer的消息发送效率。
- close:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。
如前所述,Interceptor可能被运⾏在多个线程中,因此在具体实现时⽤户需要⾃⾏确保线程安全。另外倘若指定了多个Interceptor,则Producer将按照指定顺序调⽤它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误⽇志中⽽⾮在向上传递。这在使⽤过程中要特别留意。
1.3.2 自定义拦截器
自定义拦截器步骤:
- 实现ProducerInterceptor接⼝
- 在KafkaProducer的设置中设置⾃定义的拦截器
自定义拦截器 1:
public class InterceptorOne<Key, Value> implements ProducerInterceptor<Key, Value> { private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class); @Override public ProducerRecord<Key, Value> onSend(ProducerRecord<Key, Value> record) { System.out.println("拦截器1---go"); // 此处根据业务需要对相关的数据作修改 String topic = record.topic(); Integer partition = record.partition(); Long timestamp = record.timestamp(); Key key = record.key(); Value value = record.value(); Headers headers = record.headers(); // 添加消息头 headers.add("interceptor", "interceptorOne".getBytes()); ProducerRecord<Key, Value> newRecord = new ProducerRecord<Key, Value>(topic, partition, timestamp, key, value, headers); return newRecord; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println("拦截器1---back"); if (exception != null) { // 如果发⽣异常,记录⽇志中 LOGGER.error(exception.getMessage()); } } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { }}
照着 拦截器 1 再加两个拦截器。
生产者
public class MyProducer1 { public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { Map<String, Object> configs = new HashMap<>(); // 设置连接Kafka的初始连接⽤到的服务器地址 // 如果是集群,则可以通过此初始连接发现集群中的其他broker configs.put("bootstrap.servers", "192.168.0.102:9092"); // 设置key的序列化器 configs.put("key.serializer", IntegerSerializer.class); // 设置⾃定义的序列化类 configs.put("value.serializer", UserSerializer.class); // 设置自定义分区器 configs.put("partitioner.class", "com.mfc.config.MyPartitioner"); // 设置拦截器 configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.mfc.interceptor.InterceptorOne," + "com.mfc.interceptor.InterceptorTwo," + "com.mfc.interceptor.InterceptorThree"); KafkaProducer<Integer, User> producer = new KafkaProducer<>(configs); User user = new User(); user.setUserId(1001); user.setUsername("阿彪"); // ⽤于封装Producer的消息 ProducerRecord<Integer, User> record = new ProducerRecord<>( "topic_1", // 主题名称 0, // 分区编号 user.getUserId(), // 数字作为key user // user 对象作为value ); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e == null) { System.out.println("消息发送成功:" + metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset()); } else { System.out.println("消息发送异常"); } } }); // 关闭⽣产者 producer.close(); }}
1.4 序列化器
1.4.1 Kafka 自带序列化器
Kafka使⽤
org.apache.kafka.common.serialization.Serializer接⼝⽤于定义序列化器,将泛型指定类型的数据转换为字节数组。
package org.apache.kafka.common.serialization; import java.io.Closeable;import java.util.Map; /**将对象转换为byte数组的接⼝该接⼝的实现类需要提供⽆参构造器@param <T> 从哪个类型转换*/public interface Serializer<T> extends Closeable { /* 类的配置信息 @param configs key/value pairs @param isKey key的序列化还是value的序列化 */ void configure(Map<String, ?> var1, boolean var2); /* 将对象转换为字节数组 @param topic 主题名称 @param data 需要转换的对象 @return 序列化的字节数组 */ byte[] serialize(String var1, T var2); /* 关闭序列化器 该⽅法需要提供幂等性,因为可能调⽤多次。 */ void close();}
系统提供了该接⼝的⼦接⼝以及实现类:
org.apache.kafka.common.serialization.ByteArraySerializer
org.apache.kafka.common.serialization.ByteBufferSerializer
org.apache.kafka.common.serialization.BytesSerializer
org.apache.kafka.common.serialization.DoubleSerializer
org.apache.kafka.common.serialization.FloatSerializer
org.apache.kafka.common.serialization.IntegerSerializer
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.LongSerializer
org.apache.kafka.common.serialization.ShortSerializer
1.4.2 自定义序列化器
数据的序列化⼀般⽣产中使⽤ avro。
⾃定义序列化器需要实现
org.apache.kafka.common.serialization.Serializer<T> 接⼝,并实现其中的serialize⽅法。
实体类
public class User { private Integer userId; private String username; // set、get方法省略}
自定义序列化器
public class UserSerializer implements Serializer<User> { @Override public void configure(Map<String, ?> map, boolean b) { // do Nothing } @Override public byte[] serialize(String topic, User user) { try { // 如果数据是null,则返回null if (user == null) return null; Integer userId = user.getUserId(); String username = user.getUsername(); int length = 0; byte[] bytes = null; if (null != username) { bytes = username.getBytes("utf-8"); length = bytes.length; } ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length); buffer.putInt(userId); buffer.putInt(length); buffer.put(bytes); return buffer.array(); } catch (UnsupportedEncodingException e) { throw new SerializationException("序列化数据异常"); } } @Override public void close() { // do Nothing }}
生产者:
public class MyProducer1 { public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { Map<String, Object> configs = new HashMap<>(); // 设置连接Kafka的初始连接⽤到的服务器地址 // 如果是集群,则可以通过此初始连接发现集群中的其他broker configs.put("bootstrap.servers", "192.168.0.102:9092"); // 设置key的序列化器 configs.put("key.serializer", IntegerSerializer.class); // 设置⾃定义的序列化类 configs.put("value.serializer", UserSerializer.class); KafkaProducer<Integer, User> producer = new KafkaProducer<>(configs); User user = new User(); user.setUserId(1001); user.setUsername("阿彪"); // ⽤于封装Producer的消息 ProducerRecord<Integer, User> record = new ProducerRecord<>( "topic_1", // 主题名称 0, // 分区编号 user.getUserId(), // 数字作为key user // user 对象作为value ); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e == null) { System.out.println("消息发送成功:" + metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset()); } else { System.out.println("消息发送异常"); } } }); // 关闭⽣产者 producer.close(); }}
1.5 分区器
1.5.1 Kafka 自带分区器
默认(DefaultPartitioner)分区计算:
- 如果record提供了分区号,则使⽤record提供的分区号
- 如果record没有提供分区号,则使⽤key的序列化后的值的hash值对分区数量取模
- 如果record没有提供分区号,也没有提供key,则使⽤轮询的⽅式分配分区号。会⾸先在可⽤的分区中分配分区号如果没有可⽤的分区,则在该主题所有分区中分配分区号。
看一下kafka的生产者(KafkaProducer)源码:
再看Kafka自带的默认分区器(DefaultPartitioner):
默认的分区器实现了 Partitioner 接口,先看一下接口:
public interface Partitioner extends Configurable, Closeable { /** * 为指定的消息记录计算分区值 * * @param topic 主题名称 * @param key 根据该key的值进⾏分区计算,如果没有则为null * @param keyBytes key的序列化字节数组,根据该数组进⾏分区计算。如果没有key,则为null * @param value 根据value值进⾏分区计算,如果没有,则为null * @param valueBytes value的序列化字节数组,根据此值进⾏分区计算。如果没有,则为null * @param cluster 当前集群的元数据 */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); /** * 关闭分区器的时候调⽤该⽅法 */ public void close(); }
1.5.2 自定义分区器
如果要⾃定义分区器,则需要
- ⾸先开发Partitioner接⼝的实现类
- 在KafkaProducer中进⾏设置:configs.put("partitioner.class", "xxx.xx.Xxx.class")
实现Partitioner接⼝⾃定义分区器:
public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return 0; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { }}
然后在⽣产者中配置:
二、消息发送原理
原理图解:
由上图可以看出:KafkaProducer 有两个基本线程:
- 主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中;消息收集器RecoderAccumulator为每个分区都维护了⼀个 Deque<ProducerBatch> 类型的双端队列。ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低⽹络影响;由于⽣产者客户端使⽤ java.io.ByteBuffer 在发送消息之前进⾏消息保存,并维护了⼀个 BufferPool 实现 ByteBuffer的复⽤;该缓存池只针对特定⼤⼩( batch.size 指定)的 ByteBuffer进⾏管理,对于消息过⼤的缓存,不能做到重复利⽤。每次追加⼀条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取⼀个ProducerBatch,判断当前消息的⼤⼩是否可以写⼊该批次中。若可以写⼊则写⼊;若不可以写⼊,则新建⼀个ProducerBatch,判断该消息⼤⼩是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建⽴新的ProducerBatch,这样⽅便进⾏缓存重复利⽤;若超过,则以计算的消息⼤⼩建⽴对应的 ProducerBatch ,缺点就是该内存不能被复⽤了。
- Sender线程:该线程从消息收集器获取缓存的消息,将其处理为 <Node, List<ProducerBatch> 的形式, Node 表示集群的broker节点。进⼀步将<Node, List<ProducerBatch>转化为<Node, Request>形式,此时才可以向服务端发送数据。在发送之前,Sender线程将消息以 Map<NodeId, Deque<Request>> 的形式保存到 InFlightRequests 中进⾏缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压⼒最⼩的⼀个,以实现消息的尽快发出。
三、更多生产者参数配置
参数名称 |
描述 |
retry.backoff.ms |
在向⼀个指定的主题分区重发消息的时候,重试之间的等待时间。 |
retries |
retries重试次数 |
request.timeout.ms |
客户端等待请求响应的最⼤时⻓。如果服务端响应超时,则会重发请求,除⾮达到重试次数。该设置应该⽐replica.lag.time.max.ms (a broker configuration)要⼤,以免在服务器延迟时间内重发消息。int类型值,默认:30000,可选值:[0,...] |
interceptor.classes |
在⽣产者接收到该消息,向Kafka集群传输之前,由序列化器处理之前,可以通过拦截器对消息进⾏处理。 |
acks |
默认值:all。 |
batch.size |
当多个消息发送到同⼀个分区的时候,⽣产者尝试将多个记录作为⼀个批来处理。批处理提⾼了客户端和服务器的处理效率。 |
client.id |
⽣产者发送请求的时候传递给broker的id字符串。 |
compression.type |
⽣产者发送的所有数据的压缩⽅式。默认是none,也就是不压缩。 |
send.buffer.bytes |
TCP发送数据的时候使⽤的缓冲区(SO_SNDBUF)⼤⼩。如果设置为0,则使⽤操作系统默认的。 |
buffer.memory |
⽣产者可以⽤来缓存等待发送到服务器的记录的总内存字节。如果记录的发送速度超过了将记录发送到服务器的速度,则⽣产者将阻塞max.block.ms的时间,此后它将引发异常。此设置应⼤致对应于⽣产者将使⽤的总内存,但并⾮⽣产者使⽤的所有内存都⽤于缓冲。⼀些额外的内存将⽤于压缩(如果启⽤了压缩)以及维护运⾏中的请求。long型数据。默认值:33554432,可选值:[0,...] |
connections.max.idle.ms |
当连接空闲时间达到这个值,就关闭连接。long型数据,默认:540000 |
linger.ms |
⽣产者在发送请求传输间隔会对需要发送的消息进⾏累积,然后作为⼀个批次发送。⼀般情况是消息的发送的速度⽐消息累积的速度慢。有时客户端需要减少请求的次数,即使是在发送负载不⼤的情况下。该配置设置了⼀个延迟,⽣产者不会⽴即将消息发送到broker,⽽是等待这么⼀段时间以累积消息,然后将这段时间之内的消息作为⼀个批次发送。该设置是批处理的另⼀个上限:⼀旦批消息达到了batch.size指定的值,消息批会⽴即发送,如果积累的消息字节数达不到batch.size的值,可以设置该毫秒值,等待这么⻓时间之后,也会发送消息批。该属性默认值是0(没有延迟)。如果设置linger.ms=5,则在⼀个请求发送之前先等待5ms。long型值,默认:0,可选值:[0,...] |
max.block.ms |
控制KafkaProducer.send()和 |
max.request.size |
单个请求的最⼤字节数。该设置会限制单个请求中消息批的消息个数,以免单个请求发送太多的数据。服务器有⾃⼰的限制批⼤⼩的设置,与该配置可能不⼀样。int类型值,默认1048576,可选值:[0,...] |
partitioner.class |
实现了接⼝ |
receive.buffer.bytes |
TCP接收缓存(SO_RCVBUF),如果设置为-1,则使⽤操作系统默认的值。int类型值,默认32768,可选值:[-1,...] |
security.protocol |
跟broker通信的协议:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. |
max.in.flight.requests.per.connection |
单个连接上未确认请求的最⼤数量。达到这个数量,客户端阻塞。如果该值⼤于1,且存在失败的请求,在重试的时候消息顺序不能保证。 |
reconnect.backoff.max.ms |
对于每个连续的连接失败,每台主机的退避将成倍增加,直⾄达到此最⼤值。在计算退避增量之后,添加20%的随机抖动以避免连接⻛暴。 |
reconnect.backoff.ms |
尝试重连指定主机的基础等待时间。避免了到该主机的密集重连。该退避时间应⽤于该客户端到broker的所有连接。 |
由浅入深!全面了解Kafka 生产者解析,赶紧拿下相关推荐
- kafka 生产者源码解析
为学日益,为道日损,损之又损,以至于无为,无为而无不为 0x01: 概述 kafka作为大数据领域消息系统一哥,其架构与代码设计十分巧妙与优雅,从中我们可以学习与借鉴到很多分布式高性能并发与缓存方案, ...
- mysql作为kafka生产者_Kafka之生产者
[TOC] 从编程的角度而言,生产者就是负责向 Kafka 发送消息的应用程序.在 Kafka 的历史变迁 中, 一共有两个大版本的生产者客户端: 第-个是于 Kafka开源之初使用 Scala语言编 ...
- 快速了解 Kafka 生产者的使用和原理
作者 | 草捏子 整理 | 杨碧玉 出品 | 草捏子(ID:chaycao) 头图 | CSDN 下载自视觉中国 本文将学习 Kafka 生产者的使用和原理,文中使用的 kafka-clients ...
- Kafka 生产者消息发送流程
1. 数据生产流程解析 Producer创建时,会创建一个Sender线程并设置为守护线程. 生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在 ...
- 2021年大数据Kafka(十):kafka生产者数据分发策略
全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 生产者数据分发策略 策略一:用户指定了partition 策 ...
- discard connection丢失数据_python kafka 生产者发送数据的三种方式
python kafka 生产者发送数据的三种方式 发送方式 同步发送 发送数据耗时最长 有发送数据的状态,不会丢失数据,数据可靠性高 以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断 ...
- Kafka深度解析(如何在producer中指定partition)(转)
原文链接:Kafka深度解析 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统.主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能 ...
- java实现Kafka生产者示例
使用java实现Kafka的生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 3 ...
- Kafka设计解析(七)- 流式计算的新贵 Kafka Stream
http://www.infoq.com/cn/articles/kafka-analysis-part-7 Kafka Stream背景 Kafka Stream是什么 Kafka Stream是A ...
最新文章
- iOS: NSTimer的循环引用(解决)
- 中心频率和一些概念解释
- 操作系统课设--具有优先级的线程调度
- gitlab使用方法
- Java黑皮书课后题第6章:*6.20(计算一个字符串中字母的个数)编写一个方法,使用下面的方法体计算字符串中的字母个数。编写一个测试程序,提示用户输入字符串,然后显示字符串中的字母个数
- 美国读本科出勤率低被休学,无法毕业怎么办
- Windows系统下安装分布式事务组件Seata
- 在Android应用中使用Pull解析XML文件(传智播客视频笔记)
- 编写代码的软件用什么编写的_如何通过像编写代码一样克服对编写的恐惧
- java 遗传算法_[原]遗传算法Java实现源代码
- linux安装python2环境_Python基础手册 2 —— Python 环境搭建(Linux)
- python写一个类方法_python中如何写类
- 当前安装包签名出现异常_安卓系统手机安装应用出现应用签名异常或-22错误(联网验证失败)的应对方法...
- 服务器数据存储在哪个位置,数据存储在云服务器什么地方
- 微信进入公众号提示服务器错误,微信登录公众号提示没有权限访问解决教程
- 数显之家快讯:【SHIO世硕心语】便携显示器可能是智能手机变身电脑最好的配件了!
- 【高德地图进阶】--- 带图片的点(1)
- ActionListenner
- Ubuntu kylin共享文件夹创建(极简)VM VirtualBox
- c语言高精度算法(加法)