这是第四个柱中的一系列关于同步客户端集成与异步系统( 1, 2, 3 )。 在这里,我们将尝试了解Kafka的工作方式,以便正确利用其发布-订阅实现。

卡夫卡概念

根据官方文件 :

Kafka是一种分布式的,分区的,复制的提交日志服务。 它提供消息传递系统的功能,但具有独特的设计。

Kafka作为集群运行,这些节点称为代理。 代理可以是领导者或副本,以提供高可用性和容错能力。 代理负责分区,分区是存储消息的分发单元。 这些消息是有序的,可以通过名为offset的索引进行访问。 一组分区构成一个主题,是消息的提要。 一个分区可以有不同的使用者,它们使用自己的偏移量访问消息。 生产者将消息发布到Kafka主题中。 Kafka文档中的以下图表可以帮助您理解以下内容:

排队与发布-订阅

消费者群体是另一个关键概念,有助于解释为什么Kafka比RabbitMQ等其他消息传递解决方案更灵活,功能更强大。 消费者与消费者群体相关联。 如果每个使用者都属于同一个使用者组,则主题的消息将在各个使用者之间平均负载均衡; 这就是所谓的“排队模型”。 相反,如果每个使用者都属于不同的使用者组,则所有消息都将在每个客户端中使用。 这就是所谓的“发布-订阅”模型。

您可以混合使用这两种方法,分别针对不同的需求使用不同的逻辑使用者组,并在每个组中有多个使用者以通过并行提高吞吐量。 同样, Kafka文档中的另一个图表:

了解我们的需求

正如我们在以前的文章(见1, 2, 3 )该项目服务发布消息到卡夫卡的话题叫item_deleted 。 此消息将位于该主题的一个分区中。 为了定义消息将驻留在哪个分区中,Kafka提供了三种选择 :

  • 如果记录中指定了分区,请使用它
  • 如果未指定分区但存在键,则根据键的哈希值选择一个分区
  • 如果没有分区或密钥,则以循环方式选择一个分区

我们将使用item_id作为密钥。 执法服务的不同实例中包含的消费者仅对特定分区感兴趣,因为他们保留某些商品的内部状态。 让我们检查不同的Kafka使用者实现,以了解哪种使用最方便。

卡夫卡消费者

卡夫卡共有三个消费者: 高级消费者 , 简单消费者和新消费者

在这三个消费者中, 简单消费者在最低级别上运行。 它满足我们的要求,因为它允许消费者“在流程中仅使用主题中分区的子集”。 但是,正如文档所述:

SimpleConsumer确实需要使用者组中不需要的大量工作:

  • 您必须跟踪应用程序中的偏移量,才能知道从何处停止消费
  • 您必须确定哪个Broker是主题和分区的主要Broker。
  • 您必须处理经纪人负责人变更

如果您阅读了建议的用于处理这些问题的代码,则将不鼓励您使用此使用者。

新使用者提供正确的抽象级别,并允许我们订阅特定的分区。 他们在文档中建议以下用例:

第一种情况是,如果进程正在维护与该分区关联的某种本地状态(例如本地磁盘上的键值存储),因此该进程应仅获取其在磁盘上维护的分区的记录。

不幸的是,我们的系统使用的是Kafka 0.8,而该使用者仅从0.9开始可用。 我们没有足够的资源来迁移到该版本,因此我们需要坚持使用高级消费者

该使用者提供了一个不错的API,但不允许我们订阅特定的分区。 这意味着,执法服务的每个实例都将使用每条消息,即使那些无关的消息也是如此。 我们可以通过为每个实例定义不同的消费者组来实现。

利用Akka Event Bus

在上一篇文章中,我们定义了一些等待ItemDeleted消息的有限状态机ItemDeleted

when(Active) {case Event(ItemDeleted(item), currentItemsToBeDeleted@ItemsToBeDeleted(items)) =>val newItemsToBeDeleted = items.filterNot(_ == item)newItemsToBeDeleted.size match {case 0 => finishWorkWith(CensorResult(Right()))case _ => stay using currentItemsToBeDeleted.copy(items = newItemsToBeDeleted)}}

我们的卡夫卡消费者可以将所有消息转发给那些演员,并让他们丢弃/过滤不相关的物品。 但是,我们不想让参与者浪费大量的冗余和低效的工作,因此我们将添加一层抽象,使他们能够以一种非常有效的方式丢弃适当的消息。

final case class MsgEnvelope(partitionKey: String, payload: ItemDeleted)class ItemDeletedBus extends EventBus with LookupClassification {override type Event = MsgEnvelopeoverride type Classifier = Stringoverride type Subscriber = ActorRefoverride protected def mapSize(): Int = 128override protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber ! event.payloadoverride protected def classify(event: Event): Classifier = event.partitionKeyoverride protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = a.compareTo(b)
}

Akka Event Bus按分区为我们提供订阅,而我们的Kafka高级消费者中缺少该分区。 我们将从卡夫卡消费者处发布每条消息到公交车上:

itemDeletedBus.publish(MsgEnvelope(item.partitionKey, ItemDeleted(item)))

在上一篇文章中,我们展示了如何使用该分区键订阅消息:

itemDeletedBus.subscribe(self, item.partitionKey)

