Kafka最重要的功能之一是实现消息的负载平衡,并保证分布式集群中的排序,否则传统队列中将无法实现。

首先让我们尝试了解问题陈述

让我们假设我们有一个主题,其中发送消息,并且有一个消费者正在使用这些消息。
如果只有一个使用者,它将按消息在队列中的顺序或发送的顺序接收消息。

现在,为了获得更高的性能,我们需要更快地处理消息,因此我们引入了消费者应用程序的多个实例。

如果消息包含任何状态,则将导致问题。

让我们尝试通过一个例子来理解这一点:

如果对于特定的消息ID,我们有3个事件:
第一:创建
第二:更新 第三:删除 我们要求仅在消息的“创建”事件之后才处理消息的“更新”或“删除”事件。 现在,如果两个单独的实例几乎同时获得相同消息的“ CREATE”和“ UPDATE”,则即使另一个实例完成“ CREATE”消息之前,带有“ UPDATE”消息的实例仍有机会尝试对其进行处理。 。 这可能是一个问题,因为使用者将尝试更新尚未创建的消息,并且将引发异常,并且此“更新”可能会丢失。

可能的解决方案

我想到的第一个解决方案是对数据库的乐观锁定,这可以防止这种情况,但是随后需要适应异常情况。 这不是一个非常简单的方法,可能涉及更多的锁定和要处理的并发问题。

另一个更简单的解决方案是,如果特定ID的消息/事件总是转到特定实例,因此它们将是有序的。 在这种情况下,CREATE将始终在UPDATE之前执行,因为这是发送它们的原始顺序。

这就是卡夫卡派上用场的地方。

Kafka在主题内具有“分区”的概念,该概念既可以提供订购保证,又可以在整个消费者流程中提供负载平衡。

每个分区都是有序的,不可变的消息序列,该消息序列被连续附加到提交日志中。 分区中的每个消息均分配有一个顺序ID号,称为偏移量,它唯一地标识分区中的每个消息。

因此,一个主题将具有多个分区,每个分区都保持自己的偏移量。
现在,要确保将具有特定id的事件始终转到特定实例,可以执行以下操作:如果将每个使用者与特定分区绑定,然后确保具有特定id的所有事件和消息始终转到特定实例,则可以完成此操作。特定分区,因此它们始终由同一使用者实例使用。

为了实现此分区,Kafka客户端API为我们提供了两种方法:
1)定义用于分区的键,该键将用作默认分区逻辑的键。
2)编写一个Partitioning类来定义我们自己的分区逻辑。

让我们探索第一个:

默认分区逻辑

默认的分区策略是hash(key)%numPartitions 。 如果键为null,则选择一个随机分区。 所以,如果我们要为分区键是一个特定属性,我们需要将它传递在ProducerRecord构造而从发送消息Producer

让我们来看一个例子:

注意:要运行此示例,我们需要具备以下条件:
1.运行Zookeeper(在localhost:2181)
2.运行Kafka(位于localhost:9092) 3.创建一个带有3个分区的名为“ TRADING-INFO”的主题。(为简单起见,我们可以只有一个代理。) 要完成以上三个步骤,请遵循此处的文档。

假设我们正在发送有关“ TRADING-INFO”主题的交易信息,该信息由消费者消费。

1.贸易舱

(注意:我在这里使用过Lombok )

@Data
@Builder
public class Trade {private String id;private String securityId;private String fundShortName;private String value;
}

2. Kafka客户端依赖

为了制作一个Kafka Producer,我们需要包含Kafka依赖项:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>0.10.0.0</version></dependency>

卡夫卡制片人

public class Producer {public static void main(String[] args) {final String TOPIC = "TRADING-INFO";KafkaProducer kafkaProducer = new KafkaProducer(getProducerProperties());Runnable task1 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ABCD", 1, 5);Runnable task2 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "PQ1234@1211111111111", 6, 10);Runnable task3 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ZX12345OOO", 11, 15);ExecutorService executorService = Executors.newFixedThreadPool(3);executorService.submit(task1);executorService.submit(task2);executorService.submit(task3);executorService.shutdown();}private static void sendTradeToTopic(String topic, KafkaProducer kafkaProducer, String securityId, int idStart, int idEnd) {for (int i = idStart; i <= idEnd; i++) {Trade trade = Trade.builder().id(i).securityId(securityId).value("abcd").build();try {String s = new ObjectMapper().writeValueAsString(trade);kafkaProducer.send(new ProducerRecord(topic, trade.getSecurityId(), s));System.out.println("Sending to " + topic + "msg : " + s);} catch (JsonProcessingException e) {e.printStackTrace();}}}private static Properties getProducerProperties() {Properties props = new Properties();String KAFKA_SERVER_IP = "localhost:9092";props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_IP);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return props;}}

消费者

public class TConsumer {public static void main(String[] args) {final String TOPIC = "TRADING-INFO";final String CONSUMER_GROUP_ID = "consumer-group";KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(getConsumerProperties(CONSUMER_GROUP_ID));kafkaConsumer.subscribe(Arrays.asList(TOPIC));while(true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);consumerRecords.forEach(e -> {System.out.println(e.value());});}}private static Properties getConsumerProperties(String consumerGroupId) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", consumerGroupId);props.put("key.deserializer", StringDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());return props;}
}

因为我们有3个分区,所以我们将运行3个Consumer实例。

