大数据之 Kafka API 从入门到放弃

  • 一、Producer API
    • 1、消息发送流程
    • 2、KafkaProducer 发送消息流程
    • 3、异步发送 API
      • 1)导入依赖
      • 2)编写代码
        • 1、不带回调函数的 API
        • 2、带回调函数的 API
    • 4、同步发送 API
  • 二、Consumer API
    • 1、自动提交offset
      • 1)导入依赖
      • 2)编写代码
    • 2、手动提交offset
      • 1)同步提交 offset
      • 2)异步提交 offset
      • 3)数据漏消费和重复消费分析
    • 3、自定义存储 offset
  • 三、自定义 Interceptor
    • 1、拦截器原理
      • (1)configure(configs)
      • (2)onSend(ProducerRecord):
      • (3)onAcknowledgement(RecordMetadata, Exception):
      • (4)close:
    • 2、拦截器案例
      • 1)需求
      • 2)案例实操
        • (1)增加时间戳拦截器
        • (2)统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器
        • (3)producer 主程序
      • 3)测试
        • (1)在 kafka 上启动消费者,然后运行客户端 java 程序。

一、Producer API

1、消息发送流程

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到Kafka broker。

2、KafkaProducer 发送消息流程


相关参数

batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。

linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。

3、异步发送 API

1)导入依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>

2)编写代码

需要用到的类:

KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个ProducerRecord 对象

1、不带回调函数的 API

package org.example.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;/***  分区策略测试1* @ClassName MyProducer* @Author 小坏* @Date 2021/10/29、18:39* @Version 1.0** 生产者* 不存在的主题会创建一个*/
public class MyProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//指定连接的kafka集群properties.put("bootstrap.servers", "hadoop102:9092");//Ack应答级别properties.put("acks", "all");//重试次数properties.put("retries", 3);//批次大小properties.put("batch.size", 16384);//等待时间properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);//RecordAccumulator 缓 冲区大小properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(properties);/*** 使用最后的一个api、只有一个key、所有他自己取轮询*/for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<>("first", "atguigu","atguigu-" + i)).get();}//关闭资源producer.close();}
}

2、带回调函数的 API

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

package org.example.producer;import org.apache.kafka.clients.producer.*;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/***  分区策略测试2* @ClassName CallBackProducer* @Author 小坏* @Date 2021/10/29、19:50* @Version 1.0*/
public class CallBackProducer {public static void main(String[] args) {Properties properties = new Properties();//指定连接的kafka集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(properties);/*** 带回调函数的* 测试发送数据回调的是从零开始的*/for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<>("aaa", "atguigu-" + i),(recordMetadata, e) -> {if (e == null) {System.out.println(recordMetadata.partition() + "--" + recordMetadata.offset());} else {e.getMessage();}});}producer.close();}
}

4、同步发送 API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同步发送的效果,只需在调用 Future 对象的 get 方发即可。

介绍了
带分区和key的
方法的重载
同步发送

package org.example.producer;import org.apache.kafka.clients.producer.*;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/***  分区策略测试2* @ClassName CallBackProducer* @Author 小坏* @Date 2021/10/29、19:50* @Version 1.0*/
public class CallBackProducer {public static void main(String[] args) {Properties properties = new Properties();//指定连接的kafka集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(properties);/*** 同步发送* 调用 .get(); 方法*/for (int i = 0; i < 10; i++) {Future<RecordMetadata> send = producer.send(new ProducerRecord<>("aaa", "atguigu", "atguigu-" + i));try {send.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}/*** 使用倒数第三个api* 带分区和key的*/
//        for (int i = 0; i < 10; i++) {//            producer.send(new ProducerRecord<>("aaa", 0, "atguigu", "atguigu-" + i),
//                    (recordMetadata, e) -> {//                        if (e == null) {//                            System.out.println(recordMetadata.partition() + "--" + recordMetadata.offset());
//                        } else {//                            e.getMessage();
//                        }
//
//                    });
//        }/*** 方法的重载*  atguigu 哈希值 % 3**  1--3* 1--4* 1--5* 1--6* 1--7* 1--8* 1--9* 1--10* 1--11* 1--12** 都进入了一个分区**/
//        for (int i = 0; i < 10; i++) {//            producer.send(new ProducerRecord<>("aaa", "atguigu", "atguigu-" + i),
//                    (recordMetadata, e) -> {//                        if (e == null) {//                            System.out.println(recordMetadata.partition() + "--" + recordMetadata.offset());
//                        } else {//                            e.getMessage();
//                        }
//
//                    });
//        }producer.close();}
}

二、Consumer API

Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。

由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。

所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。

1、自动提交offset

1)导入依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>

2)编写代码

