一、背景

由于公司之前使用的队列中间件是kafka,近期变更为使用阿里的RocketMQ,所以对RocketMQ进行一下简单的知识整理。后续研究其内部原理后,再来一篇深入理解。

二、说明

消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

资料整理来源:阿里云官网

三、系统部署架构

如图:

Name Server:是一个几乎无状态节点,可集群部署,在消息队列 RocketMQ 版中提供命名服务,更新和发现 Broker 服务。

Broker:消息中转角色,负责存储消息,转发消息。分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。

生产者:与 Name Server 集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长链接,且定时向 Master Broker 发送心跳。

消费者:与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。

四、常用应用场景:

削峰填谷

诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列 RocketMQ 版可提供削峰填谷的服务来解决该问题。

异步解耦

交易系统作为淘宝/天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列 RocketMQ 版可实现异步通信和应用解耦,确保主站业务的连续性。

顺序收发

细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出(First In First Out,缩写 FIFO)原理类似,消息队列 RocketMQ 版提供的顺序消息即保证消息 FIFO。

分布式事务一致性

交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列 RocketMQ 版的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。

大数据分析

数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列 RocketMQ 版与流式计算引擎相结合,可以很方便的实现将业务数据进行实时分析。

分布式缓存同步

天猫双 11 大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因为带宽瓶颈限制商品变更的访问流量,通过消息队列 RocketMQ 版构建分布式缓存,实时通知商品数据的变化。

更多适用场景示例参考官网:适用场景

五、名词详解

Topic & Tag

Topic:消息主题,一级消息类型,通过 Topic 对消息进行分类。
Tag:消息标签,二级消息类型,用来进一步区分某个 Topic 下的消息分类。
在消息队列 RocketMQ 版中,Topic 与 Tag 都是业务上用来归类的标识,区分在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。


消息(Message)

消息队列中信息传递的载体。


Message ID

消息的全局唯一标识,由消息队列 RocketMQ 版系统自动生成,唯一标识某条消息。


Message Key

消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。


Producer

消息生产者,也称为消息发布者,负责生产并发送消息。


Producer 实例

Producer 的一个对象实例,不同的 Producer 实例可以运行在不同进程内或者不同机器上。Producer 实例线程安全,可在同一进程内多线程之间共享。


Consumer

消息消费者,也称为消息订阅者,负责接收并消费消息。


Consumer 实例

Consumer 的一个对象实例,不同的 Consumer 实例可以运行在不同进程内或者不同机器上。一个 Consumer 实例内配置线程池消费消息。


Group

一类 Producer 或 Consumer,这类 Producer 或 Consumer 通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。


Group ID

Group 的标识。


队列

每个 Topic 下会由一到多个队列来存储消息。每个 Topic 对应队列数与消息类型以及实例所处地域(Region)相关。


Exactly-Once 投递语义

Exactly-Once 投递语义是指发送到消息系统的消息只能被 Consumer 处理且仅处理一次,即使 Producer 重试消息发送导致某消息重复投递,该消息在 Consumer 也只被消费一次。

消息队列 RocketMQ 版的 Exactly-Once 投递语义适用于接收消息 > 处理消息 > 结果持久化到数据库的流程,能够保证您的每一条消息消费的最终处理结果写入到您的数据库有且仅有一次,保证消息消费的幂等。

详细接入和使用步骤可参考官方文档:https://help.aliyun.com/document_detail/102777.html?spm=a2c4g.11186623.2.15.77c73c01GofM0A#multiTask17923


集群消费

一个 Group ID 所标识的所有 Consumer 平均分摊消费消息。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。
适用场景:适用于消费端集群化部署,每条消息只需要被处理一次的场景。此外,由于消费进度在服务端维护,可靠性更高。

具体配置;

    // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);

