JavaEE 企业级分布式高级架构师(二十)RocketMQ学习笔记(2)
RocketMQ学习笔记
- 进阶篇
- 消息样例
- 普通消息
- 消息发送
- 发送同步消息
- 发送异步消息
- 单向发送消息
- 三种发送方式的对比
- 消费消息
- 顺序消息
- 如何保证顺序
- 顺序的实现
- MessageListenerOrderly与MessageListenerConcurrently区别
- 广播消息
- 集群消费模式
- 广播消费模式
- 示例
- 延时消息
- 介绍
- 第三方存储选型要求
- RocketMQ中的延时消息
- 实现原理
- 第一步:修改消息Topic名称和队列信息
- 第二步:转发消息到延迟主题的CosumeQueue中
- 第三步:延迟服务消费SCHEDULE_TOPIC_XXXX消息
- 第四步:将信息重新存储到CommitLog中
- 延迟消息存放
- 批量消息
- 过滤消息
- TAG模式过滤
- SQL表达式过滤
- 支持的语法
- 支持的类型
- 类过滤模式(基于4.2.0版本)
- 事务消息
- 概念介绍
- 分布式事务消息的优势
- 典型场景
- 交互流程
- 事务消息发送步骤
- 事务消息回查步骤
- 注意事项
- 示例
进阶篇
消息样例
- rocketmq提供丰富的消息类型,满足各种严苛场景下的高级特性需求,当前支持的消息类型涵盖普通消息、顺序消息(全局顺序 / 分区顺序)、分布式事务消息、定时消息/延时消息。
- java中要使用rocktmq,需要引入依赖:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.1</version>
</dependency>
普通消息
- 普通消息是指消息队列 RocketMQ 版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。
消息发送
发送同步消息
- 原理:同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。
- 应用场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
// 1、封装producer(生产者组), 2、指定nameServer, 3、开启Producer, 4、构造消息、指定tag、设置key, 5、关闭生产者
发送异步消息
- 原理:异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列 RocketMQ 版的异步发送,需要用户实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
- 异步场景:异步发送一般用于链路耗时较⻓,对响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
单向发送消息
- 原理:发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
- 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
三种发送方式的对比
- 下表概括了三者的特点和主要区别:
发送方式 | 发送TPS | 发送结果反馈 | 可靠性 |
---|---|---|---|
同步发送 | 快 | 有 | 不丢失 |
异步发送 | 快 | 有 | 不丢失 |
单向发送 | 最快 | 无 | 可能丢失 |
消费消息
顺序消息
- 顺序消息(FIFO 消息)是消息队列 RocketMQ 版提供的一种严格按照顺序来发布和消费的消息。顺序发布和顺序消费是指对于指定的一个 Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。
如何保证顺序
- 在MQ的模型中,顺序需要由3个阶段去保障:
- 消息被发送时保持顺序
- 消息被存储时保持和发送的顺序一致
- 消息被消费时保持和存储的顺序一致
- 发送时保持顺序意味着对于有顺序要求的消息,用户应该在同一个线程中采用同步的方式发送。存储保持和发送的顺序一致则要求在同一线程中被发送出来的消息A和B,存储时在空间上A一定在B之前。而消费保持和存储一致则要求消息A、B到达Consumer之后必须按照先A后B的顺序被处理。
- 如下图所示:对于两个订单的消息的原始数据:a1、b1、b2、a2、a3、b3(绝对时间下发生的顺序)
- 在发送时,a订单的消息需要保持a1、a2、a3的顺序,b订单的消息也相同,但是a、b订单之间的消息没有顺序关系,这意味着a、b订单的消息可以在不同的线程中被发送出去。
- 在存储时,需要分别保证a、b订单的消息的顺序,但是a、b订单之间的消息的顺序可以不保证
- a1、b1、b2、a2、a3、b3是可以接受的
- a1、a2、b1、b2、a3、b3也是可以接受的
- a1、a3、b1、b2、a2、b3是不能接受的
- 消费时保证顺序的简单方式就是“什么都不做”,不对收到的消息的顺序进行调整,即只要一个分区的消息只由一个线程处理即可;当然,如果a、b在一个分区中,在收到消息后也可以将他们拆分到不同线程中处理,不过要权衡一下收益。
顺序的实现
- 顺序消息分为全局顺序消息和分区顺序消息。
- 全局顺序:对于指定的一个 Topic,所有消息按照严格的先入先出(First In First Out,简称 FIFO)的顺序进行发布和消费。
- 分区顺序:对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding Key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
- 消息生产示例:
- 消息消费示例:
MessageListenerOrderly与MessageListenerConcurrently区别
- MessageListenerOrderly:有序消费,同一队列的消息同一时刻只能一个线程消费,可保证消息在同一队列严格有序消费。
- MessageListenerConcurrently:并发消费
广播消息
集群消费模式
- 适用场景:适用于消费端集群化部署,每条消息只需要被处理一次的场景。此外,由于消费进度在服务端维护,可靠性更高。具体消费示例如下图所示:
注意事项
- 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
- 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。
广播消费模式
- 适用场景:适用于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。具体消费示例如下图所示:
注意事项
- 广播消费模式下不支持顺序消息。
- 广播消费模式下不支持重置消费位点。
- 每条消息都需要被相同订阅逻辑的多台机器处理。
- 广播模式下服务端不维护消费进度,消费进度在客户端维护。
- 广播模式下,消息队列 RocketMQ 版保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。
- 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
- 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
示例
延时消息
介绍
- 延时消息(延迟消息)用于指定消息发送到消息队列 RocketMQ 版的服务端后,延时一段时间才被投递到客户端进行消费(例如 3 秒后才被消费),适用于解决一些消息生产和消费有时间窗口要求的场景,或者通过消息触发延迟任务的场景,类似于延迟队列。
- 场景案例:用户下了一个订单之后,需要在指定时间内(例如30分钟)进行支付,在到期之前可以发送一个消息提醒用户进行支付。
- 实现这类需求通常有两种方式:
- 轮询定时任务:给定周期内扫描所有未支付的订单,查看时间是否到期。
- 延时消息:订单创建的时候发送一条 N 分钟到期的信息,一旦消息消费后便可判断订单是否可以取消。
- 一些消息中间件的Broker端内置了延迟消息支持的能力,核心实现思路都是一样:将延迟消息通过一个临时存储进行暂存,到期后才投递到目标Topic中。如下图所示:
步骤说明:
- producer要将一个延迟消息发送到某个Topic中。
- Broker判断这是一个延迟消息后,将其通过临时存储进行暂存。
- Broker内部通过一个延迟服务(delay service)检查消息是否到期,将到期的消息投递到目标Topic中。这个的延迟服务名字为delay service,不同消息中间件的延迟服务模块名称可能不同。
- 消费者消费目标topic中的延迟投递的消息。
- 显然,临时存储模块和延迟服务模块,是延迟消息实现的关键。上图中,临时存储和延迟服务都是在Broker内部实现,对业务透明。
第三方存储选型要求
对于第三方临时存储,其需要满足以下几个特点:
- 高性能:写入延迟要低,MQ的一个重要作用是削峰填谷,在选择临时存储时,写入性能必须要高,关系型数据库通常不满足需求。
- 高可靠:延迟消息写入后,不能丢失,需要进行持久化,并进行备份。
- 支持排序:支持按照某个字段对消息进行排序,对于延迟消息需要按照时间进行排序。普通消息通常先发送的会被先消费,延迟消息与普通消息不同,需要进行排序。例如先发一条延迟10s的消息,再发一条延迟5s的消息,那么后发送的消息需要被先消费。
- 支持⻓时间保存:一些业务的延迟消息,需要延迟几个月,甚至更⻓,所以延迟消息必须能⻓时间保留。不过通常不建议延迟太⻓时间,存储成本比较大,且业务逻辑可能已经发生变化,已经不需要消费这些消息。
- 例如,滴滴开源的消息中间件DDMQ,底层消息中间件的基础上加了一层代理,独立部署延迟服务模块,使用rocksdb进行临时存储。rocksdb是一个高性能的KV存储,并支持排序。
- 此时对于延迟消息的流转如下图所示:
步骤说明:
- 生产者将发送给producer proxy,proxy判断是延迟消息,将其投递到一个缓冲Topic中。
- delay service启动消费者,用于从缓冲topic中消费延迟消息,以时间为key,存储到rocksdb中。
- delay service判断消息到期后,将其投递到目标Topic中。
- 消费者消费目标topic中的数据。
- 这种方式的好处是,因为delay service的延迟投递能力是独立于broker实现的,不需要对broker做任何改造,对于任意MQ类型都可以提供支持延迟消息的能力,例如DDMQ对RocketMQ、Kafka都提供了秒级精度的延迟消息投递能力,RocketMQ虽然支持延迟消息,但不支持秒级精度。
RocketMQ中的延时消息
- 注意:RocketMQ不支持任意时间的延时,只支持以下几个固定的延时等级
String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
- RocketMQ的延迟等级可以进行修改,以满足自己的业务需求,可以修改/添加新的level。
- 例如:你想支持1天的延迟,修改最后一个level的值为1d,这个时候依然是18个level;也可以增加一个1d,这个时候总共就有19个level。
String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d";
实现原理
- 延迟队列的核心思路是:所有的延迟消息由producer发出之后,都会存放到同一个topic(SCHEDULE_TOPIC_XXXX)下,根据延迟level的个数,创建对应数量的队列,也就是说18个level对应了18个队列。注意,这并不是说这个内部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列。不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存入真实指定的topic下,此时对于consumer端此消息才可⻅,从而被consumer消费。
- 生产者在发送延迟消息非常简单,只需要设置一个延迟级别即可,注意不是具体的延迟时间,如:
Message msg = new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
// 设置延迟level为5,对应延迟1分钟
msg.setDelayTimeLevel(5);
producer.send(msg);
- 延迟消息在RocketMQ Broker端的流转如下图所示:
步骤说明:
- ① 修改消息Topic名称和队列信息
- ② 转发消息到延迟主题SCHEDULE_TOPIC_XXXX的CosumeQueue中
- ③ 延迟服务消费SCHEDULE_TOPIC_XXXX消息
- ④ 将信息重新存储到CommitLog中
- ⑤ 将消息投递到目标Topic中
- ⑥ 消费者消费目标topic中的数据
第一步:修改消息Topic名称和队列信息
- RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到目标Topic的指定队列(ConsumeQueue)中。
- 由于消息一旦存储到ConsumeQueue中,消费者就能消费到,而延迟消息不能被立即消费,所以这里将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。
- 同时,还会将消息原来要发送到的目标Topic和队列信息存储到消息的属性中。相关源码如下所示:
// org.apache.rocketmq.store.CommitLog#putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// ...// Delay Delivery// 如果是延迟消息if (msg.getDelayTimeLevel() > 0) {// 如果设置的级别超过了最大级别,重置延迟级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 修改Topic的投递目标为内部主题SCHEDULE_TOPIC_XXXXtopic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;// 根据delayLevel,确定将消息投递到SCHEDULE_TOPIC_XXXX内部的哪个队列中queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId// 记录原始topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 更新消息投递目标为SCHEDULE_TOPIC_XXXX和queueIdmsg.setTopic(topic);msg.setQueueId(queueId);}}// ...
}
第二步:转发消息到延迟主题的CosumeQueue中
- CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。
投递时间 = 消息存储时间(storeTimestamp) + 延迟级别对应的时间
- 需要注意的是,会将计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中,CosumeQueue单个存储单元组成结构如下图所示:
- Commit Log Offset:记录在CommitLog中的位置。
- Size:记录消息的大小。
- Message Tag HashCode:记录消息Tag的哈希值,用于消息过滤。特别的,对于延迟消息,这个字段记录的是消息的投递时间戳。这也是为什么java中hashCode方法返回一个int型,只占用4个字节,而这里Message Tag HashCode字段确设计成8个字节的原因。
- 相关源码如下所示:
// CommitLog#checkMessageAndReturnSize
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC) {return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true);
}↓↓↓↓↓
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,final boolean readBody) {// ...// Timing message processing{// 如果消息需要投递到延迟主题SCHEDULE_TOPIC_XXX中String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {int delayLevel = Integer.parseInt(t);if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();}// 如果延迟级别大于0,计算目标投递时间,并将其当做tag哈希值if (delayLevel > 0) {tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);}}}}// ...
}
第三步:延迟服务消费SCHEDULE_TOPIC_XXXX消息
- Broker内部有一个ScheduleMessageService类,其充当延迟服务,消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。
- ScheduleMessageService在启动时,其会创建一个定时器Timer,并根据延迟级别的个数,启动对应数量的TimerTask,每个TimerTask负责一个延迟级别的消费与投递。
- 相关源码如下所示:
// ScheduleMessageService#start
public void start() {if (started.compareAndSet(false, true)) {// 1、创建定时器Timerthis.timer = new Timer("ScheduleMessageTimerThread", true);// 2、针对每个延迟级别,创建一个TimerTask// 2.1、迭代每个延迟级别:delayLevelTable是一个Map记录了每个延迟级别对应的延迟时间for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {// 2.2、获得每个每个延迟级别的level和对应的延迟时间Integer level = entry.getKey();Long timeDelay = entry.getValue();Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}// 2.3、针对每个级别创建一个对应的TimerTaskif (timeDelay != null) {this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}// ...}
}
- 需要注意的是,每个TimeTask在检查消息是否到期时,首先检查对应队列中尚未投递第一条消息,如果这条消息没到期,那么之后的消息都不会检查。如果到期了,则进行投递,并检查之后的消息是否到期。
第四步:将信息重新存储到CommitLog中
- 在将消息到期后,需要投递到目标Topic。由于在第一步已经记录了原来的Topic和队列信息,因此这里重新设置,再存储到CommitLog即可。此外,由于之前Message Tag HashCode字段存储的是消息的投递时间,这里需要重新计算tag的哈希值后再存储。
- 相关源码如下所示:
// DeliverDelayedMessageTimerTask#messageTimeup方法
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {MessageExtBrokerInner msgInner = new MessageExtBrokerInner();// ...TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());// 由于之前Message Tag HashCode字段存储的是消息的投递时间,这里需要重新计算tag的哈希值后再存储。long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());msgInner.setTagsCode(tagsCodeValue);// ...return msgInner;
}
延迟消息存放
- 延迟消息存放的结构如下图所示:
- 其中不同的延迟级别放在不同的队列序号下(queueId=delayLevel-1)。每一个延迟级别对应的延迟消息转换为普通消息的位置标识存放在~/store/config/delayOffset.json文件内。
- key为对应的延迟级别,value对应不同延迟级别转换为普通消息的offset值。
{"offsetTable": {3:202, 4:2, 5:2, 6:2, 7:2, 8:2, 9:2, 10:2, 11:2}
}
- 发送延时消息:
- 消息延时消息:
批量消息
- 批量发送可以提高发送性能,但有一定的限制:
- topic 相同。
- waitStoreMsgOK 相同 (首先我们建设消息的iswaitstoremsgok=true(默认为true), 如果没有异常,我们将始终收到"OK",org.apache.rocketmq.common.message.Message#isWaitStoreMsgOK)。
- 不支持延时发送。
- 一批消息的大小不能大于 1M。
- 大小限制需要特殊注意,因为消息是动态的,不注意的话就可能超限,就会报错。
过滤消息
- 消息过滤包括基于表达式过滤与基于类模式两种过滤模式。其中表达式过滤又分为TAG和SQL92模式。
TAG模式过滤
- 发送消息时我们会为每一条消息设置TAG标签,同一大类中的消息放在一个主题TOPIC下,但是如果进行分类我们则可以根据TAG进行分类,每一类消费者可能不是关系某个主题下的所有消息,我们就可以通过TAG进行过滤,订阅关注的某一类数据。
SQL表达式过滤
- SQL92表达式消息过滤,是通过消息的属性运行SQL过滤表达式进行条件匹配,消息发送时需要设置用户的属性putUserProperty方法设置属性。
支持的语法
- 数值比较,如:>、>=、<、<=、BETWEEN、=
- 字符比较,如:=、<>、IN
- IS NULL or IS NOT NULL
- 逻辑连接符:AND、OR、NOT
支持的类型
- 数值型,如:123、3.1415
- 字符型,如:‘abc’ 必须单引号
- NULL、特殊常数
- 布尔值:TRUE、FALSE
类过滤模式(基于4.2.0版本)
- RocketMQ通过定义消息过滤类的接口实现消息过滤。
事务消息
- 消息队列 RocketMQ 版提供的分布式事务消息适用于所有对数据最终一致性有强需求的场景。
概念介绍
- 事务消息:消息队列 RocketMQ 版提供类似 X/Open XA 的分布式事务功能,通过消息队列 RocketMQ 事务消息能达到分布式事务的最终一致。
- 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列 RocketMQ 版服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
- 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 版服务端通过扫描发现某条消息⻓期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。
分布式事务消息的优势
- 消息队列 RocketMQ 版分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。
典型场景
- 在淘宝购物⻋下单时,涉及到购物⻋系统和交易系统,这两个系统之间的数据最终一致性可以通过分布式事务消息的异步处理实现。在这种场景下,交易系统是最为核心的系统,需要最大限度地保证下单成功。而购物⻋系统只需要订阅消息队列 RocketMQ 版的交易订单消息,做相应的业务处理,即可保证最终的数据一致性。
交互流程
- 事务消息交互流程如下图所示
事务消息发送步骤
- 1、发送方将半事务消息发送至消息队列 RocketMQ 版服务端。
- 2、消息队列 RocketMQ 版服务端将消息持久化成功之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息。
- 3、发送方开始执行本地事务逻辑。
- 4、发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤
- 1、在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
- 2、发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 3、发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。
注意事项
- 事务消息不支持延时消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
- 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间⻓度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
- 事务性消息可能不止一次被检查或消费,做好幂等性的检查。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
示例
- 要使用RocketMQ的事务消息,要实现一个TransactionListener的接口,这个接口中有两个方法,如下:
public interface TransactionListener {/*** When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.* 执行本地事务* @param msg Half(prepare) message* @param arg Custom business parameter* @return Transaction state*/LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);/*** When no response to prepare(half) message. broker will send check message to check the transaction status, and this* method will be invoked to get local transaction status.* 消息回查后,需要检查对应消息的本地事务执行的最终结果* @param msg Check message* @return Transaction state*/LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
- RocketMQ的事务消息是基于两阶段提交实现的,也就是说消息有两个状态,prepared和commited。当消息执行完send方法后,进入的prepared状态,进入prepared状态以后,就要执行executeLocalTransaction方法,这个方法的返回值有3个,也决定着这个消息的命运:
- LocalTransactionState.COMMIT_MESSAGE:提交消息,这个消息由prepared状态进入到commited状态,消费者可以消费这个消息。
- LocalTransactionState.ROLLBACK_MESSAGE:回滚,这个消息将被删除,消费者不能消费这个消息。
- LocalTransactionState.UNKNOW:未知,这个状态有点意思,如果返回这个状态,这个消息既不提交,也不回滚,还是保持prepared状态,而最终决定这个消息命运的,是checkLocalTransaction这个方法。
- 当executeLocalTransaction方法返回UNKNOW以后,RocketMQ会每隔一段时间调用一次checkLocalTransaction,这个方法的返回值决定着这个消息的最终归宿。那么checkLocalTransaction这个方法多⻓时间调用一次呢?我们在BrokerConfig类中可以找到:
/*** Transaction message check interval.*/
@ImportantField
private long transactionCheckInterval = 60 * 1000;
- 这个值是在brokder.conf中配置的,默认值是60*1000,也就是1分钟。那么会检查多少次呢?如果每次都返回UNKNOW,也不能无休止的检查吧,我们在BrokerConfig类中可以找到:
/***The maximum number of times the message was checked, if exceed this value, this message will be discarded.* 这个是检查的最大次数,超过这个次数,如果还返回UNKNOW,这个消息将被删除。*/
@ImportantField
private int transactionCheckMax = 15;
- 事务消息中,TransactionListener这个最核心的概念介绍完后,我们看看代码如何实现:
JavaEE 企业级分布式高级架构师(二十)RocketMQ学习笔记(2)相关推荐
- JavaEE 企业级分布式高级架构师(十八)容器虚拟化技术(3)
Kubernetes学习笔记 K8S集群服务搭建 环境准备 机器环境 依赖环境 docker部署 kubeadm(一键安装k8s) 集群安装 依赖镜像 k8s部署 flannel插件 节点Join 节 ...
- JavaEE 企业级分布式高级架构师(十五)FastDFS分布式文件服务器(1)
FastDFS学习笔记 FastDFS介绍 传统文件存储弊端 FastDFS是什么 为什么使用FastDFS FastDFS架构原理分析 架构整体分析 Tracker Server跟踪服务器 Stor ...
- JavaEE 企业级分布式高级架构师(十五)FastDFS分布式文件服务器(3)
FastDFS学习笔记 Java操作FastDFS 测试文件上传 Spring Boot整合FastDFS 实现图片压缩 FastDFS主从文件 应用背景 解决办法 流程说明 Java代码 Nginx ...
- JavaEE 企业级分布式高级架构师(十三)微服务框架 SpringCloud (H 版)(1)
Spring Cloud学习笔记 Spring Cloud入门 分布式技术图谱 Spring Cloud简介 官网介绍 百度百科 总结 Spring Cloud的国内使用情况 Spring Cloud ...
- JavaEE 企业级分布式高级架构师(十七)ElasticSearch全文检索(1)
ElasticSearch学习笔记 基础篇 问题思考 大规模数据如何检索? 传统数据库的应对解决方案? 非关系型数据库的解决方案? 另辟蹊径--完全把数据放入内存怎么样? 全文检索技术 什么是全文检索 ...
- JavaEE 企业级分布式高级架构师(六)MySQL学习笔记(6)
MySQL学习笔记 性能优化篇 性能优化的思路 慢查询日志 慢查询日志介绍 开启慢查询功能 演示一 演示二 分析慢查询日志 MySQL自带的mysqldumpslow 使用percona-toolki ...
- JavaEE 企业级分布式高级架构师(四)SpringMVC学习笔记(4)
SpringMVC学习笔记 高级应用篇 ControllerAdvice @ControllerAdvice @ModelAttribute 作用于方法 作用于方法参数 @InitBinder @Ex ...
- JavaEE 企业级分布式高级架构师课程_汇总贴
总目录: 第一课(2018.7.10) 01 mybatis框架整体概况(2018.7.10)- 转载于:https://www.cnblogs.com/wangjunwei/p/10424103.h ...
- JavaEE 企业级分布式高级架构师(十七)ElasticSearch全文检索(5)
ElasticSearch学习笔记 性能优化篇 ElasticSearch部署 选择合理的硬件配置--尽可能使用 SSD 给JVM配置机器一半的内存,但是不建议超过32G 规模较大的集群配置专有主节点 ...
最新文章
- 卧槽!又一 SQL 神器面世!!
- C++实现求解最大公约数和最小公倍数
- C++Primer 第一章 快速入门 学习
- pycharm提醒:PEP 8: invalid escape sequence xx 解决办法 (转义序列无效,需改成双反斜杠\\)
- python列表索引 end start_Pandas:在Pandas数据帧中查找连续索引的startend值
- [manacher] hdu 3294 Girls#39; research
- Docker部署nginx并修改配置文件
- 【转载】Android网络开发案例
- cpuz测试分数天梯图_2015最新cpu天梯图 cpu性能排行榜
- Java实现文件及文件夹的删除
- python英文参考文献格式_英文论文参考文献标准格式
- OpenCASCADE(OCC)读取STEP模型文件到XDE中
- Win10 chm文件无法打开解决方案
- 北航 华科 计算机学院官网,北京航空航天大学和华中科技大学如何比较?
- 亚洲杯在即,中国男足志在必得。
- enovia PLM : add new value to SPEO
- 修复共享服务器,集群服务器共享磁盘柜的修复案例
- git删除远端分支命令
- 阿里专家梁笑:2018双十一下单成功率99.9%!供应链服务平台如何迎接大促 1
- 计算机电源大小,电源功率到底选多大?老司机告诉你电源功率怎么选?
热门文章
- 英雄联盟无限重新连接服务器,英雄联盟无法连接服务器你想重新连接吗
- PEPS无钥匙进入系统-传统PEPS与数字钥匙系统
- 三、51单片机 使用Proteus仿真实现8位数码管滚动显示(仿真及代码)
- vue stomp 的使用笔记
- java namevaluepair_NameValuePair方式传参数实例教程
- 山东电梯维护服务器,山东省质量技术监督局关于建立电梯维护保养单位告知性登记制度的通知...
- 2022 CCF中国开源大会会议通知(第四轮)
- ACPI相关(10)- Platform Communications Channel
- 正则表达式验证邮箱格式
- 一分钟搞懂云计算和大数据对人到底有啥用?