2019独角兽企业重金招聘Python工程师标准>>>

背景

消息队列,在业务解耦、削峰填谷、流量控制、广播消息等场景下都有很好的应用,已经成为很多企业IT系统内部通信重要手段。

现有常用的开源消息中间件有RabbitMQ、Kafka、RocketMQ等,但各自有着不同的应用场景和特点,例如,Kafka注重的是消息的吞吐量,不保证消息存储的可靠性以及一致性,因此多用于日志系统数据的上报;RabbitMQ能保证消息可靠存储投递,但性能较差。

CMQ(Cloud Message Queue)是腾讯云开发的一款高可靠、高可用、高性能的分布式消息队列服务,具有低耦合、消息可靠、强一致性、可扩展性等特点,支持Push/Pull消费模型、消息回溯、延时消息、发布订阅、路由广播、消息加密等一系列功能,以满足更多的mq应用场景。

相对Kafka,CMQ更多注重消息高可靠的应用场景,例如金融、交易、订单等业务;相比RabbitMQ,CMQ在可用性和性能上做了很大的优化和提升。更详细的对比,请参考官网介绍。

本文先简单介绍CMQ底层的架构实现,然后着重结合CMQ的功能特点来介绍CMQ的实践案例,让大家快速理解和上手CMQ的开发。

底层架构

CMQ整体架构如上图所示,每个set由三个broker节点副本组成,保证消息的可靠存储以及高可用性,且基于raft算法保证数据的一致性。CMQ单个set 在CAP理论中优先保证了CP,当SET中过半数节点都正常工作时,才能进行消息的生产消费。

实践案例

一、广播拉取消息模型

CMQ支持队列(queue)和主题(topic)两种模型,如下所示:

其中,queue模型是一对一的消息拉取(pull)模式,client端主动pull消息;而topic模型,也称发布/订阅模型,是一对多的消息推送(push)模式,CMQ服务端广播消息时,根据各个订阅地址主动推送消息给client。两种模型基本能满足大部分应用场景了,对比如下:

queue模型,client端可以灵活根据自身能力去消费pull消息,消息实时性依赖client的消费速度,如果消费速度比生产速度慢,会引起大量消息堆积。

topic模型,服务端主动推送消息,消息实时性比较高,但要求client性能上能及时处理大量推送过来的消息,并且在client发生故障的时候可能会导致丢消息(有消息重发策略做基本保障)。

对于topic模型,有以下特殊场景需求:

client端想根据自身能力去pull消息

创建订阅的时候需要暴露client端的接收消息的地址,但在一些企业内网、vpc网络等特殊情况下,CMQ无法推送到,只能用pull方式获取消息。

针对以上特殊场景,CMQ结合queue和topic两种模型实现了一对多的广播拉取消息模型,如下所示:

topic的订阅者可以是一个queue实例,topic发布消息后,会自动将消息推送到queue,然后client和使用queue模型一样去消费消息即可。

# python sdk demo code: create subscription of queue protocal 
my_sub = my_account.get_subscription(topic_name, subscription_name) 
subscription_meta = SubscriptionMeta() 
subscription_meta.Endpoint = "queue1" 
subscription_meta.Protocal = "queue" 
my_sub.create(subscription_meta)

二、Pull长轮询

对于Queue模型,消费者需要pull获取消息,但问题是:消费者不知道队列什么时候有消息,只能不停轮询请求去pull,如果轮询间隔时间短,在队列长时间没有消息时会耗费消费者请求资源且效率低,如果轮询间隔时间长,则消费速度慢,消息实时性低,且造成消息大量堆积。

针对以上问题,CMQ解决方案是设计了长轮询功能。例如,假设设置队列长轮询时间为10s

当消费者pull消息时,如果队列中有消息则马上返回

如果队列暂时没有消息,消费者pull请求不会马上返回,而是会等待阻塞10s:当10s内有新的生产消息到达队列,CMQ会马上将消息投递给正在阻塞等待的消费者,消费者端感知就是阻塞的pull请求被唤醒并且收到消息返回;当10s内队列都没有消息,则请求返回告诉消费者当前队列没有消息。

# python sdk demo code: receive message through long polling 
pollingWaitSeconds = 3 
recv_msg = my_queue.receive_message(pollingWaitSeconds)

三、延时消息

CMQ提供延时消息功能:消息发送到队列后,从入队时间算起,消息在设置的延时时间后才对消费者可见,即才能被消费者消费到。延时消息功能可以很轻松实现一些定时任务的应用场景。

如上图所示,根据CMQ延迟消息功能实现的定时任务检查告警系统。