TCP协议接入示例:

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;import java.util.Properties;public class ConsumerTest {public static void main(String[] args) {Properties properties = new Properties();// 您在控制台创建的 Group IDproperties.put(PropertyKeyConst.GROUP_ID, "XXX");// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建properties.put(PropertyKeyConst.AccessKey, "XXX");// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建properties.put(PropertyKeyConst.SecretKey, "XXX");// 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");// 集群订阅方式 (默认)properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);// 广播订阅方式// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个 Tagpublic Action consume(Message message, ConsumeContext context) {System.out.println("Receive: " + message);return Action.CommitMessage;}});//订阅另外一个 Topic,如需取消订阅该 Topic,请删除该部分的订阅代码,重新启动消费端即可consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //订阅全部 Tagpublic Action consume(Message message, ConsumeContext context) {System.out.println("Receive: " + message);return Action.CommitMessage;}});consumer.start();System.out.println("Consumer Started");}
}

HTTP 协议接入:
(HTTP 协议目前仅支持集群消费)


广播消费

一个 Group ID 所标识的所有 Consumer 都会各自消费某条消息一次。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。详情请参见集群消费和广播消费。

适用场景:适用于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。
注意事项:
  • 广播消费模式下不支持顺序消息。
  • 广播消费模式下不支持重置消费位点。
  • 每条消息都需要被相同订阅逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复消费的概率稍大于集群模式。
  • 广播模式下,消息队列 RocketMQ 版保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。
  • 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  • 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 版控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
    // 广播订阅方式设置properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

定时消息 & 延时消息

定时消息:Producer 将消息发送到消息队列 RocketMQ 版服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息。
延时消息:Producer 将消息发送到消息队列 RocketMQ 版服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。

定时消息与延时消息在代码配置上存在一些差异,但是最终达到的效果相同:消息在发送到消息队列 RocketMQ 版服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。

适用场景
  • 消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
  • 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息。
注意事项

定时和延时消息的 msg.setStartDeliverTime 参数需要设置成当前时间戳之后的某个时刻(单位毫秒)。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
定时和延时消息的 msg.setStartDeliverTime 参数可设置 40 天内的任何时刻(单位毫秒),超过 40 天消息发送将失败。
StartDeliverTime 是服务端开始向消费端投递的时间。 如果消费者当前有消息堆积,那么定时和延时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。
由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差。
设置定时和延时消息的投递时间后,依然受 3 天的消息保存时长限制。
例如,设置定时消息 5 天后才能被消费,如果第 5 天后一直没被消费,那么这条消息将在第 8 天被删除。

代码实现

发定时消息 (TCP)


import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;import com.aliyun.openservices.ons.api.SendResult;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Properties;public class ProducerDelayTest {public static void main(String[] args) {Properties properties = new Properties();// 阿里云身份验证,在阿里云服务器管理控制台创建properties.put(PropertyKeyConst.AccessKey, "XXX");// 阿里云身份验证,在阿里云服务器管理控制台创建properties.put(PropertyKeyConst.SecretKey, "XXX");// 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");Producer producer = ONSFactory.createProducer(properties);// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可。producer.start();Message msg = new Message( //// Message 所属的 Topic"Topic",// Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 版的服务器过滤"tag",// Message Body 可以是任何二进制形式的数据,消息队列 RocketMQ 版不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式"Hello MQ".getBytes());// 设置代表消息的业务关键属性,请尽可能全局唯一// 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。// 注意:不设置也不会影响消息正常收发msg.setKey("ORDERID_100");try {// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如 2016-03-07 16:21:00 投递。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();msg.setStartDeliverTime(timeStamp);// 发送消息,只要不抛异常就是成功SendResult sendResult = producer.send(msg);System.out.println("Message Id:" + sendResult.getMessageId());} catch (Exception e) {// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());e.printStackTrace();}// 在应用退出前,销毁 Producer 对象// 注意:如果不销毁也没有问题producer.shutdown();}
}       

发延时消息 (TCP)


import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;public class ProducerDelayTest {public static void main(String[] args) {Properties properties = new Properties();// AccessKeyId 阿里云身份验证,在阿里云服务器管理控制台创建properties.put(PropertyKeyConst.AccessKey, "XXX");// AccessKeySecret 阿里云身份验证,在阿里云服务器管理控制台创建properties.put(PropertyKeyConst.SecretKey, "XXX");// 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");Producer producer = ONSFactory.createProducer(properties);// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可。producer.start();Message msg = new Message( //// 您在控制台创建的 Topic"Topic",// Message Tag, 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 版服务器过滤"tag",// Message Body 可以是任何二进制形式的数据,消息队列 RocketMQ 版不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式"Hello MQ".getBytes());// 设置代表消息的业务关键属性,请尽可能全局唯一。// 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。// 注意:不设置也不会影响消息正常收发msg.setKey("ORDERID_100");try {// 延时消息,单位毫秒(ms),在指定延迟时间(当前时间之后)进行投递,例如消息在 3 秒后投递long delayTime = System.currentTimeMillis() + 3000;// 设置消息需要被投递的时间msg.setStartDeliverTime(delayTime);SendResult sendResult = producer.send(msg);// 同步发送消息,只要不抛异常就是成功if (sendResult != null) {System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());}} catch (Exception e) {// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());e.printStackTrace();}// 在应用退出前,销毁 Producer 对象// 注意:如果不销毁也没有问题producer.shutdown();}
}           

发定时消息(Http)


import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;import java.util.Date;public class Producer {public static void main(String[] args) {MQClient mqClient = new MQClient(// 设置HTTP接入域名(此处以公共云生产环境为例)"${HTTP_ENDPOINT}",// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建"${ACCESS_KEY}",// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建"${SECRET_KEY}");// 所属的 Topicfinal String topic = "${TOPIC}";// Topic所属实例ID,默认实例为空final String instanceId = "${INSTANCE_ID}";// 获取Topic的生产者MQProducer producer;if (instanceId != null && instanceId != "") {producer = mqClient.getProducer(instanceId, topic);} else {producer = mqClient.getProducer(topic);}try {// 循环发送4条消息for (int i = 0; i < 4; i++) {TopicMessage pubMsg;if (i % 2 == 0) {// 普通消息pubMsg = new TopicMessage(// 消息内容"hello mq!".getBytes(),// 消息标签"A");// 设置属性pubMsg.getProperties().put("a", String.valueOf(i));// 设置KEYpubMsg.setMessageKey("MessageKey");} else {pubMsg = new TopicMessage(// 消息内容"hello mq!".getBytes(),// 消息标签"A");// 设置属性pubMsg.getProperties().put("a", String.valueOf(i));// 定时消息, 定时时间为10s后pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);}// 同步发送消息,只要不抛异常就是成功TopicMessage pubResultMsg = producer.publishMessage(pubMsg);// 同步发送消息,只要不抛异常就是成功System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());}} catch (Throwable e) {// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);e.printStackTrace();}mqClient.close();}}

事务消息

消息队列 RocketMQ 版提供类似 X/Open XA 的分布事务功能,通过消息队列 RocketMQ 版的事务消息能达到分布式事务的最终一致。

事务消息交互流程如下:


简而言之就是:

  • 生产者将消息发送至MQ服务端,MQ服务端会将消息持久化至文件中;
  • MQ服务端向发送方返回 Ack 确认消息已经发送成功,此时发送方开始执行本地事务逻辑;
  • 发送方根据本地事务执行结果向服务端提交本地事务结果(Commit 或是 Rollback),服务端收到 Commit 状态则将事务消息标记为可投递,订阅方开始消费该消息;
注意事项
  • 事务消息的 Group ID 不能与其他类型消息的 Group ID 共用。与其他类型的消息不同,事务消息有回查机制,回查时消息队列 RocketMQ 版服务端会根据 Group ID 去查询客户端。
  • 通过 ONSFactory.createTransactionProducer 创建事务消息的 Producer 时必须指定 LocalTransactionChecker 的实现类,处理异常情况下事务消息的回查。
  • 事务消息发送完成本地事务后,可在 execute 方法中返回以下三种状态:
TransactionStatus.CommitTransaction:提交事务,允许订阅方消费该消息。
TransactionStatus.RollbackTransaction:回滚事务,消息将被丢弃不允许消费。
TransactionStatus.Unknow:暂时无法判断状态,等待固定时间以后消息队列 RocketMQ 版服务端向发送方进行消息回查。

可通过以下方式给每条消息设定第一次消息回查的最快时间:

Message message = new Message();
// 在消息属性中添加第一次消息回查的最快时间,单位秒。例如,以下设置实际第一次回查时间为 120 秒 ~ 125 秒之间
message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"120");
// 以上方式只确定事务消息的第一次回查的最快时间,实际回查时间向后浮动 0 秒 ~ 5 秒;如第一次回查后事务仍未提交,后续每隔 5 秒回查一次
事务消息具体实现

Tcp(参考官网):https://help.aliyun.com/document_detail/29548.html?spm=a2c4g.11186623.2.17.2cf565fegQjBrs#concept-2047089
http(参考官网):https://help.aliyun.com/document_detail/141784.html?spm=a2c4g.11186623.2.24.2cf565fegQjBrs#concept-2047127


顺序消息

顺序消息(FIFO 消息)是消息队列 RocketMQ 版提供的一种严格按照顺序来发布和消费的消息。顺序发布和顺序消费是指对于指定的一个 Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。
分为全局顺序消息和分区顺序消息。

注意事项:

顺序消息暂不支持广播模式。
建议同一个 Group ID 只对应一种类型的 Topic,即不同时用于顺序消息和无序消息的收发。
顺序消息不支持异步发送方式,否则将无法严格保证顺序。
对于全局顺序消息,建议至少创建 2 个 SDK 实例。同时运行多个实例,是为了防止工作实例意外退出而导致业务中断。当工作实例退出时,其他实例可以立即接手工作,不会导致业务中断,实际工作的只会有一个实例。

顺序消息收发Java接入实例:https://help.aliyun.com/document_detail/49323.html?spm=a2c4g.11186623.2.18.279e4d93Zjr69r#task-2335090

全局顺序消息

对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。

示例:

在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。


分区顺序消息

对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区。同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding Key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Message Key 是完全不同的概念。
适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照 FIFO 原则进行消息发布和消费的场景。

示例:
  • 用户注册需要发送发验证码,以用户 ID 作为 Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
  • 电商的订单创建,以订单 ID 作为 Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。

消息堆积

Producer 已经将消息发送到消息队列 RocketMQ 版的服务端,但由于 Consumer 消费能力有限,未能在短时间内将所有消息正确消费掉,此时在消息队列 RocketMQ 版的服务端保存着未被消费的消息,该状态即消息堆积。


消息过滤

Consumer 可以根据消息标签(Tag)对消息进行过滤,确保 Consumer 最终只接收被过滤后的消息类型。消息过滤在消息队列 RocketMQ 版的服务端完成。

示例代码

发送消息
发送消息时,每条消息必须指明 Tag:

    Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());

订阅所有 Tag
消费者如需订阅某 Topic 下所有类型的消息,Tag 用符号 * 表示:

consumer.subscribe("MQ_TOPIC", "*", new MessageListener() {public Action consume(Message message, ConsumeContext context) {System.out.println(message.getMsgID());return Action.CommitMessage;}});

订阅单个 Tag
消费者如需订阅某 Topic 下某一种类型的消息,请明确标明 Tag:

consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {public Action consume(Message message, ConsumeContext context) {System.out.println(message.getMsgID());return Action.CommitMessage;}});                

订阅多个 Tag
消费者如需订阅某 Topic 下多种类型的消息,请在多个 Tag 之间用 || 分隔:

    consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() {public Action consume(Message message, ConsumeContext context) {System.out.println(message.getMsgID());return Action.CommitMessage;}});

错误示例
同一个消费者多次订阅某个 Topic 下的 Tag,以最后一次订阅的 Tag 为准:

    //如下错误代码中,Consumer 只能订阅到 MQ_TOPIC 下 TagB 的消息,而不能订阅 TagA 的消息。consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {public Action consume(Message message, ConsumeContext context) {System.out.println(message.getMsgID());return Action.CommitMessage;}});consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() {public Action consume(Message message, ConsumeContext context) {System.out.println(message.getMsgID());return Action.CommitMessage;}});

订阅关系一致

订阅关系一致指的是同一个消费者 Group ID 下所有 Consumer 实例的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。
消息队列 RocketMQ 版里的一个消费者 Group ID 代表一个 Consumer 实例群组。对于大多数分布式应用来说,一个消费者 Group ID 下通常会挂载多个 Consumer 实例。
由于消息队列 RocketMQ 版的订阅关系主要由 Topic + Tag 共同组成,因此,保持订阅关系一致意味着同一个消费者 Group ID 下所有的实例需在以下两方面均保持一致:

  • 订阅的 Topic 必须一致
  • 订阅的 Topic 中的 Tag 必须一致

消息轨迹

在一条消息从 Producer 发出到 Consumer 消费处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从 Producer 发出,经由消息队列 RocketMQ 版服务端,投递给 Consumer 的完整链路,方便定位排查问题。
关于消息轨迹查询详情可参考官网:https://help.aliyun.com/document_detail/43357.html?spm=a2c4g.11186623.2.24.10dd425cmp5p9L#concept-2335151


重置消费位点

以时间轴为坐标,在消息持久化存储的时间范围内(默认 3 天),重新设置 Consumer 对已订阅的 Topic 的消费进度,设置完成后 Consumer 将接收设定时间点之后由 Producer 发送到消息队列 RocketMQ 版服务端的消息。
通过重置消费位点,按需清除堆积的或不想消费的这部分消息再开始消费,或直接跳转到某个时间点消费该时间点之后的消息(不论是否消费过该时间点之前的消息)。
使用重置消费位点功能有以下注意事项:

  • 广播消费模式不支持重置消费位点。
  • 目前不支持指定 Message ID、Message Key 和 Tag 来重置消息的消费位点。
    具体操作流程:https://help.aliyun.com/document_detail/63390.html?spm=a2c4g.11186623.2.25.10dd425cM0vDuZ#concept-2047153

死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列 RocketMQ 版会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明 Consumer 在正常情况下无法正确地消费该消息。此时,消息队列 RocketMQ 版不会立刻将消息丢弃,而是将这条消息发送到该 Consumer 对应的特殊队列中。
消息队列 RocketMQ 版将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

死信队列具有以下特性:
  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 版不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
  • 消息队列 RocketMQ 版控制台提供对死信消息的查询、导出和重发的功能。

死信队列相关其他操作参考:https://help.aliyun.com/document_detail/87277.html?spm=a2c4g.11186623.2.27.10dd425c4KGF0G#concept-2047154


消息路由

消息路由常用于不同地域之间的消息同步,保证地域之间的数据一致性。消息队列 RocketMQ 版的全球消息路由功能依托阿里云优质基础设施实现的高速通道专线,可以高效地实现不同地域之间的消息同步复制。详情请参见全球消息路由。

消息队列 RocketMQ原理和使用整理相关推荐

  1. 芋道 Spring Boot 消息队列 RocketMQ 入门

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...

  2. 什么是消息队列 RocketMQ 版?

    消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟.高并发.高可用.高可靠的分布式消息中间件.消息队列 RocketMQ 版既可为分布式应用系统提供异步解耦和削峰 ...

  3. 阿里云消息队列 RocketMQ、Kafka 荣获金融级产品稳定性测评 “先进级” 认证

    近日,由中国信通院和混沌工程实验室联合举办的混沌工程技术沙龙–金融行业精品专场顺利召开,并发布金融级产品稳定性测评成果.在分布式系统稳定性评估体系获奖名单中,阿里云分布式消息队列服务成为通过首批消息队 ...

  4. 消息队列RocketMQ应对双十一流量洪峰的“六大武器”

    作者:不周 审核校对:岁月.明锻 编辑&排版:雯燕 " 4982 亿,58.3 万笔/秒 "的背后 在新冠肺炎疫情催化下,数字化生活方式渐成新常态."4982 亿 ...

  5. 基于消息队列 RocketMQ 的大型分布式应用上云较佳实践

    作者|绍舒 审核&校对:岁月.佳佳 编辑&排版:雯燕 前言 消息队列是分布式互联网架构的重要基础设施,在以下场景都有着重要的应用: 应用解耦 削峰填谷 异步通知 分布式事务 大数据处理 ...

  6. 阿里云消息队列 RocketMQ 5.0 全新升级:消息、事件、流融合处理平台

    从"消息"到"消息.事件.流"的大融合 消息队列作为当代应用的通信基础设施,微服务架构应用的核心依赖,通过异步解耦能力让用户更高效地构建分布式.高性能.弹性健壮 ...

  7. 基于消息队列 RocketMQ 的大型分布式应用上云最佳实践

    简介:Apache RocketMQ 作为阿里巴巴开源的支撑万亿级数据洪峰的分布式消息中间件,在众多行业广泛应用.在选型过程中,开发者一定会关注开源版与商业版的业务价值对比. 那么,今天就围绕着商业版 ...

  8. 云栖发布|阿里云消息队列 RocketMQ 5.0:消息、事件、流融合处理平台

    简介:RocketMQ5.0 的发布标志着阿里云消息正式从消息领域正式迈向了"消息.事件.流"场景大融合的新局面. 引言:从"消息"到"消息.事件.流 ...

  9. JAVA应用开发MQ实战最佳实践——Series2:消息队列RocketMQ性能测试案例

    简介:JAVA应用开发MQ实战最佳实践--Series2:消息队列RocketMQ性能测试案例 往期内容 JAVA应用开发MQ实战最佳实践--Series1:RocketMQ综述及代码设计 1. 消息 ...

  10. Spring Boot 消息队列 RocketMQ 入门

    转载自  芋道 Spring Boot 消息队列 RocketMQ 入门 摘要: 原创出处 http://www.iocoder.cn/Spring-Boot/RocketMQ/ 「芋道源码」欢迎转载 ...

最新文章

  1. 一重量级联盟成立!北大、浙大、上交大、国科大等34校加入
  2. Java基础:Java变量、数据类型、运算符(2)
  3. 10月份个人技术指标
  4. SAP UI5 应用开发教程之十九 - SAP UI5 数据类型和复杂的数据绑定
  5. 亲一下就搞定的事,绝不花钱解决!
  6. 存储过程是用来干什么的_感情不是用来考验的
  7. Linux之ioctl20160705
  8. 获取公网ip,获取用户城市地址
  9. 翻译:与包括索引列:5级阶梯SQL Server索引
  10. JavaWeb19-HTML篇笔记
  11. 计算机三级c语言题库,全国计算机三级C语言上机题库.doc
  12. wmv怎么转换成视频mp4?
  13. 水平放滑轮组计算机械效率,初中物理中考常用公式-总结-2
  14. AI人工智能算法解析落地实践专栏列表
  15. Ubuntu 16.04 parted 对 GPT 格式硬盘 (12 TB) 分区
  16. 用以促学——Linux进程后台运行的原理、方法、比较及其实现
  17. 创建jira sprint_如何在Excel中创建高级sprint燃尽图
  18. 【大数据处理】广州餐饮店铺爬虫并可视化,上传至hdfs
  19. Linux下的Hall sensor驱动
  20. speedtest插件

热门文章

  1. 成人教育考试报名照片的尺寸是多少?大一寸照片怎么做?
  2. cdlinux中minidwep的使用
  3. 前端彷英雄联盟官网/个人心得
  4. 2020-09-16 multisim14仿真电路数字八路抢答控制器仿真
  5. 简述ip地址的abc类如何划分_ip地址的分类abc类的具体含义与分类方法
  6. ssh工具连接虚拟机(finalshell)
  7. mega软件 linux,Linux系统下使用MegaCli软件对磁盘阵列进行操作
  8. maya2014中uvlayout2.08安装
  9. 核心网upf作用_5G核心网SMF和UPF拓扑增强技术研究
  10. Netty学习之读netty权威指南(三)