卡夫卡如何分区

Kafka最重要的功能之一是实现消息的负载平衡,并保证分布式集群中的排序,否则在传统队列中是不可能的。

首先让我们尝试了解问题陈述

让我们假设我们有一个主题,其中发送消息,并且有一个消费者正在使用这些消息。
如果只有一个使用者,它将按消息在队列中的顺序或发送的顺序接收消息。

现在,为了获得更高的性能,我们需要更快地处理消息,因此我们引入了消费者应用程序的多个实例。

如果消息包含任何状态,则将导致问题。

让我们尝试通过一个例子来理解这一点:

如果对于特定的消息ID,我们有3个事件:
第一:创建
第二:更新 第三:删除 我们要求仅在消息的“创建”事件之后才处理消息的“更新”或“删除”事件。 现在,如果两个单独的实例几乎同时获得同一消息的“ CREATE”和“ UPDATE”,则即使另一个实例完成“ CREATE”消息之前,带有“ UPDATE”消息的实例仍有机会尝试对其进行处理。 。 这可能是一个问题,因为使用者将尝试更新尚未创建的消息,并且将引发异常,并且此“更新”可能会丢失。

可能的解决方案

我想到的第一个解决方案是数据库上的乐观锁,这可以防止这种情况,但是随后需要适应异常情况。 这不是一个非常简单的方法,可能涉及更多的锁定和要处理的并发问题。

另一个更简单的解决方案是,如果特定ID的消息/事件总是转到特定实例,因此它们将是有序的。 在这种情况下,CREATE将始终在UPDATE之前执行,因为这是发送它们的原始顺序。

这就是卡夫卡派上用场的地方。

Kafka在主题中具有“分区”的概念,该概念既可以提供订购保证,又可以在整个消费者流程中提供负载平衡。

每个分区都是有序的,不可变的消息序列,这些消息连续地附加到提交日志中。 分区中的每个消息均分配有一个顺序ID号,称为偏移量,该ID唯一地标识分区中的每个消息。

因此,一个主题将具有多个分区,每个分区保持各自的偏移量。
现在,要确保将具有特定id的事件始终转到特定实例,可以执行以下操作:如果我们将每个使用者与特定分区绑定在一起,然后确保具有特定id的所有事件和消息始终转到特定实例,特定分区,因此它们始终由同一使用者实例使用。

为了实现此分区,Kafka客户端API为我们提供了两种方法:
1)定义用于分区的键,该键将用作默认分区逻辑的键。
2)编写一个Partitioning类来定义我们自己的分区逻辑。

让我们探索第一个:

默认分区逻辑

默认的分区策略是hash(key)%numPartitions 。 如果键为null,则选择一个随机分区。 所以,如果我们要为分区键是一个特定属性,我们需要将它传递在ProducerRecord构造而从发送消息Producer

让我们来看一个例子:

注意:要运行此示例,我们需要具备以下条件:
1.运行Zookeeper(在localhost:2181)
2.运行Kafka(位于localhost:9092) 3.创建一个带有3个分区的名为“ TRADING-INFO”的主题。(为简单起见,我们可以只有一个代理。) 要完成以上三个步骤,请遵循此处的文档。

假设我们正在发送有关“ TRADING-INFO”主题的交易信息,该信息由消费者消费。

1.贸易舱

(注意:我在这里使用过Lombok )

@Data
@Builder
public class Trade {private String id;private String securityId;private String fundShortName;private String value;
}

2. Kafka客户端依赖

为了制作一个Kafka Producer,我们需要包含Kafka依赖项:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>0.10.0.0</version></dependency>

卡夫卡制片人

