分布式事务

我们知道在单数据库系统中,实现数据的一致性,通过数据库的事务来处理比较简单。在微服务或分布式系统中,各个独立的服务都会有自己的数据库,而不是在同一个数据库中,所以当一组事务(如商品交易中,商品的库存、用户的账户资金和交易记录等)的处理是分布在不同数据库中的,分布式事务就是为了解决在多个数据库节点中保证这些数据的一致性。

分布式事务里有个BASE理论,在分布式数据库中,存在强一致性和弱一致性。

强一致性的好处是,对于开发者来说比较友好,数据始终可以读取到最新值,但这种方式需要复杂的协议,并且需要牺牲很多的性能。

弱一致性,对于开发者来说相对没有那么友好,无法保证读取的值是最新的,但是不需要引入复杂的协议,也不需要牺牲很多的性能。

弱一致性是当今企业采用的主流方案,它并不能保证所有数据的实时一致性,所以有时候实时读取数据是不可信的。它只是在正常的流程中,加入了提供修复数据的可能性,从而减少数据不一致的可能性,大大降低数据不一致的可能性。

什么时候使用分布式事务

对于像电商中用户隐私信息、商品信息、交易记录以及资金等数据,这些具备价值的核心数据,关系到用户隐私和财产的内容,应该考虑使用分布式事务来保证一致性。

但对于用户评价、自身装饰和其他一些非重要的个性化信息,可以采用非事务的处理。因为一个正常的系统出现不一致的情况是小概率事件,而非大概率事件,对于一些小概率的数据丢失,一般来说是允许的。之所以这样选择,主要基于两点,一个是开发者的开发难度;另一个是用户的体验,过多的分布式事务会造成性能的不断丢失

弱一致性分布式事务解决方案有如下几种:

状态表

RabbitMQ可靠事件

最大尝试

TCC模式

幂等性

在分布式事务中,各个访问操作的接口,都需要保证幂等性。

所谓幂等性,是指在HTTP协议中,一次和多次请求某一个资源,对于资源本身应该具有同样的结果,也就是其执行任意多次时,对资源本身所产生的影响,与执行一次时的相同。

实现方式有以下几种:

唯一索引 -- 防止新增脏数据

token机制 -- 防止页面重复提交

悲观锁 -- 获取数据的时候加锁(锁表或锁行)

乐观锁 -- 基于版本号version实现, 在更新数据那一刻校验数据

分布式锁 -- redis(jedis、redisson)或zookeeper实现

状态机 -- 状态变更, 更新数据时判断状态

※说明:如何实现接口的幂等性,可以分篇在接口的幂等性文章里解说。

状态表实现分布式事务

这里拿电商的商品交易为例,讲述下思路:

需要商品数据库:商品表、商品交易明细表;资金数据库:用户账户表、账户交易明细表

主要流程包括:

商品表减商品库存、

商品交易明细表中添加新的交易记录、

用户账户表中扣减用户账户表的资金、

资金交易明细表中记录账户交易明细表

需要准备一个状态表,用redis的Hset数据类型比较合适

这里假设相关的明细记录表中,有4个状态:

1--准备交易,

2--交易成功,

3--被冲正,

4--冲正记录

交易流程

流程说明

在商品服务中,商品减库存后,记录商品交易明细,如果没有异常,就将商品交易记录的状态位设置为“1—准备提交”,并且记录在Redis的状态表中。

商品服务通过RESTFUL调用资金服务,如果成功,就将账户交易明细表的记录的状态位设置为“1—准备提交”,并且记录在Redis的状态表中。

最后,读取Redis相关的所有状态位,确定是否所有的操作都为“1—准备提交”状态,如果是,则更新产品服务的记录状态为“2—提交成功”,然后发起资金服务调用,将对应的记录(可通过业务流水号关联)的状态也更新为“2—提交成功”,这样就完成了整个交易。

如果不全部为“1—准备提交”状态,则发起各库的冲正交易,冲掉原有的记录,并且归还商品库存和账户金额。发起冲正交易,把原明细记录状态更新为3--被冲正,并往明细表中添加对应的新记录,状态为4--冲正记录

RabbitMQ可靠事件

