1. 分布式事务

所谓事务,通俗一点讲就是一系列操作要么同时成功,要么同时失败。而分布式事务就是这一系列的操作在不同的节点上,那要如何保证事务的ACID特性呢。

  • 原子性(atomicity)。一个事务是一个不可分割的工作单位,事务中包括的操作要么都成功,要么都失败。

  • 一致性(consistency)。事务必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的。

  • 隔离性(isolation)。一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对并发的其他事务是隔离的,并发执行的各个事务之间不能互相干扰。

  • 持久性(durability)。持久性也称永久性(permanence),指一个事务一旦提交或回滚,数据库会对数据持久化的保存。

2. 最终一致性方案

首先,什么叫一致性?一致性指系统中的所有数据备份,在同一时刻具有同样的值,所有节点访问同一份最新的数据副本。那么,什么又叫最终一致性呢。在此之前,先给大家介绍一下BASE理论。

  1. BA(Basically Available):基本可用。在分布式系统出现故障的时候,允许牺牲部分非核心功能的可用性,常用的手段是访问部分功能时进入降级页面,来保障核心业务的可用性。

  2. S(Soft state):软状态。允许系统中的数据存在中间状态,并且认为该状态是不影响系统的整体可用性的,即允许系统在不同节点上的数据备份短暂性的不一致。

  3. E(Eventually consistent):最终一致性。所谓最终一致性,就是数据不可能永久的处于软状态,在一定的时间期限内,所有节点的数据备份应当是一致的,即数据延时一段时间后达到一致性。至于这个时间期限,取决于各种因素,包括业务需求、网络延时、系统负载、存储选型,数据复制方案设计等因素。

3. 可靠消息

所谓的可靠消息,即发布端消息不丢失,可靠抵达队列,消费端可靠接收。以RabbitMQ为例,消息的投递消费过程如下:

1.发布端确认

  1. 如果使用标准的AMQP协议,保证消息不丢失的唯一方法就是使用事务,使通道具有事务性,对每一条消息的发布和提交都是事务性的。在这种情况下,事务是不必要的重量级,并将吞吐量降低了250倍,为了解决这个问题,就引入了确认机制。

    • confirmCallback确认模式

      开启发布者确认

      spring:rabbitmq: publisher-confirm-type: correlated
      

      自定义RabbitTemplate

      @Autowired
      RabbitTemplate rabbitTemplate;@PostConstruct
      public void initRabbitTemplate(){rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 当前消息的唯一关联数据* @param b 消息是否成功收到* @param s 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("当前消息【"+correlationData+"】==》服务端是否收到:"+b+"==》失败的原因【"+s+"】");}});
      }
      
    • returnCallback未投递到队列退回模式

      开启消息抵达队列确认

      spring: rabbitmq: publisher-returns: true  # 开启发送端消息抵达队列的确认template:   #只有抵达队列,以异步发送优先回调returnCallbackmandatory: true
      

      设置消息抵达队列回调

      @PostConstruct
      public void initRabbitTemplate(){rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 只要消息没有投递给指定的队列,就触发这个失败回调* @param message 投递失败的消息详细信息* @param i 回复状态码* @param s 回复的文本内容* @param s1 当时这个消息发送给哪个交换机* @param s2 当时这个消息用哪个路由键*/@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {System.out.println("失败信息【"+message+"】==》状态码【"+i+"】==》文本内容【"+s+"】==》交换机【"+s1+"】==》路由键【"+s2+"】");}});
      }
      

2. 消费端确认

ack机制

默认是自动确认的,只要消息收到,客户端会自动确认,服务端就会移除这个问题

  1. 问题:假如收到很多消息,自动回复给服务器ack,如果一个消息处理成功,宕机了。发生消息丢失

  2. 消费者手动确认模式:只要没有明确告诉MQ,消息被接收,没有Ack,消息就一直是unacked状态,即使服务器宕机,消息也不会丢失,会重新变为Ready状态。

    1. 开启手动确认模式

      spring: raabbitmq: listener:simple:acknowledge-mode: manual
      
    2. 手动签收

      long deliveryTag = message.getMessageProperties().getDeliveryTag();
      try {// 签收 long deliveryTag, boolean mulitiple(是否批量模式)channel.basicAck(deliveryTag,false);
      } catch (IOException e) {e.printStackTrace();
      }// 拒签 long deliveryTag, boolean mulitiple(是否批量模式), boolean requeue(是否重新入队)
      channel.basicNack(deliveryTag,false,false);
      

