接上一篇《消息中间件核心实体(0)》,这一篇继续介绍消息中间件中的一些实体。

上一篇主要是Message、Topic、TopicMeta和Queue这样最基础的实体,这几篇介绍一些发送和消费的过程中会涉及到的实体和组件。

1. 发送

1.1 增强Message属性

Message一般只包含topic、tag、content这些属性,这些属性也是使用方在发送时会涉及到的内容。但是光有这些属性往往是不够的,比如我们会需要记录产生这条消息的Producer的信息;记录消息的产生时间和产生的IP信息等等。这些信息都是在Client中给消息附加上去的,对发送方来说是透明的,所以不会在Message实体中暴露,而是我们会增加一个实体:EnhancedMessage。

EnhancedMessage继承自Message,并会增加一些如下的属性:

  • bornTime

  • bornAddress

  • producer

  • etc

引申一点,Producer发送消息的大致过程如下:

  1. 增强Message属性,得到EnhancedMessage的实例

  2. 获取可以写入的队列(也可以理解成获取分区)

  3. 向队列写入消息(可以是队列暴露写入接口或者由专门的写入工具写入到队列中)

伪代码:

EnhancedMessage msg = enhance(message);
// 根据消息选择一个可以写入的目标队列
WritableQueue queue = router.select(msg);
// 写入消息(queue实现write方法进行写入)
Result result = queue.write(msg);// write过程
// 将消息序列化成自定义协议的网络包
Packet messagePacket = Serializer.encode(msg);
// 发送网络包
bootstrap.write(messagePacket);

上面的WritableQueue暴露了API去写入,具体实现可以是写入到网络,即远端的一个Partition。而在做单元测试或者本地测试的时候,可以覆盖write的实现,而不用真正写入到网络中,这会使代码更容易测试测试。

上面两幅图是Rocket开源版本中发送相关的一些代码,私以为这段代码非常的不优雅,读起来特别累,特别是requestHeader的各种属性设置。

这段是Rocket开源版本中真正将消息写入到网络的实现,看起来总是非常臃肿,另外不知道是如何mock这些实现以达到在本地做测试的目的的。

1.2 Queue的路由选择

发送过程中会涉及到队列的选择(分区的选择),一条消息最终会根据一定的策略落到一个分区中,这里需要一个组件来完成选择(把这个组件单独抽象出来,这样便于控制写入的目标来进行测试,抽象出来也可以由使用方来实现,这样可以按照使用方自己的场景做特定的路由)。

路由组件非常的简单,一般是Router会根据topic获取到topic的元数据(元数据包含了多有分区的信息),然后根据消息的属性或者用户的参数计算出落到哪个分区,比如可以根据用户的参数对分区总数取模来选择分区,这样可以做到将某一类消息发送到一个分区,比如同一个用户的消息或同一笔订单的不同消息。

这个组件会比较简单,但是在集成的时候需要注意一点,这个组件用户可以自己注入到Producer中来达到控制分区选择策略的目的。

RocketMQ在TopicPublishInfo中实现分区的选择,TopicPublishInfo包含了队列信息(List<MessageQueue> messageQueueList属性),笔者更倾向于抽象出独立的路由组件,以便在特定的场景用户可以自己实现路由,或者在测试时可以做到使用特定路由规则。

2. 消费

消费可以分为多种方式,从获取消息的方式上可以分为Pull和Push两种类型的Consumer;从消费消息的方式上可以分为集群消费和广播消费。这里不展开讨论各种模式的实现(以后单独会讨论Consumer该实现那些内容),会以Push模式&集群消费的Consumer为例,把消费流程中涉及到的一些组件进行介绍。

2.1 分配分区

集群消费中需要保证每个分区有且只有一个Consumer在进行消费。如果某个分区没有Consumer消费,那么使用方拿不到完整的数据;如果某个分区被两个Consumer消费,那么会产生大量的重复消息。所以这里需要实现一个分区分配策略,使在分布式环境中,每个Consumer拿到属于自己的分区,且相互交叉。下面是四个分区两个Consumer默认情况下的分配结果。

