内置分区策略

1、如果指定的partition,那么直接进入该partition
2、如果没有指定partition,但是指定了key,使用key的hash选择partition
3、如果既没有指定partition,也没有指定key,使用轮询的方式进入partition

自定义Hash分区器

实现Partitioner接口中的partition方法,就是怎么分区的~
自定义hash分区器通过计算key的hashCode来进行分区.与key有关系

public class HashPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//获取指定topic的分区数目Integer partCount = cluster.partitionCountForTopic(topic);//计算分区int partition = Math.abs(key.hashCode()) % partCount;//打印key及分区,方便确认结果System.out.println("key: " + key + "  partition: " + partition);return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

之后需要在producer.properties中添加分区配置信息

partitioner.class=com.hm.kafka.day01.HashPartitioner

测试代码

object kafakaTest2 extends App {private val prop = new Properties()prop.load(kafakaTest2.getClass.getClassLoader.getResourceAsStream("producer.properties"))prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")//获取生产者对象private val producer = new KafkaProducer[Integer, String](prop)//获取记录for(x<- 1 to 10){val message: ProducerRecord[Integer, String] = new ProducerRecord("pet", x, "test"+x)producer.send(message)}producer.close()
}

运行结果
分区的结果和key有关

key: 1  partition: 1
key: 2  partition: 2
key: 3  partition: 0
key: 4  partition: 1
key: 5  partition: 2

其中自定义分区器也可以使用scala实现,更简单

class HashPartitioner extends Partitioner {override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = {val parnum: Integer = cluster.partitionCountForTopic(topic)val x: Int = key.hashCode() % parnumprintln(s"key: $key  partition $x")x}override def close(): Unit = {}override def configure(configs: util.Map[String, _]): Unit = {}
}

如何控制分区的个数? 对%x取余,x是多少,分区就是多少个.

自定义随机分区器

class Partitioner2 extends Partitioner {/*** 计算当前消息的分区号*/override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = {//先获取分区的数量val parnum: Integer = cluster.partitionCountForTopic(topic)//随机一个分区号,应该是一个0到paranum-1的自然数val x: Int = (Math.random() * parnum).toIntprintln(s"key: $key  partition $x")x}override def close(): Unit = {}override def configure(configs: util.Map[String, _]): Unit = {}
}

测试运行结果如下
随机分区就是利用随机数随机分区的,跟key没有关系

key: 1  partition 2
key: 2  partition 2
key: 3  partition 2
key: 4  partition 2
key: 5  partition 1
key: 6  partition 2
key: 7  partition 2
key: 8  partition 2
key: 9  partition 0
key: 10  partition 2

自定义轮询分区器

class Partitioner3 extends Partitioner {val atomic = new AtomicInteger()/*** 计算当前消息的分区号*/override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = {//获取分区的总数量val count: Integer = cluster.partitionCountForTopic(topic)//一个自增变量val i: Int = atomic.getAndIncrement();//自增变量堆分区总数取模得到分区号val i1: Int = i % count//打印下分区号供验证println(s"key: $key  partition $i1")//返回分区号i1}override def close(): Unit = {}override def configure(configs: util.Map[String, _]): Unit = {}
}

运行结果
可以看出,分区是轮询的,然后轮询分区和key也没有什么关系

key: 1  partition 0
key: 2  partition 1
key: 3  partition 2
key: 4  partition 0
key: 5  partition 1
key: 6  partition 2
key: 7  partition 0
key: 8  partition 1
key: 9  partition 2
key: 10  partition 0

总结

  • kafka 分区分内置分区和自定义分区
  • 内置分区又根据有无key 会有不同的分区行为. 有key的话会根据key计算分区,没有key的话就轮询计算分区
  • 自定义分区同样,有key的话可以根据key计算分区. 没有key的话可以整个随机分区或者轮询分区. (随机分区可能导致数据倾斜,还是自定义轮询分区靠谱点,没有key的情况下)

kafka内置分区及自定义分区相关推荐

  1. awk 内置变量与自定义变量

    点击上方"生信科技爱好者",马上关注真爱,请置顶或星标 作者:ghostwu 原文:https://www.cnblogs.com/ghostwu/p/9085653.html A ...

  2. Vue011_ 内置指令与自定义指令

