摘要:

,但是returnLocalTransactionState.COMMIT_MESSAG的时候*服务挂了,那么最终Brock还未收到消息的二次确定,还是个半消息,所以当重新启动的时候还是回调这个回调接口。*如果不先查询上面本地事务的执行情况直接在执行本地事务,那么就相当于成功执行了两次本地事务了。*///TODO2、这里返回要么commit要么rollback。没有必要在返回UNKNOWretur

RocketMQ实现分布式事务

有关RocketMQ实现分布式事务前面写了一篇博客

地事务执行会有三种可能*1、commit成功*2、Rollback失败*3、网络等原因服务宕机收不到返回结果*/log.info("本地事务执行参数,用户id={},商品ID={},销售库存

下面就这个项目做个整体简单介绍,并在文字最下方附上项目Github地址。

优先,缓存作为回退方案模式的。我们之前使用这个模式给用户体验还是挺不错的,首先先请求网络,当网络断开的时候,我们从缓存里面拿到数据。这样就不会使页面异常或空白。但是上面我们已经了解到了缓存了,我们可以

一、项目概述

1、技术架构

项目总体技术选型

onse){returnresponse||fetch(event.request);});}))}});self.addEventListener("activate",function(e){e.

SpringCloud(Finchley.RELEASE) + SpringBoot2.0.4 + Maven3.5.4 + RocketMQ4.3 +MySQL + lombok(插件)

有关SpringCloud主要用到以下四个组建

rderService;}@OverridepublicLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){log.i

Eureka Server +config-server(配置中心)+ Eureka Client + Feign(服务间调用)

配置中心是用MySQL存储数据。

用websocket技术,而不是http请求,我这边是假如)。只有当网络请求失败的时候,我们再从缓存里面去读取数据。但是对于股票K线图中的一些图标展示这样的,因为这些图标是一般不会变的,所以我们更倾向

2、项目整体结构

config-service # 配置中心

eureka # 注册中心

service-order #订单微服务

service-produce #商品微服务

各服务的启动顺序就安装上面的顺序启动。

put(event.request,networkResponse.clone());returnnetworkResponse;});returncachedResponse||fetchPromi

大致流程

ndex.html").then(function(cachedResponse){varfetchPromise=fetch("/index.html").then(function(network

启动后,配置中心、订单微服务、商品微服务都会将信息注册到注册中心。

s.producer.start();}catch(MQClientExceptione){e.printStackTrace();}}/***一般在应用上下文,使用上下文监听器,进行关闭*/publ

如果访问:localhost:7001(注册中心地址),以上服务都出现说明启动成功。

容渲染到页面上去。但是随着技术在不断的改变,现在很多业务逻辑也放在前端,前后端分离,前端是做模板渲染工作,后端只做业务逻辑开发,只提供数据接口。但是我们的web前端开发在数据层这方面来讲还是依赖于服务

3、分布式服务流程

用户在订单微服务下单后,会去回调商品微服务去减库存。这个过程需要事务的一致性。

