Kafka 分区备份实战
1.概述
在 Kafka 集群中,我们可以对每个 Topic 进行一个或是多个分区,并为该 Topic 指定备份数。这部分元数据信息都是存放在 Zookeeper 上,我们可以使用 zkCli 客户端,通过 ls 和 get 命令来查看元数据信息。通过 log.dirs 属性控制消息存放路径,每个分区对应一个文件夹,文件夹命名方式为:TopicName-PartitionIndex,该文件夹下存放这该分区的所有消息和索引文件,如下图所示:
2.内容
Kafka 集群在生产消息入库的时候,通过 Key 来进行分区存储,按照相应的算法,生产分区规则,让所生产的消息按照该规则分布到不同的分区中,以达到水平扩展和负载均衡。而我们在消费这些消息的时候,可以使用多线程来消费该 Topic 下的所有分区中的消息。
分区规则的制定,通过实现 kafka.producer.Partitioner 接口,该接口我们可以进行重写,按照自己的方式去实现分区规则。如下,我们按照 Key 的 Hash 值,然后取模得到分区索引,代码如下所示:
package cn.hadoop.hdfs.kafka.partition;import kafka.producer.Partitioner;import kafka.utils.VerifiableProperties;/*** @Date Nov 3, 2016** @Author dengjie** @Note 先 Hash 再取模,得到分区索引 */public class CustomerPartitioner implements Partitioner { public CustomerPartitioner(VerifiableProperties props) {} public int partition(Object key, int numPartitions) { int partition = 0;String k = (String) key;partition = Math.abs(k.hashCode()) % numPartitions; return partition;}}
在创建 Topic 的时候,若按照上述规则创建分区,分区数最后为 Brokers 的整数倍,这样才能发挥其负载均衡的作用,比如:当前我们集群节点由 3 个 Broker 组成,如下图所示:
2.1 创建分区
我们在创建分区的时候,可以通过 Kafka 提供的客户端命令进行创建,如下,我们创建一个6分区,3备份的一个 Topic,命令如下所示:
./kafka-topics.sh --create --zookeeper k1:2181,k2:2181,k3:2181 --replication-factor 3 --partitions 6 --topic ke_test
这里需要注意的是,指定备份数的时候,备份数要小于等于 Brokers 数。否则创建失败。在创建分区的时候,假设,我们只创建 2 个分区,而我们上述图中, Brokers 有 3 个,会造成有一个 Broker 上没有该 Topic 的分区,以致分布不均。
2.2 分区入库
一般,我们在入库消息的时候,都有使用 Kafka 的 API,如下,我们使用生产 API ,按照上述的 Hash 取模规则,进行分区入库,代码如下所示:
package cn.hadoop.hdfs.kafka.partition;import java.util.List;import java.util.Properties;import cn.hadoop.hdfs.kafka.partition.data.FileRead;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;/*** @Date Nov 3, 2016** @Author dengjie** @Note 按照先 Hash 再取模的规则,进行分区入库 */public class PartitionerProducer { public static void main(String[] args) {producerData();} private static void producerData() {Properties props = new Properties();props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("metadata.broker.list", "k1:9092,k2:9092,k3:9092");props.put("partitioner.class", "cn.hadoop.hdfs.kafka.partition.CustomerPartitioner");Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));String topic = "ke_test";List<String> list = FileRead.readData(); for (int i = 0; i < list.size(); i++) {String k = "key" + i;String v = new String(list.get(i));producer.send(new KeyedMessage<String, String>(topic, k, v)); if (i == (list.size() - 1)) { return;}}producer.close();} }
这里,我们分析发现,生产者在生产消息入库时,会按照 CustomerPartitioner 的规则,进行分区入库,在入库时,将 Key 先做 Hash,然后分区数取模(这里分区数是 6).我们计算可以得到一下信息:
hashCode("key0") % 6 = 1hashCode("key1") % 6 = 2hashCode("key2") % 6 = 3hashCode("key3") % 6 = 4hashCode("key4") % 6 = 5hashCode("key5") % 6 = 0// ... 以此循环
按照该表述规则进行分区入库。
2.3 分区入库验证
接下里,我们通过 Kafka 的消费者 API 来验证,在消费时,消费 Topic 各分区的详情,代码如下所示:
package cn.hadoop.hdfs.kafka.partition;import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata;/*** @Date Nov 3, 2016** @Author dengjie** @Note 通过 Kafka 的消费者 API 验证分区入库的消息 */public class PartitionerConsumer {public static void main(String[] args) {String topic = "ke_test";ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) {MessageAndMetadata<byte[], byte[]> mam = it.next();System.out.println("consume: Partition [" + mam.partition() + "] Message: [" + new String(mam.message()) + "] ..");}}private static ConsumerConfig createConsumerConfig() {Properties props = new Properties();props.put("group.id", "group1");props.put("zookeeper.connect", "zk1:2181,zk2:2181,zk3:2181");props.put("zookeeper.session.timeout.ms", "40000");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "smallest");return new ConsumerConfig(props);} }
这里笔者只是验证消费数据,若在实际生产线上,需将上述单线程消费改造成多线程消费,来提升处理消息的能力。
2.4 验证结果
这里,我们线运行生产者,让其生产消息,并分区入库;然后,在启动消费者,消费消息验证其结果,如下图所示:
转载于:https://blog.51cto.com/12953214/1940568
Kafka 分区备份实战相关推荐
- SpringBoot整合kafka之kafka分区实战
本文来说下SpringBoot整合kafka之kafka分区实战 文章目录 准备工作 程序代码 程序测试 本文小结 准备工作 当然我们也可以不手动创建topic,在执行代码kafkaTemplate. ...
- Kafka原理+操作+实战
Kafka原理+操作+实战 前面我和大家交流了kafka的部署安装.对于部署安装这都是小意思,不值得太多的提及.重点还是需要知道kafka原理.熟练掌握kafka命令以及灵活用于kafka场景. 干货 ...
- 聊一聊Kafka分区的隐藏属性——二次归类
欢迎跳转到本文的原文链接:https://honeypps.com/mq/hidden-attribute-of-kafka-partition/ 在使用Kafka的过程中,分区是一个不可忽视的概念. ...
- Kafka分区分配策略(4)——分配的实施
接上文: 1.[Kafka分区分配策略(1)--RangeAssignor] 2.[Kafka分区分配策略(2)--RoundRobinAssignor和StickyAssignor] 3.[Kafk ...
- Kafka分区分配策略(3)——自定义分区分配策略
接上文: 1.[Kafka分区分配策略(1)--RangeAssignor] 2.[Kafka分区分配策略(2)--RoundRobinAssignor和StickyAssignor] 欢迎支持笔者新 ...
- Kafka分区分配策略(2)——RoundRobinAssignor和StickyAssignor
接上文[Kafka分区分配策略(1)--RangeAssignor] 欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔 ...
- kafka创建topic_Kafka实战宝典:一文带解决Kafka常见故障处理
Kafka自带常用工具 Kafka的bin目录下shell脚本是kafka自带的管理工具,提供topic的创建/删除/配置修改.消费者的监控.分区重载.集群健康监控.收发端TPS压测.跨机房同步等能 ...
- kafka分区机制详解
本文来说下SpringBoot整合kafka之kafka分区实战 文章目录 kafka分区机制 分区个数选择 分区写入策略 轮询策略 随机策略 按键保存策略 本文小结 kafka分区机制 分区机制是k ...
- 图解:Kafka 水印备份机制
作者 | 张乘辉 责编 | 刘静 高可用是很多分布式系统中必备的特征之一,Kafka 日志的高可用是通过基于 leader-follower 的多副本同步实现的,每个分区下有多个副本,其中只有一个是 ...
最新文章
- 三端可调稳压集成电路LM317的多种应用电路
- c++职工管理系统主函数代码
- Deepin15.7 Android8.1 编译 以及问题解决
- Android个人信息管理系统 源代码,个人信息管理系统源代码(自己写的).doc
- 【收藏】go博客 zxysilent / blog
- UDP 组播---基本概念
- ROCKOUT软件测试工程师,具透丨这才是让 iMessage 变得好玩有用的原因:iMessage App Store 详解...
- Mysql 优化(学习笔记二十)
- 【Java数据结构与算法】第二十章 Dijkstra算法和Floyd算法
- 2、AbstractApplicationContext的refresh功能概述
- Unity Transform bug
- 计算机二级真题c.doc,2018计算机二级C语言考试真题试卷汇总.doc
- 汇总!零基础到进阶Graphpad Prism完整指南!教程全方位汇总!
- [转]供应链管理方面的书籍
- NOI / 1.3编程基础之算术表达式与顺序执行——12:计算球的体积
- SAP工厂日历的应用
- sql 2008常用语法语句收集
- 【Redis】Redis的五大数据类型
- icp光谱仪的工作原理_ICP基本原理解析.ppt
- Virustotal的使用
热门文章
- LeetCode刷题(31)
- MyBatis基于Maven入门实例
- [设计模式-行为型]迭代器模式(Iterator)
- python 获取数据库字段类型_python中如何读取数据库数据类型
- js in html5,CSS-in-JS 来做的 5 件事情,一般人都不知道!
- linux+tux游戏,Linux吉祥物游戏SuperTux 0.5.0版发布 类《超级马里奥兄弟》
- linux恢复fat文件系统,使用‘fsck’修复Linux中文件系统错误的方法
- Eclipse安装后启动出现error:could not create the java machine.
- echarts树图节点垂直间距_铝模板的安装、拆除、节点、禁止做法详解
- mysql定时异地备份_MYsql 异地备份脚本