文章目录

  • 1.Producer API
    • 1.1 消息发送流程
    • 1.2 异步发送API
      • 1.2.1 导入依赖
      • 1.2.2 添加log4j配置文件
      • 1.2.3 编写代码
        • 1.2.3.1 不带回调函数的API
        • 1.2.3.2 带回调函数的API
    • 1.3 自定义分区器
  • 2.Consumer API
    • 2.1 自动提交offset
      • 2.1.1 编写代码
    • 2.2 手动提交offset
      • 2.2.1 同步提交offset
      • 2.2.2 异步提交offset
    • 2.3 数据漏消费和重复消费分析
  • 3.自定义Interceptor
    • 3.1 拦截器原理
    • 3.2 拦截器案例
      • 3.2.1 需求
      • 3.2.2 增加时间戳拦截器
      • 3.2.3 计数拦截器
      • 3.2.4 Producer主程序
      • 3.2.5 测试

1.Producer API

1.1 消息发送流程

​ Kafka的Producer发送消息采用异步发送的方式。在消息发送的过程中,涉及到了两个线程—-main线程和Sender线程,以及一个线程共享变量—-RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

​ 下图为KafkaProducer发送消息流程:

注意:

两个重要参数:
1.batch.size:sender发送数据的前提是数据积累到batch.size(一批数据的大小)。
2.lingertime:如果数据迟迟未达到batch.size,sender等待lingertime之后就会发送数据。

1.2 异步发送API

1.2.1 导入依赖

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.12.0</version></dependency>
</dependencies>

1.2.2 添加log4j配置文件

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig"><Appenders><!-- 类型名为Console,名称为必须属性 --><Appender type="Console" name="STDOUT"><!-- 布局为PatternLayout的方式,输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here --><Layout type="PatternLayout"pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" /></Appender></Appenders><Loggers><!-- 可加性为false --><Logger name="test" level="info" additivity="false"><AppenderRef ref="STDOUT" /></Logger><!-- root loggerConfig设置 --><Root level="info"><AppenderRef ref="STDOUT" /></Root></Loggers></Configuration>

1.2.3 编写代码

需要用到的类:

KafkaProducer:创建一个生产者对象,用来发送数据

ProducerConfig:获取所需的一系列配置参数

ProducerRecord:每条数据都要封装成一个ProducerRecord对象

1.2.3.1 不带回调函数的API

CustomProducer.java


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();//kafka集群:broker-listprops.put("bootstrap.servers","yxp:9092");//重试次数props.put("retries",1);//批次大小props.put("batch.size",16384);//等待时间props.put("linger.ms",1);//RecordAccumilator缓冲区大小props.put("buffer.memory",33554432);//键和值的序列化器props.put("key.serializer", StringSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);//发送的消息内容为:[0,20)for (int i=0;i<20;i++){producer.send(new ProducerRecord<String,String>("topic_jx",String.valueOf(i),String.valueOf(i)));}producer.close();}
}

测试:

1.在shell中先开启consumer端:

如果没有该Topic,会警告然后自动创建,不用担心。

命令:

kafka-console-consumer.sh --bootstrap-server yxp:9092 --topic topic_jx

2.运行ConusmerProducer代码

3.查看Consumer收到的消息

1.2.3.2 带回调函数的API

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadataException,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。

注意:当消息发送失败后会自动重试,不需要我们在回调函数中手动调试。

对原ConsumerProducer.java做一下修改,修改后的程序代码为:

package com.yxp.kafkareview;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();//kafka集群:broker-listprops.put("bootstrap.servers","yxp:9092");//重试次数props.put("retries",1);//批次大小props.put("batch.size",16384);//等待时间props.put("linger.ms",1);//RecordAccumilator缓冲区大小props.put("buffer.memory",33554432);//键和值的序列化器props.put("key.serializer", StringSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);//在每次发送的数据的send方法中调用回调函数for (int i=0;i<20;i++){producer.send(new ProducerRecord<String, String>("topic_jx", String.valueOf(i), String.valueOf(i)), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception==null){System.out.println("success->"+metadata.offset());}else{exception.printStackTrace();}}});}producer.close();}
}

