消息发送

  1. Producer创建时,会创建一个Sender线程并设置为守护线程。
  2. 生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。
  3. 批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。
  4. 批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且失败原因允许重试,那么客户端内部会对该消息进行重试。
  5. 落盘到broker成功,返回生产元数据给生产者。
  6. 元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回

必要参数配置

  • broker配置
  1. 配置条目的使用方式


2. 配置参数

属性 说明
bootstrap.servers 生产者客户端与broker集群建立初始连接需要的broker地址列表,由该初始连接发现Kafka集群中其他的所有broker。该地址列表不需要写全部的Kafka集群中broker的地址,但也不要写一个,以防该节点宕机的时候不可用。式为: host1:port1,host2:port2,… .
key.serializer 实现了接口org.apache.kafka.common.serialization.Serializer 的key序列化类
value.serializer 实现了接口org.apache.kafka.common.serialization.Serializer 的value序列化类
acks 该选项控制着已发送消息的持久性。acks=0 :生产者不等待broker的任何消息确认。只要将消息放到了socket的缓冲区,就认为消息已发送。不能保证服务器是否收到该消息, retries 设置也不起作用,因为客户端不关心消息是否发送失败。客户端收到的消息偏移量永远是-1。acks=1 :leader将记录写到它本地日志,就响应客户端确认消息,而不等待ollower副本的确认。如果leader确认了消息就宕机,则可能会丢失消息,因为follower副本可能还没来得及同步该消息。acks=all :leader等待所有同步的副本确认该消息。保证了只要有一个同步副本存在,消息就不会丢失。这是最强的可用性保证。等价于 acks=-1 。默认值为1,字符串。可选值:[all, -1, 0, 1]
compression.type 生产者生成数据的压缩格式。默认是none(没有压缩)。允许的值: none , gzip , snappy 和 lz4 。压缩是对整个消息批次来讲的。消息批的效率也影响压缩的比例。消息批越大,压缩效率越好。字符串类型的值。默认是none
retries 设置该属性为一个大于1的值,将在消息发送失败的时候重新发送消息。该重试与客户端收到异常重新发送并无二至。允许重试但是不设置 max.in.flight.requests.per.connection 为1,存在消息乱序的可能,因为如果两个批次发送到同一个分区,第一个失败了重试,第二个成功了,则第一个消息批在第二个消息批后。int类型的值,默认:0,可选值:[0,…,2147483647]

序列化器


由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。
序列化器的作用就是用于序列化要发送的消息的。
Kafka使用 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数组。

package org.apache.kafka.common.serialization;
import java.io.Closeable;
import java.util.Map;
/**
* 将对象转换为byte数组的接口
* *
该接口的实现类需要提供无参构造器
* @param <T> 从哪个类型转换
*/
public interface Serializer<T> extends Closeable {/**
* 类的配置信息
* @param configs key/value pairs
* @param isKey key的序列化还是value的序列化
*/
void configure(Map<String, ?> configs, boolean isKey);
/**
* 将对象转换为字节数组
* *
@param topic 主题名称
* @param data 需要转换的对象
* @return 序列化的字节数组
*/
byte[] serialize(String topic, T data);
/**
* 关闭序列化器
* 该方法需要提供幂等性,因为可能调用多次。
*/
@Override
void close();

系统提供了该接口的子接口以及实现类:
org.apache.kafka.common.serialization.ByteArraySerializer


org.apache.kafka.common.serialization.ByteBufferSerializer

org.apache.kafka.common.serialization.BytesSerializer

org.apache.kafka.common.serialization.DoubleSerializer

org.apache.kafka.common.serialization.FloatSerializer

org.apache.kafka.common.serialization.IntegerSerializer

org.apache.kafka.common.serialization.StringSerializer

org.apache.kafka.common.serialization.LongSerializer

自定义序列化器

数据的序列化一般生产中使用avro。
自定义序列化器需要实现org.apache.kafka.common.serialization.Serializer接口,并实现其中
的 serialize 方法

public class User {private Integer userId;private String username;public Integer getUserId() {return userId;}public void setUserId(Integer userId) {this.userId = userId;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}
}
  • 实现接口
package com.liu.kafka.serializer;import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// do nothing}@Overridepublic byte[] serialize(String topic, User data) {try {// 如果数据是null,则返回nullif (data == null) return null;Integer userId = data.getUserId();String username = data.getUsername();int length = 0;byte[] bytes = null;if (null != username) {bytes = username.getBytes("utf-8");length = bytes.length;}ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);buffer.putInt(userId);buffer.putInt(length);buffer.put(bytes);return buffer.array();} catch (UnsupportedEncodingException e) {throw new SerializationException("序列化数据异常");}}@Overridepublic void close() {// do nothing}
}
  • 生产者
    public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();// 指定初始连接用到的broker地址configs.put("bootstrap.servers", "192.168.181.140:9092");// 指定key的序列化类configs.put("key.serializer", StringSerializer.class);// 指定value的序列化类configs.put("value.serializer", UserSerializer.class);//        configs.put("acks", "all");
