简介:本篇博客是对kafka produce 生产者分区器的API(Java)
包含以下内容:分区使用原则,分区器使用原则,分区器相关代码编写及pom.xml配置文件编写,到最后的运行结果。

目录标题

  • 使用kafka producer分区器的好处:
  • 分区原则
  • 分区器API
    • 相关配置
    • 代码编写
      • 消费者代码编写
      • 自定义partition代码
      • 创建CustomProducerCallBackDemo类使用自定义分区
  • 运行测试结果

使用kafka producer分区器的好处:

1、方便在集群中扩展
2、可以提高并发性

分区原则

1、 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
2、没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
3、 既没有 partition 值又没有 key 值的情况下, kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,kafka再随机一个分区进行使用.(以前是一条条的轮询,现在是一批次的轮询)

分区器API

相关配置

1、创建kafka api的maven项目
2、给pom.xml配置成如下样子:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>kafkaDemmo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-nop</artifactId><version>1.7.2</version></dependency></dependencies></project>

3、在main下的resources中添加log4j.properties文件,内容如下:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

代码编写

消费者代码编写

代码内容如下:

package com.lqs.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;/*** @author qingSong liu* @version 1.0* @time 2021/12/28 21:58*/public class CustomConsumerDemo {public static void main(String[] args) {Properties properties = new Properties();//给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "bdc112:9092");//配置序列化,必须要配置properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//配置消费者组对象,这也是必须要配置的properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");//创建消费者对象KafkaConsumer<String, String> stringStringKafkaConsumer = new KafkaConsumer<>(properties);//注册主题ArrayList<String> strings = new ArrayList<>();strings.add("first01");stringStringKafkaConsumer.subscribe(strings);//拉取数据打印while (true) {ConsumerRecords<String, String> consumerRecords = stringStringKafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}}

自定义partition代码

package com.lqs.kafka.partitioner;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** @author qingSong liu* @version 1.0* @time 2021/12/28 20:12* <p>* 这是一个自定义的分区器* 1、实现接口Partitioner* 2、实现3个方法:Partitioner,close,configure* 3、编写Partition方法,返回分区号*/public class PartitionerDemo implements Partitioner {/*** 返回对应信息的分区号的方法** @param topic      主题* @param key        消息key* @param keyBytes   消息的key序列化后的字节数组* @param value      消息的value* @param valueBytes 消息的value序列化后的字节数组* @param cluster    集群元数据可以查看分区信息* @return 设置好后的分区*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//获取消息String s1 = value.toString();//创建partitionint partition;//判断消息是否包含lqsif (s1.contains("lqs")) {partition = 0;//判断消息是否包含test} else if (s1.contains("test")) {partition = 1;} else {partition = 2;}return partition;}@Override/*** 关闭资源*/public void close() {}/*** 配置方法** @param configs 配置的configs*/@Overridepublic void configure(Map<String, ?> configs) {}
}

创建CustomProducerCallBackDemo类使用自定义分区

package com.lqs.kafka.partitioner;import org.apache.kafka.clients.producer.*;import java.util.Properties;/*** @author qingSong liu* @version 1.0* @time 2021/12/28 20:20*/public class CustomProducerCallBackDemo {public static void main(String[] args) {//创建配置对象Properties properties = new Properties();//给配置对象添加链接properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bdc112:9092");//设置批次大小,默认为16kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//设置等待时间为1毫秒properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);//设置RecordAccumulator(记录累加器)缓冲区大小为默认值32mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//设置key和value的序列化,注意,这个是必须要设置的properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//自定义分区properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.lqs.kafka.partitioner.PartitionerDemo");//创建kafka生产者对象KafkaProducer<String, String> stringStringKafkaProducer = new KafkaProducer<>(properties);for (int i = 0; i < 12; i++) {if (i % 2 == 0) {//调用send方法发送消息stringStringKafkaProducer.send(new ProducerRecord<>("first01", "lqs" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//判断是否发送成功if (exception != null) {exception.printStackTrace();} else {System.out.println(metadata.toString());}}});} else {stringStringKafkaProducer.send(new ProducerRecord<>("first01", "sfa" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {exception.printStackTrace();} else {System.out.println(metadata.toString());}}});}}//关闭资源链接stringStringKafkaProducer.close();}}

运行测试结果

注意:在运行前记得先启动Zookeeper和kafka!!!

1、先运行消费者代码

2、运行生产者代码

3、查看刚运行的消费者代码结果里面出现了新的内容:
出现一下内容,说明手写成功。

F:\installSoftware\IDE\Java\jdk1.8.0_202\bin\java.exe "-javaagent:F:\installSoftware\IDE\IntelliJ IDEA 2021.2\lib\idea_rt.jar=4587:F:\installSoftware\IDE\IntelliJ IDEA 2021.2\bin" -Dfile.encoding=UTF-8 -classpath F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\charsets.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\deploy.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\ext\access-bridge-64.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\ext\cldrdata.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\ext\dnsns.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\ext\jaccess.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\ext\jfxrt.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\ext\localedata.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\ext\nashorn.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\ext\sunec.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\ext\sunjce_provider.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\ext\sunmscapi.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\ext\sunpkcs11.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\ext\zipfs.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\javaws.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\jce.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\jfr.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\jfxswt.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\jsse.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\management-agent.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\plugin.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\resources.jar;F:\installSoftware\IDE\Java\jdk1.8.0_202\jre\lib\rt.jar;F:\Data\codeWorkSpace\myJDBCCode\jdbc\lib\druid-1.1.10.jar;F:\Data\codeWorkSpace\KafkaCode\kafkaDemmo\target\classes;F:\installSoftware\BigDatas\apache-maven-3.5.4\data-repository\org\apache\kafka\kafka-clients\2.4.1\kafka-clients-2.4.1.jar;F:\installSoftware\BigDatas\apache-maven-3.5.4\data-repository\com\github\luben\zstd-jni\1.4.3-1\zstd-jni-1.4.3-1.jar;F:\installSoftware\BigDatas\apache-maven-3.5.4\data-repository\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;F:\installSoftware\BigDatas\apache-maven-3.5.4\data-repository\org\xerial\snappy\snappy-java\1.1.7.3\snappy-java-1.1.7.3.jar;F:\installSoftware\BigDatas\apache-maven-3.5.4\data-repository\org\slf4j\slf4j-api\1.7.28\slf4j-api-1.7.28.jar;F:\installSoftware\BigDatas\apache-maven-3.5.4\data-repository\org\slf4j\slf4j-nop\1.7.2\slf4j-nop-1.7.2.jar com.lqs.kafka.consumer.CustomConsumerDemo
ConsumerRecord(topic = first01, partition = 2, leaderEpoch = 2, offset = 24, CreateTime = 1640754679164, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = sfa1)
ConsumerRecord(topic = first01, partition = 2, leaderEpoch = 2, offset = 25, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = sfa3)
ConsumerRecord(topic = first01, partition = 2, leaderEpoch = 2, offset = 26, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = sfa5)
ConsumerRecord(topic = first01, partition = 2, leaderEpoch = 2, offset = 27, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = sfa7)
ConsumerRecord(topic = first01, partition = 2, leaderEpoch = 2, offset = 28, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = sfa9)
ConsumerRecord(topic = first01, partition = 2, leaderEpoch = 2, offset = 29, CreateTime = 1640754679166, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = sfa11)
ConsumerRecord(topic = first01, partition = 0, leaderEpoch = 2, offset = 213, CreateTime = 1640754679156, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = lqs0)
ConsumerRecord(topic = first01, partition = 0, leaderEpoch = 2, offset = 214, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = lqs2)
ConsumerRecord(topic = first01, partition = 0, leaderEpoch = 2, offset = 215, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = lqs4)
ConsumerRecord(topic = first01, partition = 0, leaderEpoch = 2, offset = 216, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = lqs6)
ConsumerRecord(topic = first01, partition = 0, leaderEpoch = 2, offset = 217, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = lqs8)
ConsumerRecord(topic = first01, partition = 0, leaderEpoch = 2, offset = 218, CreateTime = 1640754679166, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = lqs10)

手撸kafka producer生产者的分区器(partition)API相关推荐

  1. 【kafka】Kafka Producer Sticky Partitioner kafka 生产者 粘性 分区器

    1.概述 转载:[译]Kafka Producer Sticky Partitioner 最近事情多有点犯懒,依然带来一篇译文:Apache Kafka Producer Improvements w ...

  2. Kafka Producer生产者原理

    producer是线程安全的. 两个线程 生产者客户端由两条线程协调运行,分别是主线程和sender线程(发送线程). 主线程通过KafkaProducer创建消息 (ProducerRecord) ...

  3. kafka自定义生产者分区器、自定义消费者分区器

    目录 1 默认分区 1.1 键key的作用 1.2 键的分区 2 生产者自定义分区 2.1 使用场景分析 2.2 自定义分区器要实现Partitioner接口 2.3 生产者使用分区器 3 消费者自定 ...

  4. Kafka Producer拦截器

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  5. 手撸架构,Kafka 面试42问

    技术栈 传送门 JAVA 基础 手撸架构,Java基础面试100问_vincent-CSDN博客 JAVA 集合 手撸架构,JAVA集合面试60问_vincent-CSDN博客 JVM 虚拟机 手撸架 ...

  6. java kafka 设置分区_Java kafka如何实现自定义分区类和拦截器

    Java kafka如何实现自定义分区类和拦截器 2.producer配置文件指定,具体的分区类 // 具体的分区类 props.put(ProducerConfig.PARTITIONER_CLAS ...

  7. Kafka分区分配计算(分区器Partitions)

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  8. java kafka 分区_Java kafka如何实现自定义分区类和拦截器

    生产者发送到对应的分区有以下几种方式: (1)指定了patition,则直接使用:(可以查阅对应的java api, 有多种参数) (2)未指定patition但指定key,通过对key的value进 ...

  9. 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,

    目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...

最新文章

  1. 全文翻译(全文合集):TVM: An Automated End-to-End Optimizing Compiler for Deep Learning
  2. 推荐10款冷门但强大的windows软件,值得收藏!
  3. C语言 输入中文语句并按倒叙将它输出
  4. Struts2中采用Json返回List对象数据为空解决方案
  5. 中国新能源重卡行业十四五规划及投资可行性研究报告2022-2028年版
  6. ORA-00054: 资源正忙, 但指定以 NOWAIT 方式获取资源, 或者超时失效
  7. Java 答疑:JDK 11(Java 11)之后没有 JRE 目录,环境用户变量配置的解决方法
  8. SpringCloud与zuul
  9. 【知识索引】【Java程序设计】
  10. 你不知道的outerText,innerText
  11. HDU 2549 壮志难酬
  12. conda: No writeable envs directories configured.
  13. 百度注册登录页面简单实现——仿照
  14. 穷爸爸与富爸爸读后感 (2)
  15. python怎么撤销_用Python玩转微信(三)—— 查看撤回消息
  16. on duplicate key update不生效_万粉盛典amp;六周年庆|双十一提前嗨!惠玩惠购不做尾款人!...
  17. mysql每组结果分页显示_SQL语句mysql分组统计并对每组分页
  18. 【Vscode】tab键失效的解决方案
  19. STM32智能门锁之调试步进电机
  20. 带标签的infogan及其代码解析

热门文章

  1. 二分法求方程的根(Python)
  2. GSMA 2019“与CTIA合作的MWC洛杉矶”巩固其作为行业领先展会的地位
  3. 手机有线网络测试软件,iOS-用手机网络测试Ipv6
  4. go-pitaya学习笔记(5)-clusterdemo分析2
  5. 『ORACLE』安装oracle(11g)
  6. 19.第二十六章上.法律法规
  7. Mac上为VS Code配置Python运行环境及matplotlib画图示例
  8. 来来来,用 C++ 写个有限状态机(一)
  9. 舒舍说:在北京租房,有什么好物强烈推荐?
  10. ERROR: In D:\lib\VTK-7.0.0\Rendering\OpenGL2\vtkOpenGLRenderWindow.cxx, line 545