实现的策略一般是:

  1. 拿到一个Topic所有的分区,对这个列表进行排序

  2. 拿到当前所有的Consumer,对Consumer列表进行排序

  3. 根据自己所处的Consumer列表的位置和Consumer总数,从分区列表中获取对应的一部分

每个分区和Consumer都有唯一的ID,这样各自按照排序后的结果进行分配,可以达到相互不交叉且不遗漏的目的。(在Consumer总数或分区数发生变化的过程中可能分配结果不正确,这个过程是短暂的,且在消费时还会结合锁去保证分区只有一个Consumer消费,所以不会对实际消费产生影响)。

同样记住一点,这个分配策略是需要暴露出去的,系统可以默认实现集群消费和广播消费的基础策略,用户可以实现自己的分配策略注入到系统中。

2.2 消息缓存

消费端一个重要的组件是消息缓存。为了提升性能,在消费端消息的获取和消息的消费是异步的。Consumer内部有线程专门从服务端获取消息写入到消息缓存中,另外有线程从缓存中获取消息调用用户的回调接口来执行业务操作。

消息缓存除了提供基础的put和take来实现存入消息和取出消息,还需要自身容量,水位控制等配置。

本身Buffer不是很复杂的部分,但是需要考虑一些流控策略,比如Buffer使用率到多少时降低从服务端获取数据的频率。

RocketMQ中实现消息缓存由ProcessQueue实现,笔者倾向于独立出Buffer模块,另外Buffer需要提供锁,以实现顺序消费。

2.3 消费进度

还有一个重要的实体是消费进度,系统需要记录“每个”Consumer的消费进度,且这个数据需要被持久化。

消费进度需要记录某个Group对某个Topic的某个分区的消费位点。进度是按照Topic维度去组织的(持久化在服务端),结构如下:

topicgroup0cursor0、cursor1、cursor2...group1...实现的对象应该是:
class Cursors {String topic;Cursor cursor;class Cursor {String group;// 用数组来存储一个group消费的一个topic的所有分区的进度// 分区数一般情况下不会变更(变更场景很少),用数据就可以long[] cursors;}
}

Consumer可以在每一次获取消息时将消费进度提交到服务端,在服务端来更新Cursors内部的数据。

3. 结语

最近两篇内容将一些基础实体和组件简单的介绍了一下,下一篇讨论一下消息应该由Server Push给Consumer还是Consumer主动来Pull消息。

往期文章:

消息中间件核心实体(0)

消息的写入和读取流程

NameServer模块划分

Client模块划分

Broker模块划分

消息中间件架构讨论

业务方对消息中间件的需求

消息中间件中的一些概念

什么是分布式消息中间件?

欢迎关注公众号来交流MQ相关问题。

转载于:https://www.cnblogs.com/hzmark/p/mq_entity_1.html

消息中间件核心实体(1)相关推荐

  1. 基于双向LSTM和迁移学习的seq2seq核心实体识别

    http://spaces.ac.cn/archives/3942/ 暑假期间做了一下百度和西安交大联合举办的核心实体识别竞赛,最终的结果还不错,遂记录一下.模型的效果不是最好的,但是胜在" ...

  2. 开源开放 | 《大词林》开源 75 万核心实体和围绕核心实体的细粒度概念、关系列表...

