分布式事务的最终解决

通过MQ的延时消息来解决订单/库存服务直接调用的事务问题

一、如何解决下单系统这个高并发里边的分布式事务呢?

首先,我们肯定不会用 2PC 模式、 TCC-事务补偿性方案,我们也不考虑

最终我们选择可靠消息+最终一致性这种方式

为了保证高并发,订单这一块还是自己回滚,

因为性能不佳,我们弃用seata的方案


1、库存服务自己怎么回滚?

  • 第一种

通过MQ消息队列,我们在提交订单那里,当捕捉到异常要回滚的时候,给库存服务发一个消息,让库存服务自己把库存解锁

这样不需要让库存事务回滚,只需要给它发一个消息,不会损失什么性能

  • 第二种

通过MQ延迟消息

库存服务本身也可以使用自动解锁模式。

怎么自动解锁呢?

需要使用消息队列来完成。

如果你想让我这的哪些库存解锁,首先你需要给我发一个消息告诉我。

然后我们专门的库存解锁服务,去来订阅我们stock.release.stock.queue这个队列里的消息。

那你给我发消息的时候,比如:用路由键stock.release,我知道要库存解锁,

然后,你的消息发给我们这个交换机stock-event-exchange

交换机把这个消息路由给stock.release.stock.queue这个队列。

然后,这个队列stock.release.stock.queue里边存的这些消息都是库存要解锁的消息,我们的库存解锁服务只要收到了,它就会在后台慢慢的解锁消息。

我们不用保证强一致,我们哪怕是二十分钟、三十分钟,乃至于一天以后把这个库存解锁了,最终一致了就行。

所以我们可以来使用消息队列来完成我们的这个最终一致性。


2、锁库存的增强版逻辑

我们想要锁库存的话,我们先来保存一个库存工作单和库存工作单详情

相当于只要我们想要锁库存,我们先给数据库里边保存记录,我要锁库存。

接下来我们就来进行锁,只要锁成功了,那一切ok。

如果锁失败了,数据库里边相当于没有这个锁库存记录。

因为锁失败呢,我们这个本身自己所失败会全部回滚。

但如果可能是这种失败,比如我们来到订单里边,我们库存其实自己锁成功了。但是我们订单下边的其他完了,然后库存要进行解锁。那怎么办呢?

我们可以使用定时任务


二、订单服务的完整消息队列

1、库存自动解锁

库存微服务,有一个它的库存交换机stock-event-exchange.

如果想要解锁库存,应该是这样的。

首先订单创建成功之后,库存锁定成功,然后发一个消息给交换机,

这个消息里面的内容有订单编号、仓库编号、哪个商品锁了几个库存,

这个交换机,绑定了两个队列,

一个是按照stock.release.#模糊匹配的路由键绑定的stock.release.stock.queue队列

一个是stock.delay.queue队列

第一次发的库存锁定成功的消息,先使用路由键叫stock.locked

交换机按照这个路由键,找到stock.delay.queue延时队列

延时队列50分钟以后,用stock.release这个路由键,将死信交给库存交换机stock-event-exchange

交换机收到以后,按照这个路由键查找,发现stock.release.#这个模糊匹配的路由键跟它是一样的,然后被交换机路由到我们这个stock.release.stock.queue队列。

接下来的解锁库存服务,专门来处理stock.release.stock.queue里的消息。

  • 最终实现

  • com.achang.achangmall.order.conf.MQConfig

通过配置类来创建队列、绑定、交换机