4. 分布式事务案例

在电商背景下,以订单和库存系统之间的分布式事务为例,来介绍分布式事务基于消息队列的最终一致性方案。下单和扣减库存操作要么同时成功,要么同时失败,是事务的。如果只是本地事务的话,操作同一数据库,依赖数据库本身的事务特性,就可以完成。但是,对于分布式系统而言,订单系统和库存系统是操作不同的数据库的,那要如何实现这样的分布式事务。

1. 业务流程及问题

  1. 一般的业务流程是:下单成功后,远程调用库存服务,扣减库存。伪代码如下

    public void saveOrder(){// 创建订单Order order = createOrder();// 保存订单orderService.save(order);// 远程调用库存服务wareFeignService.sub(order);
    }
    
  2. 订单系统和库存系统本地是满足事务的。即订单服务发生异常,订单回滚;库存服务发生异常,库存是会回滚的。

  3. 由于订单服务是以内嵌的方式远程调用库存服务的,也就是说,库存服务发生异常,订单服务感知到远程调用异常,从而订单会回滚的,对于业务来说,这是没有问题的。

  4. 如果订单服务在远程调用库存服务之前发生异常,订单会回滚,并且也不会调用库存服务来扣减库存,这也是没有问题的。

  5. 如果订单服务在远程调用库存服务之后,并且远程扣减库存操作成功后,发生异常,则订单会回滚,但是远程库存服务是无法回滚的。这就导致了数据的不一致性。

2. 基于消息队列的解决方案分析

我们使用RabbitMQ来实现分布式事务的最终一致性。

1. 业务分析

  1. 由于用户下单和支付并不是同时进行,一般都是下单成功后,30min内可以支付。那我们来思考这样一个问题,如果我们在下单成功就扣减库存的话,会不会有什么问题。

    • 恶意刷单。下单后不支付,导致其他人无法下单。
  2. 那如果支付成功后再扣减库存呢?

    • 在支付订单时,会出现库存不足,支付失败。
  3. 综合这两种情况考虑,我们在下单成功后先锁定库存,支付成功再去扣减库存,如果超时未支付,则解锁库存。

2. 业务流程

  1. 下单成功,锁定库存

  2. 订单支付超时,则需要自动关闭订单。

  3. 订单关闭,库存需要解锁。

3. 定时任务

如何确保下单后,30min内保留订单。最先想到的方法应该时定时任务。

  1. 定时任务使用的是系统时间,我们无法为每一个订单都生成一定时任务。

  2. 我想大家应该都发现了使用定时任务会带来的问题,那就是每一个订单的保留时间并不是一致的30min,订单保留的时间区间为(0min,60min)。即在定时任务即将到来前完成下单,和定时任务刚结束完成下单。

所以使用定时任务来完成这个操作是不可行。

4. RabbitMQ延时队列

利用消息的存活时间和死信来完成延时任务。

1. 消息的存活时间(TTL:Time To Live)

  1. RabbitMQ可以对队列和消息分别设置TTL。

  2. 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。

  3. 如果队列和消息都设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息的死亡时间有可能不一样(不同的队列设置)。

  4. 单个消息的TTL,才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。

2. 死信交换机(DLX:Dead Letter Exchanges)

  1. 一个消息如果满足如下条件,就会进入死信路由(不是队列,一个路由可以对应多个队列)

    • 一个消息被消费者拒收,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

    • 消息的TTL到了,消息过期了。

    • 队列长度限制满了,排在前面的消息会被丢弃或者扔到死信路由上。

  2. 在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去

  3. 先控制消息在一段时间后变成死信,然后控制变成死信的消息被路由到某个指定的交换机。二者结合就可以实现一个延时队列。

在下单成功后,发送一条消息到死信队列,经过一段时间(TTL),死信路由到订单释放队列,订单服务监听到消息释放放订单。

5. 分布式解决方案

1. 对以下情况,库存需要解锁。

  1. 首先之前提到的,订单服务在远程调用成功后,发生异常,导致订单回滚,库存也需要解锁。

  2. 订单延时取消,或者主动取消订单,都需要解锁库存。

  3. 订单服务下单成功后,订单服务宕机,超过订单支付时间,仍然无法恢复,导致无法发送消息通知库存服务解锁库存,故需要自动解锁库存。

