目录

什么时候会用到消息中间件保证事务一致型

概念

RocketMQ集群概述

大概流程

具体流程如下

代码实现过程

Producer:

Consumer:

踩坑记

案例代码实现


什么时候会用到消息中间件保证事务一致型

前面讲了seata AT 、TCC等模式来保证事务的一致性问题,为什么还要研究可靠消息一致性呢?看分析:

电商核心流程:

订单服务 -> 创建订单 -> 库存服务 -> 扣减库存 -> 积分服务 -> 增加积分 -> 仓储服务 -> 通知发货

核心模块:订单服务、库存服务、积分服务 -> 绑定为一个TCC事务

边缘模块:例如仓储,通过mq去通知,保证最终一致性,也可以业务解耦。(由中间件保证一定会把消息交给下游的库存服务去扣减库存,仓储服务去通知发货等,如果这个过程中有消息发送失败,则可靠消息中间件应该保证不停的重试投递消息。)

参考:https://www.cnblogs.com/fyql/p/12186296.html

核心链路使用seata这种类似于TCC的事务,而像wms这种相当于是分支链路,可以通过MQ进行解耦。

RocketMQ分布式事务:

核心链路使用seata这种类似于TCC的事务,而像wms这种相当于是分支链路,可以通过MQ进行解耦。

但是通过MQ解耦也会带来一些问题,例如消息丢失,消息重复等等问题,因此也需要进行最终一致性的保证。

结合整个订单接口服务,分为两个支付链路,一个是核心链路(订单业务),一个是非核心链路(wms) 整个流程。

先向RocketMQ发送half msg,然后调用核心链路。核心链路要是返回失败,就会走失败的逻辑:退款,更改订单状态为取消,再给rocketmq发送callback废弃掉刚才的消息。

如果成功,就commit msg让消费者可以消费。如果在等待期间,一直没有callback/commit那么mq就会走回调查询具体的状态。最终消费者接收到消息后,消费完成就回复mq一个ack, 如果消费失败了,mq就会重新投递或者换一个服务投递。使用rocketmq的half msg机制,可以实现这一套固定模式的最终一致性,很完善。 这个将wms的操作放在核心链路前面的这个问题,是为了提升整个订单接口服务的效率,因为需要保证最终一致性,那么必然会有消息生产者对MQ的一些操作,包括重试,ack等等,如果将这些逻辑全部都放在核心链路执行完成后再去一一完成,那么可能会耗费一些时间。而通过rocketmq这个模式,可以通过half msg的支持,来将整个与mq的交互过程拆解掉,从而提升效率。

概念

Rocket MQ是阿里开发的一个分布式的开源消息队列组件,目前由Apache开源组织维护,最新版本是5.3.0,已经支持事务消息。

事务消息可以确保本地事务 与 发送消息 之间的原子性,相关概念:

1、Half(Prepare) Message

Producer已经把消息发送给Mq 服务器,但是Mq服务器尚未收到生产者的第二次Ack,这个时候消息会被标记为"temporarily undeliverable",目前消息的状态为 HalfMessage。

2、Message Status Check

网络断开或者Producer应用重启会导致Mq服务器无法从Producer获取第二次ACK,当Mq服务器发现一个消息长时间处于 HalfMessage 状态时(默认为60S,可配置),它会主动请求Producer,查询消息Id对应的最新状态(commit 或者 rollback)。

官网的文档、案例都很全可以直接参考:https://rocketmq.apache.org/

RocketMQ集群概述

1) Name Server

Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

2) Broker

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。

3) Producer

Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。

Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。

4) Consumer

Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知。

Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。

注意,启动时允许手动创建topic:nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &

大概流程

生产者向broker发送一条未经commit不可消费的事务性消息(半消息)。如果发送成功返回SEND_OK,则在executeLocalTransaction(Message msg, Object arg) 方法中

执行本地事务。如果本地事务执行成功则commit(checkLocalTransaction(MessageExt msg)),commit过的消息可正常被服务端消费,执行失败则rollback,rollback的消息则被删除。

具体流程如下

1、Producer 向Mq服务器 发送消息。

2、Mq服务器收到消息并持久化成功之后,会向 Producer确认首次ACK,此时消息处于 HalfMessage 状态,并未发送给对应的Consumer。

3、Producer 开始执行本地事务逻辑。

4、根据事务执行结果,Producer 向Mq服务器提交二次确认(commit 或者 rollback)。Mq Server 收到 Commit 状态则将半消息标记为可投递,Consumer 最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,Consumer 将不会接受该消息。

5、在断网或者应用重启的情况下,二次ACK未成功的发给Mq Server,Mq Server会主动向 Producer 启动消息回查(Message Status Check),

6、Producer 根据事务执行结果,对消息回查返回对应的结果。

7、Mq Server根据返回结果,决定继续投递消息或者丢弃消息(重复第4步操作)。

代码实现过程

Producer:

1、实现 TransactionListener 接口,在 executeLocalTransaction方法里执行本地事务逻辑,在 checkLocalTransaction方法里返回消息id对应的事务状态,用于Mq的消息回查。

2、通过 TransactionMQProducer构造事务消息并发送。

Consumer:

1、从Mq server获取到消息之后,即开始处理本地事务,处理成功后返回 CONSUME_SUCCESS。

2、处理失败则返回 RECONSUME_LATER,Mq server会在稍后重新投递这个消息,又进入步骤1。

注: Consumer 需要做好幂等控制,消息可能会被多次投递到Consumer。

参考代码:https://blog.csdn.net/qq_43747119/article/details/86061818