现在,当我们使用不同的线程运行生产者时,生成具有3种“安全类型”消息的消息,这是我们的关键。 我们将看到,特定的实例总是迎合特定的“安全类型”,因此将能够按顺序处理消息。

产出

消费者1:

{"id":1,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":2,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":3,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":4,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":5,"securityId":"ABCD","fundShortName":null,"value":"abcd"}

消费者2:

{"id":6,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":7,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":8,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":9,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":10,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}

消费者3:

{"id":11,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":12,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":13,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":14,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":15,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}

因此,这里的3种类型的“ securityIds”生成了不同的哈希值,因此被分配到不同的分区中,从而确保一种交易类型始终用于特定实例。

现在,如果我们不想使用默认的分区逻辑,并且我们的场景更加复杂,我们将需要实现自己的Partitioner,在下一个博客中,我将解释如何使用它以及它如何工作。

翻译自: https://www.javacodegeeks.com/2016/08/achieving-order-guarnetee-kafka-partitioning.html

通过分区在Kafka中实现订单保证人相关推荐

  1. 干趴面试官系列 | 请你简述一下Kafka中的分区分配

    欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-basic-knowledge-of-partition-assignors/ "请你简述一下Kafka ...

  2. kafka中生产者和消费者的分区问题

    本文来书说下kafka中生产者和消费者的分区问题 文章目录 概述 主题的分区数设置 分区与生产者 分区与消费者 range roundrobin(轮询) 本文参考 本文小结 概述 我们知道,生产者发送 ...

  3. kafka中Topic、Partition、Groups、Brokers概念辨析

    kafka消息队列有两种消费模式,分别是点对点模式和订阅/发布模式.具体比较可以参考Kafka基础–消息队列与消费模式. 下图是一个点对点的Kafka结构示意图,其中有以下几个部分: producer ...

  4. kafka中controller的作用_Kafka 常见问题汇总

    Kafka 如何做到高吞吐.低延迟呢? 这里提下 Kafka 写数据的大致方式:先写操作系统的页缓存(Page Cache),然后由操作系统自行决定何时刷到磁盘. 因此 Kafka 达到高吞吐.低延迟 ...

  5. Kafka科普系列 | 原来Kafka中的选举有这么多?

    欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-basic-knowledge-of-selection/ 面试官在考查你Kafka知识的时候很可能会故弄玄虚的问 ...

  6. Kafka科普系列 | 轻松理解Kafka中的延时操作

    欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-basic-knowledge-of-delay-operation/ 本文起源于之前去面试的一道面试题,面试题大 ...

  7. 关于kafka中的timestamp与offset的对应关系

    关于kafka中的timestamp与offset的对应关系 @(KAFKA)[storm, kafka, 大数据] 关于kafka中的timestamp与offset的对应关系 获取单个分区的情况 ...

  8. kafka 发布订阅_在Kafka中发布订阅模型

    kafka 发布订阅 这是第四个柱中的一系列关于同步客户端集成与异步系统( 1, 2, 3 ). 在这里,我们将尝试了解Kafka的工作方式,以便正确利用其发布-订阅实现. 卡夫卡概念 根据官方文件 ...

  9. kafka中topic默认属性_分享:Kafka 的 Lag 计算误区及正确实现

    前言 消息堆积是消息中间件的一大特色,消息中间件的流量削峰.冗余存储等功能正是得益于消息中间件的消息堆积能力.然而消息堆积其实是一把亦正亦邪的双刃剑,如果应用场合不恰当反而会对上下游的业务造成不必要的 ...

最新文章

  1. oracle文字与格式字符串不匹配的解决
  2. 计算机英语阅读理解,2017年12月英语四级阅读理解50篇:学习计算机
  3. 2009从知到行知识管理培训公开课最后一期
  4. ML.NET 1.4 发布,跨平台机器学习框架
  5. C++——《算法分析》实验肆——单源最短路径问题
  6. easypoi 如何合并相同的列_easy_poi合并行以及样式调整
  7. LogStash实现MySQL数据增量同步到ElasticSearch
  8. (十六)K-Means聚类
  9. 2.SQL里的聚合函数
  10. 内的图标_从零开始画图标系列:线性图标设计实战演示!
  11. Visual C++ 2010 (中文)学习版 安装教程
  12. 字节跳动前端开发面试题总结,需要的小伙伴来看!
  13. 【架构干货】京东是如何抗住今年春晚百亿次互动的?
  14. The Top 5 cloud security threats presented by Mark Russinovich
  15. 把Wordpress集成到zen-cart里方法 各种修改 经典机制
  16. 微前端 - qiankun
  17. 被陆奇文章刷屏了,细思极恐
  18. 英国哈德斯菲尔德大学留学生本科未毕业如何将留学路进行到底
  19. 信号时频域分析 ——EMD/BEMD/LMD 算法原理
  20. 【计算机考研必备常识】24考研你开始准备了吗?

热门文章

  1. Spring @Required 注释
  2. JS获取自定义属性data-*值与dataset
  3. hashCode到底有什么用?
  4. ssm创建一个查询接口
  5. java中随机生成26个字母组合的随机验证码
  6. myeclipse如何换一个漂亮的主题
  7. mybatis简单案例源码详细【注释全面】——实体层(User.java)
  8. String转Double
  9. 四足爬行机器人运动_有自我意识机器人横空出世,还能自我复制,专家表示需警惕其失控...
  10. docker 买了腾讯服务器后的学习