public class Producer {public static void main(String[] args) {final String TOPIC = "TRADING-INFO";KafkaProducer kafkaProducer = new KafkaProducer(getProducerProperties());Runnable task1 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ABCD", 1, 5);Runnable task2 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "PQ1234@1211111111111", 6, 10);Runnable task3 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ZX12345OOO", 11, 15);ExecutorService executorService = Executors.newFixedThreadPool(3);executorService.submit(task1);executorService.submit(task2);executorService.submit(task3);executorService.shutdown();}private static void sendTradeToTopic(String topic, KafkaProducer kafkaProducer, String securityId, int idStart, int idEnd) {for (int i = idStart; i <= idEnd; i++) {Trade trade = Trade.builder().id(i).securityId(securityId).value("abcd").build();try {String s = new ObjectMapper().writeValueAsString(trade);kafkaProducer.send(new ProducerRecord(topic, trade.getSecurityId(), s));System.out.println("Sending to " + topic + "msg : " + s);} catch (JsonProcessingException e) {e.printStackTrace();}}}private static Properties getProducerProperties() {Properties props = new Properties();String KAFKA_SERVER_IP = "localhost:9092";props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_IP);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return props;}}

消费者

public class TConsumer {public static void main(String[] args) {final String TOPIC = "TRADING-INFO";final String CONSUMER_GROUP_ID = "consumer-group";KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(getConsumerProperties(CONSUMER_GROUP_ID));kafkaConsumer.subscribe(Arrays.asList(TOPIC));while(true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);consumerRecords.forEach(e -> {System.out.println(e.value());});}}private static Properties getConsumerProperties(String consumerGroupId) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", consumerGroupId);props.put("key.deserializer", StringDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());return props;}
}

由于我们有3个分区,因此我们将运行3个Consumer实例。

现在,当我们使用不同的线程运行生产者时,生成具有3种“安全类型”消息的消息,这是我们的关键。 我们将看到,特定的实例总是迎合特定的“安全类型”,因此将能够按顺序处理消息。

产出

消费者1:

{"id":1,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":2,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":3,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":4,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":5,"securityId":"ABCD","fundShortName":null,"value":"abcd"}

消费者2:

{"id":6,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":7,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":8,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":9,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":10,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}

消费者3:

{"id":11,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":12,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":13,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":14,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":15,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}

因此,这里的3种类型的“ securityIds”生成了不同的哈希值,因此被分配到了不同的分区中,从而确保一种交易总是去往特定的实例。

现在,如果我们不想使用默认的分区逻辑并且我们的场景更加复杂,我们将需要实现自己的Partitioner,在下一个博客中,我将解释如何使用它以及它如何工作。

翻译自: https://www.javacodegeeks.com/2016/08/achieving-order-guarnetee-kafka-partitioning.html

卡夫卡如何分区