//        configs.put("reties", "3");KafkaProducer<String, User> producer = new KafkaProducer<String,User>(configs);User user = new User();user.setUserId(1001);user.setUsername("张三");ProducerRecord<String, User> record = new ProducerRecord<>("tp_user_01",0,user.getUsername(),user);producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("消息发送成功:"+ metadata.topic() + "\t"+ metadata.partition() + "\t"+ metadata.offset());} else {System.out.println("消息发送异常");}});
// 关闭生产者producer.close();}
}

分区器


默认(DefaultPartitioner)分区计算:

  1. 如果record提供了分区号,则使用record提供的分区号
  2. 如果record没有提供分区号,则使用key的序列化后的值的hash值对分区数量取模
  3. 如果record没有提供分区号,也没有提供key,则使用轮询的方式分配分区号。
  4. 会首先在可用的分区中分配分区号
  5. 如果没有可用的分区,则在该主题所有分区中分配分区号



如果要自定义分区器,则需要

  1. 首先开发Partitioner接口的实现类
  2. 在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xx.Xxx.class”)

位于 org.apache.kafka.clients.producer 中的分区器接口:

package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Cluster;
import java.io.Closeable;
/**
* 分区器接口
*/
public interface Partitioner extends Configurable, Closeable {/**
* 为指定的消息记录计算分区值
* *
@param topic 主题名称
* @param key 根据该key的值进行分区计算,如果没有则为null。
* @param keyBytes key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为
null
* @param value 根据value值进行分区计算,如果没有,则为null
* @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为
null
* @param cluster 当前集群的元数据
*/
public int partition(String topic, Object key, byte[] keyBytes, Object
value, byte[] valueBytes, Cluster cluster);
/**
* 关闭分区器的时候调用该方法
*/
public void close();
}

包 org.apache.kafka.clients.producer.internals 中分区器的默认实现


package org.apache.kafka.clients.producer.internals;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
/**
* 默认的分区策略:
* *
如果在记录中指定了分区,则使用指定的分区
* 如果没有指定分区,但是有key的值,则使用key值的散列值计算分区
* 如果没有指定分区也没有key的值,则使用轮询的方式选择一个分区
*/
public class DefaultPartitioner implements Partitioner {private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new
ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
/**
* 为指定的消息记录计算分区值
* *
@param topic 主题名称
* @param key 根据该key的值进行分区计算,如果没有则为null。
* @param keyBytes key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为
null
* @param value 根据value值进行分区计算,如果没有,则为null
* @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为
null
* @param cluster 当前集群的元数据
*/
public int partition(String topic, Object key, byte[] keyBytes, Object
value, byte[] valueBytes, Cluster cluster) {// 获取指定主题的所有分区信息
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 分区的数量
int numPartitions = partitions.size();
// 如果没有提供key
if (keyBytes == null) {int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions =
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) %
availablePartitions.size();
return availablePartitions.get(part).partition();
} else {// no partitions are available, give a non-available
partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {// hash the keyBytes to choose a partition
// 如果有,就计算keyBytes的哈希值,然后对当前主题的个数取模
return Utils.toPositive(Utils.murmur2(keyBytes)) %
numPartitions;
}
} p
rivate int nextValue(String topic) {AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {counter = new
AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter =
topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {counter = currentCounter;
}
} r
eturn counter.getAndIncrement();
} p
ublic void close() {}
}