使用RabbitMQ等消息队列中间件的可靠事件,来实现分布式事务,这里结合SpringBoot

前面有介绍过SpringBoot整合多数据库的文章,这里可以用到,具体参考《Spring Boot学习:MyBatis配置Druid多数据源》,切换数据源使用@DataSource注解,如下

@DataSource(value = DataSourceType.MASTER) //切换到商品数据库

@DataSource(value = DataSourceType.SLAVE) //切换到账户数据库

在此基础上我们加入RabbitMQ实现分布式事务功能

在pom.xml文件中加入依赖

org.springframework.boot

spring-boot-starter-amqp

yml配置文件中,关于RabbitMQ的配置如下:

# Spring 配置

spring:

rabbitmq:

host: localhost

port: 5672

username: admin

password: 123456

#使用发布者确认模式,发布消息者会得到一个“消息是否被服务提供者接收”的确认消息

publisher-confirms: true

#RabbitMQ 队列名称配置

rabbitmq:

queue:

fund: fund

3.创建RabbitMQ配置文件RabbitConfig.java

package com.zhlab.demo.config;

import org.springframework.amqp.core.Queue;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @ClassName RabbitConfig

* @Description //RabbitMQ消息队列配置

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 上午 11:10

**/

@Configuration

public class RabbitConfig {

// 读取配置属性

@Value("${rabbitmq.queue.fund}")

private String fundQueueName = null;

// 创建RabbitMQ消息队列

@Bean(name="fundQueue")

public Queue createFundQueue() {

return new Queue(fundQueueName);

}

}

创建数据传输对象FundParams.java

package com.zhlab.demo.model;

import java.io.Serializable;

/**

* @ClassName FundParams

* @Description //FundParams

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 上午 11:30

**/

public class FundParams implements Serializable {

// 序列化版本号

public static final long serialVersionUID = 989878441231256478L;

private Long xid; // 业务流水号

private Long userId; // 用户编号

private Double amount; // 交易金额

public FundParams() {

}

public FundParams(Long xid, Long userId, Double amount) {

this.xid = xid;

this.userId = userId;

this.amount = amount;

}

public Long getXid() {

return xid;

}

public void setXid(Long xid) {

this.xid = xid;

}

public Long getUserId() {

return userId;

}

public void setUserId(Long userId) {

this.userId = userId;

}

public Double getAmount() {

return amount;

}

public void setAmount(Double amount) {

this.amount = amount;

}

}

创建商品服务 业务逻辑PurchaseService.java

package com.zhlab.demo.service.goods;

import com.zhlab.demo.db.DataSourceType;

import com.zhlab.demo.db.annotation.DataSource;

import com.zhlab.demo.model.FundParams;

import com.zhlab.demo.utils.SnowFlakeUtil;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Service;

/**

* @ClassName PurchaseService

* @Description //商品 业务逻辑

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 上午 11:24

**/

@Service

public class PurchaseService implements RabbitTemplate.ConfirmCallback {

//实现RabbitTemplate.ConfirmCallback接口

//需要实现它定义的confirm方法,这样它便可以作为一个发布者检测消息是否被消费者所接收的确认类

// SnowFlake算法生成ID

SnowFlakeUtil worker = new SnowFlakeUtil(003);

// RabbitMQ模板

@Autowired

private RabbitTemplate rabbitTemplate;

// 读取配置属性

@Value("${rabbitmq.queue.fund}")

private String fundQueueName;

// 购买业务方法

@DataSource(value = DataSourceType.MASTER) //切换到商品数据库

public Long purchase(Long productId, Long userId, Double amount) {

rabbitTemplate.setConfirmCallback(this);//设置了回调类为当前类

// SnowFlake算法生成序列号,用户跨服务的关联,这里用本地自定义方法,可以借助Leaf TinyID等分布式ID生成服务中间件

Long xid = worker.nextId();

// 传递给消费者的参数

FundParams params = new FundParams(xid, userId, amount);

// 发送消息给资金服务做扣款

this.rabbitTemplate.convertAndSend(fundQueueName, params); // ④

System.out.println("执行产品服务逻辑");

return xid;

}

/**

* 确认回调,会异步执行

* @param correlationData --相关数据

* @param ack -- 是否被消费

* @param cause -- 失败原因

*/

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

/*

* ack代表是否成功。

* 如果投递消息失败,就会先停滞1秒,然后尝试进行冲正交易,冲掉原有交易,这样就可以使得数据平整

*/

if (ack){ // 消息投递成功

System.out.println("执行交易成功");

} else { // 消息投递失败

try {

// 停滞1秒(稍微等待可能没有完成的正常流程),然后发起冲正交易

Thread.sleep(1000);

} catch (Exception ex) {

ex.printStackTrace();

}

System.out.println("尝试产品减库存冲正交易。");

System.out.println("尝试账户扣减冲正交易。");

//在confirm方法中,如果参数ack为false,则说明消息传递失败,就要尝试执行冲正交易,把数据还原回来

System.out.println(cause); // 打印消息投递失败的原因

}

}

}