    内置指令与自定义指令 常用内置指令 1) v:text : 更新元素的 textContent 2) v-html : 更新元素的 innerHTML 3) v-if : 如果为 true, 当前标签 ...

  3. 文件自定义变量_awk 内置变量与自定义变量

    点击上方"生信科技爱好者",马上关注真爱,请置顶或星标 作者:ghostwu 原文:https://www.cnblogs.com/ghostwu/p/9085653.html A ...

  4. java 多重注解_Java注解-元数据、注解分类、内置注解和自定义注解

    大家好,我是乐字节的小乐,上次说过了Java多态的6大特性|乐字节,接下来我们来看看Java编程里的注解. Java注解有以下几个知识点:元数据 注解的分类 内置注解 自定义注解 注解处理器 Serv ...

  5. 2字节取值范围_Java注解-元数据、注解分类、内置注解和自定义注解|乐字节

    大家好,我是乐字节的小乐,上次说过了Java多态的6大特性|乐字节,接下来我们来看看Java编程里的注解. Java注解有以下几个知识点: 元数据 注解的分类 内置注解 自定义注解 注解处理器 Ser ...

  6. AngularJS-demo - 常用命令、内置服务、自定义服务、继承

    AngularJS-demo - 常用命令.内置服务.自定义服务.继承 常用命令: ng-app.ng-controller.ng-init.ng-repeat.ng-click 内置服务: $sco ...

  7. form-create教程:给内置组件和自定义组件添加事件

    本文将介绍form-create如何给内置组件和自定义组件添加事件 form-create 是一个可以通过 JSON 生成具有动态渲染.数据收集.验证和提交功能的表单生成器.并且支持生成任何 Vue ...

  8. 大数据入门教程系列之Hive内置函数及自定义函数

    本篇文章主要介绍Hive内置函数以及自定义UDF函数和UDFT函数,自定义UDF函数通过一个国际转换中文的例子说明. 操作步骤: ①.准备数据和环境 ②.演示Hive内置函数 ③.自定义UDF函数编写 ...

  9. 内置函数、自定义函数

    编程语言的函数都可以分为两类:内置函数和自定义函数. 内置函数分为 数字相关的内置函数 # abs(x) 求绝对值print(abs(-10)) #10 """ divm ...

最新文章

  1. 有参转录组常用数据库
  2. python的最大绘图速度_Python数据可视化之高速绘图神器PyQtGraph库,强烈建议收藏...
  3. 河北工业机器人夹爪生产厂家_电动夹爪会成为“标配”吗?
  4. 【数据库原理及应用】经典题库附答案(14章全)——第十三章:面向对象程数据库系统
  5. 算法导论 第十三章 红黑树(python)-1插入
  6. 安装eclipse版本oxygen,及maven导入spring mvc项目并运行
  7. 连亏172亿,割肉卖楼,年收3700亿、闻名全球的巨头,败退中国!
  8. 《Spring Boot官方指南》28.安全
  9. c++自动抢购_小黄人汽车手机支架多功能出风口高档可充电全自动导航卡通支架2元优惠券券后价22.9元...
  10. python遍历目录压缩文件夹_zip包含一个档案文件夹,如何使用Python获取存档中每个zip的注释?...
  11. 手机app通达信添加自定义公式(分时T+0)为例子讲解
  12. 【高德地图】H5 Web端定位当前位置,获取GPS和地址信息?
  13. 等保安全计算环境之Windows(安全审计+入侵防范)(二级)
  14. Ajax请求URL的写法
  15. 计算机网络(一)网络分层及协议
  16. android自定义相机带方框,Android摄像头开发:拍照后添加相框,融合相框和图片为一副 图片...
  17. C#获取本周,上周,下周信息
  18. 税务会计实务【21】
  19. 开放后的繁荣——-揭秘韩国游戏市场
  20. 微信接口开发申请服务器,开发微信服务器接口的实例教程

热门文章

  1. mysql C where语句_MYSQL WHERE语句优化
  2. lisp语言cond和if套用_AutoCAD LISP 循环选择语句cond
  3. python实现k core算法_python实现k-近邻算法
  4. html video拖放设置,HTML5新特性以及video和audio标签和拖放笔记
  5. 唱歌如何保持高位置_高段位女生是如何做到保持新鲜感,让男朋友爱她多年如一日的?...
  6. python网站开发实例 flask_python-flask框架web服务接口开发实例
  7. oracle装一半报错要卸掉,OpenSUSE下oracle11gR2的安装卸载
  8. 鼠标悬浮显示全部内容 不然隐藏部分内容
  9. caffe matlab 重启,Matlab caffe 具体使用方法
  10. python3d绘图代码_python机器学习之3D Matplotlib绘图