可以实现Partitioner接口自定义分区器:

public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

拦截器


Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用
于实现Client端的定制化控制逻辑。

对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,Producer允许用户指定多个Interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是
org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

  • onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。

  • onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运行在Producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。

  • close:关闭Interceptor,主要用于执行一些资源清理工作

,Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意

  • 实体类
package com.lagou.kafka.demo.entity;
public class User {private Integer userId;
private String username;
public Integer getUserId() {return userId;
} p
ublic void setUserId(Integer userId) {this.userId = userId;
} p
ublic String getUsername() {return username;
} p
ublic void setUsername(String username) {this.username = username;
}
}
  • 自定义序列化器

public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// do nothing}@Overridepublic byte[] serialize(String topic, User data) {try {// 如果数据是null,则返回nullif (data == null) return null;Integer userId = data.getUserId();String username = data.getUsername();int length = 0;byte[] bytes = null;if (null != username) {bytes = username.getBytes("utf-8");length = bytes.length;}ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);buffer.putInt(userId);buffer.putInt(length);buffer.put(bytes);return buffer.array();} catch (UnsupportedEncodingException e) {throw new SerializationException("序列化数据异常");}}@Overridepublic void close() {// do nothing}
}
  • 自定义分区器
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 2;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
  • 自定义拦截器1