创建账户服务业务逻辑AccountService.java

package com.zhlab.demo.service.fund;

import com.zhlab.demo.db.DataSourceType;

import com.zhlab.demo.db.annotation.DataSource;

import com.zhlab.demo.model.FundParams;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Service;

/**

* @ClassName AccountService

* @Description //账户 业务逻辑

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 上午 11:25

**/

@Service

public class AccountService {

/* 消息监听,取YAML文件配置的队列名

*因为消息被消费,所以触发PurchaseService类的confirm方法

*spring.rabbitmq.listener.simple.acknowledge-mode = manual

*如果配置为手动,这里就需要手动确认消息,默认为自动的

*自动确认:这种模式下,当发送者发送完消息之后,它会自动认为消费者已经成功接收到该条消息。

*这种方式效率较高,当时如果在发送过程中,如果网络中断或者连接断开,将会导致消息丢失

*手动确认:消费者成功消费完消息之后,会显式发回一个应答(ack信号),

*RabbitMQ只有成功接收到这个应答消息,才将消息从内存或磁盘中移除消息。

*这种方式效率较低点,但是能保证绝大部分的消息不会丢失,当然肯定还有一些小概率会发生消息丢失的情况

*主要方法:basicAck、basicNack、basicReject根据具体业务情况使用,配合redis做幂等检验

*/

@RabbitListener(queues = "${rabbitmq.queue.fund}")

@DataSource(value = DataSourceType.SLAVE) //切换到账户数据库

public void dealAccount(FundParams params) {

//TODO具体业务逻辑需自己实现

System.out.println("扣减账户金额逻辑......");

}

}

7.写个测试接口来测试一下,创建MqController.java

package com.zhlab.demo.controller;

import com.zhlab.demo.service.goods.PurchaseService;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

/**

* @ClassName MqController

* @Description //RabbitMQ可靠消息 接口测试

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 下午 2:25

**/

@RestController

@RequestMapping("/mq")

public class MqController {

@Autowired

private PurchaseService purchaseService;

@GetMapping("/test")

public String testMq() {

return purchaseService.purchase(1L, 1L, 200.0) + "";

}

}

以上就是基于RabbitMQ可靠消息 实现的分布式事务处理,逻辑和说明都在注释里了。

※说明:这样的确认方式,只是保证了事件的有效传递,但是不能保证消费类能够没有异常或者错误发生,当消费类有异常或错误发生时,数据依旧会存在不一致的情况。这样的方式,只是保证了消息传递的有效性,降低了不一致的可能性,从而大大降低了后续需要运维和业务人员处理的不一致数据的数量

TCC补偿事务

TCC代表的是

try(尝试)

confirm(确认)

cancel(取消)

在TCC事务中,要求任何一个服务逻辑都有3个接口,它们对应的就是尝试(try)方法、确认(confirm)方法和取消(cancel)方法。

TCC事务模型

TCC事务的一致性可达99.99%,是一种较为成熟的方案,因此在目前有着较为广泛的应用。

继续通过上面的商品交易流程来解析这个模型:

一阶段

商品表减库存,商品交易明细表记录商品交易明细,并且将对应记录状态设置为“1—准备提交”。

调用账户服务,用户账户表扣减账户资金,账户交易明细表记录交易明细,并且将对应记录状态设置为“1—准备提交”