LookupClassification将过滤不需要的消息,因此我们的参与者不会过载。

摘要

由于Kafka提供的灵活性,我们能够设计我们的系统以了解不同的折衷方案。 在接下来的文章中,我们将看到如何协调这些FSM的结果以向客户端提供同步响应。

第一部分 | 第2部分 | 第三部分

翻译自: https://www.javacodegeeks.com/2016/05/publish-subscribe-model-kafka.html

在Kafka中发布订阅模型相关推荐

  1. kafka 发布订阅_在Kafka中发布订阅模型

    kafka 发布订阅 这是第四个柱中的一系列关于同步客户端集成与异步系统( 1, 2, 3 ). 在这里,我们将尝试了解Kafka的工作方式,以便正确利用其发布-订阅实现. 卡夫卡概念 根据官方文件 ...

  2. 搭建高吞吐量 Kafka 分布式发布订阅消息 集群

    搭建高吞吐量 Kafka 分布式发布订阅消息 集群 简介 Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区. ...

  3. java实现rabbitmq发布/订阅模型(Publish/Subscribe queues), 生产者 消费者 交换机 消息队列

    发布/订阅模型又称扇出模型,或者是广播模型,可以有多个消费者,每个消费者有自己的队列,每个队列都要绑定到交换机,生产者发送的消息只需要发送到交换机,再由交换机决定要发送到哪些队列,生产者无法自行决定. ...

  4. 分布式发布订阅模型网络的实现有哪些

    大数据中常用的分布式发布订阅系统: 参考资料: WCF百科介绍 分布式"消息发布者-订阅者"模型的实现--WCF双工通讯特性的应用 [设计模式] 观察者模式(发布-订阅/Publi ...

  5. 3,ActiveMQ-入门(基于JMS发布订阅模型)

    一.Pub/Sub-发布/订阅消息传递模型 在发布/订阅消息模型中,发布者发布一个消息,该消息通过topic传递给所有的客户端.在这种模型中,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和订阅 ...

  6. vue发布订阅模式,发布订阅模型

    1.什么是发布订阅模式 (又叫做观察者模式) 他定义了对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于他的对象都将得到通知. 2.发布订阅模型 1.发布者会给订阅者提供一个方法以便 ...

  7. ros 单向通讯 talker,listener 发布订阅模型

    原文链接: ros 单向通讯 talker,listener 发布订阅模型 上一篇: VirtualBox 端口转发(端口映射) 主机和虚拟机相互访问 下一篇: python 串口编程 发布订阅模型 ...

  8. c#事件的发布-订阅模型_C# 委托和事件 与 观察者模式(发布-订阅模式)讲解 by天命...

    使用面向对象的思想 用c#控制台代码模拟猫抓老鼠 我们先来分析一下猫抓老鼠的过程 1.猫叫了 2.所有老鼠听到叫声,知道是哪只猫来了 3.老鼠们逃跑,边逃边喊:"xx猫来了,快跑啊!我是老鼠 ...

  9. Kafka(分布式发布-订阅消息系统)

    一.简介 Apache Kafka是分布式发布-订阅消息系统,在 kafka官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统. 它最初由LinkedIn公司开发,Linkedin于201 ...

最新文章

  1. springboot使用maven打包无法打进本地包解决方法
  2. python可以给excel编程吗_python – 我可以通过编程方式将matplotlib图形插入Excel吗?...
  3. 【转】3.1(译)构建Async同步基元,Part 1 AsyncManualResetEvent
  4. 聚焦效率与目标差距,数据才是远程办公的内核!
  5. USB 3.0、USB 3.1到底什么区别?
  6. 【LeetCode】10. Regular Expression Matching
  7. dpkg:处理软件包 xxx (--configure)时出错
  8. VMThread占CPU高基本上是JVM在频繁GC导致,原因基本上是冰法下短时间内创建了大量对象堆积造成频繁GC。...
  9. 2020年最好用的手机是哪一款_2020年千元机中性能最好的4款手机,印象最让你深刻的是哪一款?...
  10. 计算机二级c语言编程题库100题下载,计算机二级c语言编程题库(100题).pdf
  11. 第一节 ISM Web工业可视化组态软件简介
  12. win10系统dnf安装不上服务器,升级到Win10正式版后不能玩DNF如何解决?
  13. Linux FTP服务搭建(完整步骤)
  14. 数据结构视频教程 -《吉大刘大有主讲》
  15. 推荐几款绿色无广告良心软件
  16. JTT1078 + netty + rtmp
  17. 中英文标点符号切换的组合键_切换中英文标点快捷键
  18. Attributes as Operators
  19. 微信配置JS接口安全域名问题-Nginx配置
  20. 硬盘安装windows系统

热门文章

  1. 类和对象运行时在内存里是怎么样的?各种变量、方法在运行时是怎么交互的?
  2. eclipse xml文件中按没有提示
  3. Java 父类子类的对象初始化过程
  4. 关于HashMap容量的初始化,还有这么多学问
  5. php如何接收前端返回的各种类型的数据
  6. 虚拟机和linux的安装
  7. 两个map中的数据,按照相同键,将所对应的值相加方法
  8. mysql中的isnull
  9. 2018蓝桥杯省赛---java---C---8(等腰三角形)
  10. JavaWeb前端之AJAX的初步学习