需要用到的类:

KafkaConsumer:需要创建一个消费者对象,用来消费数据
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象

为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交offset 的功能。自动提交 offset 的相关参数:

enable.auto.commit:是否开启自动提交 offset 功能
auto.commit.interval.ms:自动提交 offset 的时间间隔

以下为自动提交offset 的代码

package org.example.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;/*** 消费者* 自动提交的缺点* 消费者丢数据、*** @ClassName MyConsumer* @Author 小坏* @Date 2021/10/31、16:09* @Version 1.0*/
public class MyConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");//开启自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交的延迟properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");//Key,Value 的反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");/*** 如何重复消费主题的数据、* 1、换一个组* 2、设置重置的offset*  properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");**  为什么从最大的消费?*  一个组过来消费的时候、相当于断开重新连接、然后会去返回一下以前的数据、重新连接了就不能返回了、找不到了、*  系统就要给你一个、既然没有了就告诉你从这个地方消费、给的时候就有 earliest、latest最大最小值、然后按照这个去消费、】*  发现没有数据的时候同样把这个写进去、然后就从最大的消费*/properties.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata1");//重置消费者的offset、意为 从零开始properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Arrays.asList("first", "second"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(100);//解析并打印consumerRecordsfor (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key() + "--" + consumerRecord.value());}}}
}

2、手动提交offset

虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。

手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提交)。两者的相同点是,**都会将本次 poll 的一批数据最高的偏移量提交;**不同点是, commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。

1)同步提交 offset

由于同步提交 offset 有失败重试机制,故更加可靠,以下为同步提交offset 的示例。

package org.example.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;/*** 消费者* 1)同步提交 offset** @ClassName MyConsumer* @Author 小坏* @Date 2021/10/31、16:09* @Version 1.0*/
public class MyConsumer2 {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");//开启自动提交
//        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//自动提交的延迟
//        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");//Key,Value 的反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");/*** 如何重复消费主题的数据、* 1、换一个组* 2、设置重置的offset*  properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");**  为什么从最大的消费?*  一个组过来消费的时候、相当于断开重新连接、然后会去返回一下以前的数据、重新连接了就不能返回了、找不到了、*  系统就要给你一个、既然没有了就告诉你从这个地方消费、给的时候就有 earliest、latest最大最小值、然后按照这个去消费、】*  发现没有数据的时候同样把这个写进去、然后就从最大的消费*/properties.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata1");//重置消费者的offset、意为 从零开始properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//订阅主题consumer.subscribe(Arrays.asList("first", "second"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(100);//解析并打印consumerRecordsfor (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key() + "--" + consumerRecord.value());}/*** 同步提交,当前线程会阻塞直到 offset 提交成功* 但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。* 因此更多的情况下,会选用异步提交 offset 的方式。*/consumer.commitSync();}}
}

2)异步提交 offset

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交
offset 的方式。

以下为异步提交 offset 的示例