卡夫卡如何分区_通过分区在卡夫卡实现订单担保人相关推荐

  1. win10只有c盘怎么分区_磁盘分区:系统C盘空间不足怎么办?

    一般来说,驱动器字母"C"就是我们所说的系统分区.该C盘空间不足将影响您的计算机系统的性能,程序和游戏的运行速度.因此,您必须学会如何增加C盘空间让其有更多的自由空间以优化系统程序 ...

  2. parted新建分区_扩展分区及文件系统(Linux)

    操作场景 云硬盘是云上可扩展的存储设备,您可以在创建云硬盘后随时扩展其大小,以增加存储空间,同时不失去云硬盘上原有的数据. 云硬盘扩容 完成后,需要将扩容部分的容量划分至已有分区内,或者将扩容部分的容 ...

  3. sql server 分区_使用分区归档SQL Server数据

    sql server 分区 The Partition feature was introduced in the SQL Server 2005. This article is to cover ...

  4. 小区门禁卡可以复制到手机上吗_怎样把小区门禁卡复制到手机上

    展开全部 使用NFC功能 NFC是Near Field Communication缩写,即近距离无线通讯技术. 目前许多手机厂商62616964757a686964616fe78988e69d8331 ...

  5. mysql按照省市给表分区_表分区-partition

    partition分区: 设置分区限制,t0:1-10,t1:10-20,t2:20-最大值: 插入数据后可以看到topic表出现了t0.t1.t2: 按照散点值分区: 创建地区表: 建立会员表,根据 ...

  6. a卡显存检测软件_科普小课堂,A卡玩家如何轻松超频?

    超频一直以来是DIY玩家最为关注的话题,那什么是显卡超频?简单来说是通过提升显卡核心和显存的频率,使其在更高频率下运行并获得更好的性能. 现如今选购A卡的玩家是越多越多了,其中也不乏一些小白玩家,虽说 ...

  7. cad2014卡顿的解决方法_升级iOS14.1后出现卡顿、闪退?这3种方法可以解决

    随着iOS14.1正式版的推出,大家对于该版本有着很高的关注度,毕竟这是iOS14版本第一次正式的小版本更新,同时也将是新机iPhone12系列的预搭载版本. 但是,随着体验了几天的iOS14.1之后 ...

  8. 电脑突然卡主动不了了_必看!电脑运行卡或软件卡死无响应,怎么办?

    你是否遇到过以下情况:1.电脑突然死机,鼠标都动不了了:2.玩LOL游戏,突然就卡的要死,画面都转换不过来:3.打开一软件,比如:PS.视频播放器等,突然就打不开了,一直卡在那里不动了. 以上情况,相 ...

  9. python做马尔科夫模型预测法_隐马尔可夫模型的前向算法和后向算法理解与实现(Python)...

    前言 隐马尔可夫模型(HMM)是可用于标注问题的统计学习模型,描述由隐藏的马尔可夫链随机生成观测序列的过程,属于生成模型. 马尔可夫模型理论与分析 参考<统计学习方法>这本书,书上已经讲得 ...

最新文章

  1. Informix IDS 11体系操持(918测验)认证指南,第 4 部门: 机能调优(1)
  2. Rational RequisitePro
  3. 5二代配什么主板最好_新教育5:父母什么时间陪伴孩子最好
  4. java基础学习(5)疯狂java讲义第4章课后习题解答源码
  5. NIO中的ByteBuffer读取中文错误的解决方法:MalformedInputException
  6. 徐耀赐教授系列讲座——车道宽度理论在城市道路路网中的应用(编译文本)...
  7. 节理玫瑰花图怎么画_什么软件能便捷地绘制出节理玫瑰花图、水系玫瑰花图?...
  8. LeetCode刷题(158)~从尾到头打印链表【递归|辅助栈】
  9. 赣南师范大学数学与计算机科学学院张志超,张志超 - 南京信息工程大学 - 数学与统计学院...
  10. 程序员必备的5个工作技能
  11. 继续教育统考英语计算机监考严吗,网络教育统考监考严吗
  12. android关于 子控件超出父控件范围的注意点
  13. 解决ERROR: distribution port 25672 in use by another node: rabbit@
  14. CSS 为图片 增加边框效果
  15. 较为精细的陆地和海洋掩膜
  16. Redis6.0以后版本安装报错问题
  17. PCB电路板3D模型3D渲染思路
  18. 软件功能介绍之(数据维护)3.1数据编辑(1)
  19. 为什么要创建对象(实例化)?
  20. c开源hash项目 uthash的用法总结

热门文章

  1. CF613D-Kingdom and its Cities【虚树,LCA,树链剖分,贪心】
  2. Comet OJ(Contest #8)-D菜菜种菜【树状数组,指针】
  3. Codeforces1045I
  4. Spring Cloud 升级最新 Finchley 版本,踩了所有的坑
  5. art-template入门(三)之语法
  6. 史上最全Redis面试题
  7. springboot从控制器请求至页面时js失效的解决方法
  8. 《四世同堂》金句摘抄(十二)
  9. java实现人脸识别源码【含测试效果图】——实体类(Users)
  10. javaWeb服务详解(含源代码,测试通过,注释) ——applicationContext-dao.xml