2. 解决方案

针对以上情形,库存解锁的方案。

  1. 第一种情况,可以利用库存自动解锁来解决。库存锁定时发送消息到延时队列,经过TTL后,成为死信路由到库存解锁队列,库存服务监听到消息后,解锁库存。

    • 不能在订单未支付时就解锁库存,所以库存自动解锁的延迟时间应该大于订单延时取消的时间。
  2. 对于第二种情况,主动取消或者延时取消,都可以通过库存的自动解锁来完成库存的解锁。

  3. 自动解锁时,需要判断订单的状态,只有为取消状态的订单才可以解锁库存。但是这样仍然会存在问题。

    • 订单服务卡顿,导致订单状态消息一直改不了,而库存消息先到期,查询订单状态为新建状态,不解锁库存,并删除消息,导致库永远无法解锁。

    • 解决:订单超时取消的同时,发送订单取消的消息到队列,库存服务监听该消息,则解锁库存。

    • 为了防止重复解锁,需要满足幂等性。

3. 代码实现

使用消息队列实现分布式事务的最终一致性方案的流程图如下:

1. 订单服务

  1. 创建交换机、队列和绑定关系
@Configuration
public class OrderRabbitMQConfig {/*** 使用JSON序列化机制,进行消息转换* @return*/@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}@Beanpublic Exchange orderEventExchange(){return new TopicExchange("order-event-exchange",true,false);}@Beanpublic Queue orderReleaseOrderQueue(){return new Queue("order.release.order.queue",true,false,false);}@Beanpublic Queue orderDelayQueue(){HashMap<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange","order-event-exchange");args.put("x-dead-letter-routing-key","order.release.order");args.put("x-message-ttl",60000);return new Queue("order.delay.queue",true,false,false,args);}@Beanpublic Binding stockReleaseBinding(){return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}@Beanpublic Binding stockLockedBinding(){return new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}/*** 订单释放直接和库存释放进行绑定*/@Beanpublic Binding orderReleaseOtherBinding(){return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.other.#",null);}}
  1. 订单创建伪代码
@Transactional
public Order createOrder(){// 创建订单Order order = createOrder();// 远程调用库存服务,锁定库存R r = wareFeignService.orderLockStock(wareSkuLockVo);if (r.getCode() == 0) {// 锁定成功,发送订单创建消息到延时队列rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order);} else {// 远程调用失败,抛出异常throw new Exception();}reture order;
}
  1. 订单服务监听订单释放信息,关闭订单