package org.example.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.util.Arrays;
import java.util.Map;
import java.util.Properties;/*** 消费者* 2)异步提交 offset* 虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。* 因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。** @ClassName MyConsumer* @Author 小坏* @Date 2021/10/31、16:09* @Version 1.0*/
public class MyConsumer3 {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");//开启自动提交
//        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//自动提交的延迟
//        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");//Key,Value 的反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");/*** 如何重复消费主题的数据、* 1、换一个组* 2、设置重置的offset*  properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");**  为什么从最大的消费?*  一个组过来消费的时候、相当于断开重新连接、然后会去返回一下以前的数据、重新连接了就不能返回了、找不到了、*  系统就要给你一个、既然没有了就告诉你从这个地方消费、给的时候就有 earliest、latest最大最小值、然后按照这个去消费、】*  发现没有数据的时候同样把这个写进去、然后就从最大的消费*/properties.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata1");//重置消费者的offset、意为 从零开始properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//订阅主题consumer.subscribe(Arrays.asList("first", "second"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(100);//解析并打印consumerRecordsfor (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key() + "--" + consumerRecord.value());}/*** 虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,* 直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。*/consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.out.println("Commit failed for" + offsets);}}});}}
}

3)数据漏消费和重复消费分析

无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。

3、自定义存储 offset

Kafka 0.9 版本之前,offset 存储在 zookeeper,0.9 版本及之后,默认将 offset 存储在 Kafka
的一个内置的topic 中。除此之外,Kafka 还可以选择自定义存储 offset。

offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。

当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做Rebalance。

消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先
获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。要实现自定义存储 offset,需要借助 ConsumerRebalanceListener,以下为示例代码,其
中提交和获取offset 的方法,需要根据所选的 offset 存储系统自行实现。

package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class CustomConsumer {private static Map<TopicPartition, Long> currentOffset = new
HashMap<>();
public static void main(String[] args) {//创建配置信息Properties props = new Properties();
//Kafka 集群props.put("bootstrap.servers", "hadoop102:9092");
//消费者组,只要 group.id 相同,就属于同一个消费者组props.put("group.id", "test");
//关闭自动提交 offsetprops.put("enable.auto.commit", "false");//Key 和 Value 的反序列化类props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");//创建一个消费者KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props);//消费者订阅主题consumer.subscribe(Arrays.asList("first"), new
ConsumerRebalanceListener() {//该方法会在 Rebalance 之前调用@Overridepublic void
onPartitionsRevoked(Collection<TopicPartition> partitions) {commitOffset(currentOffset);}//该方法会在 Rebalance 之后调用@Overridepublic void
onPartitionsAssigned(Collection<TopicPartition> partitions) {currentOffset.clear();for (TopicPartition partition : partitions) {consumer.seek(partition, getOffset(partition));//
定位到最近提交的 offset 位置继续消费}}});while (true) {ConsumerRecords<String, String> records =
consumer.poll(100);//消费者拉取数据for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value
= %s%n", record.offset(), record.key(), record.value());currentOffset.put(new TopicPartition(record.topic(),
record.partition()), record.offset());}commitOffset(currentOffset);//异步提交}}//获取某分区的最新 offsetprivate static long getOffset(TopicPartition partition) {return 0;}//提交该消费者所有分区的 offsetprivate static void commitOffset(Map<TopicPartition, Long>
currentOffset) {} }

三、自定义 Interceptor

1、拦截器原理

Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients
端的定制化控制逻辑。

对于 producer 而言,interceptor 使得用户在消息发送前以及 producer
回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor 按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

(1)configure(configs)

获取配置信息和初始化数据时调用。

(2)onSend(ProducerRecord):

该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在
消息被序列化以及计算分区前调用该方法
。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。

(3)onAcknowledgement(RecordMetadata, Exception):

该方法会在消息从RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。

(4)close:

关闭 interceptor,主要用于执行一些资源清理工作
如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

2、拦截器案例

1)需求

实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。

Kafka拦截器

2)案例实操

(1)增加时间戳拦截器

