生产者发送流程

  • Kafka 会将发送消息包装为ProducerRecord 对象,ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。
  • 在发送ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
  • 数据被传给分区器。如果之前已经在ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情。
  • 如果没有指定分区 ,那么分区器会根据ProducerRecord 对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。
  • 有一个独立的线程负责把这些记录批次发送到相应的 broker 上。
  • 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。
  • 如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试。

创建生产者

在创建生产者对象的时候,要设置一些属性,有三个属性是必选的:

  • bootstrap.servers:指定Broker的地址清单,地址格式为host:port。清单里不需要包含所有的Broker地址,生产者会从给定的Broker里查找到其他Broker的信息;不过建议至少要提供两个Broker的信息保证容错。
  • key.serializer:指定键的序列化器。Broker希望接收到的消息的键和值都是字节数组。这个属性必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer,因此一般不需要实现自定义的序列化器。需要注意的是,key.serializer属性是必须设置的,即使只发送值内容。
  • value.serializer:指定值的序列化器。如果键和值都是字符串,可以使用与key.serializer一样的序列化器,否则需要使用不同的序列化器。

发送方式

  • 发送并忘记(fire-and-forget):把消息发送给服务器,但并不关心消息是否正常到达,也就是上面样例中的方式。大多数情况下,消息会正常到达,这可以由Kafka的高可用性和自动重发机制来保证。不过有时候也会丢失消息。
  • 同步发送:使用send()方法发送消息,它会返回一个Future对象,调用get()方法进行等待,我们就可以知道消息是否发送成功。
ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "k", "v");
try {producer.send(record).get;
} catch (Exception e) {e.printStackTrace();
}

  • 异步发送:调用send()方法时,同时指定一个回调函数,服务器在返回响应时调用该函数。
ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "k", "v");
// 异步发送消息,并监听回调
producer.send(record, new Callback() { // 1@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) { // 2if (exception != null) {// 进行异常处理} else {System.out.printf("topic=%s, partition=%d, offset=%s n", metadata.topic(), metadata.partition(), metadata.offset());}}
});

顺序保证

Kafka可以保证同一个分区里的消息是有序的。考虑一种情况,如果retries为非零整数,同时max.in.flight.requests.per.connection为比1大的数如果某些场景要求消息是有序的,也即生产者在收到服务器响应之前可以发送多个消息,且失败会重试。那么如果第一个批次消息写入失败,而第二个成功,Broker会重试写入第一个批次,如果此时第一个批次写入成功,那么两个批次的顺序就反过来了。也即,要保证消息是有序的,消息是否写入成功也是很关键的。那么如何做呢?在对消息的顺序要严格要求的情况下,可以将retries设置为大于0,max.in.flight.requests.per.connection设为1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给Broker。当然这回严重影响生产者的吞吐量。

使用步骤:

导入依赖

<groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>1.1.0</version>

生产者

