Kafka的入门级API应用
文章目录
- 1.Producer API
- 1.1 消息发送流程
- 1.2 异步发送API
- 1.2.1 导入依赖
- 1.2.2 添加log4j配置文件
- 1.2.3 编写代码
- 1.2.3.1 不带回调函数的API
- 1.2.3.2 带回调函数的API
- 1.3 自定义分区器
- 2.Consumer API
- 2.1 自动提交offset
- 2.1.1 编写代码
- 2.2 手动提交offset
- 2.2.1 同步提交offset
- 2.2.2 异步提交offset
- 2.3 数据漏消费和重复消费分析
- 3.自定义Interceptor
- 3.1 拦截器原理
- 3.2 拦截器案例
- 3.2.1 需求
- 3.2.2 增加时间戳拦截器
- 3.2.3 计数拦截器
- 3.2.4 Producer主程序
- 3.2.5 测试
1.Producer API
1.1 消息发送流程
Kafka的Producer发送消息采用异步发送的方式。在消息发送的过程中,涉及到了两个线程—-main线程和Sender线程,以及一个线程共享变量—-RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
下图为KafkaProducer发送消息流程:
注意:
两个重要参数:
1.batch.size:sender发送数据的前提是数据积累到batch.size(一批数据的大小)。
2.lingertime:如果数据迟迟未达到batch.size,sender等待lingertime之后就会发送数据。
1.2 异步发送API
1.2.1 导入依赖
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.12.0</version></dependency>
</dependencies>
1.2.2 添加log4j配置文件
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig"><Appenders><!-- 类型名为Console,名称为必须属性 --><Appender type="Console" name="STDOUT"><!-- 布局为PatternLayout的方式,输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here --><Layout type="PatternLayout"pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" /></Appender></Appenders><Loggers><!-- 可加性为false --><Logger name="test" level="info" additivity="false"><AppenderRef ref="STDOUT" /></Logger><!-- root loggerConfig设置 --><Root level="info"><AppenderRef ref="STDOUT" /></Root></Loggers></Configuration>
1.2.3 编写代码
需要用到的类:
KafkaProducer:创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个ProducerRecord对象
1.2.3.1 不带回调函数的API
CustomProducer.java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();//kafka集群:broker-listprops.put("bootstrap.servers","yxp:9092");//重试次数props.put("retries",1);//批次大小props.put("batch.size",16384);//等待时间props.put("linger.ms",1);//RecordAccumilator缓冲区大小props.put("buffer.memory",33554432);//键和值的序列化器props.put("key.serializer", StringSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);//发送的消息内容为:[0,20)for (int i=0;i<20;i++){producer.send(new ProducerRecord<String,String>("topic_jx",String.valueOf(i),String.valueOf(i)));}producer.close();}
}
测试:
1.在shell中先开启consumer端:
如果没有该Topic,会警告然后自动创建,不用担心。
命令:
kafka-console-consumer.sh --bootstrap-server yxp:9092 --topic topic_jx
2.运行ConusmerProducer代码
3.查看Consumer收到的消息
1.2.3.2 带回调函数的API
回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。
注意:当消息发送失败后会自动重试,不需要我们在回调函数中手动调试。
对原ConsumerProducer.java做一下修改,修改后的程序代码为:
package com.yxp.kafkareview;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();//kafka集群:broker-listprops.put("bootstrap.servers","yxp:9092");//重试次数props.put("retries",1);//批次大小props.put("batch.size",16384);//等待时间props.put("linger.ms",1);//RecordAccumilator缓冲区大小props.put("buffer.memory",33554432);//键和值的序列化器props.put("key.serializer", StringSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);//在每次发送的数据的send方法中调用回调函数for (int i=0;i<20;i++){producer.send(new ProducerRecord<String, String>("topic_jx", String.valueOf(i), String.valueOf(i)), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception==null){System.out.println("success->"+metadata.offset());}else{exception.printStackTrace();}}});}producer.close();}
}
发送消息后会调用该方法打印成功与否,控制台消息如下:
这success后跟的是生产者的偏移量。由于已经发送过20条(0-19),因此从20开始
Consumer端收到的消息:
1.3 自定义分区器
MyPartitioner.java
package com.yxp.kafkareview;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class MyPartitionerx implements Partitioner {/***分区规则:value%2==0 放在0分区中,value%2!=0 放在1分区中* @param topic 主题* @param key 消息的key* @param keyBytes key序列化后的字节数组* @param value 消息的value* @param valueBytes value序列化后的字节数组* @param cluster 类似于MR中的context,有获取各种参数的作用* @return*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int partition=1;if (Integer.parseInt(value.toString())%2==0){System.out.println(Integer.parseInt(value.toString())%2);partition=0;}else {System.out.println(Integer.parseInt(value.toString())%2);}return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
在Producer对应的类中指定该程序全类名。
ConsumerProducer.java
package com.yxp.kafkareview;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();//kafka集群:broker-listprops.put("bootstrap.servers","yxp:9092");//重试次数props.put("retries",1);//批次大小props.put("batch.size",16384);//等待时间props.put("linger.ms",1);//自定义分区器props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitionerx.class.getName());//RecordAccumilator缓冲区大小props.put("buffer.memory",33554432);//键和值的序列化器props.put("key.serializer", StringSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);for (int i=0;i<40;i++){producer.send(new ProducerRecord<String, String>("topic_mem", String.valueOf(i)), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception==null){System.out.println("success->"+metadata.offset());}else{exception.printStackTrace();}}});}producer.close();}
}
2.Consumer API
由于数据在Kafka中是持久化的,所以不用担心数据丢失问题,因此,Consumer消费数据时的可靠性是可以保证的。
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
所以offset的维护是Consumer消费数据是必须考虑的问题。
2.1 自动提交offset
2.1.1 编写代码
需要用到的类:
KafkaConsumer:创建一个消费者对象,用来消费数据。
ConsumerConfig:获取所需的一系列配置参数。
ConsumerRecord:每条数据都要封装成一个ConsumerRecord对象。
Kafka提供了自动提交offset的功能。自动提交Offset的相关参数:
enable.auto.commit:是否开启自动提交offset功能。
auto.commit.interval.ms:自动提交offset的时间间隔。
CustomConsumer.java
package com.yxp.kafkareview;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class CustomConsumer {public static void main(String[] args) {Properties props = new Properties();//kafka集群服务器 props.put("bootstrap.servers", "yxp:9092");//消费者的组idprops.put("group.id", "test");//允许自动提交 props.put("enable.auto.commit", "true");//自动提交的间隔时间 props.put("auto.commit.interval.ms", "1000");//key的反序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//value的反序列化器 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//指定提交到的topic consumer.subscribe(Arrays.asList("topic_mem"));//无限循环的poll数据while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}
2.2 手动提交offset
虽然自动提交offset十分简洁便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机,因此Kafka还提供了手动提交offset的API。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。
2.2.1 同步提交offset
由于同步提交offset有失败重试机制,故更加可靠,以下为同步提交offset的示例。
CustomConsumer.java
package com.yxp.kafkareview;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class CustomConsumer {public static void main(String[] args) {Properties props = new Properties();//集群服务器props.put("bootstrap.servers", "yxp:9092");//组idprops.put("group.id", "test");//关闭自动提交props.put("enable.auto.commit", "false");//自动提交间隔props.put("auto.commit.interval.ms", "1000");//key和value的反序列化器props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic_mem"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}//消费完之后同步提交offset偏移量consumer.commitSync();}}
}
2.2.2 异步提交offset
虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交offset的方式。
CustomConsumer.java
package com.yxp.kafkareview;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class CustomConsumer {public static void main(String[] args) {Properties props = new Properties();//集群服务器props.put("bootstrap.servers", "yxp:9092");//组idprops.put("group.id", "test");//关闭自动提交props.put("enable.auto.commit", "false");//自动提交间隔props.put("auto.commit.interval.ms", "1000");//key和value的反序列化器props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic_mem"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}//消费完之后异步提交offset偏移量consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception!=null){exception.printStackTrace();}}});}}
}
2.3 数据漏消费和重复消费分析
无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能会造成数据的重复消费。
3.自定义Interceptor
3.1 拦截器原理
Producer拦截器(Interceptor)主要用于实现clients端的定制化控制逻辑。
对于Producer而言,Interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个Interceptor按序作用于同一条消息从而形成一个拦截链(Interceptor chain)。拦截器需要实现ProducerInterceptor接口,其定义的方法包括:
(1)configure(configs):
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。
(4)close:
关闭interceptor,主要用于执行一些资源清理工作.
如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非向上传递。这在使用过程中要特别留意。
3.2 拦截器案例
3.2.1 需求
实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。
3.2.2 增加时间戳拦截器
TimeInterceptor.java
package com.yxp.kafkareview;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class TimeInterceptor implements ProducerInterceptor<String,String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {//新构建一个ProducerRecorder对象(在值属性那里将时间戳加在原值前面)return new ProducerRecord<>(record.topic(),record.partition(),record.key(),System.currentTimeMillis()+","+ record.value());}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
3.2.3 计数拦截器
CounterInterceptor.java
package com.yxp.kafkareview;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:统计发送消息的成功和失败数*/
public class CounterInterceptor implements ProducerInterceptor<String,String > {private int errorCounter=0;private int successCounter=0;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {//统计成功和失败的次数if (exception==null){successCounter++;}else {errorCounter++;}}@Overridepublic void close() {System.out.println("Success sent:"+successCounter);System.out.println("Error sent:"+errorCounter);}@Overridepublic void configure(Map<String, ?> configs) {}
}
3.2.4 Producer主程序
package com.yxp.kafkareview;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.ArrayList;
import java.util.Properties;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class InterceptorProducer {public static void main(String[] args) {//1.设置配置信息Properties props = new Properties();props.put("bootstrap.servers", "yxp:9092");props.put("acks", "all");props.put("retries", 3);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//2.构建拦截链ArrayList<String> interceptors = new ArrayList<>();interceptors.add(TimeInterceptor.class.getName());interceptors.add(CounterInterceptor.class.getName());props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);KafkaProducer<String, String> producer = new KafkaProducer<>(props);//3.发送消息for (int i=0;i<10;i++){producer.send(new ProducerRecord<>("Topic_new","message"+i));}//4.关闭producerproducer.close();}}
3.2.5 测试
1.在kafka上启动消费者
shell端命令:kafka-console-consumer.sh --bootstrap-server yxp:9092 --topic Topic_new
2.然后运行客户端java程序。
3.观察消费者接受到的信息
Kafka的入门级API应用相关推荐
- 2021年大数据Kafka(五):❤️Kafka的java API编写❤️
全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的java API编写 一.生产者代码 第一步: ...
- java kafka api_kafka java API的使用
Kafka包含四种核心的API: 1.Producer API支持应用将数据流发送到Kafka集群的主题 2.Consumer API支持应用从Kafka集群的主题中读取数据流 3.Streams A ...
- java kafka client_Kafka Client API 基本使用
之前讲过了[Kafka基本概念及原理][1],这次我们来看看Kafka Client的API.要使用Kafka Client的API,首先需要先部署Kafka集群,部署过程请参见[官网][2].然后在 ...
- kafka 命令、API
日萌社 人工智能AI:Keras PyTorch MXNet TensorFlow PaddlePaddle 深度学习实战(不定时更新) 大数据组件使用 总文章 kafka 生产/消费API.offs ...
- 简单封装kafka相关的api
一.针对于kafka版本 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafk ...
- Kafka High Level API vs. Low Level API
目录: 1.ConsumerApi 2.High Level Consumer (屏蔽细节管理) 3.Low Level API (细节需要自己处理) 1.Kafka提供了两种Consumer API ...
- kafka(三)kafka steaming high-level api
接上一篇文章 https://blog.csdn.net/qq_44962429/article/details/113809911 1. high level api Kafka Streams D ...
- Kafka之Consumer API详解
Kafka中如何创建消费者Consumer已经在前面给大家详细的讲解过,那么如何使用JAVA来消费topic中的数据呢呢,今天就说说. 还是先创建一个topic,拥有一个副本和一个分区 kafka-t ...
- kafka 消费端 api_在消费者的眼中:您真的需要为您的API提供客户端库吗?
kafka 消费端 api RESTful Web服务和API的优点在于,任何使用HTTP协议的使用者都可以理解和使用它. 但是,同样的难题一遍又一遍地弹出:您是否应该将Web APis与客户端库一起 ...
- kafka jar包_和同事交流不会kafka怎么行,API奉上,不是大神也能编
对于kafka真的是又爱又恨,作为架构和大数据两个方面的通用者, 在这个数据量称雄的时代,越来越起到至关重要的作用,在和同事进行交流的时候,kafka在开发的过程中如何使用能起到最大的效果成为话题之一 ...
最新文章
- java 注解妙用_框架开发之Java注解的妙用
- 【Linux】一步一步学Linux——bind命令(231)
- 商务智能之绩效管理 Performance Management
- 接口 vs 抽象类 的区别
- python 数据类_python数据类
- css 中多种边框的实现小窍门
- JavaScript权威指南 - 数组
- [POI2009]石子游戏Kam
- 硬盘服务器作用,文件服务器有什么作用?
- ETC风头已过,龙头林立,黔通智联此时上市还真缺点儿“想象力”
- Power BI Desktop 中的数据源
- Arduino应用开发——LCD显示图片
- android手机分区调整大小写,如何使用PQMagic调整磁盘分区容量大小
- App Store商店图片文案填写说明
- 服务器系统测试,服务器系统整合测试
- PAT A1038 Recover the Smallest Number ——醉里挑灯看剑
- 兄弟扫描机无法连接计算机,可以网络打印, 但是不能进行网络扫描。
- 【集合论】偏序关系 ( 偏序关系定义 | 偏序集定义 | 大于等于关系 | 小于等于关系 | 整除关系 | 包含关系 | 加细关系 )
- 基于图神经网络的知识追踪的五篇论文
- Python如何读取STL文件,生成STL文件预览图(缩略图)