package org.example.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** 带拦截器的生产者* @ClassName InterceptorProducer* @Author 小坏* @Date 2021/11/3、17:31* @Version 1.0*/
public class InterceptorProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//指定连接的kafka集群properties.put("bootstrap.servers", "hadoop102:9092");//Ack应答级别properties.put("acks", "all");//重试次数properties.put("retries", 3);//批次大小properties.put("batch.size", 16384);//等待时间properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);//RecordAccumulator 缓 冲区大小properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//加入拦截器List<String> list =  new ArrayList<>();list.add("org.example.interceptor.TimeInterceptor");list.add("org.example.interceptor.CounterInterceptor");properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,list);KafkaProducer<String, String> producer = new KafkaProducer<>(properties);/*** 使用最后的一个api、只有一个key、所有他自己取轮询*/for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<>("first", "atguigu","atguigu-" + i)).get();}//关闭资源producer.close();}
}

定义拦截器A

package org.example.interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @ClassName TimeInterceptor* @Author 小坏* @Date 2021/11/3、17:12* @Version 1.0*/
public class TimeInterceptor implements ProducerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {//1、取出数据String value = record.value();return new ProducerRecord<String, String>(record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "," + value);}/*** 时间拦截器处理业务比较复杂的场景* @param metadata* @param exception*/@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}}

(2)统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器

package com.atguigu.kafka.interceptor;
package org.example.interceptor;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @ClassName CounterInterceptor* @Author 小坏* @Date 2021/11/3、17:25* @Version 1.0*/
public class CounterInterceptor implements ProducerInterceptor<String, String> {int success;int error;/*** 处理的数据不需要动** @param record* @return*/@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}/*** 重要的在这写** @param metadata* @param exception*/@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if (metadata != null) {success++;} else {error++;}}@Overridepublic void close() {System.out.println("success:"+success);System.out.println("error:"+error);}@Overridepublic void configure(Map<String, ?> configs) {}
}

(3)producer 主程序

package com.atguigu.kafka.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
public class InterceptorProducer {public static void main(String[] args) throws Exception {// 1 设置配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop102:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// 2 构建拦截链
List<String> interceptors = new ArrayList<>();
interceptors.add("com.atguigu.kafka.interceptor.TimeInterce
ptor");
interceptors.add("com.atguigu.kafka.interceptor.CounterInte
rceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
interceptors);
String topic = "first";
Producer<String, String> producer = new
KafkaProducer<>(props);
// 3 发送消息
for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new
ProducerRecord<>(topic, "message" + i);producer.send(record);
}
// 4 一定要关闭 producer,这样才会调用 interceptor 的 close 方法
producer.close();
} }

3)测试

(1)在 kafka 上启动消费者,然后运行客户端 java 程序。

[hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic
first
1501904047034,message0
1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message9

大数据之 Kafka API 从入门到放弃 (第四章)相关推荐

  1. 大数据时代:SSAS从入门到放弃

    大数据时代到来,如何从数据中提取.挖掘对业务发展有价值的信息,为业务决策提供有力依据,推动精益化化的企业管理.商业分析师通常会使用各种数据分析工具,例如Excel.Tableau和PowerBI等对数 ...

  2. kafka分区与分组原理_大数据技术-Kafka入门

    在大数据学习当中,主要的学习重点就是大数据技术框架,针对于大数据处理的不同环节,需要不同的技术框架来解决问题.以Kafka来说,主要就是针对于实时消息处理,在大数据平台当中的应用也很广泛.大数据学习一 ...

  3. 视频教程-跟风舞烟学大数据可视化-Echarts从入门到上手实战-JavaScript

    跟风舞烟学大数据可视化-Echarts从入门到上手实战 网名风舞烟,中国科技大学计算机专业.微软认证讲师(MCE).微软数据分析讲师.10多年软件行业从业经验,参与过数百万的企业级ERP系统,在大数据 ...

  4. 车联网大数据框架_大数据基础:ORM框架入门简介

    作为大数据开发技术者,需要掌握扎实的Java基础,这是不争的事实,所以对于Java开发当中需要掌握的重要框架技术,也需要有相应程度的掌握,比如说ORM框架.今天的大数据基础分享,我们就来具体讲一讲OR ...

  5. java从入门到精通_Java大数据:数据库开发从入门到精通

    在Java大数据开发任务当中,数据存储是非常关键的一环,涉及到分布式文件系统.分布式数据库,数据库是后端系统当中支持数据存储的重要组件.今天我们就来聊聊Java大数据,数据库开发从入门到精通,应该如何 ...

  6. 云计算大数据之 Kafka集群搭建

    云计算大数据之 Kafka集群搭建 版权声明: 本文为博主学习整理原创文章,如有不正之处请多多指教. 未经博主允许不得转载.https://blog.csdn.net/qq_42595261/arti ...

  7. 3000门徒内部训练绝密视频(泄密版)第1课:大数据最火爆语言Scala光速入门

    大数据最火爆语言Scala光速入门 scala 可以使用java的库 scala 的工厂方法:apply 条件表达式有返回值 数组可以用to ,箭头 <- 最后一行内容的值是整个代码块的返回值 ...

  8. Java大数据:数据库开发从入门到精通

    在Java大数据开发任务当中,数据存储是非常关键的一环,涉及到分布式文件系统.分布式数据库,数据库是后端系统当中支持数据存储的重要组件.今天我们就来聊聊Java大数据,数据库开发从入门到精通,应该如何 ...

  9. GPS从入门到放弃(四) --- GPS信号结构

    GPS从入门到放弃(四) - GPS信号结构 GPS信号结构可以分为三层: 载波 伪码 数据码 载波 载波是三层里的基础,伪码和数据码都是调制在载波上才能发送.GPS有两个载波频率,L1和L2,L1为 ...

  10. ArcGIS for Desktop入门教程_第四章_入门案例分析 - ArcGIS知乎-新一代ArcGIS问答社区...

    原文:ArcGIS for Desktop入门教程_第四章_入门案例分析 - ArcGIS知乎-新一代ArcGIS问答社区 1 入门案例分析 在第一章里,我们已经对ArcGIS系列软件的体系结构有了一 ...

最新文章

  1. 穷举n位二进制数 (深搜、回溯_子集树)
  2. 图像分析:二值图像连通域标记
  3. pandas 把某一列中字符串变数值_Python学习教程:Python数据分析实战基础 | 初识Pandas...
  4. 第一百一十二期:96秒100亿!如何抗住双11高并发流量?
  5. OAF_OAF组件系列1 - Item Style汇总(概念)
  6. 《JavaScript开发框架权威指南》——1.4 查找、添加和删除Bower包
  7. 论文笔记_S2D.18_2019-ICRA_DeepFusion: 基于单视图深度和梯度预测的单目SLAM实时稠密三维重建
  8. 使用pdfbox分页保存pdf为图片
  9. H3C-NE实验主要命令
  10. 女人长点心就收藏它肯定会用到的
  11. The server encountered an internal error that prevented it from fulfilling this request.解决方法
  12. 霍纳法则c语言算法代码,霍纳法则(Horner Rule)介绍及C语言实现
  13. uni-app真机运行app时报错:TypeError: Cannot read property ‘call’ of undefined
  14. 中易云嵌入式网关丨性能卓越+性价比高+应用场景丰富
  15. 【大学生软件测试基础】图书阅读指南 - 决策表法
  16. Storage 的使用
  17. 阿里巴巴二重身ABBC Coin虚涨逾100%
  18. 被微软收购两年后,GitHub 怎么样了?
  19. 准确率100%,阿里商旅账单系统架构设计实践
  20. 借大数据玩自主酒店,携程的酒店业之困能不能解?

热门文章

  1. 教育统计系统服务器,2020版教育事业统计软件常见问题及解决方案
  2. win10各版本的历史记录
  3. Android对接蓝牙打印机
  4. 决策树算法(ID3算法)
  5. 两台win7电脑,双网卡主机共享网络(局域网和Internet)给从机
  6. ubuntu/window安装dukto
  7. 数据分析-常用分析方法-(1)描述性分析-用Excel实现
  8. python等值线如何设置高度的范围和间隔_matlab 等值线间距问题
  9. phy芯片测试寄存器_DM9000寄存器功能详细介绍
  10. C# WinAPI 编程详解(一)