    1<大词林>简介 <大词林>(http://101.200.120.155/)是由哈尔滨工业大学社会计算与信息检索研究中心推出,由我中心秦兵教授和刘铭副教授主持开发,是一个自动 ...

  3. weblogic jms消息 删除_消息队列与消息中间件概述:消息中间件核心概念与技术选型...

    什么是消息? "消息"是在两台计算机间传送的数据单位. 消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象. 什么是队列? 队列(Queue)队列是一种先进先出 ...

  4. 消息中间件的核心思想

    本文来说下消息中间件的核心思想 文章目录 传统的Http协议调用接口存在那些问题 采用多线程异步的形式实现有优缺点 消息中间件核心思想有那些 传统的Http协议调用接口存在那些问题 采用同步的形式调用 ...

  5. golang var 初始化时机_你应该知道的 Go 调度器知识:Go 核心原理 — 协程调度时机...

    点击上方蓝色"Go语言中文网"关注我们,领全套Go资料,每天学习 Go 语言 本文作者:叶不闻 原文链接:https://juejin.im/post/5dafc241f265da ...

  6. 【论文】基于特定实体的文本情感分类总结(PART I)

    最近在看一个比赛:2019 搜狐校园算法大赛,赛题说的是 给定若干文章,目标是判断文章的核心实体以及对核心实体的情感态度.每篇文章识别最多三个核心实体,并分别判断文章对上述核心实体的情感倾向(积极.中 ...

  7. 2021-7-19-OpenStack基础知识学习

    OpenStack基础知识学习 参考文献:Wolf_Coder,百度百科 1,云计算 1.1,出现原因 由亚马逊公司提出.1.随着业务增加公司内部的服务器不够使用,进行虚拟化技术->2.随着公司 ...

  8. 从中国质造到淘宝心选:CBM赋能“数造”新品牌

    CBM 是指 C2B2M 的商业模式, C是消费者,M是制造商(生产者),通过平台直接连接生产者和消费者,形成消费者需求驱动生产供给的这样一种模式.过去一年多,淘宝在CBM这种模式上进行了深入的探索和 ...

  9. DNN 数据访问策略 (转)

    经过几天断断续续的努力,这篇文章终于翻译结束,文章主要讲了DNN的数据访问策略,对于了解系统整体上是如何工作的有一定的帮助,希望能给dnn的初学者一些有用的信息.由于翻译的匆忙+水平有限,错误或不当之 ...

最新文章

  1. R语言data.table导入数据实战:data.table使用by函数进行数据分组(aggregate)
  2. python虚拟环境管理app_pyenv虚拟环境管理python多版本和软件库
  3. 蓝桥杯JAVA省赛2013-----B------4(黄金连分数)
  4. Error: Can't resolve 'babel-loader'
  5. 文章采集伪原创工具_伪原创文章技巧(如何提高伪原创文章的原创度)
  6. 基于BS模式的航材电子商务交易平台(1)
  7. word拼写检查自定义词典下载_取消或开启Word拼写检查和语法(去掉红波浪线)...
  8. visualboyadvance滤镜_研究VisualBoyAdvance的请进
  9. 《机会的数学》--陈希孺
  10. Latex输出大小写罗马数字
  11. treeset可以重复吗_社保和商业医疗险可以重复报销吗?报销攻略请收好
  12. JavaWeb前端: JavaScript 简介
  13. Python 立体图形的画法(一)
  14. Windows系统封装总结
  15. 【沙滩爱心桌面主题】_9.4
  16. 如何更换AirTag电池?
  17. android文本框带图片格式,android 带图片的文本框
  18. 【综述】对话系统中的口语理解技术
  19. RabbitMQ系列3之运行和Rabbit服务
  20. Linux df命令的使用

热门文章

  1. cocoapods的安装(这真是一个神奇的东西,每次安装的方法都不一样,而且很容易出现各种各样的错误)...
  2. [delphi]修改indy源码后重新编译
  3. 关于angularjs input上传图片前获取图片的Size 浅析
  4. 任意阶幻方(魔方矩阵)C语言实现
  5. 无处不在的container_of
  6. flask同源策略解决办法及flask-cors只允许特定域名跨域
  7. 公共交通WiFi末路?公交WiFi重挫 地铁WiFi承受盈利压力
  8. W3 Total Cache+Hacklog Remote Attachment Upyun
  9. Linux—程序包安装与管理
  10. 开发高级 Web 部件