public class MyProducer {public static void main(String[] args) {Properties props = new Properties();// Kafka服务端的主机名和端口号props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.85.113:9092,192.168.85.114:9092,192.168.85.118:9092");// 等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);// 请求延时props.put("linger.ms", 1);// 发送缓存区(RecoderAccumulator)内存大小props.put("buffer.memory", 33554432);// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//发送数据KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<String, String>("first", "testkafka" + i));}//关闭kafkaProducer.close();}

消费者

顺序是因为发送到不同的分区内

带回调函数的生产者

public class CallBackProducer {public static void main(String[] args) {Properties props = new Properties();// Kafka服务端的主机名和端口号props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.85.113:9092,192.168.85.114:9092,192.168.85.118:9092");// key序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);for (int i = 0; i < 10; i++) {//可以指定分区和key值producer.send(new ProducerRecord<String, String>("fourth", "testKafka2" + i),(recordMetadata, ex)->{//成功返回recordMetadataif (null == ex) {System.out.println("offset为+++++" + recordMetadata.offset());System.out.println("分区为++++++" + recordMetadata.partition());} else {//失败返回exceptionex.printStackTrace();}});}producer.close();}
}

自定义分区器/默认分区

public class DefaultPartitioner implements Partitioner {private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();public DefaultPartitioner() {}public void configure(Map<String, ?> configs) {}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = this.nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return ((PartitionInfo)availablePartitions.get(part)).partition();} else {return Utils.toPositive(nextValue) % numPartitions;}} else {return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private int nextValue(String topic) {AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}return counter.getAndIncrement();}public void close() {}
}

配置自定义的分区器

配置文件内添加自定义的分区器

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.sf.kafak.Partitioner.MyPartitioner");

异步转同步-------应用较少

for (int i = 0; i < 10; i++) {Future<RecordMetadata> fourth = producer.send(new ProducerRecord<String, String>("fourth", 0, "0", "testKafka2" + i), (recordMetadata, ex) -> {//成功返回recordMetadataif (null == ex) {System.out.println("offset为+++++" + recordMetadata.offset() + "分区为++++++" + recordMetadata.partition());} else {//失败返回exceptionex.printStackTrace();}});try {//get方法阻塞当前线程RecordMetadata recordMetadata = fourth.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}
}


消费者

  • 创建

    • 创建Kafka的消费者对象的过程与创建生产者的过程是类似的,需要传入必要的属性。在创建消费者的时候以下以下三个选项是必选的:
    • bootstrap.servers :指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错;
    • key.deserializer :指定键的反序列化器;
    • value.deserializer :指定值的反序列化器。
String topic = "Hello";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "server:9091");
/*指定分组 ID*/
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

  • 消费

创建了Kafka消费者之后,接着就可以订阅主题了。订阅主题可以使用如下两个 API :

  • consumer.subscribe(Collection topics) :指明需要订阅的主题的集合;
  • consumer.subscribe(Pattern pattern) :使用正则来匹配需要订阅的集合。
consumer.subscribe(Collections.singletonList(topic))
try {while (true) {// 轮询获取数据ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));for (ConsumerRecord<String, String> record : records) {System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,n",record.topic(), record.partition(), record.key(), record.value(), record.offset());}}
} finally {consumer.close();
}

  • demo
public class MyConsumer {public static void main(String[] args) {Properties props = new Properties();// Kafka服务端的主机名和端口号props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.85.113:9092,192.168.85.114:9092,192.168.85.118:9092");//开启自动提交------------------未开启自动提交时不提交offset,每次拉取都会从头props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 消费者提交offset延时props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);// key/value反序列化props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG,"bigdata");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);//订阅主题(可以订阅多个主题)consumer.subscribe(Arrays.asList("first","fourth"));while (true) {//每次拉取会拉取多个值ConsumerRecords<String, String> consumerRecords = consumer.poll(100);//消费者拉取的时间间隔//解析拉取的数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println("key+++++++" + consumerRecord.key()+ "value+++++" + consumerRecord.value()+ "offset++++++" + consumerRecord.offset());}}}
}


OFFSET管理

提交是指更新分区当前位置的操作,分区当前的位置,也就是所谓的偏移量

  • Kafka 的每一条消息都有一个偏移量属性,记录了其在分区中的位置,偏移量是一个单调递增的整数。消费者通过往一个叫作 _consumer_offset 的特殊主题发送消息,消息里包含每个分区的偏移量。 如果消费者一直处于运行状态,那么偏移量就没有 什么用处。不过,如果有消费者退出或者新分区加入,此时就会触发再均衡。完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。 因为这个原因,所以如果不能正确提交偏移量,就可能会导致数据丢失或者重复出现消费,比如下面情况:
  • 如果提交的偏移量小于客户端处理的最后一个消息的偏移量 ,那么处于两个偏移量之间的消息就会被重复消费;
  • 如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。

自动提交:

只需要将消费者的 enable.auto.commit 属性配置为 true 即可完成自动提交的配置。 此时每隔固定的时间,消费者就会把 poll() 方法接收到的最大偏移量进行提交,提交间隔由 auto.commit.interval.ms 属性进行配置,默认值是 5s。

使用自动提交是存在隐患的,假设我们使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s ,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无法完全避免的。基于这个原因,Kafka 也提供了手动提交偏移量的 API,使得用户可以更为灵活的提交偏移量。

手动提交offset

通常从Kafka拿到的消息是要做业务处理,而且业务处理完成才算真正消费成功。所以在完成业务处理后才能提交offset。手动提交分为:commitSync(同步提交)和commitAsync(异步提交)。

相同点:都会poll的一批数据最高的偏移量提交。

不同点:commitSync会阻塞当前线程,一直到提交成功,并且会自动失败重试;commitAsync则没有失败重试机制,有可能提交失败。

  • 同步提交
public class MyConsumer {public static void main(String[] args) {Properties props = new Properties();// Kafka服务端的主机名和端口号props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.85.113:9092,192.168.85.114:9092,192.168.85.118:9092");//关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// key/value反序列化props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG,"bigdata");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);//订阅主题(可以订阅多个主题)consumer.subscribe(Arrays.asList("first","fourth"));while (true) {//每次拉取会拉取多个值ConsumerRecords<String, String> consumerRecords = consumer.poll(100);//消费者拉取的时间间隔//解析拉取的数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println("key+++++++" + consumerRecord.key()+ "value+++++" + consumerRecord.value()+ "offset++++++" + consumerRecord.offset());}//同步提交,阻塞当前线程一直到提交成功//会降低程序的吞吐量consumer.commitSync();}}

  • 异步提交—————生产中使用那个较多
//同步提交,阻塞当前线程一直到提交成功
consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if (null!=e) {System.out.println("commit fail for"+e);}}
});

自定义offset提交/存储(如mysql中)

提交自定义的offset只需要在重载的提交方法中传入偏移量参数即可。

// 同步提交特定偏移量
commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
// 异步提交特定偏移量
commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback 

无论以哪种方式提交offset都有可能造成重复消费或数据丢失的问题。可以使用自定义的offset存储方式。

kafka0.9版本之前,offset存储在zookeeper中,0.9版本之后,offset存储在kafka一个内置的topic中。还可以自定义存储的offset(主要借助ConsumerRebalanceListener)。


拦截器

Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。

对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

  • configure(configs)

获取配置信息和初始化数据时调用。

  • onSend(ProducerRecord):

该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算

  • onAcknowledgement(RecordMetadata, Exception):

该方法会在消息被应答或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率

  • close:

关闭interceptor,主要用于执行一些资源清理工作

如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

拦截器链demo

将时间戳添加到value前,并实现消息数量统计

时间戳拦截器

public class TimeInterceptor implements ProducerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> map) {}@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 创建一个新的record,把时间戳写入消息体的最前部return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),System.currentTimeMillis() + "," + record.value().toString());}@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}@Overridepublic void close() {}
}