发送消息后会调用该方法打印成功与否,控制台消息如下:

这success后跟的是生产者的偏移量。由于已经发送过20条(0-19),因此从20开始

Consumer端收到的消息:


1.3 自定义分区器

​ MyPartitioner.java

package com.yxp.kafkareview;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class MyPartitionerx implements Partitioner {/***分区规则:value%2==0 放在0分区中,value%2!=0 放在1分区中* @param topic 主题* @param key   消息的key* @param keyBytes key序列化后的字节数组* @param value 消息的value* @param valueBytes value序列化后的字节数组* @param cluster  类似于MR中的context,有获取各种参数的作用* @return*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int partition=1;if (Integer.parseInt(value.toString())%2==0){System.out.println(Integer.parseInt(value.toString())%2);partition=0;}else {System.out.println(Integer.parseInt(value.toString())%2);}return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

在Producer对应的类中指定该程序全类名。

ConsumerProducer.java

package com.yxp.kafkareview;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();//kafka集群:broker-listprops.put("bootstrap.servers","yxp:9092");//重试次数props.put("retries",1);//批次大小props.put("batch.size",16384);//等待时间props.put("linger.ms",1);//自定义分区器props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitionerx.class.getName());//RecordAccumilator缓冲区大小props.put("buffer.memory",33554432);//键和值的序列化器props.put("key.serializer", StringSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);for (int i=0;i<40;i++){producer.send(new ProducerRecord<String, String>("topic_mem", String.valueOf(i)), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception==null){System.out.println("success->"+metadata.offset());}else{exception.printStackTrace();}}});}producer.close();}
}

2.Consumer API

​ 由于数据在Kafka中是持久化的,所以不用担心数据丢失问题,因此,Consumer消费数据时的可靠性是可以保证的。

​ 由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

所以offset的维护是Consumer消费数据是必须考虑的问题。


2.1 自动提交offset

2.1.1 编写代码

需要用到的类:

KafkaConsumer:创建一个消费者对象,用来消费数据。

ConsumerConfig:获取所需的一系列配置参数。

ConsumerRecord:每条数据都要封装成一个ConsumerRecord对象。

Kafka提供了自动提交offset的功能。自动提交Offset的相关参数:

enable.auto.commit:是否开启自动提交offset功能。

auto.commit.interval.ms:自动提交offset的时间间隔。

CustomConsumer.java

package com.yxp.kafkareview;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class CustomConsumer {public static void main(String[] args) {Properties props = new Properties();//kafka集群服务器 props.put("bootstrap.servers", "yxp:9092");//消费者的组idprops.put("group.id", "test");//允许自动提交 props.put("enable.auto.commit", "true");//自动提交的间隔时间 props.put("auto.commit.interval.ms", "1000");//key的反序列化器        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//value的反序列化器 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//指定提交到的topic consumer.subscribe(Arrays.asList("topic_mem"));//无限循环的poll数据while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}

2.2 手动提交offset

​ 虽然自动提交offset十分简洁便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机,因此Kafka还提供了手动提交offset的API

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

2.2.1 同步提交offset

由于同步提交offset有失败重试机制,故更加可靠,以下为同步提交offset的示例。

CustomConsumer.java

package com.yxp.kafkareview;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class CustomConsumer {public static void main(String[] args) {Properties props = new Properties();//集群服务器props.put("bootstrap.servers", "yxp:9092");//组idprops.put("group.id", "test");//关闭自动提交props.put("enable.auto.commit", "false");//自动提交间隔props.put("auto.commit.interval.ms", "1000");//key和value的反序列化器props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic_mem"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}//消费完之后同步提交offset偏移量consumer.commitSync();}}
}

2.2.2 异步提交offset

虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交offset的方式。

CustomConsumer.java

package com.yxp.kafkareview;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class CustomConsumer {public static void main(String[] args) {Properties props = new Properties();//集群服务器props.put("bootstrap.servers", "yxp:9092");//组idprops.put("group.id", "test");//关闭自动提交props.put("enable.auto.commit", "false");//自动提交间隔props.put("auto.commit.interval.ms", "1000");//key和value的反序列化器props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic_mem"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}//消费完之后异步提交offset偏移量consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception!=null){exception.printStackTrace();}}});}}
}

2.3 数据漏消费和重复消费分析

无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费而先消费后提交offset,有可能会造成数据的重复消费


3.自定义Interceptor

3.1 拦截器原理

​ Producer拦截器(Interceptor)主要用于实现clients端的定制化控制逻辑。

​ 对于Producer而言,Interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个Interceptor按序作用于同一条消息从而形成一个拦截链(Interceptor chain)。拦截器需要实现ProducerInterceptor接口,其定义的方法包括:

(1)configure(configs):

获取配置信息和初始化数据时调用。

(2)onSend(ProducerRecord)

该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。

(3)onAcknowledgement(RecordMetadata, Exception)

该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。

(4)close

关闭interceptor,主要用于执行一些资源清理工作.

如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非向上传递。这在使用过程中要特别留意。

3.2 拦截器案例


3.2.1 需求

实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。

3.2.2 增加时间戳拦截器

TimeInterceptor.java

package com.yxp.kafkareview;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class TimeInterceptor implements ProducerInterceptor<String,String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {//新构建一个ProducerRecorder对象(在值属性那里将时间戳加在原值前面)return new ProducerRecord<>(record.topic(),record.partition(),record.key(),System.currentTimeMillis()+","+ record.value());}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

3.2.3 计数拦截器

CounterInterceptor.java

package com.yxp.kafkareview;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:统计发送消息的成功和失败数*/
public class CounterInterceptor implements ProducerInterceptor<String,String > {private int errorCounter=0;private int successCounter=0;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {//统计成功和失败的次数if (exception==null){successCounter++;}else {errorCounter++;}}@Overridepublic void close() {System.out.println("Success sent:"+successCounter);System.out.println("Error sent:"+errorCounter);}@Overridepublic void configure(Map<String, ?> configs) {}
}

3.2.4 Producer主程序

package com.yxp.kafkareview;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.ArrayList;
import java.util.Properties;/*** @Author : 尤小鹏* 切忌一味模仿!* 2022/1/6/006* description:*/
public class InterceptorProducer {public static void main(String[] args) {//1.设置配置信息Properties props = new Properties();props.put("bootstrap.servers", "yxp:9092");props.put("acks", "all");props.put("retries", 3);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//2.构建拦截链ArrayList<String> interceptors = new ArrayList<>();interceptors.add(TimeInterceptor.class.getName());interceptors.add(CounterInterceptor.class.getName());props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);KafkaProducer<String, String> producer = new KafkaProducer<>(props);//3.发送消息for (int i=0;i<10;i++){producer.send(new ProducerRecord<>("Topic_new","message"+i));}//4.关闭producerproducer.close();}}

3.2.5 测试

1.在kafka上启动消费者

shell端命令:kafka-console-consumer.sh --bootstrap-server yxp:9092 --topic Topic_new

2.然后运行客户端java程序。

3.观察消费者接受到的信息

Kafka的入门级API应用相关推荐

  1. 2021年大数据Kafka(五):❤️Kafka的java API编写❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的java API编写 一.生产者代码 第一步: ...

  2. java kafka api_kafka java API的使用

    Kafka包含四种核心的API: 1.Producer API支持应用将数据流发送到Kafka集群的主题 2.Consumer API支持应用从Kafka集群的主题中读取数据流 3.Streams A ...

  3. java kafka client_Kafka Client API 基本使用

    之前讲过了[Kafka基本概念及原理][1],这次我们来看看Kafka Client的API.要使用Kafka Client的API,首先需要先部署Kafka集群,部署过程请参见[官网][2].然后在 ...

  4. kafka 命令、API

    日萌社 人工智能AI:Keras PyTorch MXNet TensorFlow PaddlePaddle 深度学习实战(不定时更新) 大数据组件使用 总文章 kafka 生产/消费API.offs ...

  5. 简单封装kafka相关的api

    一.针对于kafka版本 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafk ...

  6. Kafka High Level API vs. Low Level API

    目录: 1.ConsumerApi 2.High Level Consumer (屏蔽细节管理) 3.Low Level API (细节需要自己处理) 1.Kafka提供了两种Consumer API ...

  7. kafka(三)kafka steaming high-level api

    接上一篇文章 https://blog.csdn.net/qq_44962429/article/details/113809911 1. high level api Kafka Streams D ...

  8. Kafka之Consumer API详解

    Kafka中如何创建消费者Consumer已经在前面给大家详细的讲解过,那么如何使用JAVA来消费topic中的数据呢呢,今天就说说. 还是先创建一个topic,拥有一个副本和一个分区 kafka-t ...

  9. kafka 消费端 api_在消费者的眼中:您真的需要为您的API提供客户端库吗?

    kafka 消费端 api RESTful Web服务和API的优点在于,任何使用HTTP协议的使用者都可以理解和使用它. 但是,同样的难题一遍又一遍地弹出:您是否应该将Web APis与客户端库一起 ...

  10. kafka jar包_和同事交流不会kafka怎么行,API奉上,不是大神也能编

    对于kafka真的是又爱又恨,作为架构和大数据两个方面的通用者, 在这个数据量称雄的时代,越来越起到至关重要的作用,在和同事进行交流的时候,kafka在开发的过程中如何使用能起到最大的效果成为话题之一 ...

最新文章

  1. java 注解妙用_框架开发之Java注解的妙用
  2. 【Linux】一步一步学Linux——bind命令(231)
  3. 商务智能之绩效管理 Performance Management
  4. 接口 vs 抽象类 的区别
  5. python 数据类_python数据类
  6. css 中多种边框的实现小窍门
  7. JavaScript权威指南 - 数组
  8. [POI2009]石子游戏Kam
  9. 硬盘服务器作用,文件服务器有什么作用?
  10. ETC风头已过,龙头林立,黔通智联此时上市还真缺点儿“想象力”
  11. Power BI Desktop 中的数据源
  12. Arduino应用开发——LCD显示图片
  13. android手机分区调整大小写,如何使用PQMagic调整磁盘分区容量大小
  14. App Store商店图片文案填写说明
  15. 服务器系统测试,服务器系统整合测试
  16. PAT A1038 Recover the Smallest Number ——醉里挑灯看剑
  17. 兄弟扫描机无法连接计算机,可以网络打印, 但是不能进行网络扫描。
  18. 【集合论】偏序关系 ( 偏序关系定义 | 偏序集定义 | 大于等于关系 | 小于等于关系 | 整除关系 | 包含关系 | 加细关系 )
  19. 基于图神经网络的知识追踪的五篇论文
  20. Python如何读取STL文件,生成STL文件预览图(缩略图)

热门文章

  1. 《雍正皇帝》文化专有词翻译策略的研究现状(Baker)
  2. 15个在线网站检测工具
  3. Android常用控件-01
  4. Badboy 安装 使用 常见问题 badboy当前页面脚本发生错误
  5. 英文转换-在线英文批量转换器免费
  6. 足疗小张和面向对象的7个设计原则
  7. 机器学习实战----初识泰坦尼克
  8. JavaScript 数组方法every()
  9. 软件保护技术 - 基础
  10. Unity3d的场景音效静音处理