# python sdk demo code: send delayed message 
msg_body = "I am delay message" 
msg = Message(msg_body) 
delaySeconds = 3 
my_queue.send_message(msg, delaySeconds)

在此我向大家推荐一个架构学习交流群。交流学习群号: 744642380, 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良

四、消息回溯

CMQ提供类似于Kafka的消息回溯能力,已经消费删除的消息是可以通过回溯来重新消费的。目前支持指定回溯时间点,在这个时间点开始被删除的消息可以重新消费到。此功能在一些金融业务对账、业务系统重试等场景下有很好的实用性。

最大可回溯时间点 = 当前时间 - 设置的可回溯时长。消息生产时间在这个值之前的不可回溯,之后的可回溯,如下图所示:

# python sdk demo code: rewind the queue 
# backtrack one hour 
backTrackingTime = int(time.time()) - 3600 
my_queue.rewindQueue(backTrackingTime)

五、Topic路由匹配

CMQ topic模型提供类似于RabbitMQ的消息路由匹配功能,在消息广播基础上实现了消息的自动分发。

订阅者可以指定bindingKey,即路由规则,如上所示,*(星号)可以匹配一个单词,#(井号)可以匹配一个或多个单词。例如,生产者发布一个消息,且消息的路由键(routingKey)是”quick.orange.elephant”,那么该消息只会推送给消费者C1;如果routingKey=”quick.orange.rabbit”,则消息会推送给C1和C2;如果routingKey=”lazy.brown.fox”,则消息只会推送给C2。

# python sdk demo code: set topic-subscription route-rule 
my_sub = my_account.get_subscription(topic_name, subscription_name) 
subscription_meta = SubscriptionMeta() 
subscription_meta.Endpoint = "http://test.com" 
subscription_meta.Protocal = "http" 
subscription_meta.bindingKey = ['*.*.rabbit','lazy.#'] 
my_sub.create(subscription_meta)

message = Message() 
message.msgBody = "route msg test" 
my_topic.publish_message(message, 'quick.orange.rabbit')

六、超大消息传输

目前CMQ的队列消息大小最大限制为1MB,而当消息大小不超过64KB时,收发消息的最大QPS限制分别为正常的5k(有特殊需求可调整),当消息大小超过64KB而小于1MB时,CMQ不保证收发消息的QPS性能。因此,支持大于64KB的消息只是为了考虑业务偶尔传输少量大消息且不想做消息分片的应用场景。

一般来说,64KB的消息限制大小基本能满足大部分业务场景需求了,但在某些特殊场景下,消息数据大于64KB甚至大于1MB时,业务和CMQ如何支持这种超大消息的传输呢?这里有两种解决方案:

1.消息分片。类似IP数据包分片传输原理,生产者对消息分片标记后分别发送到队列,消费者从队列取出所有分片消息进行组装。个人方案如下:

每个消息body分为header和data两部分。其中,data就是原消息分片后的内容,header包含三个标记:业务指定消息的ID号,唯一记录一个消息的ID值,具有同一个ID号的消息分片才会在消费端重新组装;分片序号(从1开始),记录一个消息分片的次序编号,消费端依据分片序号依次组装消息;下一分片是否存在的标记,如果是,说明消息包还不完整,否则消息组装完毕。

由于可能存在多个消费者client,不同分片可能被不同client接收到,为了能够组装分片,需要一个集中式的地方存储所有分片并最终组装成完整的消息包,但无疑大大增加了系统设计的复杂度。

2.COS代理存储(COS是腾讯云的对象存储服务)。类似编程中的指针原理,方案如下(具体代码实现参考附件):

生产者先把超大消息的数据以文件形式上传到COS,并返回消息文件的COS URL地址;

生产者将URL地址作为消息发送到CMQ队列中;

消费者从CMQ队列中读取消息,判断消息内容是否是COS的URL地址信息,如果是,则根据URL地址从COS下载相应的消息文件,并从文件中读取出超大消息的数据。

七、消息加密传输

腾讯云提供秘钥管理服务KMS,能对数据进行安全加密。CMQ消息加密功能有以下两种方案:

1.CMQ SDK客户端加密方案。客户端发送消息时,根据设置的CMK(KMS的秘钥ID)调用KMS生成数据秘钥接口,会返回数据秘钥的明文key以及加密后的密文key,使用明文key对消息进行本地加密,然后将加密的数据和密文key作为消息 发送给CMQ;消费者接收消息时,先获取消息中的密文key,调用KMS接口解密(不必每次均调用,可做缓存)得到对应的明文key,最后根据明文key本地解密密文数据即可。具体代码实现参考附件。