统计失败成功条数

public class CounterInterceptor implements ProducerInterceptor<String, String> {private int errorCounter = 0;private int successCounter = 0;@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 统计成功和失败的次数if (exception == null) {successCounter++;} else {errorCounter++;}}@Overridepublic void close() {// 保存结果System.out.println("Successful sent: " + successCounter);System.out.println("Failed sent: " + errorCounter);}
}

配置文件内添加拦截器链

        //构建拦截链List<String> interceptors = new ArrayList<>();interceptors.add("com.sf.kafka.interceptor.TimeInterceptor");interceptors.add("com.sf.kafka.interceptor.CounterInterceptor");props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

生产者消费者_Kafka之生产者/消费者相关推荐

  1. kafka消费者如何读同一生产者消息_Kafka系列3:深入理解Kafka消费者

    上面两篇聊了Kafka概况和Kafka生产者,包含了Kafka的基本概念.设计原理.设计核心以及生产者的核心原理.本篇单独聊聊Kafka的消费者,包括如下内容:消费者和消费者组 如何创建消费者 如何消 ...

  2. RabbitMQ 实战(四)消费者 ack 以及 生产者 confirms

    2019独角兽企业重金招聘Python工程师标准>>> 这篇文章主要讲 RabbitMQ 中 消费者 ack 以及 生产者 confirms. 如上图,生产者把消息发送到 Rabbi ...

  3. java生产者与消费者问题_java生产者与消费者问题

    为了回忆一下J2SE中的线程互斥与同步问题,所以今天就写个生产者与消费者问题.这个程序大部分时间的结果都基本正确,但某些时候会造成死锁.百思不得其解,将代码贴上,方便以后有更深的体会时再进行修改.也方 ...

  4. java 多线程生产者_java-Runnable加锁实现生产者和消费者的多线程问题

    //库存函数,保存着库存的信息Storage.java public classStorage {//模拟库存 public Integer num=1; }//生产者函数 product.java/ ...

  5. java 生产者消费者代码_Java生产者和消费者代码

    java 生产者消费者代码 This also helps us to understand the concept of synchronised multi-threading in java, ...

  6. PythonRabbitmq文档阅读笔记-生产者数据直接送入队列消费者消费

    Hello World! 使用Pika库连接到Rabbitmq. 本次要实现的功能:生产者生直接发送消息到队列中,消费者消费队列中的数据. 逻辑结构如下: Rabbitmq的通信协议有很多,这里使用A ...

  7. c语言生产者与消费者实验报告,生产者和消费者实验报告.doc

    生产者和消费者实验报告 [实验目的] 加深对进程概念的理解,明确进程和程序的区别. 进一步认识并发执行的实质. 验证用信号量机制实现进程互斥的方法. 验证用信号量机制实现进程同步的方法. [实验要求] ...

  8. 生产者消费者代码_生产者消费者模型:Kotlin 多线程读写文件实例

    计算机科学中的所有问题,都可以通过添加一个间接层来解决. 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题. 生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产 ...

  9. Java多线程-生产者消费者问题(多个消费者多个生产者)

    Java多线程-生产者消费者问题(多个消费者多个生产者) public class ConsumerProcuderDemo {public static void main(String[] arg ...

最新文章

  1. Spark SQL基本操作以及函数的使用
  2. 使用Jupyter Notebook编写技术文档
  3. IronRuby:元编程特性【method_missing】的使用
  4. Docker安装配置Nginx
  5. 从零开始的Unity萌导书#1:Hello,Unity!
  6. netfilter 和 iptables
  7. Git(9):通俗易懂的Git指令
  8. Cadence OrCad Allegro SPB 16.6 下载及安装破解指南
  9. win11找不到开启蓝牙开关,设备管理器也没有蓝牙
  10. 仓储管理之计价方法——月末一次加权平均法
  11. 解决IE浏览器无法使用“IP:端口“直接访问的问题
  12. 基础概念 | 公约数、公倍数、互质数
  13. views是什么意思_views的意思
  14. 怎样在电脑桌面上设置员工生日提醒 每年准时提醒生日的便签
  15. 初识JAVA:华为面试写一个程序:要求出用1,2,5这三个数不同个数组合的和为100的组合个数
  16. 02335网络操作系统
  17. ArcMap制作3D地形图
  18. CSS学习笔记一 ——详细附图、入门必看
  19. 过去两三年 互联网科技公司的掌舵人这样看区块链
  20. Python下载网易云音乐歌单

热门文章

  1. 求一个3*3矩阵两条对角线上元素之和(每个元素只加一次) C语言
  2. VueSummary_note
  3. epoll nio区别_高性能网络服务器编程:为什么linux下epoll是最好,Netty要比NIO.2好?...
  4. c++引用另一个类的方法_转:关于A类,B类,C类IP地址的网段和主机数的计算方法...
  5. wordpress ?php the_time() ?,WordPress时间函数the_time与get_the_time解析
  6. 【快速入门Linux】7_Linux命令—使用su命令切换用户出现认证失败
  7. 解决“chrome正受到自动测试软件的控制”信息栏显示问题(转)
  8. 荒野行动系统推荐观战榜_荒野行动 观战延迟投票结果公示 更新计划抢先看!...
  9. 中班音乐计算机反思,中班音乐教学反思
  10. 怎么查看linux日志里请求量最高的url访问最多的_LoaRunner性能测试系统学习教程:日志文件分析(8)...