在一阶段的调用中,如果没有发生异常,就可以执行正常二阶段进行提交了

正常二阶段

商品服务 更新对应记录的状态为“2—提交成功”,使得数据生效

调用账户服务,使得对应的记录状态也为“2—提交成功”,这样正常的提交就完成了

如果在一阶段发生异常,需要取消操作,可以执行异常二阶段

异常二阶段

商品服务执行冲正交易,冲掉原有的产品交易,将库存归还给商品表

调用账户服务,发起冲正交易,冲掉原有的资金交易,将资金归还到账户里

注意,这些提交和退出机制在TCC中,都需要开发者对接口作幂等性处理

TCC事务机制,也并不能保证所有的数据都是完全一致的,它只是提供了一个可以修复的机制,来降低不一致的情况,从而大大降低后续维护数据的代价。TCC事务也会带来两个较大的麻烦:第一个是,原本的一个方法实现,现在需要拆分为3个方法,代价较大;第二个是,需要开发者自已实现提交和取消方法的幂等性

总结

使用分布式事务,并不是很容易的事情,甚至有些方法还相当复杂。

在互联网中,并不是所有的数据都需要使用分布式事务,所以首先要考虑的是:在什么时候使用分布式事务。即使需要使用分布式事务,有时候也并非需要实时实现数据的一致性,因为可以在后续通过一定的手段来完成。例如电商网站,对买家来说,需要的是快速响应,但对商家来说,就未必需要得到实时数据了,过段时间得到数据也是可以的,而这段时间就可以考虑进行数据补偿了。无论我们如何使用分布式事务,也无法使数据完全达到百分之百的一致性,因此一般金融和电商企业会通过对账等形式来完成最终一致性的操作。

在分布式事务的选择中,都会采用弱一致性代替强一致性,相对来说,弱一致性更加灵活,更方便我们开发。从网站的角度来说,弱一致性可以获得更佳的性能,提升用户的体验,这是互联网应用需要首先考虑的要素。

拓展---电商中的高并发和分布式事务

电商网站中高并发是常见的,高并发是针对用户而言的,比如抢购中,用户只希望短时间内快速抢到商品,而商家对于交易信息可以延迟处理得到。

这就是意味着,对于用户交易部分,要尽可能通过分布式事务进行保证,但而对于商户数据部分,实时性要求相对不是那么高,可以过段时间通过后续手段来补偿修复,从而缩小分布式事务的范围。

确定需要分布式事务的范围

这里可以看出使用分布式事务的主要是请求数据,保证这个过程可以提高数据可靠性。对于商户数据,不需要使用分布式事务,这样可以提升性能,使抢购进行得更快,满足买家的需求,但是这也会引发数据的丢失。为了解决这个问题,后续可以通过和请求数据进行对比来修复数据,使数据达到一致,这个过程可以在高并发过后(一般高并发都是时间段性的,如性价比高的产品发布点、购物节开始时间段)进行,这样商户最终也可以得到可靠的数据,只是不是实时的,但是这并不影响商户和用户的业务。