2.CMQ服务端加密方案。该方案,由CMQ服务端和KMS服务打通,CMQ自动对消息加解密,用户无感知,例如,用户通过https接口发送消息,由CMQ自动加密后存储,通过https接口接收消息时,CMQ对消息自动解密后返回给用户。此功能正在开发中。

订阅者可以指定bindingKey,即路由规则,如上所示,*(星号)可以匹配一个单词,#(井号)可以匹配一个或多个单词。例如,生产者发布一个消息,且消息的路由键(routingKey)是”quick.orange.elephant”,那么该消息只会推送给消费者C1;如果routingKey=”quick.orange.rabbit”,则消息会推送给C1和C2;如果routingKey=”lazy.brown.fox”,则消息只会推送给C2。

# python sdk demo code: set topic-subscription route-rule 
my_sub = my_account.get_subscription(topic_name, subscription_name) 
subscription_meta = SubscriptionMeta() 
subscription_meta.Endpoint = "http://test.com" 
subscription_meta.Protocal = "http" 
subscription_meta.bindingKey = ['*.*.rabbit','lazy.#'] 
my_sub.create(subscription_meta)

message = Message() 
message.msgBody = "route msg test" 
my_topic.publish_message(message, 'quick.orange.rabbit')

六、超大消息传输

目前CMQ的队列消息大小最大限制为1MB,而当消息大小不超过64KB时,收发消息的最大QPS限制分别为正常的5k(有特殊需求可调整),当消息大小超过64KB而小于1MB时,CMQ不保证收发消息的QPS性能。因此,支持大于64KB的消息只是为了考虑业务偶尔传输少量大消息且不想做消息分片的应用场景。

一般来说,64KB的消息限制大小基本能满足大部分业务场景需求了,但在某些特殊场景下,消息数据大于64KB甚至大于1MB时,业务和CMQ如何支持这种超大消息的传输呢?这里有两种解决方案:

1.消息分片。类似IP数据包分片传输原理,生产者对消息分片标记后分别发送到队列,消费者从队列取出所有分片消息进行组装。个人方案如下:

每个消息body分为header和data两部分。其中,data就是原消息分片后的内容,header包含三个标记:业务指定消息的ID号,唯一记录一个消息的ID值,具有同一个ID号的消息分片才会在消费端重新组装;分片序号(从1开始),记录一个消息分片的次序编号,消费端依据分片序号依次组装消息;下一分片是否存在的标记,如果是,说明消息包还不完整,否则消息组装完毕。

由于可能存在多个消费者client,不同分片可能被不同client接收到,为了能够组装分片,需要一个集中式的地方存储所有分片并最终组装成完整的消息包,但无疑大大增加了系统设计的复杂度。

2.COS代理存储(COS是腾讯云的对象存储服务)。类似编程中的指针原理,方案如下(具体代码实现参考附件):

生产者先把超大消息的数据以文件形式上传到COS,并返回消息文件的COS URL地址;

生产者将URL地址作为消息发送到CMQ队列中;

消费者从CMQ队列中读取消息,判断消息内容是否是COS的URL地址信息,如果是,则根据URL地址从COS下载相应的消息文件,并从文件中读取出超大消息的数据。

七、消息加密传输

腾讯云提供秘钥管理服务KMS,能对数据进行安全加密。CMQ消息加密功能有以下两种方案:

1.CMQ SDK客户端加密方案。客户端发送消息时,根据设置的CMK(KMS的秘钥ID)调用KMS生成数据秘钥接口,会返回数据秘钥的明文key以及加密后的密文key,使用明文key对消息进行本地加密,然后将加密的数据和密文key作为消息 发送给CMQ;消费者接收消息时,先获取消息中的密文key,调用KMS接口解密(不必每次均调用,可做缓存)得到对应的明文key,最后根据明文key本地解密密文数据即可。具体代码实现参考附件。

2.CMQ服务端加密方案。该方案,由CMQ服务端和KMS服务打通,CMQ自动对消息加解密,用户无感知,例如,用户通过https接口发送消息,由CMQ自动加密后存储,通过https接口接收消息时,CMQ对消息自动解密后返回给用户。此功能正在开发中。

在此我向大家推荐一个架构学习交流群。交流学习群号: 744642380, 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良

转载于:https://my.oschina.net/u/3833719/blog/1801183