@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {@AutowiredOmsOrderService orderService;@RabbitHandlerpublic void listener(OmsOrderEntity orderEntity, Channel channel, Message message) throws IOException {System.out.println("收到过期订单,准备关单:"+orderEntity.getBizOrderId());try{orderService.closeOrder(orderEntity.getBizOrderId());channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch (Exception e){channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}
}
  1. 关闭订单,发送解锁库存消息。伪代码如下
public void closeOrder(String bizOrderId) {// 查询当前订单是否付款OmsOrderEntity order = getOne(new QueryWrapper<OmsOrderEntity>().eq("biz_order_id", bizOrderId));if (order != null) {// 判断订单状态,为新建状态才取消if (order.getOrderStatus() == OrderStatusConstant.CREATE.getCode()){//过期未支付,取消订单,设置订单状态为取消状态updateOrder.setOrderStatus(OrderStatusConstant.CANCEL.getCode());// 发送MQrabbitTemplate.convertAndSend("order-event-exchange","order.release.other",orderTo);}}
}

2. 库存服务

  1. 创建交换机、队列和绑定关系
@Configuration
public class MyRabbitConfig {/*** 使用JSON序列化机制,进行消息转换* @return*/@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}@Beanpublic Exchange stockEventExchange(){return new TopicExchange("stock-event-exchange",true,true);}@Beanpublic Queue stockReleaseStockQueue(){return new Queue("stock.release.stock.queue",true,false,false);}@Beanpublic Queue stockDelayQueue(){HashMap<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange","stock-event-exchange");args.put("x-dead-letter-routing-key","stock.release");args.put("x-message-ttl",120000);return new Queue("stock.delay.queue",true,false,false,args);}@Beanpublic Binding stockReleaseBinding(){return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);}@Beanpublic Binding stockLockedBinding(){return new Binding("stock.delay.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}
}
  1. 库存锁定,伪代码如下
@Transactional
public void orderLockStock() {// 保存工作单taskService.save(task);// 为每件商品锁定库存for (OrderItemVo orderItem : orderItems) {//判断库存是否足够Long count = skuWareDao.lockStock(orderItem);if (count == 1){// 锁定成功// 保存工作单详情taskDetailService.save(taskDetail);// 自动解锁,发送工作单详情,防止回滚后找不到数据// 锁定库存,发送消息到延时队列rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", taskTo);} else {// 锁定失败,抛出异常throw new Exception();}}
}
  1. 监听库存解锁信息
@Service
@RabbitListener(queues = "stock.release.stock.queue")
public class StockReleaseListener {@AutowiredWmsSkuWareService wareSkuService;/*** 库存自动解锁*只要解锁库存的消息的失败,需要告诉MQ解锁失败,消息不要删除,重新放回队列* @param taskTo* @param message**/@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTaskTo taskTo, Message message, Channel channel) throws IOException {System.out.println("收到解锁库存的消息");try{wareSkuService.unlockStock(taskTo);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch (Exception e){channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}@RabbitHandlerpublic void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {System.out.println("订单关闭,准备解锁库存");try{wareSkuService.unlockStock(orderTo);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch (Exception e){channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}
}
  1. 自动解锁库存,伪代码如下
@Transactional
public void unlockStock(StockLockedTaskTo taskTo) {// 判断是否存在该任务Detail detail = detailService.getById(taskTo.detailId)if (detail != null){// 查询订单状态,订单状态为【取消】,则解锁库存// 远程调用订单服务,获取订单状态R r = orderFeignService.getOrderStatus(bizOrderId);if (r.getCode() == 0) {// 远程查询成功OrderVo orderVo = r.getData(new TypeReference<OrderVo>() {});if (orderVo == null || orderVo.getOrderStatus() == OrderStatusConstant.CANCEL.getCode()) {// 订单不存在,或者订单已取消,都需要解锁库存if (detail.getLockStatus() == WareTaskStatusConstant.LOCKED.getCode()){//锁定状态下才需要解锁,已解锁的不用在解锁unlockStock(detail.getId(),detail.getSkuId(),detail.getSkuNum());// 更新taskDetail状态为已解锁taskDetailService.updateById(taskDetailEntity);}}} else {throw new RuntimeException("远程调用订单服务失败");}}
}
  1. 订单取消,解锁库存,伪代码如下
@Transactional
public void unlockStock(OrderTo orderTo) {// 无需查询订单最新状态,能来到这,肯定更新了订单状态的// 判断任务是否存在WmsOrderTaskEntity task = taskService.getOne(new QueryWrapper<WmsOrderTaskEntity>().eq("biz_order_id", bizOrderId));if (task != null){// 任务存在,获取任务项状态为锁定状态的所有任务项List<WmsOrderTaskDetailEntity> taskDetails = taskDetailService.list(new QueryWrapper<WmsOrderTaskDetailEntity>().eq("task_id", task.getId()).eq("lock_status", WareTaskStatusConstant.LOCKED.getCode()));if (taskDetails != null && taskDetails.size() > 0){for (WmsOrderTaskDetailEntity taskDetail : taskDetails) {// 解锁库存unlockStock(taskDetail.getId(),taskDetail.getSkuId(),taskDetail.getSkuNum());// 更新taskDetail状态为已解锁taskDetailService.updateById(taskDetailEntity);}}}
}

使用RabbitMQ实现分布式事务的最终一致性方案的大致流程解析完毕。如果对源码感兴趣的,欢迎到github仓库clone。

基于RabbitMQ的分布式事务最终一致性解决方案相关推荐

  1. 一致 先验分布 后验分布_「分布式技术」分布式事务最终一致性解决方案,下篇...

    各位志同道合的朋友们大家好,我是一个一直在一线互联网踩坑十余年的编码爱好者,现在将我们的各种经验以及架构实战分享出来,如果大家喜欢,就关注我,一起将技术学深学透,我会每一篇分享结束都会预告下一专题 上 ...

  2. 6种分布式事务最终一致性解决方案,一次性说清了!

    分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,尤其在微服务架构中,几乎可以说是无法避免,因此也常常被认为是微服务落地的最大阻碍. 随着系统的服务拓扑从单体应用迈 ...

  3. rocketmq分布式事务最终一致性解决方案

    背景 分布式系统中,我们时常会遇到分布式事务的问题,如更新订单然后发送短信提醒,但是这两个操作需要操作不同的数据库,那么此时数据库的事务就不能处理好了 传统方式存在的问题: 1.先发送消息,再执行数据 ...

  4. rabbitmq 查询版本_基于rabbitmq解决分布式事务

    分布式事务要解决的问题是保证二个数据库数据的一致性,本地事务ACID属于刚性事务,基于CAP理论,分布式事务的核心要点柔性事务,最终一致性. 基于rabbitmq解决分布式事务要点如下 生产者采用发送 ...

  5. 分布式事务最终一致性常用方案

    目前的应用系统,不管是企业级应用还是互联网应用,最终数据的一致性是每个应用系统都要面临的问题,随着分布式的逐渐普及,数据一致性更加艰难,但是也很难有银弹的解决方案,也并不是引入特定的中间件或者特定的开 ...

  6. RabbitMQ消息可靠性投递及分布式事务最终一致性实现

    RabbitMQ消息可靠性投递就是保证消息生产者能够将消息百分百投递到RabbitMQ服务器,并在传递过程中不丢失.然而在生产环境中由于网络中断.网络不稳定等原因导致消息在投递过程中丢失,这或许会造成 ...

  7. 阿里 P8 聊分布式事务最终一致性的 6 种解决方案

    分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,尤其在微服务架构中,几乎可以说是无法避免,因此也常常被认为是微服务落地的最大阻碍. 随着系统的服务拓扑从单体应用迈 ...

  8. 分布式事务最终一致性-CAP框架轻松搞定

    前言 对于分布式事务,常用的解决方案根据一致性的程度可以进行如下划分: 强一致性(2PC.3PC):数据库层面的实现,通过锁定资源,牺牲可用性,保证数据的强一致性,效率相对比较低. 弱一致性(TCC) ...

  9. 分布式事务最终一致性mysql_分布式事务最终一致性方案案例

    前言: 以下以网上课程购买流程举一个例子: 如何实现两个分布式服务(订单服务.学习服务)共同完成一件事即订单支付成功自动添加学生选课的需求, 这里的关键是如何保证两个分布式服务的事务的一致性. 订单支 ...

最新文章

  1. int(1) 和 int(10) 有什么区别?资深开发竟然分不清!
  2. 2030全球新出行产业报告:2.2万亿美元蛋糕将这样分
  3. Android组件化打造知乎日报系列(一)—— 项目架构搭建
  4. 第7课第2节_Binder系统_c++实现_编译测试
  5. Caffe学习笔记3——制作并训练自己的数据集
  6. php 代码修改后 重新实例化_从匿名函数到PHP设计模式之容器模式
  7. [机器学习-原理篇]支持向量机(SVM)深入理解
  8. 阿拉伯数字转中文大(小)写的函数
  9. underscore api
  10. LightweightCTI开发实录(5)板卡适配器概述
  11. ASP.NET MVC实践系列11-FCKEditor和CKEditor的使用
  12. 从vim转向Emacs _ emacser.com文章收集
  13. 斯坦福机器学习教程学习笔记之1
  14. python小游戏开题报告范文_课题开题报告范文
  15. 干货分享:app运营推广超实用计划书
  16. Vmware1个服务器2个桌面,VMware设置虚拟机,并配置远程连接桌面
  17. 双轮载人平衡车设计完整教程之调校测试篇
  18. 高德地图POI分类查询
  19. 瑞萨单片机CS+ for CC 与Renesas Flash Programme软件的使用-学习记录
  20. vscode 格式化 json文件

热门文章

  1. 双生视界服务器维护,《双生视界》7月30日7点停服维护公告
  2. mysql怎么自定义一个函数调用_MySQL自定义函数编写
  3. unreal engine实现vive手柄控制
  4. [新]Linux部署DM DEM
  5. 零售电子货架标签解决方案
  6. PostgreSQL中的分区表
  7. mysql1032错误
  8. 为人处世,最重要的是什么?
  9. 实验四 MCS-51内部定时/计数器实验(2)
  10. Python 三元表达式(条件表达式)