Message ID 消息的全局唯一标识,由 MQ 系统自动生成,唯一标识某条消息。
Topic 消息主题,一级消息类型,通过 Topic 对消息进行分类。
Tag 消息标签,二级消息类型,用来进一步区分某个 Topic 下的消息分类。
messageTrackList
consumerGroup 消费集群名称
trackType 当前状态。取值如下
CONSUMED:已消费
CONSUMED_BUT_FILTERED:已被过滤
NOT_CONSUME_YET:暂未消费
NOT_ONLINE:客户端不在线
UNKNOWN:其他问题
Operation 重发消息、查看异常

参考代码:https://rocketmq.apache.org/docs/transaction-example/

踩坑记

1、参考:https://www.cnblogs.com/2YSP/p/11616376.html

2、运行 .\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

报:错误: 找不到或无法加载主类 Files\Java\jdk1.8.0_151\lib;C:\Program

解决:

runbroker.cmd

案例代码实现

参考:https://www.cnblogs.com/linjiqin/p/9561641.html

注意:这是老版本的代码,但是还是可以参考的

分布式事务(6)-分布式事务处理技术之RocketMQ相关推荐

  1. 分布式事务解决方案分布式事务原理

    分布式事务解决方案&分布式事务原理 0. 前言 1. 单数据源事务 & 多数据源事务 2. 常见分布式事务解决方案 2.0.什么是分布式事务 2.1. 分布式事务模型 2.2. 二将军 ...

  2. 分布式事务和分布式锁

    为什么要使用分布式事务和分布式锁? 我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务.分布式锁等. 事务的ACID 原子性(Atomicity):多条指令作为一个集体,要么都执行 ...

  3. 分布式事务及分布式框架Seata

    分布式事务 ==分布式事务是什么? ==>本地事务是一个单元的sql,分布式事务也是一个单元的sql,他们区别在于,分布式事务的sql分布在了不同服务上,这里的服务指微服务和数据库服务 ==?为 ...

  4. springcloud分布式事务_Springcloud 分布式事务集成Naco Seata

    前言:分布式系统架构中,最最费劲的是分布式事务,分布式事务解决方案网上大致分为两种 消息一致性 基于TCC分布式事务 不管基于那种解决方案,都是对侵入的代码植入,以大量的代码或者消息来作为代价,来实现 ...

  5. 分布式事务、分布式锁、分布式session

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | cnblogs.com/heqiyoujing ...

  6. 分布式之分布式事务、分布式锁、分布式Session

    点击上方 "程序员小乐"关注, 星标或置顶一起成长 每天凌晨00点00分, 第一时间与你相约 每日英文 It is our choices... that show what we ...

  7. 分布式事务:分布式事务原理概述

    1.什么是分布式事务 分布式事务就是指事务的资源分别位于不同的分布式系统的不同节点之上的事务: 2.分布式事务产生的原因 2.1.数据库分库分表 在单库单表场景下,当业务数据量达到单库单表的极限时,就 ...

  8. ole db 访问接口 sqlncli 无法启动分布式事务_分布式事务,看这篇就够了

    0. 前言 1. 单数据源事务 & 多数据源事务 2. 常见分布式事务解决方案 2.1. 分布式事务模型 2.2. 二将军问题和幂等性 2.3. 两阶段提交(2PC) & 三阶段提交( ...

  9. 【分布式事务】分布式事务

    一.什么是分布式事务 分布式事务就是指事务的参与者.支持事务的服务器.资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上.简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不 ...

最新文章

  1. Linux下DB2数据库安装教程
  2. Invoice校验差异处理
  3. AT89C52编程开发源代码
  4. ​css3属性选择器总结
  5. 单机安装oracle,单机安装oracle系统
  6. 主线程如何与次线程用全局变量通讯
  7. 热门NPM库 “coa” 和“rc” 接连遭劫持,影响全球的 React 管道
  8. struts2+spring+hibernte整合示例
  9. Iphone开发Interface Builder出现Assertion Failure错误
  10. 【图像分割】基于matlab贝叶斯图像分割【含Matlab源码 1677期】
  11. Unity移动端使用 Handheld.PlayFullScreenMovie播放视频参数
  12. css制作类似优惠券的卡片样式(vue)
  13. 软件测试测试常见分类有哪些?
  14. 123457123457#0#-----com.yuming.drawGame01--前拼后广--儿童画画游戏
  15. MPP(无主备)环境搭建
  16. 软件工程和计算机科学考公务员,软件工程可以考公务员吗
  17. 租房需要注意些什么?
  18. 清除IBM小型机橙色告警灯方法
  19. 真正的高手,都在刻意练习
  20. Fe原子辐照轰击多层石墨烯模拟代码

热门文章

  1. ultravnc 设置代理_云立方IP丨IP代理可以实现哪些功能?
  2. 滑铁卢大学计算机学什么,为什么来滑铁卢大学学习数学和计算机
  3. 【待更新】【UWB】UWB 学习、使用及 QCA平台移植实例
  4. 发射瞬时速度约束下的弹道导弹轨迹仿真算法
  5. 山东大学软件学院众智科学与网络化产业(网络、群体与市场)复习笔记
  6. 蚂蚁金融加入以色列区块链隐私解决方案公司A轮融资
  7. windows无法连接到打印机
  8. 2.阿里实人认证 .net 准备工作2 转换demo
  9. 记一次 关于Android studio 编译报错compileDebugJavaWithJavac FAILED
  10. [OHIF-Viewers]医疗数字阅片-医学影像-React/Redux 的好帮手Classnames