深入分析消息列队CMQ七大功能相关推荐

  1. 消息队列 CMQ 七大功能实践案例

    背景 消息队列,在业务解耦.削峰填谷.流量控制.广播消息等场景下都有很好的应用,已经成为很多企业IT系统内部通信重要手段. 现有常用的开源消息中间件有RabbitMQ.Kafka.RocketMQ等, ...

  2. 腾讯云分布式高可靠消息队列 CMQ 架构

    在分布式大行其道的今天,我们在系统内部.平台之间广泛运用消息中间件进行数据交换及解耦.CMQ 是腾讯云内部自研基于的高可靠.强一致.可扩展分 布式消息队列,在腾讯内部包括微信手机 QQ 业务红包.腾讯 ...

  3. 在PHP中如何使用消息列队

    /*** 消息列队服务* @author zhou.tingze* @example* -----------------------------------Create--------------- ...

  4. android 微信浮窗实现_Android实现类似qq微信消息悬浮窗通知功能

    实现方法:(需要开启悬浮窗通知权限.允许应用在其他应用上显示) 一.利用headsup 悬挂式Notification,他是5.0中新增的,也就是API中的Headsup的Notification,可 ...

  5. 阿里云消息队列 2021 新功能新特性重要里程碑

    关于阿里云消息队列更多新功能新特性及详细介绍,欢迎大家查阅官网各产品"新功能发布记录": [1]  RocketMQ: ​​https://help.aliyun.com/docu ...

  6. SIP应答消息状态码与功能

    SIP应答消息状态码与功能 类型 状态码 状态说明 临时应答(1XX) 100 Trying 正在处理中 180 Ringing 振铃 181 call being forwarder 呼叫正在前向 ...

  7. 微信公众号python人工智能回复_python实现微信机器人: 登录微信、消息接收、自动回复功能...

    安装wxpy pip install -u wxpy 登录微信 # 导入模块 from wxpy import * # 初始化机器人,扫码登陆 bot = bot() 运行以上代码,会生成一个二维码, ...

  8. java消息推送怎么实现_PHP实现的消息实时推送功能

    本文实例讲述了PHP实现的消息实时推送功能.分享给大家供大家参考,具体如下: 入口文件index.html <!DOCTYPE HTML> <html> <head> ...

  9. 微信客服为什么不能人工服务器,微信客服消息格式限制及功能

    微信客服消息格式限制及功能,不知小伙伴们有没有关注过微信客服,有没有接触过微信客服.下面小编就来为大家讲解一下微信客服消息格式限制及功能. 微信客服消息格式有什么限制? 文字:长度不得超过600字. ...

最新文章

  1. __attribute__编绎属性、关于__init、__initdata、__exit、__exitdata及类似的宏、关于initcall的一些宏定义、__setup宏的来源及使用...
  2. html5在线裁剪,HTML5 Canvas裁剪clip
  3. ASP.NET Atlas简单控件介绍——InputControl,TextBox,Button和CheckBox
  4. arcgis python 教程-按区域消除--arcgis python实例教程
  5. 微软职位内部推荐-Software Development Engineering II
  6. SharePoint 2010-在ribbon上添加表单,将默认control加到自定义group中
  7. raid读写速度对比_组建RAID 0前后的读写速度对比
  8. 发生在我们身边的灵异事件 - 发生在台湾奇萊山的一些灵异事件
  9. 一个IOS音乐播放器源码
  10. 云计算教程入门视频课件:云计算中心怎么选址?
  11. 软件开发过程与项目管理
  12. 为什么服务网页打开需要很久,点击之后计算机在处理什么,为什么我打开电脑桌面全部显示出来后要等很久才能打开连接.doc...
  13. 成功启动electron-egg项目,electron+vue的傻瓜式搭建
  14. 炸了!3年图片都没了!
  15. Linux命令详解(2) – mv
  16. 织梦php开发tags功能开发,织梦全网最新联动筛选功能的实现(单选和多选)可显示分类的文章...
  17. 使用 vue-i18n 进行Vue国际化处理,使项目切换中英文
  18. 非零基础自学Golang 第15章 Go命令行工具 15.4 注释文档(doc)
  19. 一个高性能无锁非阻塞链表队列
  20. 【技术点】数据结构--B树系列之B+树(五)

热门文章

  1. How to be a master?
  2. stm32f103——滴答定时器
  3. 实验鼠、实验猴生意火了:单价过万 利润率高过茅台
  4. html5腾讯地图api应用
  5. 实例讲解西门子 S7-1500 与75台 S7-200smart 以太网通讯,怎么做到的?
  6. bjui用框架提交form表单
  7. MnasNet:移动端模型的自动化神经架构搜索方法
  8. 读《A survey of deep learning techniques for autonomous driving》自动驾驶综述
  9. 原来order by 中也可以使用子查询
  10. 7-2 心理阴影面积