@Configuration
public class MQConfig {@Beanpublic Queue orderDelayQueue(){//死信队列HashMap<String, Object> argument = new HashMap<>();argument.put("x-dead-letter-exchange","order-event-exchange");//死信路由argument.put("x-dead-letter-routing-key","order.release.order");//死信路由键argument.put("x-message-ttl",60000);//消息过期时间 1分钟return new Queue("order.delay.queue",true,false,false,argument);}@Beanpublic Queue orderReleaseOrderQueue(){//普通队列return new Queue("order.release.order.queue",true,false,false,null);}@Beanpublic Exchange orderEventExchange(){//普通的主题交换机(可绑定多个队列,根据条件路由)return new TopicExchange("order-event-exchange",true,false);}@Beanpublic Binding orderCreateOrderBinding(){return new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}@Beanpublic Binding orderReleaseOrderBinding(){return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}
}
  • 库存服务加mq依赖

achangmall-ware/pom.xml

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • rabbitmq配置

achangmall-ware/src/main/resources/application.yaml

spring:rabbitmq:host: 192.168.109.101port: 5672virtual-host: /listener:simple:acknowledge-mode: manual
  • 主启动类启动MQ

achangmall-ware/src/main/java/com/achang/achangmall/AchangmallWareApplication.java

@EnableRabbit
  • 配置类

com.achang.achangmall.ware.conf.RabbitMQConfig

@Configuration
public class RabbitMQConfig {@Beanpublic MessageConverter messageConversionException(){return new Jackson2JsonMessageConverter();}@Beanpublic Exchange stockEventExchange(){return new TopicExchange("stock-event-exchange",true,false);}@Beanpublic Queue stockReleaseStockQueue(){return new Queue("stock.release.stock.queue",true,false,false);}@Beanpublic Queue stockDelayQueue(){HashMap<String, Object> argument = new HashMap<>();argument.put("x-dead-letter-exchange","stock-event-exchange");//死信路由argument.put("x-dead-letter-routing-key","stock.release");//死信路由键argument.put("x-message-ttl", 120000L);//消息过期时间 1分钟return new Queue("stock.delay.queue",true,false,false,argument);}@Beanpublic Binding stockReleaseBinding(){return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);}@Beanpublic Binding stockLockedBinding(){return new Binding("stock.delay.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}
}
  • com.achang.achangmall.ware.controller.WareSkuController
/*** 锁定库存* @param vo** 库存解锁的场景*      1)、下订单成功,订单过期没有支付被系统自动取消或者被用户手动取消,都要解锁库存*      2)、下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁*      3)、   */
@PostMapping(value = "/lock/order")
public R orderLockStock(@RequestBody WareSkuLockVo vo) {//        try {boolean lockStock = wareSkuService.orderLockStock(vo);return R.ok().setData(lockStock);//        } catch (NoStockException e) {//            return R.error(NO_STOCK_EXCEPTION.getCode(),NO_STOCK_EXCEPTION.getMessage());//        }
}
  • 发送到mq消息队列的to

com.achang.common.to.mq.StockLockedTo

/*** @Description: 发送到mq消息队列的to**/
@Data
public class StockLockedTo {/** 库存工作单的id **/private Long id;/** 工作单详情的所有信息 **/private StockDetailTo detailTo;
}
  • com.achang.common.to.mq.StockDetailTo
@Data
public class StockDetailTo {private Long id;/*** sku_id*/private Long skuId;/*** sku_name*/private String skuName;/*** 购买个数*/private Integer skuNum;/*** 工作单id*/private Long taskId;/*** 仓库id*/private Long wareId;/*** 锁定状态*/private Integer lockStatus;}
  • com.achang.achangmall.ware.service.impl.WareSkuServiceImpl
@Service("wareSkuService")
public class WareSkuServiceImpl extends ServiceImpl<WareSkuDao, WareSkuEntity> implements WareSkuService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate WareSkuDao wareSkuDao;@Autowiredprivate OrderFeignService orderFeignService;@Autowiredprivate WareOrderTaskDetailService wareOrderTaskDetailService;@Overridepublic PageUtils queryPage(Map<String, Object> params) {QueryWrapper<WareSkuEntity> wrapper = new QueryWrapper<>();String skuId = (String) params.get("skuId");if (!StringUtils.isEmpty(skuId)){wrapper.eq("sku_id",skuId);}String wareId = (String) params.get("wareId");if (!StringUtils.isEmpty(wareId)){}IPage<WareSkuEntity> page = this.page(new Query<WareSkuEntity>().getPage(params),wrapper);return new PageUtils(page);}@Overridepublic List<SkuHasStockVo> getSkuHasStock(List<Long> skuIdList) {List<SkuHasStockVo> collect = skuIdList.stream().map(item -> {SkuHasStockVo vo = new SkuHasStockVo();Long count = baseMapper.getSkuStock(item);vo.setSkuId(item);vo.setHasStock(count == null?false:count>0);return vo;}).collect(Collectors.toList());return collect;}@Autowiredprivate WareOrderTaskService wareOrderTaskService;@Overridepublic boolean orderLockStock(WareSkuLockVo vo) {/*** 保存库存工作单详情信息* 追溯*/WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity();wareOrderTaskEntity.setOrderSn(vo.getOrderSn());wareOrderTaskEntity.setCreateTime(new Date());wareOrderTaskService.save(wareOrderTaskEntity);//1、按照下单的收货地址,找到一个就近仓库,锁定库存//2、找到每个商品在哪个仓库都有库存List<OrderItemVo> locks = vo.getLocks();List<SkuWareHasStock> collect = locks.stream().map((item) -> {SkuWareHasStock stock = new SkuWareHasStock();Long skuId = item.getSkuId();stock.setSkuId(skuId);stock.setNum(item.getCount());//查询这个商品在哪个仓库有库存List<Long> wareIdList = wareSkuDao.listWareIdHasSkuStock(skuId);stock.setWareId(wareIdList);return stock;}).collect(Collectors.toList());//2、锁定库存for (SkuWareHasStock hasStock : collect) {boolean skuStocked = false;Long skuId = hasStock.getSkuId();List<Long> wareIds = hasStock.getWareId();if (org.springframework.util.StringUtils.isEmpty(wareIds)) {//没有任何仓库有这个商品的库存//                throw new NoStockException(skuId);}//1、如果每一个商品都锁定成功,将当前商品锁定了几件的工作单记录发给MQ//2、锁定失败。前面保存的工作单信息都回滚了。发送出去的消息,即使要解锁库存,由于在数据库查不到指定的id,所有就不用解锁for (Long wareId : wareIds) {//锁定成功就返回1,失败就返回0Long count = wareSkuDao.lockSkuStock(skuId,wareId,hasStock.getNum());if (count == 1) {skuStocked = true;WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity();taskDetailEntity.setSkuId(skuId);taskDetailEntity.setSkuName("");taskDetailEntity.setSkuNum(hasStock.getNum());taskDetailEntity.setTaskId(wareOrderTaskEntity.getId());taskDetailEntity.setWareId(wareId);taskDetailEntity.setLockStatus(1);wareOrderTaskDetailService.save(taskDetailEntity);//TODO 告诉MQ库存锁定成功StockLockedTo lockedTo = new StockLockedTo();lockedTo.setId(wareOrderTaskEntity.getId());StockDetailTo detailTo = new StockDetailTo();BeanUtils.copyProperties(taskDetailEntity,detailTo);lockedTo.setDetailTo(detailTo);rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",lockedTo);break;} else {//当前仓库锁失败,重试下一个仓库}}if (skuStocked == false) {//当前商品所有仓库都没有锁住throw new NoStockException(skuId);}}//3、肯定全部都是锁定成功的return true;}/*** 解锁库存的方法* @param skuId* @param wareId* @param num* @param taskDetailId*/public void unLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId) {//库存解锁wareSkuDao.unLockStock(skuId,wareId,num);//更新工作单的状态WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity();taskDetailEntity.setId(taskDetailId);//变为已解锁taskDetailEntity.setLockStatus(2);wareOrderTaskDetailService.updateById(taskDetailEntity);}@Dataclass SkuWareHasStock {private Long skuId;private Integer num;private List<Long> wareId;}/*** 防止订单服务卡顿,导致订单状态消息一直改不了,库存优先到期,查订单状态新建,什么都不处理* 导致卡顿的订单,永远都不能解锁库存* @param orderTo*/@Transactional(rollbackFor = Exception.class)@Overridepublic void unlockStock(OrderTo orderTo) {String orderSn = orderTo.getOrderSn();//查一下最新的库存解锁状态,防止重复解锁库存WareOrderTaskEntity orderTaskEntity = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);//按照工作单的id找到所有 没有解锁的库存,进行解锁Long id = orderTaskEntity.getId();List<WareOrderTaskDetailEntity> list = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", id).eq("lock_status", 1));for (WareOrderTaskDetailEntity taskDetailEntity : list) {unLockStock(taskDetailEntity.getSkuId(),taskDetailEntity.getWareId(),taskDetailEntity.getSkuNum(),taskDetailEntity.getId());}}@Overridepublic void unlockStock(StockLockedTo to) {//库存工作单的idStockDetailTo detail = to.getDetailTo();Long detailId = detail.getId();/*** 解锁* 1、查询数据库关于这个订单锁定库存信息*   有:证明库存锁定成功了*      解锁:订单状况*          1、没有这个订单,必须解锁库存*          2、有这个订单,不一定解锁库存*              订单状态:已取消:解锁库存*                      已支付:不能解锁库存*/WareOrderTaskDetailEntity taskDetailInfo = wareOrderTaskDetailService.getById(detailId);if (taskDetailInfo != null) {//查出wms_ware_order_task工作单的信息Long id = to.getId();WareOrderTaskEntity orderTaskInfo = wareOrderTaskService.getById(id);//获取订单号查询订单状态String orderSn = orderTaskInfo.getOrderSn();//远程查询订单信息R orderData = orderFeignService.getOrderStatus(orderSn);if (orderData.getCode() == 0) {//订单数据返回成功OrderVo orderInfo = orderData.getData("data", new TypeReference<OrderVo>() {});//判断订单状态是否已取消或者支付或者订单不存在if (orderInfo == null || orderInfo.getStatus() == 4) {//订单已被取消,才能解锁库存if (taskDetailInfo.getLockStatus() == 1) {//当前库存工作单详情状态1,已锁定,但是未解锁才可以解锁unLockStock(detail.getSkuId(),detail.getWareId(),detail.getSkuNum(),detailId);}}} else {//消息拒绝以后重新放在队列里面,让别人继续消费解锁//远程调用服务失败throw new RuntimeException("远程调用服务失败");}} else {//无需解锁}}}
  • achangmall-ware/src/main/resources/mapper/ware/WareSkuDao.xml
<update id="unLockStock">UPDATE wms_ware_skuSET stock_locked = stock_locked - #{num}WHEREsku_id = ${skuId}AND ware_id = #{wareId}
</update>
  • com.achang.achangmall.ware.service.impl.WareOrderTaskServiceImpl
@Override
public WareOrderTaskEntity getOrderTaskByOrderSn(String orderSn) {WareOrderTaskEntity orderTaskEntity = this.baseMapper.selectOne(new QueryWrapper<WareOrderTaskEntity>().eq("order_sn", orderSn));return orderTaskEntity;
}
  • com.achang.achangmall.ware.listener.StockReleaseListener
@Slf4j
@RabbitListener(queues = "stock.release.stock.queue")
@Service
public class StockReleaseListener {@Autowiredprivate WareSkuService wareSkuService;/*** 1、库存自动解锁*  下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁**  2、订单失败*      库存锁定失败**   只要解锁库存的消息失败,一定要告诉服务解锁失败*/@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {log.info("******收到解锁库存的信息******");try {//当前消息是否被第二次及以后(重新)派发过来了// Boolean redelivered = message.getMessageProperties().getRedelivered();//解锁库存wareSkuService.unlockStock(to);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}@RabbitHandlerpublic void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {log.info("******收到订单关闭,准备解锁库存的信息******");try {wareSkuService.unlockStock(orderTo);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
  • com.achang.achangmall.order.service.impl.OrderServiceImpl
        /*** 按照订单号获取订单信息* @param orderSn* @return*/@Overridepublic OrderEntity getOrderByOrderSn(String orderSn) {OrderEntity orderEntity = this.baseMapper.selectOne(new QueryWrapper<OrderEntity>().eq("order_sn", orderSn));return orderEntity;}
  • com.achang.achangmall.order.controller.OrderController
/**
* 根据订单编号查询订单状态
*/
@GetMapping(value = "/status/{orderSn}")
public R getOrderStatus(@PathVariable("orderSn") String orderSn) {OrderEntity orderEntity = orderService.getOrderByOrderSn(orderSn);return R.ok().setData(orderEntity);
}
  • com.achang.achangmall.ware.feign.OrderFeignService
    @FeignClient("achangmall-order")public interface OrderFeignService {@GetMapping(value = "/order/order/status/{orderSn}")R getOrderStatus(@PathVariable("orderSn") String orderSn);}
  • com.achang.achangmall.ware.vo.OrderVo
@Data
public class OrderVo {private Long id;/*** member_id*/private Long memberId;/*** 订单号*/private String orderSn;/*** 使用的优惠券*/private Long couponId;/*** create_time*/private Date createTime;/*** 用户名*/private String memberUsername;/*** 订单总额*/private BigDecimal totalAmount;/*** 应付总额*/private BigDecimal payAmount;/*** 运费金额*/private BigDecimal freightAmount;/*** 促销优化金额(促销价、满减、阶梯价)*/private BigDecimal promotionAmount;/*** 积分抵扣金额*/private BigDecimal integrationAmount;/*** 优惠券抵扣金额*/private BigDecimal couponAmount;/*** 后台调整订单使用的折扣金额*/private BigDecimal discountAmount;/*** 支付方式【1->支付宝;2->微信;3->银联; 4->货到付款;】*/private Integer payType;/*** 订单来源[0->PC订单;1->app订单]*/private Integer sourceType;/*** 订单状态【0->待付款;1->待发货;2->已发货;3->已完成;4->已关闭;5->无效订单】*/private Integer status;/*** 物流公司(配送方式)*/private String deliveryCompany;/*** 物流单号*/private String deliverySn;/*** 自动确认时间(天)*/private Integer autoConfirmDay;/*** 可以获得的积分*/private Integer integration;/*** 可以获得的成长值*/private Integer growth;/*** 发票类型[0->不开发票;1->电子发票;2->纸质发票]*/private Integer billType;/*** 发票抬头*/private String billHeader;/*** 发票内容*/private String billContent;/*** 收票人电话*/private String billReceiverPhone;/*** 收票人邮箱*/private String billReceiverEmail;/*** 收货人姓名*/private String receiverName;/*** 收货人电话*/private String receiverPhone;/*** 收货人邮编*/private String receiverPostCode;/*** 省份/直辖市*/private String receiverProvince;/*** 城市*/private String receiverCity;/*** 区*/private String receiverRegion;/*** 详细地址*/private String receiverDetailAddress;/*** 订单备注*/private String note;/*** 确认收货状态[0->未确认;1->已确认]*/private Integer confirmStatus;/*** 删除状态【0->未删除;1->已删除】*/private Integer deleteStatus;/*** 下单时使用的积分*/private Integer useIntegration;/*** 支付时间*/private Date paymentTime;/*** 发货时间*/private Date deliveryTime;/*** 确认收货时间*/private Date receiveTime;/*** 评价时间*/private Date commentTime;/*** 修改时间*/private Date modifyTime;}
  • com.achang.achangmall.order.interceptor.LoginUserInterceptor
//用户登录拦截器
@Component
public class LoginUserInterceptor implements HandlerInterceptor {public static ThreadLocal<MemberResponseVo> loginUser = new ThreadLocal<>();@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {boolean match = new AntPathMatcher().match("/order/order/status/**", request.getRequestURI());if (match){return true;}HttpSession session = request.getSession();MemberResponseVo user = (MemberResponseVo) session.getAttribute(AuthServerConstant.LOGIN_USER);if (user!=null){//登录了loginUser.set(user);return true;}else {//没登录session.setAttribute("msg","请先登录");response.sendRedirect("http://auth.achangmall.com/login.html");return false;}}}

2、定时关闭订单

首先订单创建成功之后,使用order.create.order路由键将消息路由到order-event-exchange交换机

交换机发现order.create.order这个路由键绑定的是order.delay.queue这个延时队列,然后就把它放到order.delay.queue队列里

过了30分钟,这个延时队列里面的消息,也就是死信,通过order.release.order又路由回order-event-exchange交换机

然后交换机发现这个路由键对应的是order.release.order.queue这个队列,然后就放到order.release.order.queue这个队列里

最终监听order.release.order.queue这个队列的释放订单服务,发现有消息进来了,就会针对里面的数据对其进行关闭订单

