Spring Cloud —— RocketMQ 的消息类型
导航
- 引言
- 一、普通消息
- 1.1 可靠同步发送
- 1.2 可靠异步发送
- 1.3 单向发送
- 二、顺序消息
- 三、事务消息
- 3.1 什么是事务消息
- 3.2 事务消息示例
- 1、编写本地事务逻辑
- 2、发送半事务消息
- 3、注册本地事务监听器
- 4、测试
引言
本文承接《Spring Cloud —— 消息队列与 RocketMQ》
RocketMQ 提供了多种场景所需的消息类型,包括普通消息、顺序消息、事务消息,本文分别针对这些消息类型予以展开介绍。
一、普通消息
普通消息分为三种发送方式:可靠同步发送、可靠异步发送、单向发送。
简言之,可靠同步发送就是消息发送方直到收到MQ的发送结果才发送下一条消息;可靠异步发送就是消息接收方暂时不关心发送结果,连续发送消息,采用消息发送回调的方式接收MQ的发送结果响应;单向发送就是不同步等待发送结果也不设置任何回调函数。
1.1 可靠同步发送
可靠同步发送,表示发送方会同步等待 MQ 的发送结果,可以使用 rocketMQTemplate.syncSend(…) 来实现。
syncSend 有很多重载方法,包括可以在参数列表中指定一个毫秒级的超时时间。
syncSend 如何设置标签?
syncSend(“topic:tag”, 其他参数);
@Slf4j
@SpringBootTest(classes = OrderApplication.class)
@RunWith(SpringRunner.class)
public class MessageTypeTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void testSyncSend() {SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1:testTag", "这是一条同步消息");log.info("同步消息发送结果:{}", sendResult);}
}
1.2 可靠异步发送
可靠异步发送,表示不等待MQ返回响应,而通过回调接口接收服务器响应,并对发送结果进行处理。异步发送一般用于链路耗时较长,对RT 响应时间较为敏感的业务场景。
由于junit运行完会立即退出,因此需要 Thread.sleep 避免 JVM shutdown,实际开发不需要。
@Slf4j
@SpringBootTest(classes = OrderApplication.class)
@RunWith(SpringRunner.class)
public class MessageTypeTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void testAsyncSend() throws InterruptedException {rocketMQTemplate.asyncSend("test-topic-1", "这是一条异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("发送结果:{}", sendResult);}@Overridepublic void onException(Throwable throwable) {log.error("消息发送异常,{}", throwable);}});System.out.println("================");// 实际开发不需要Thread.sleep(10000);}
}
执行结果:
================
2021-10-05 09:04:16.284 INFO [service-order,,,] 7608 --- [ublicExecutor_1] com.morty.rocketmq.MessageTypeTest : 发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A803781DB858644D46168BB8FC0000, offsetMsgId=C0A8018C00002A9F000000000002FF47, messageQueue=MessageQueue [topic=test-topic-1, brokerName=DEFAULT_BROKER, queueId=3], queueOffset=1]
1.3 单向发送
单向发送,表示发送方只负责发送消息,不等待服务器回应,且没有回调函数触发,即只发送请求不等待应答。
适用于某些耗时非常短,但对可靠性要求不高的场景,例如日志收集。
@Slf4j
@SpringBootTest(classes = OrderApplication.class)
@RunWith(SpringRunner.class)
public class MessageTypeTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void testOneWay() {rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息。");}
}
二、顺序消息
顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。
Broker 中默认有4个 ConsumeQueue 用来作为消息的传输通道,如果不做特殊要求,消息会分散到不同的 Queue 中,导致消息的乱序。因此,如果希望消息严格保证顺序发送和接收,就必须可以保证顺序的消息发送 API ,使得这些 Message 可以发送到同一个 Queue 中。
对于可靠同步、可靠异步,以及单向发送的场景,都提供了 xxxSendOrderly(…) 方法,除了保证消息可以分配到同一个 queue 中,以保证消息的有序性之外,没有任何其他区别。
sendOrderly(…) 方法除了需要基本的信息之外,还需要传入一个唯一的 HashKey,只要能够保证唯一即可。
@Test
public void testOneWayOrderly() {rocketMQTemplate.sendOneWayOrderly("test-topic-1", "这是一条单向消息。",String.valueOf(System.currentTimeMillis()));
}
如何验证消息是否被分配到了同一个 queue ?在RocketMQ 控制台的主题中找到如下按钮:
如果消息能够发送到同一个 queue,那么这几个 queue 中只会有一个 queue 的最大位点发生变化,由此就可以推断消息是否被分配到了同一个 queue 中:
三、事务消息
本节内容参考:消息类型-事务消息
3.1 什么是事务消息
RocketMQ 提供了事务消息,通过事务消息就能达到分布式事务的最终一致性。
上图是 RocketMQ 提供的事务消息工作流程图,这是一种非常典型的分布式事务的解决方案。
半事务消息(half message)
指暂不能投递的消息,发送方已经成功地将消息发送到 RocketMQ 服务端,但是MQ未收到生产者对该消息的二次确认,此时该消息被标记为“暂不能投递”状态,处于该种状态下的消息即半事务消息。
消息回查(check back)
MQ服务端针对半事务消息主动向生产者查询其事务状态。由于网络闪断、生产者重启等原因,导致某些事务消息的二次确认丢失,MQ服务端通过扫描发现某些消息长期处于“半事务消息”状态,需要主动向消息生产者询问该消息的最终状态(commit 或 rollback),该询问过程即为消息回查。
3.2 事务消息示例
完成一个订单创建的事务消息案例。本地事务采用本地事务表的方式记录事务的状态。
本地事务表
或本地消息表,是一种记录本地事务状态的独立表结构,专门用于存储事务信息,简化并统一本地事务的回查逻辑。表中的每条记录都代表一个已经成功执行的事务。一般会将本地事务表的入库操作和某个业务放在同一个事务中,这样就可以保证事务信息存在,那么事务一定成功。
事务消息的编码步骤要紧扣 RocketMQ 事务消息的流程。
1、编写本地事务逻辑
为下单逻辑增加事务属性,并在其中加入事务消息记录的逻辑。使用 shop_tx_log 来完成本地事务记录的工作,在执行下单后,同一事务中,完成事务入库的操作。
@Data
@Entity(name = "shop_tx_log")
public class TxLog {@Idprivate String txId;private Date date;
}
public interface TxLogDao extends JpaRepository<TxLog, String> {}
@Transactional
public void createOrder(String txId, Order order) {// 保存订单orderDao.save(order);TxLog txLog = new TxLog();txLog.setTxId(txId);txLog.setDate(new Date());// 记录事务日志txLogDao.save(txLog);
}
2、发送半事务消息
在 OrderService 下新增半事务消息发送接口:
/*** 下单半事务消息*/
public void createOrderHalfMsg(Order order) {String txId = UUID.randomUUID().toString();rocketMQTemplate.sendMessageInTransaction("tx_producer_group","tx_topic",MessageBuilder.withPayload(order).setHeader("txId", txId).build(),order);
}
sendMessageInTransaction(…) 方法传入四个参数
第3个参数:org.springframework.messaging.support.MessageBuilder 用于构建 Message 对象,withPayload() 传入一个核心的消息实体对象,setHeader() 可以为 Message 对象设置消息头,这里把 txId 放入消息头中以备后面的消息回查。
第4个参数:Object 对象,用于后续执行本地事务时需要使用的数据
这一步骤是 RocketMQ 事务消息的第一步——发送半事务消息,也是代表开启一个以RocketMQ 为基础的分布式事务,除了设置一些基本的消息内容之外(分组、主题等),还需要通过构建MessageBuilder来构建Message,并绑定一个该分布式事务的 transaction Id,和执行后面执行本地事务的必要参数。
3、注册本地事务监听器
RocketMQLocalTransactionListener 提供了事务消息流程中“执行本地事务”和“消息回查”两个步骤的监听入口。
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;/*** 本地事务监听器** @data 2021/10/5 15:03*/
@Service
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderMQListener implements RocketMQLocalTransactionListener {@Autowiredprivate OrderService orderService;@Autowiredprivate TxLogDao txLogDao;/*** 执行本地事务*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {String txId = (String) message.getHeaders().get("txId");// 执行本地事务orderService.createOrder(txId, (Order) o);return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK;}}/*** 事务回查*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {String txId = (String) message.getHeaders().get("txId");TxLog txLog = txLogDao.findById(txId).get();if (txLog != null) {return RocketMQLocalTransactionState.COMMIT;} else {return RocketMQLocalTransactionState.ROLLBACK;}}
}
这里有个小问题,在发送半事务消息的时候,已经有发送结果,那为什么不直接在收到发送成功的响应后直接执行本地事务呢?还要再创建 executeLocalTransaction 这样的回调方法才去执行本地事务?
我认为是因为由MQ主动调用回调函数来执行本地事务具有更强的可靠性。如果直接以发送半消息的结果作为依据来执行本地事务,一旦由于网络或发送端重启等原因未收到半消息的发送结果,就会导致本地事务无法触发,系统的容错性偏低。而提供了回调接口,就可以由MQ来触发本地事务的执行,MQ也可以更好的将本地事务的执行和MQ半事务消息的提交绑定到同一个事务中,更利于事务的管控。
4、测试
上面三步已经基本把事务消息的代码编写完毕,只要在 Controller 层调用 半消息发送方法就可以完成整个事务消息功能。
这里需要对 OrderMQListener 的两个回调函数 executeLocalTransaction() 和 checkLocalTransaction() 打上断点,并检查执行 executeLocalTransaction 时是否完成 txLog 对象的入库。
从测试结果来看,并没有什么问题。
如何测试消息回查呢?我们可以直接在 executeLocalTransaction() 返回前杀死 order-service ,这样MQ Server 就收不到二次确认的信息,从而会触发消息回查方法。
可以使用 kill 命令,这里简单介绍下 Windows 下是如何操作的。
D:\idea-workspace\shop>jps
11792 Jps
18372 RemoteMavenServer
23284 OrderApplication
9780 rocketmq-console-ng-1.0.0.jar
13080 Launcher
15672 nacos-server.jar
20840 ProductApplication
8200
D:\idea-workspace\shop>taskkill -F /pid 23284
成功: 已终止 PID 为 23284 的进程。
OrderApplication 已经停止,再次启动后,不多一会就可以收到 MQ 的消息回查请求触发 checkLocalTransaction() 方法。测试成功!
Spring Cloud —— RocketMQ 的消息类型相关推荐
- 几种常见的微服务架构方案简述——ZeroC IceGrid、Spring Cloud、基于消息队列
微服务架构是当前很热门的一个概念,它不是凭空产生的,是技术发展的必然结果.虽然微服务架构没有公认的技术标准和规范草案,但业界已经有一些很有影响力的开源微服务架构平台,架构师可以根据公司的技术实力并结合 ...
- Spring Cloud Streams Messaging消息驱动微服务实践
作者:禅与计算机程序设计艺术 1.简介 消息驱动微服务是一个新的分布式架构模式,它基于异步通信和事件驱动的消息传递机制,通过轻量级的消息代理与集成框架实现分布式系统的解耦合.弹性伸缩和可靠性保证.Sp ...
- Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务
文章目录 概述 添加依赖 配置文件配置RabbitMQ的地址信息 接口定义 接收方 @EnableBinding @StreamListener 测试 消费组 发送复杂对象 消息回执 代码 概述 官网 ...
- Spring Cloud Stream如何处理消息重复消费
最近收到好几个类似的问题:使用Spring Cloud Stream操作RabbitMQ或Kafka的时候,出现消息重复消费的问题.通过沟通与排查下来主要还是用户对消费组的认识不够.其实,在之前的博文 ...
- java B2B2C Springboot多租户电子商城系统-Spring Cloud Stream(消息驱动)
1.什么是Spring Cloud Stream 愿意了解源码的朋友直接企鹅求求:二一四七七七五六三三 Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架.它可以基于 ...
- spring cloud学习之消息总线(Finchley版本),以及postman下载与使用
首先来学习链接: 方志朋:https://blog.csdn.net/forezp/article/details/81041028 程序猿DD:http://blog.didispace.com/s ...
- 来自 Spring Cloud 官方的消息,Spring Cloud Alibaba 即将毕业
2019 年 7 月 24 日晚,Spring Cloud 官方发布公告: 仓库迁移是官方决定 Spring Cloud Alibaba 即将毕业 根据官方最新的发版规则,我们会把孵化器中的 Spri ...
- 【spring cloud】(六)消息总线——springcloud Bus
各位小伙伴们大家好,欢迎来到这个小扎扎的spring cloud专栏,在这个系列专栏中我对B站尚硅谷阳哥的spring cloud教程进行一个总结,鉴于 看到就是学到.学到就是赚到 精神,这波依然 ...
- Spring Cloud(十一)高可用的分布式配置中心 Spring Cloud Bus 消息总线集成(RabbitMQ)
上一篇文章,留了一个悬念,Config Client 实现配置的实时更新,我们可以使用 /refresh 接口触发,如果所有客户端的配置的更改,都需要手动触发客户端 /refresh ,当服务越来越多 ...
最新文章
- 【opencv】(13) 案例:停车场空余车位检测,附python完整代码
- i2c-tools dected -l
- JavaScript Repeater 模板控件
- hyper-V 检查点
- python etl 大猩猩_Airflow教程-使用Airflow实现ETL调度
- 补码(为什么按位取反再加一):告诉你一个其实很简单的问题
- c和汇编混合编程----main的反汇编
- 加速进军自动驾驶领域,福特计划推出自动驾驶出租车服务
- 可输入可下拉的输入选择框
- K均值聚类关于初始聚类中心的探讨(matlab程序)
- C# 6.0 (C# vNext) 的新功能:Exception-Handling Improvements
- HTML 如何禁用缓存
- JS在与lua的交互心得
- JavaScript之数组学习
- matlab结果导入ug,matlab与UG数据交换.docx
- 张小丫第一次微信支付(讲解)
- 经济法期末模拟试卷及答案
- 用python做一个木马_Python编程简单的木马程序(转载于乌云中)
- Android打开QQ临时会话和打开群聊
- 电脑出现Hold Escape key to prevent StartlsBack from loading,导致电脑闪屏。
热门文章
- code craft_Craft.io调度中使用的重要术语
- 附录:更多集合操作命令
- Windows10安装Anaconda和Pytorch(CPU版,无GPU加速)
- Maven多模块打包
- 计组学习笔记(一):浮点数的表示和运算
- 输入快捷键显示未知命令_「干货」华为VRP基础和常用命令了解一下
- 电子科技大学 高级计算机结构,电子科技大学计算机系统结构作业答案
- python发展路线_Python进阶路径-从学徒到大师
- mysql 压缩的blob不能正常显示中文内容_servlet网页显示MySQL BLOB中文乱码
- python语言逆序符号_python的逆序