dEventListener("install",function(event){event.waitUntil(caches.open(CACHE_NAME).then(function(cache

4、测试流程

页面输入:

tln("consumerstart...");}}至于完整的项目地址见GitHub。GitHubd地址https://github.com/yudiandemingzi/spri

http://localhost:9001/api/v1/order/save?userId=1&productId=1&total=4

订单微服务执行情况(订单服务事务执行成功)

h(caches.open("cache-name").then(function(cache){returnfetch(event.request).then(function(networkRes

商品微服务执行情况(商品服务事务执行成功)

ll;/***官方建议自定义线程给线程取自定义名称发现问题更好排查*/privateExecutorServiceexecutorService=newThreadPoolExecutor(2,5,1

当然你也可以通过修改参数来模拟分布式事务出现的各种情况。

配失败了,都会进入then回调函数,如果匹配到了,说明缓存里面有对应的数据,那么直接从缓存里面返回,如果缓存里面cachedResponse值为undefined,没有的话,那么就重新使用fetch请

ewDefaultMQPushConsumer(consumerGroup);//添加服务器地址consumer.setNamesrvAddr(jms.getNameServer());//添加订阅号

策略现在我们来更新下我们的sw.js文件,该文件来缓存我们index.html,及在index.html使用到的所有静态资源文件。index.html代码改成如下:g

二、MQ中生产者核心代码

这里展示下,生产者发送消息核心代码。

务都出现说明启动成功。3、分布式服务流程用户在订单微服务下单后,会去回调商品微服务去减库存。这个过程需要事务的一致性。4、测试流程页面输入:http://localhost:9001/api/v1/o

@Slf4j

@Component

public class TransactionProducer {

/**

* 需要自定义事务监听器 用于 事务的二次确认 和 事务回查

*/

private TransactionListener transactionListener ;

/**

* 这里的生产者和之前的不一样

*/

private TransactionMQProducer producer = null;

/**

* 官方建议自定义线程 给线程取自定义名称 发现问题更好排查

*/

private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,

new ArrayBlockingQueue(2000), new ThreadFactory() {

@Override

public Thread newThread(Runnable r) {

Thread thread = new Thread(r);

thread.setName("client-transaction-msg-check-thread");

return thread;

}

});

public TransactionProducer(@Autowired Jms jms, @Autowired ProduceOrderService produceOrderService) {

transactionListener = new TransactionListenerImpl(produceOrderService);

// 初始化 事务生产者

producer = new TransactionMQProducer(jms.getOrderTopic());

// 添加服务器地址

producer.setNamesrvAddr(jms.getNameServer());

// 添加事务监听器

producer.setTransactionListener(transactionListener);

// 添加自定义线程池

producer.setExecutorService(executorService);

start();

}

public TransactionMQProducer getProducer() {

return this.producer;

}

/**

* 对象在使用之前必须要调用一次,只能初始化一次

*/

public void start() {

try {

this.producer.start();

} catch (MQClientException e) {

e.printStackTrace();

}

}

/**

* 一般在应用上下文,使用上下文监听器,进行关闭

*/

public void shutdown() {

this.producer.shutdown();

}

}

/**

* @author xub

* @Description: 自定义事务监听器

* @date 2019/7/15 下午12:20

*/

@Slf4j

class TransactionListenerImpl implements TransactionListener {

@Autowired

private ProduceOrderService produceOrderService ;

public TransactionListenerImpl( ProduceOrderService produceOrderService) {

this.produceOrderService = produceOrderService;

}

@Override

public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

log.info("=========本地事务开始执行=============");

String message = new String(msg.getBody());

JSONObject jsonObject = JSONObject.parseObject(message);

Integer productId = jsonObject.getInteger("productId");

Integer total = jsonObject.getInteger("total");

int userId = Integer.parseInt(arg.toString());

//模拟执行本地事务begin=======

/**

* 本地事务执行会有三种可能

* 1、commit 成功

* 2、Rollback 失败

* 3、网络等原因服务宕机收不到返回结果

*/

log.info("本地事务执行参数,用户id={},商品ID={},销售库存={}",userId,productId,total);

int result = produceOrderService.save(userId, productId, total);

//模拟执行本地事务end========

//TODO 实际开发下面不需要我们手动返回,而是根据本地事务执行结果自动返回

//1、二次确认消息,然后消费者可以消费

if (result == 0) {

return LocalTransactionState.COMMIT_MESSAGE;

}

//2、回滚消息,Broker端会删除半消息

if (result == 1) {

return LocalTransactionState.ROLLBACK_MESSAGE;

}

//3、Broker端会进行回查消息

if (result == 2) {

return LocalTransactionState.UNKNOW;

}

return LocalTransactionState.COMMIT_MESSAGE;

}

/**

* 只有上面接口返回 LocalTransactionState.UNKNOW 才会调用查接口被调用

*

* @param msg 消息

* @return

*/

@Override

public LocalTransactionState checkLocalTransaction(MessageExt msg) {

log.info("==========回查接口=========");

String key = msg.getKeys();

//TODO 1、必须根据key先去检查本地事务消息是否完成。

/**

* 因为有种情况就是:上面本地事务执行成功了,但是return LocalTransactionState.COMMIT_MESSAG的时候

* 服务挂了,那么最终 Brock还未收到消息的二次确定,还是个半消息 ,所以当重新启动的时候还是回调这个回调接口。

* 如果不先查询上面本地事务的执行情况 直接在执行本地事务,那么就相当于成功执行了两次本地事务了。

*/

// TODO 2、这里返回要么commit 要么rollback。没有必要在返回 UNKNOW

return LocalTransactionState.COMMIT_MESSAGE;

}

}

ewDefaultMQPushConsumer(consumerGroup);//添加服务器地址consumer.setNamesrvAddr(jms.getNameServer());//添加订阅号

策略现在我们来更新下我们的sw.js文件,该文件来缓存我们index.html,及在index.html使用到的所有静态资源文件。index.html代码改成如下:g

三、MQ消费端核心代码

这里展示下,消费端消费消息核心代码。消费端和普通消费一样。

优先,网络作为回退方案,第一次请求完成后,我们把请求的数据缓存起来,下次再次执行的时候,我们先从缓存里面读取。因此代码如下:self.addEventListener("fetch",function

@Slf4j

@Component

public class OrderConsumer {

private DefaultMQPushConsumer consumer;

private String consumerGroup = "produce_consumer_group";

public OrderConsumer(@Autowired Jms jms,@Autowired ProduceService produceService) throws MQClientException {

//设置消费组

consumer = new DefaultMQPushConsumer(consumerGroup);

// 添加服务器地址

consumer.setNamesrvAddr(jms.getNameServer());

// 添加订阅号

consumer.subscribe(jms.getOrderTopic(), "*");

// 监听消息

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {

MessageExt msg = msgs.get(0);

String message = new String(msgs.get(0).getBody());

JSONObject jsonObject = JSONObject.parseObject(message);

Integer productId = jsonObject.getInteger("productId");

Integer total = jsonObject.getInteger("total");

String key = msg.getKeys();

log.info("消费端消费消息,商品ID={},销售数量={}",productId,total);

try {

produceService.updateStore(productId, total, key);

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

} catch (Exception e) {

log.info("消费失败,进行重试,重试到一定次数 那么将该条记录记录到数据库中,进行如果处理");

e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

});

consumer.start();

System.out.println("consumer start ...");

}

}

至于完整的项目地址见GitHub。

们使用"网络优先,缓存作为回退方案"模式来做的话,这样确实可以通过请求来显示最新的页面,但是这样做也有缺点,比如我们的index.html页面没有改过任何东西的话,也要从网络上请求,而不是从缓存里面读

面本地事务的执行情况直接在执行本地事务,那么就相当于成功执行了两次本地事务了。*///TODO2、这里返回要么commit要么rollback。没有必要在返回UNKNOWreturnLocalTran

晚安!

ncaches.match(event.request);}))});5.网络优先,缓存作为回退方案,通用回退该模式是先请求网络,如果网络失败的话,则从缓存里面读取,如果缓存里面读取失败的话,我们提供

只要自己变优秀了,其他的事情才会跟着好起来(上将8)

rocketmq整合mysql事务_分布式事务(4)---RocketMQ实现分布式事务项目相关推荐

  1. sql server 事务_如何使用显式SQL Server事务回滚

    sql server 事务 In this article, we will explore the process of rollback an explicit SQL Server transa ...

  2. springboot mysql事物_在Spring Boot中使用数据库事务

    关于数据库访问还有一个核心操作那就是事务的处理了,前面两篇博客小伙伴们已经见识到Spring Boot带给我们的巨大便利了,其实不用猜,我们也知道Spring Boot在数据库事务处理问题上也给我们带 ...

  3. lua mysql 事务_为什么在 Redis 实现 Lua 脚本事务?-阿里云开发者社区

    在刚过去的几个月中,我一直在构思并尝试在 redis 中实现 lua 脚本的事务功能.没有多少人理解我的想法,所以我将通过一些历史为大家做下解释. MySQL 与 Postgres 在 1998-20 ...

  4. rocketmq 组监听_最全的RocketMQ学习指南,程序员必备的中间件技能

    一.简介 RocketMq是阿里开发出来的一个消息中间件,后捐献给Apache.官网上是这样介绍的: Apache RocketMQ™ is a unified messaging engine, l ...

  5. rabbit和mysql事务_分布式事务原理及SpringBoot整合RabbitMQ实现可靠事件,TCC事务模型及接口幂等性...

    分布式事务 我们知道在单数据库系统中,实现数据的一致性,通过数据库的事务来处理比较简单.在微服务或分布式系统中,各个独立的服务都会有自己的数据库,而不是在同一个数据库中,所以当一组事务(如商品交易中, ...

  6. mysql xa 和普通事务_一文看懂MySQL中基于XA实现的分布式事务

    概述 前面已经介绍了2PC和3PC方面的内容,那么MySQL数据库在分布式事务这块又是怎么规划呢? XA事务简介 XA 事务的基础是两阶段提交协议.需要有一个事务协调者来保证所有的事务参与者都完成了准 ...

  7. mysql三阶段提交实现_基于两阶段提交的分布式事务实现(UP-2PC)

    引言:分布式事务是分布式数据库的基础性功能,在2017年上海MySQL嘉年华(IMG)和中国数据库大会(DTCC2018)中作者都对银联UPSQL Proxy的分布式事务做了简要介绍,受限于交流形式难 ...

  8. Redis 学习笔记-NoSQL数据库 常用五大数据类型 Redis配置文件介绍 Redis的发布和订阅 Redis_事务_锁机制_秒杀 Redis应用问题解决 分布式锁

    1.NoSQL数据库 1.1 NoSQL数据库概述 NoSQL(NosQL = Not Only sQL ),意即"不仅仅是sQL",泛指非关系型的数据库.NoSQL不依赖业务逻辑 ...

  9. ole db 访问接口 sqlncli 无法启动分布式事务_阿里终面:分布式事务原理

    本文提纲如下 前言 单数据源事务 & 多数据源事务 常见分布式事务解决方案 2.1. 分布式事务模型 2.2. 二将军问题和幂等性 2.3. 两阶段提交(2PC) & 三阶段提交(3P ...

最新文章

  1. SDUT 母牛的故事
  2. HDOJ 1509 Windows Message Queue
  3. MyEclipse开发教程:使用REST Web Services管理JPA实体(四)
  4. [Android]你不知道的Android进程化--进程信息
  5. SAP中国研究院再次荣获年度中国最佳雇主,国际友人纷纷发来贺电
  6. wordpress程序安装php多少,2020最新WordPress网站程序详细安装教程
  7. .NET 6 Preview 4 已发布,ASP.NET Core 更新内容
  8. Mysql8.0可以使用解压版 这个比较快 好像现在都是解压版了
  9. 微信JS-SDK实现分享功能
  10. 【TypeScript系列教程11】函数的使用
  11. 告别卷积神经网络CNN?计算机视觉也能用上 Transformer 了
  12. Java NIO网络编程之群聊系统
  13. 《Web Hacking 101》中的链接整理
  14. 51nod--1212 最小生成树
  15. Java序列化 3 连问,这太难了吧!
  16. maven学习系列——(七)Dependency
  17. 液压系统管路流速推荐表_液压系统管道选择标准
  18. postman 415错误
  19. photoShop支持retina显示屏
  20. 基于新浪云服务器的微信公众号

热门文章

  1. 【SQL】分析函数功能-排序
  2. Nginx反向代理导致PHP获取不到正确的HTTP_HOST,SERVER_NAME,客户端IP的解决方法
  3. DIOCP开源项目-高效稳定的服务端解决方案(DIOCP + 无锁队列 + ZeroMQ + QWorkers) 出炉了
  4. Win10微软帐户切换不回Administrator本地帐户的解决方法【亲测】
  5. 彻底解决_OBJC_CLASS_$_某文件名“, referenced from:问题(转)
  6. 关于Ubuntu10.04中使用 apt-get install 安装软件总是出现“E: Package *** has no installation candidate” 错误的解决方案
  7. Xcode 5中缺少Provisioning Profiles菜单项
  8. REST和RESTful有什么区别
  9. 如何确定C语言中数组的大小?
  10. win11还原点如何设置 windows11还原点的设置方法