rabbit和mysql事务_分布式事务原理及SpringBoot整合RabbitMQ实现可靠事件,TCC事务模型及接口幂等性...相关推荐

  1. RabbitMQ原理及SpringBoot整合RabbitMQ

    RabbitMQ原理及SpringBoot整合RabbitMQ 1. RabbitMQ环境搭建 参考:https://blog.csdn.net/u013071014/article/details/ ...

  2. ole db 访问接口 sqlncli 无法启动分布式事务_分布式事务,看这篇就够了

    0. 前言 1. 单数据源事务 & 多数据源事务 2. 常见分布式事务解决方案 2.1. 分布式事务模型 2.2. 二将军问题和幂等性 2.3. 两阶段提交(2PC) & 三阶段提交( ...

  3. .net MySQL事物_在ASP.NET 2.0中操作数据之六十一:在事务里对数据库修改进行封装...

    导言: 正如我们在第16章<概述插入.更新和删除数据>里探讨的那样,GridView控件内建的功能支持对每行数据的编辑和删除功能,你只需要稍稍动一下鼠标就可以创建丰富的数据修改界面而不用写 ...

  4. springboot 事务_原创002 | 搭上SpringBoot事务源码分析专车

    前言 如果这是你第二次看到师长,说明你在觊觎我的美色! 点赞+关注再看,养成习惯 没别的意思,就是需要你的窥屏^_^ 专车介绍 该趟专车是开往Spring Boot事务源码分析的专车 专车问题 为什么 ...

  5. springboot 事务_第六章:springboot开启声明式事务

    springboot中已经默认对jpa.jdbc.mybatis开启了事务,引入他们的依赖关系,事务就默认开启.所以springboot开启事务很简单,只需要一个注解@Transactional就可以 ...

  6. Kafka原理以及SpringBoot整合Kafka

    1.Kafka原理 1. brokers有多个broker组成,broker是指Kafka服务器(192.168.223.140就是其中的一个broker),上面三台Kafka服务器组成了Kafka集 ...

  7. mysql ef 分布式事务_分布式事务系列--分布式跨库查询解决方案 mysql federated引擎的使用...

    背景 在服务高度拆分,数据库不断细化切分的情况下,我们经常有连接多台数据库查询的需求,如果不断的把数据库连接的逻辑添加在代码中,那么这种耦合会越来越严重,这会给程序的拓展和维护带来很大的麻烦. mys ...

  8. rocketmq整合mysql事务_分布式事务(4)---RocketMQ实现分布式事务项目

    摘要: ,但是returnLocalTransactionState.COMMIT_MESSAG的时候*服务挂了,那么最终Brock还未收到消息的二次确定,还是个半消息,所以当重新启动的时候还是回调这 ...

  9. vb6 由于超出容量限制 不能创建新事务_分布式限流?你也能轻松玩转(没啥新技术)...

    点击蓝色「日拱一兵」关注,持续侦破 Java 技术案件 一.什么是限流?为什么要限流? 不知道大家有没有做过帝都的地铁,就是进地铁站都要排队的那种,为什么要这样摆长龙转圈圈?答案就是为了限流!因为一趟 ...

最新文章

  1. 大数据小项目之电视收视率企业项目04--完全分布式搭建
  2. 如何修改app服务器数据库连接,app调用服务器数据库连接
  3. 最快的ASP无组件上传类(4M只需10秒)0.96版
  4. 使用 screen 管理你的远程会话
  5. Spring IOC 容器源码分析 - 余下的初始化工作
  6. 用jQuery设置多个css样式
  7. linux 串口特别是接收
  8. 地理数据库 (geodatabase) 的架构
  9. 5岁儿童自学python编程-适合6-16岁孩子学习的Python编程
  10. maxdea如何计算指数_10分钟计算出指数温度,开始基金定投之旅~
  11. python调用大漠插件、检测么_python调用大漠插件教程05字库
  12. AutoCAD安装及激活
  13. 利用QRmaker制作二维码
  14. java copy-on-write_COW奶牛!Copy On Write机制了解一下
  15. 在ipad上播放flash大集合
  16. 记录一下申请邓白氏编码的完整流程
  17. ccf 考试时间_梳理丨2020年五大学科竞赛考试时间安排出炉!
  18. 2020国开c语言程序设计1075,中央电大秋季C语言程序设计期末试卷及答案代码1075,01(7页)-原创力文档...
  19. 音频文件的结构与规范——RIFF和WAVE音频文件格式
  20. 电脑修改用户(User)文件夹名称

热门文章

  1. 分组后统计总数_大数据时代看排球:排球技术统计能告诉你什么?
  2. C++纯虚函数与抽象类
  3. 如何登陆网页的back office_如何使用iPhone面容ID快速登陆应用或网页
  4. 关于web服务器性能书籍,图书商城系统的Web服务器性能优化研究与实现
  5. 【Python】pandas模块中更改Series的数据类型
  6. PL/SQL Developer的错误提示弹框的文本显示乱码问题
  7. 实现路由器无线接收另一个路由器无线信号搭建网络
  8. 使用网络TCP搭建一个简单文件下载器
  9. 201771010109焦旭超《面向对象程序设计(java)》第十六周学习总结
  10. rocket-console控制台安装