  • 问题

这种关闭订单方式会有一些问题

假设订单创建成功之后,订单服务的机器由于卡顿、消息延迟等原因,导致订单未及时取消

此时库存服务的逻辑是订单创建成功之后,它自己会发一个消息,等 40分钟 以后检查之前下单的订单是否已取消,如果是已取消,则解锁库存

结果,库存服务过来查询时,订单服务由于上述原因没有将订单修改为已取消,所以库存就不会解锁,此时的库存消息就算是消费了

等库存服务都检查完了,此时的订单服务才反应过来,然后把订单状态改为已取消了,但是此时库存服务会再有任何的操作了,因为检查订单的消息已经被消费了,库存永远得不到解锁。

  • 解决

为了解决这个问题,我们在监听取消订单的消息时,再发一个消息,主动解锁库存。

  • com.achang.achangmall.order.listener.OrderCloseListener
/*** @Description: 定时关闭订单**/@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {@Autowiredprivate OrderService orderService;@RabbitHandlerpublic void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn());try {orderService.closeOrder(orderEntity);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
  • com.achang.achangmall.order.service.impl.OrderServiceImpl
    /*** 关闭订单* @param orderEntity*/
@Override
public void closeOrder(OrderEntity orderEntity) {//关闭订单之前先查询一下数据库,判断此订单状态是否已支付OrderEntity orderInfo = this.getOne(new QueryWrapper<OrderEntity>().eq("order_sn",orderEntity.getOrderSn()));if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) {//代付款状态进行关单OrderEntity orderUpdate = new OrderEntity();orderUpdate.setId(orderInfo.getId());orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode());this.updateById(orderUpdate);// 发送消息给MQOrderTo orderTo = new OrderTo();BeanUtils.copyProperties(orderInfo, orderTo);try {//TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);} catch (Exception e) {//TODO 定期扫描数据库,重新发送失败的消息}}
}
  • com.achang.achangmall.order.service.impl.OrderServiceImpl
/*** 提交订单* @param vo* @return*/
// @Transactional(isolation = Isolation.READ_COMMITTED) 设置事务的隔离级别
// @Transactional(propagation = Propagation.REQUIRED)   设置事务的传播级别
@Transactional(rollbackFor = Exception.class)
@GlobalTransactional(rollbackFor = Exception.class)
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {confirmVoThreadLocal.set(vo);SubmitOrderResponseVo responseVo = new SubmitOrderResponseVo();//去创建、下订单、验令牌、验价格、锁定库存...//获取当前用户登录的信息MemberResponseVo memberResponseVo = LoginUserInterceptor.loginUser.get();responseVo.setCode(0);//1、验证令牌是否合法【令牌的对比和删除必须保证原子性】String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";String orderToken = vo.getOrderToken();//通过lure脚本原子验证令牌和删除令牌Long result = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),Arrays.asList("order:token" + memberResponseVo.getId()),orderToken);if (result == 0L) {//令牌验证失败responseVo.setCode(1);return responseVo;} else {//令牌验证成功//1、创建订单、订单项等信息OrderCreateTo order = createOrder();//2、验证价格BigDecimal payAmount = order.getOrder().getPayAmount();BigDecimal payPrice = vo.getPayPrice();if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) {//金额对比//TODO 3、保存订单saveOrder(order);//4、库存锁定,只要有异常,回滚订单数据//订单号、所有订单项信息(skuId,skuNum,skuName)WareSkuLockVo lockVo = new WareSkuLockVo();lockVo.setOrderSn(order.getOrder().getOrderSn());//获取出要锁定的商品数据信息List<OrderItemVo> orderItemVos = order.getOrderItems().stream().map((item) -> {OrderItemVo orderItemVo = new OrderItemVo();orderItemVo.setSkuId(item.getSkuId());orderItemVo.setCount(item.getSkuQuantity());orderItemVo.setTitle(item.getSkuName());return orderItemVo;}).collect(Collectors.toList());lockVo.setLocks(orderItemVos);//TODO 调用远程锁定库存的方法//出现的问题:扣减库存成功了,但是由于网络原因超时,出现异常,导致订单事务回滚,库存事务不回滚(解决方案:seata)//为了保证高并发,不推荐使用seata,因为是加锁,并行化,提升不了效率,可以发消息给库存服务R r = wmsFeignService.orderLockStock(lockVo);if (r.getCode() == 0) {//锁定成功responseVo.setOrder(order.getOrder());// int i = 10/0;//TODO 订单创建成功,发送消息给MQrabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder());//删除购物车里的数据redisTemplate.delete(CART_PREFIX+memberResponseVo.getId());return responseVo;} else {//锁定失败String msg = (String) r.get("msg");throw new NoStockException(msg);// responseVo.setCode(3);// return responseVo;}} else {responseVo.setCode(2);return responseVo;}}
}

3、主动解锁库存

具体是这样的,在释放订单之后,我们主动发一个消息解锁库存,

使用order.release.other将消息路由到交换机,

交换机根据order.release.other.#匹配到stock.release.stock.queue这个队列,并将消息发了过去,

库存服务有对这个队列进行监听,所有一旦有数据来了,就会对其进行解锁库存服务

Day434.订单库存服务分布式事务的最终解决 -谷粒商城相关推荐

  1. Day431.本地事务分布式事务CAP理论 -谷粒商城

    本地事务 一.本地事务 一个或一组SQL语句组成一个执行单元,这个执行单元要么全部执行,要么全部不执行 二.ACID 特性 原子性:一个事务的整体操作不可拆分,要么都成功,要么都失败 一致性:一个事务 ...

  2. 一行代码就能解决微服务分布式事务问题,你知道GTS怎么做到的吗?

    2019独角兽企业重金招聘Python工程师标准>>> GTS直播火热报名中,直播直通车 一.GTS (Global Transaction Service)是啥? GTS(全局事务 ...

  3. seata xid是什么_微服务分布式事务解决方案-springboot整合分布式seata1.3.0

    概述 Seat是蚂蚁金服和阿里巴巴联合推出的一个开源的分布式事务框架,在阿里云商用的叫做GTS. 项目地址:https://github.com/longxiaonan/springcloud-dem ...

  4. 微服务-分布式事务seata

    什么是分布式事务 单体应用被拆分成微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用三个独立的数据源, 业务操作需要调用三个服务来完成.此时每个服务内部的数据一致性由本地事务来保证,但是全局的 ...

  5. 谷粒商城项目篇13_分布式高级篇_订单业务模块(提交订单幂等性、分布式事务、延时MQ实现定时任务)

    目录 一.订单业务模块 订单流程 购物车跳转订单确认页 登录拦截器 封装vo Feign远程调用丢失请求头信息 Feign远程异步调用丢失上下文信息 提交订单接口幂等性 令牌token机制 各种锁机制 ...

  6. 微服务分布式事务解决方案Seata

    文章目录 一.Seata是什么? 二.使用步骤 1.引入库 2.读入数据 总结 一.什么是Seata? Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用 的分布式事务服务.Sea ...

  7. .Net Core with 微服务 - 分布式事务 - 2PC、3PC

    最近比较忙,好久没更新了.这次我们来聊一聊分布式事务. 在微服务体系下,我们的应用被分割成多个服务,每个服务都配置一个数据库.如果我们的服务划分的不够完美,那么为了完成业务会出现非常多的跨库事务.即使 ...

  8. 微服务--分布式事务的实现方法及替代方案

    这两天正在研究微服务架构中分布式事务的处理方案, 做一个小小的总结, 作为备忘. 如有错误, 欢迎指正! 概念澄清 事务补偿机制: 在事务链中的任何一个正向操作, 都必须存在一个完全符合回滚规则的可逆 ...

  9. 微服务 分布式事务解决方案

    一. 前言 阿里2017云栖大会<破解世界性技术难题!GTS让分布式事务简单高效>中,阿里声称提出了一种破解世界性难题之分布式事务的终极解决方案,无论是可靠性.还是处理速率都领先于市面上所 ...

  10. 微服务~分布式事务里的最终一致性

    本地事务ACID大家应该都知道了,统一提交,失败回滚,严格保证了同一事务内数据的一致性!而分布式事务不能实现这种ACID,它只能实现CAP原则里的某两个,CAP也是分布式事务的一个广泛被应用的原型,C ...

最新文章

  1. SAP MM Consignment 寄售库存
  2. python从sqlserver提取数据_通过Python读取sqlserver数据写成json文件的总结
  3. 正则表达式中的分组的匹配次数的理解
  4. python getattr和getattribute_详解Python中 __get__和__getattr__和__getattribute__的区别
  5. 微信企业号让IOS不要识别数字为电话号码的方法
  6. LeetCode 题 - 53. 最大子序和 python解法
  7. pythonarp攻击_python通过scapy模块进行arp断网攻击
  8. cocos2d-xFinalProject踩坑记录(cocosStudio控件获取,角色移动及动画,碰撞检测,背景音乐与场景)...
  9. python面试专题--with 关键字与上下文管理
  10. 开源点云数据处理 开源_开源云–充满希望的未来
  11. l440加装固态硬盘ngff_[转载]Thinkpad E431装NGFF固态硬盘图文详解
  12. uefiboot 文件_UEFI下win系统启动过程及用bcdboot命令如何修复引导启动
  13. sketchup 图片转模型_3d模型转su模型(如何将3D模型转化为sketchup)
  14. ir2104s的自举电容_IR2104s半桥驱动芯片使用经验及注意事项
  15. 【JavaScript】简易打地鼠游戏
  16. 探寻埋藏在心底的梦想,社科院与杜兰大学金融管理硕士项目伴你同行
  17. linux u盘 引导修复工具下载,win7+ubuntu双系统引导修复工具boot repair disk 32+64位ISO版...
  18. 声明式导航编程式导航
  19. linux文件损坏怎么修复工具,在Ubuntu操作系统下修复损坏程序包的三种办法
  20. springboot 启动报错 Unexpected filename extension of file

热门文章

  1. 外国人怎样看待Ubuntu麒麟?
  2. 租服务器的 直连100m是啥,如何知道我的服务器带宽是独享10M或者100M?
  3. 面试:Android应用的崩溃率
  4. linux各个文件夹,linux各个目录文件夹含义
  5. C++ -Pointer指针总结(一)
  6. 热血江湖网通一服务器不稳定,《热血江湖》网通新服 千呼万唤始出来
  7. Diligent收购领先的批判性见解和分析SaaS提供商Insightia
  8. QML类型:Window
  9. Ubuntu20.04安装视频播放器SMPlayer
  10. Activiti学习之根据条件判断流程走向