package com.liu.kafka.Interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Map;public class InterceptorOne  <KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {private static final Logger LOGGER =LoggerFactory.getLogger(InterceptorOne.class);@Overridepublic ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {System.out.println("拦截器1---go");
// 此处根据业务需要对相关的数据作修改String topic = record.topic();Integer partition = record.partition();Long timestamp = record.timestamp();KEY key = record.key();VALUE value = record.value();Headers headers = record.headers();
// 添加消息头headers.add("interceptor", "interceptorOne".getBytes());ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY,VALUE>(topic,partition,timestamp,key,value,headers);return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("拦截器1---back");if (exception != null) {// 如果发生异常,记录日志中LOGGER.error(exception.getMessage());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
  • 自定义拦截器2
package com.liu.kafka.Interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Map;public class InterceptorTwo<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);@Overridepublic ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {System.out.println("拦截器2---go");
// 此处根据业务需要对相关的数据作修改String topic = record.topic();Integer partition = record.partition();Long timestamp = record.timestamp();KEY key = record.key();VALUE value = record.value();Headers headers = record.headers();
// 添加消息头headers.add("interceptor", "interceptorTwo".getBytes());ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY,VALUE>(topic,partition,timestamp,key,value,headers);return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("拦截器2---back");if (exception != null) {// 如果发生异常,记录日志中LOGGER.error(exception.getMessage());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
  • 自定义拦截器3
package com.liu.kafka.Interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Map;public class InterceptorThree<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);@Overridepublic ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {System.out.println("拦截器3---go");
// 此处根据业务需要对相关的数据作修改String topic = record.topic();Integer partition = record.partition();Long timestamp = record.timestamp();KEY key = record.key();VALUE value = record.value();Headers headers = record.headers();
// 添加消息头headers.add("interceptor", "interceptorThree".getBytes());ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY,VALUE>(topic,partition,timestamp,key,value,headers);return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("拦截器3---back");if (exception != null) {// 如果发生异常,记录日志中LOGGER.error(exception.getMessage());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
  • 生产者
package com.liu.kafka.producer;import com.liu.kafka.partitioner.MyPartitioner;
import com.liu.kafka.serializer.User;
import com.liu.kafka.serializer.UserSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;public class MyProducer1 {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.181.140:9092");
// 设置自定义分区器
// configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class);configs.put("partitioner.class","com.liu.kafka.partitioner.MyPartitioner");
// 设置拦截器configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.liu.kafka.Interceptor.InterceptorOne," +"com.liu.kafka.Interceptor.InterceptorTwo," +"com.liu.kafka.Interceptor.InterceptorThree");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
// 设置自定义的序列化类configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,UserSerializer.class);KafkaProducer<String, User> producer = new KafkaProducer<String,User>(configs);User user = new User();user.setUserId(1001);user.setUsername("张三");ProducerRecord<String, User> record = new ProducerRecord<>("tp_user_01",0,user.getUsername(),user);producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("消息发送成功:"+ metadata.topic() + "\t"+ metadata.partition() + "\t"+ metadata.offset());} else {System.out.println("消息发送异常");}});
// 关闭生产者producer.close();}
}

原理

KafkaProducer有两个基本线程

主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中;
  • 消息收集器RecoderAccumulator为每个分区都维护了一个Deque 类型的双端队列。

  • ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低网络影响;

  • 由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用;该缓存池只针对特定大小( batch.size指定)的 ByteBuffer进行管理,对于消息过大的缓存,不能做到重复利用。

  • 每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个ProducerBatch,判断该消息大小是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建立新的ProducerBatch,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的 ProducerBatch ,缺点就是该内存不能被复用了。

Sender线程
  • 该线程从消息收集器获取缓存的消息,将其处理为 <Node, List 的形式, Node 表示集群的broker节点。
  • 进一步将<Node, List转化为<Node, Request>形式,此时才可以向服务端发送数据。
  • 在发送之前,Sender线程将消息以 Map<NodeId, Deque> 的形式保存到InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个,以实现消息的尽快发出。

生产者参数配置补充

参数名称 描述
retry.backoff.ms 在向一个指定的主题分区重发消息的时候,重试之间的等待时间。比如3次重试,每次重试之后等待该时间长度,再接着重试。在一些失败的场景,避免了密集循环的重新发送请求。long型值,默认100。可选值:[0,…]
retries retries重试次数当消息发送出现错误的时候,系统会重发消息。跟客户端收到错误时重发一样。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了
request.timeout.ms 客户端等待请求响应的最大时长。如果服务端响应超时,则会重发请求,除非达到重试次数。该设置应该比 replica.lag.time.max.ms (a brokerconfiguration)要大,以免在服务器延迟时间内重发消息。int类型值,默认:30000,可选值:[0,…]
interceptor.classes 在生产者接收到该消息,向Kafka集群传输之前,由序列化器处理之前,可以通过拦截器对消息进行处理。要求拦截器类必须实现org.apache.kafka.clients.producer.ProducerInterceptor 接口。默认没有拦截器。Map<String, Object> configs中通过List集合配置多个拦截器类名
acks 当生产者发送消息之后,如何确认消息已经发送成功了。支持的值 acks=0:如果设置为0,表示生产者不会等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。该情形不能保证broker是否真的收到了消息,retries配置也不会生效,因为客户端不需要知道消息是否发送成功。发送的消息的返回的消息偏移量永远是-1,acks=1表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的确认。在该情形下,如果主分区收到消息确认之后就宕机了,而副本分区还没来得及同步该消息,则该消息丢失,acks=all首领分区会等待所有的ISR副本分区确认记录。该处理保证了只要有一个ISR副本分区存货,消息就不会丢失。这是Kafka最强的可靠性保证,等效于 acks=-1
batch.size 当多个消息发送到同一个分区的时候,生产者尝试将多个记录作为一个批来处理。批处理提高了客户端和服务器的处理效率。该配置项以字节为单位控制默认批的大小。所有的批小于等于该值。发送给broker的请求将包含多个批次,每个分区一个,并包含可发送的数据。如果该值设置的比较小,会限制吞吐量(设置为0会完全禁用批处理)。如果设置的很大,又有一点浪费内存,因为Kafka会永远分配这么大的内存来参与到消息的批整合中
client.id 生产者发送请求的时候传递给broker的id字符串。用于在broker的请求日志中追踪什么应用发送了什么消息。一般该id是跟业务有关的字符串
compression.type 生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。支持的值:none、gzip、snappy和lz4。压缩是对于整个批来讲的,所以批处理的效率也会影响到压缩的比例
send.buffer.bytes TCP发送数据的时候使用的缓冲区(SO_SNDBUF)大小。如果设置为0,则使用操作系统默认的
buffer.memory 生产者可以用来缓存等待发送到服务器的记录的总内存字节。如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将阻塞 max.block.ms 的时间,此后它将引发异常。此设置应大致对应于生产者将使用的总内存,但并非生产者使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启用了压缩)以及维护运行中的请求。long型数据。默认值:33554432,可选值:[0,…]
connections.max.idle.ms 当连接空闲时间达到这个值,就关闭连接。long型数据,默认:540000
linger.ms 生产者在发送请求传输间隔会对需要发送的消息进行累积,然后作为一个批次发送。一般情况是消息的发送的速度比消息累积的速度慢。有时客户端需要减少请求的次数,即使是在发送负载不大的情况下。该配置设置了一个延迟,生产者不会立即将消息发送到broker,而是等待这么一段时间以累积消息,然后将这段时间之内的消息作为一个批次发送。该设置是批处理的另一个上限:一旦批消息达到了 batch.size 指定的值,消息批会立即发送,如果积累的消息字节数达不到 batch.size 的值,可以设置该毫秒值,等待这么长时间之后,也会发送消息批。该属性默认值是0(没有延迟)。如果设置linger.ms=5 ,则在一个请求发送之前先等待5ms。long型值,默:0,可选值:[0,…]
max.block.ms 控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 阻塞的时长。当缓存满了或元数据不可用的时候,这些方法阻塞。在用户提供的序列化器和分区器的阻塞时间不计入。long型值,默认:60000,可选值:[0,…]
max.request.size 单个请求的最大字节数。该设置会限制单个请求中消息批的消息个数,以免单个请求发送太多的数据。服务器有自己的限制批大小的设置,与该配置可能不一样。int类型值,默认1048576,可选值:[0,…]
partitioner.class 实现了接口 org.apache.kafka.clients.producer.Partitioner 的分区器实现类。默认值为:org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes TCP接收缓存(SO_RCVBUF),如果设置为-1,则使用操作系统默认的值。int类型值,默认32768,可选值:[-1,…]
security.protocol 跟broker通信的协议:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.string类型值,默认:PLAINTEXT
max.in.flight.requests.per.connection 单个连接上未确认请求的最大数量。达到这个数量,客户端阻塞。如果该值大于1,且存在失败的请求,在重试的时候消息顺序不能保证。int类型值,默认5。可选值:[1,…]
reconnect.backoff.max.m 对于每个连续的连接失败,每台主机的退避将成倍增加,直至达到此最大值。在计算退避增量之后,添加20%的随机抖动以避免连接风暴。long型值,默认1000,可选值:[0,…]
reconnect.backoff.ms 尝试重连指定主机的基础等待时间。避免了到该主机的密集重连。该退避时间应用于该客户端到broker的所有连接。long型值,默认50。可选值:[0,…]

kafka-生产者消息发送流程相关推荐

  1. Kafka生产者——消息发送流程,同步、异步发送API

    生产者消息发送流程 发送原理 Kafka的Producer发送消息采用的是异步发送的方式. 在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAc ...

  2. Kafka 生产者消息发送流程

    1. 数据生产流程解析 Producer创建时,会创建一个Sender线程并设置为守护线程. 生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在 ...

  3. kafka生产者的发送消息的流程以及代码案例

    一 kafka发送消息流程 1.1 发送流程原理 kafka在发送消息的过程中,主要涉及两个线程main 线程和 Sender 线程. 在 main 线程 中创建了一个双端队列 RecordAccum ...

  4. kafka:消息发送以及消费的过程

    摘要 kafka的存储消息,生产者发送消息,消费者消费消息.这些看起来简单,但实际细想,会有很多问题需要解决:消息是单个单个发送还是批量发送?broker的主题里一有消息就立即推送给消费者吗?生产者的 ...

  5. 《SpringBoot2.0 实战》系列-整合kafka实现消息发送、消费

    之前写过一篇关于ActiveMq的博文,有兴趣的小伙伴可以点击查看.但是ActiveMq总体性能表现一般,如果对消息队列性能要求较高,业务量较大,那我们需要重新考量.本文就介绍一款性能高的消息队列- ...

  6. 【Mac】Mac 下 kafka 生产者 控制台 发送长消息被截断

    1.背景 本地给Kafka发送消息的时候,突然消息太长被截断了.原因还没找到

  7. kafka生产者消息分区策略

    前言 众所周知,kafka的topic具有分区的概念,生产者写入数据到kafka之后,涉及到数据到底写到哪个分区? 常用的分区写入策略 生产者写入消息到topic,Kafka将依据不同的策略将数据分配 ...

  8. iOS msgSend消息发送流程

    objc_msgSend 在iOS中调用方法其实就是在给对象发送某条消息.消息的发送在编译的时候编译器就会把方法转 换为objc_msgSend这个函数.objc_msgSend有俩个隐式的参数,消息 ...

  9. 四十五、Kafka生产者(Producer)API介绍

    前几篇文章我们主要介绍了一些理论上的知识,下面我们来实操一下,本文主要讲解Kafka生产者的API.关注专栏<破茧成蝶--大数据篇>,查看更多相关的内容~ 目录 一.Kafka的消息发送流 ...

最新文章

  1. html弹性重叠,关于html5弹性布局(2)
  2. 【转载】用 PHP V5 开发多任务应用程序
  3. 【转】HTML5第一人称射击游戏发布
  4. android骰子游戏代码_真神器!不用手写一行代码就能做网站~
  5. mysql 去重复屈居_mysql去重复关键字distinct的用法
  6. linux中的fbset工具,Fluxbox (简体中文)
  7. git.exe 启动 慢_户外慢生活节来了!南京固城湖水慢城开启春日度假模式
  8. 降低人工智能成本50% 阿里云推出新一代异构实例GN5i
  9. ActionScript 3.0 编程
  10. 使用Confluence如何输出一份结构清晰 可读性高的测试文档?
  11. delphi里的 .pas .dcu .dpk .dpl .res .cfg......分别是什么文件
  12. mangos架设魔兽世界私服
  13. 适配器模式之访问者模式
  14. genymotion配置android模拟器
  15. Apache Tomcat 文件包含漏洞(CNVD-2020-10487,对应 CVE-2020-1938)
  16. 6个实用的 Python 自动化脚本,告别加班,你学会了吗?
  17. “心”苦不“辛”苦 (7.26)
  18. 电商系统,剖析商品模块中商品表(spu)、规格表(sku)的数据库是如何设计的
  19. 烤仔创作者联盟 | NFT是市场的下一个答案?或迎来新一轮“造福潮”
  20. 使用时testng报错问题解决方案

热门文章

  1. 大数据基础之Spark——Spark pregel详细过程,一看就懂
  2. oracle+omf+格式,oracle omf
  3. 贝叶斯与朴素贝叶斯入门及实战
  4. 波士顿房价预测python决策树_波士顿房价预测 - 最简单入门机器学习 - Jupyter
  5. IBM Lotus Connections 2.5 评审指南
  6. 【Stochastic Depth】《Deep Networks with Stochastic Depth》
  7. c语言面试(c语言面试基础知识)
  8. SHINE OPENCART 自适应 多用途主题模板 ABC-0021
  9. Spring Boot-1 (IntelliJ IDEA + gradle)
  10. 3月21日短线黑马牛股公开验证