Day434.订单库存服务分布式事务的最终解决 -谷粒商城
分布式事务的最终解决
通过MQ的延时消息来解决订单/库存服务直接调用的事务问题
一、如何解决下单系统这个高并发里边的分布式事务呢?
首先,我们肯定不会用 2PC 模式、 TCC-事务补偿性方案,我们也不考虑
最终我们选择可靠消息+最终一致性这种方式
为了保证高并发,订单这一块还是自己回滚,
因为性能不佳,我们弃用seata的方案
1、库存服务自己怎么回滚?
- 第一种
通过MQ消息队列
,我们在提交订单那里,当捕捉到异常要回滚的时候,给库存服务发一个消息,让库存服务自己把库存解锁
这样不需要让库存事务回滚,只需要给它发一个消息,不会损失什么性能
- 第二种
通过MQ延迟消息
库存服务本身也可以使用自动解锁模式。
怎么自动解锁呢?
需要使用消息队列来完成。
如果你想让我这的哪些库存解锁,首先你需要给我发一个消息告诉我。
然后我们专门的库存解锁服务
,去来订阅我们stock.release.stock.queue
这个队列里的消息。
那你给我发消息的时候,比如:用路由键stock.release
,我知道要库存解锁,
然后,你的消息发给我们这个交换机stock-event-exchange
。
交换机把这个消息路由给stock.release.stock.queue
这个队列。
然后,这个队列stock.release.stock.queue
里边存的这些消息都是库存要解锁的消息,我们的库存解锁服务
只要收到了,它就会在后台慢慢的解锁消息。
我们不用保证强一致,我们哪怕是二十分钟、三十分钟,乃至于一天以后把这个库存解锁了,最终一致了就行。
所以我们可以来使用消息队列来完成我们的这个最终一致性。
2、锁库存的增强版逻辑
我们想要锁库存的话,我们先来保存一个库存工作单和库存工作单详情
相当于只要我们想要锁库存,我们先给数据库里边保存记录,我要锁库存。
接下来我们就来进行锁,只要锁成功了,那一切ok。
如果锁失败了,数据库里边相当于没有这个锁库存记录。
因为锁失败呢,我们这个本身自己所失败会全部回滚。
但如果可能是这种失败,比如我们来到订单里边,我们库存其实自己锁成功了。但是我们订单下边的其他完了,然后库存要进行解锁。那怎么办呢?
我们可以使用定时任务
二、订单服务的完整消息队列
1、库存自动解锁
库存微服务,有一个它的库存交换机stock-event-exchange
.
如果想要解锁库存,应该是这样的。
首先订单创建成功之后,库存锁定成功,然后发一个消息给交换机,
这个消息里面的内容有订单编号、仓库编号、哪个商品锁了几个库存,
这个交换机,绑定了两个队列,
一个是按照stock.release.#
模糊匹配的路由键绑定的stock.release.stock.queue
队列
一个是stock.delay.queue
队列
第一次发的库存锁定成功的消息,先使用路由键叫stock.locked
交换机按照这个路由键,找到stock.delay.queue
延时队列
延时队列50分钟以后,用stock.release
这个路由键,将死信交给库存交换机stock-event-exchange
,
交换机收到以后,按照这个路由键查找,发现stock.release.#
这个模糊匹配的路由键跟它是一样的,然后被交换机路由到我们这个stock.release.stock.queue
队列。
接下来的解锁库存服务,专门来处理stock.release.stock.queue
里的消息。
- 最终实现
- com.achang.achangmall.order.conf.MQConfig
通过配置类来创建队列、绑定、交换机
@Configuration
public class MQConfig {@Beanpublic Queue orderDelayQueue(){//死信队列HashMap<String, Object> argument = new HashMap<>();argument.put("x-dead-letter-exchange","order-event-exchange");//死信路由argument.put("x-dead-letter-routing-key","order.release.order");//死信路由键argument.put("x-message-ttl",60000);//消息过期时间 1分钟return new Queue("order.delay.queue",true,false,false,argument);}@Beanpublic Queue orderReleaseOrderQueue(){//普通队列return new Queue("order.release.order.queue",true,false,false,null);}@Beanpublic Exchange orderEventExchange(){//普通的主题交换机(可绑定多个队列,根据条件路由)return new TopicExchange("order-event-exchange",true,false);}@Beanpublic Binding orderCreateOrderBinding(){return new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}@Beanpublic Binding orderReleaseOrderBinding(){return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}
}
- 库存服务加mq依赖
achangmall-ware/pom.xml
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- rabbitmq配置
achangmall-ware/src/main/resources/application.yaml
spring:rabbitmq:host: 192.168.109.101port: 5672virtual-host: /listener:simple:acknowledge-mode: manual
- 主启动类启动MQ
achangmall-ware/src/main/java/com/achang/achangmall/AchangmallWareApplication.java
@EnableRabbit
- 配置类
com.achang.achangmall.ware.conf.RabbitMQConfig
@Configuration
public class RabbitMQConfig {@Beanpublic MessageConverter messageConversionException(){return new Jackson2JsonMessageConverter();}@Beanpublic Exchange stockEventExchange(){return new TopicExchange("stock-event-exchange",true,false);}@Beanpublic Queue stockReleaseStockQueue(){return new Queue("stock.release.stock.queue",true,false,false);}@Beanpublic Queue stockDelayQueue(){HashMap<String, Object> argument = new HashMap<>();argument.put("x-dead-letter-exchange","stock-event-exchange");//死信路由argument.put("x-dead-letter-routing-key","stock.release");//死信路由键argument.put("x-message-ttl", 120000L);//消息过期时间 1分钟return new Queue("stock.delay.queue",true,false,false,argument);}@Beanpublic Binding stockReleaseBinding(){return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);}@Beanpublic Binding stockLockedBinding(){return new Binding("stock.delay.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}
}
- com.achang.achangmall.ware.controller.WareSkuController
/*** 锁定库存* @param vo** 库存解锁的场景* 1)、下订单成功,订单过期没有支付被系统自动取消或者被用户手动取消,都要解锁库存* 2)、下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁* 3)、 */
@PostMapping(value = "/lock/order")
public R orderLockStock(@RequestBody WareSkuLockVo vo) {// try {boolean lockStock = wareSkuService.orderLockStock(vo);return R.ok().setData(lockStock);// } catch (NoStockException e) {// return R.error(NO_STOCK_EXCEPTION.getCode(),NO_STOCK_EXCEPTION.getMessage());// }
}
- 发送到mq消息队列的to
com.achang.common.to.mq.StockLockedTo
/*** @Description: 发送到mq消息队列的to**/
@Data
public class StockLockedTo {/** 库存工作单的id **/private Long id;/** 工作单详情的所有信息 **/private StockDetailTo detailTo;
}
- com.achang.common.to.mq.StockDetailTo
@Data
public class StockDetailTo {private Long id;/*** sku_id*/private Long skuId;/*** sku_name*/private String skuName;/*** 购买个数*/private Integer skuNum;/*** 工作单id*/private Long taskId;/*** 仓库id*/private Long wareId;/*** 锁定状态*/private Integer lockStatus;}
- com.achang.achangmall.ware.service.impl.WareSkuServiceImpl
@Service("wareSkuService")
public class WareSkuServiceImpl extends ServiceImpl<WareSkuDao, WareSkuEntity> implements WareSkuService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate WareSkuDao wareSkuDao;@Autowiredprivate OrderFeignService orderFeignService;@Autowiredprivate WareOrderTaskDetailService wareOrderTaskDetailService;@Overridepublic PageUtils queryPage(Map<String, Object> params) {QueryWrapper<WareSkuEntity> wrapper = new QueryWrapper<>();String skuId = (String) params.get("skuId");if (!StringUtils.isEmpty(skuId)){wrapper.eq("sku_id",skuId);}String wareId = (String) params.get("wareId");if (!StringUtils.isEmpty(wareId)){}IPage<WareSkuEntity> page = this.page(new Query<WareSkuEntity>().getPage(params),wrapper);return new PageUtils(page);}@Overridepublic List<SkuHasStockVo> getSkuHasStock(List<Long> skuIdList) {List<SkuHasStockVo> collect = skuIdList.stream().map(item -> {SkuHasStockVo vo = new SkuHasStockVo();Long count = baseMapper.getSkuStock(item);vo.setSkuId(item);vo.setHasStock(count == null?false:count>0);return vo;}).collect(Collectors.toList());return collect;}@Autowiredprivate WareOrderTaskService wareOrderTaskService;@Overridepublic boolean orderLockStock(WareSkuLockVo vo) {/*** 保存库存工作单详情信息* 追溯*/WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity();wareOrderTaskEntity.setOrderSn(vo.getOrderSn());wareOrderTaskEntity.setCreateTime(new Date());wareOrderTaskService.save(wareOrderTaskEntity);//1、按照下单的收货地址,找到一个就近仓库,锁定库存//2、找到每个商品在哪个仓库都有库存List<OrderItemVo> locks = vo.getLocks();List<SkuWareHasStock> collect = locks.stream().map((item) -> {SkuWareHasStock stock = new SkuWareHasStock();Long skuId = item.getSkuId();stock.setSkuId(skuId);stock.setNum(item.getCount());//查询这个商品在哪个仓库有库存List<Long> wareIdList = wareSkuDao.listWareIdHasSkuStock(skuId);stock.setWareId(wareIdList);return stock;}).collect(Collectors.toList());//2、锁定库存for (SkuWareHasStock hasStock : collect) {boolean skuStocked = false;Long skuId = hasStock.getSkuId();List<Long> wareIds = hasStock.getWareId();if (org.springframework.util.StringUtils.isEmpty(wareIds)) {//没有任何仓库有这个商品的库存// throw new NoStockException(skuId);}//1、如果每一个商品都锁定成功,将当前商品锁定了几件的工作单记录发给MQ//2、锁定失败。前面保存的工作单信息都回滚了。发送出去的消息,即使要解锁库存,由于在数据库查不到指定的id,所有就不用解锁for (Long wareId : wareIds) {//锁定成功就返回1,失败就返回0Long count = wareSkuDao.lockSkuStock(skuId,wareId,hasStock.getNum());if (count == 1) {skuStocked = true;WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity();taskDetailEntity.setSkuId(skuId);taskDetailEntity.setSkuName("");taskDetailEntity.setSkuNum(hasStock.getNum());taskDetailEntity.setTaskId(wareOrderTaskEntity.getId());taskDetailEntity.setWareId(wareId);taskDetailEntity.setLockStatus(1);wareOrderTaskDetailService.save(taskDetailEntity);//TODO 告诉MQ库存锁定成功StockLockedTo lockedTo = new StockLockedTo();lockedTo.setId(wareOrderTaskEntity.getId());StockDetailTo detailTo = new StockDetailTo();BeanUtils.copyProperties(taskDetailEntity,detailTo);lockedTo.setDetailTo(detailTo);rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",lockedTo);break;} else {//当前仓库锁失败,重试下一个仓库}}if (skuStocked == false) {//当前商品所有仓库都没有锁住throw new NoStockException(skuId);}}//3、肯定全部都是锁定成功的return true;}/*** 解锁库存的方法* @param skuId* @param wareId* @param num* @param taskDetailId*/public void unLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId) {//库存解锁wareSkuDao.unLockStock(skuId,wareId,num);//更新工作单的状态WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity();taskDetailEntity.setId(taskDetailId);//变为已解锁taskDetailEntity.setLockStatus(2);wareOrderTaskDetailService.updateById(taskDetailEntity);}@Dataclass SkuWareHasStock {private Long skuId;private Integer num;private List<Long> wareId;}/*** 防止订单服务卡顿,导致订单状态消息一直改不了,库存优先到期,查订单状态新建,什么都不处理* 导致卡顿的订单,永远都不能解锁库存* @param orderTo*/@Transactional(rollbackFor = Exception.class)@Overridepublic void unlockStock(OrderTo orderTo) {String orderSn = orderTo.getOrderSn();//查一下最新的库存解锁状态,防止重复解锁库存WareOrderTaskEntity orderTaskEntity = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);//按照工作单的id找到所有 没有解锁的库存,进行解锁Long id = orderTaskEntity.getId();List<WareOrderTaskDetailEntity> list = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", id).eq("lock_status", 1));for (WareOrderTaskDetailEntity taskDetailEntity : list) {unLockStock(taskDetailEntity.getSkuId(),taskDetailEntity.getWareId(),taskDetailEntity.getSkuNum(),taskDetailEntity.getId());}}@Overridepublic void unlockStock(StockLockedTo to) {//库存工作单的idStockDetailTo detail = to.getDetailTo();Long detailId = detail.getId();/*** 解锁* 1、查询数据库关于这个订单锁定库存信息* 有:证明库存锁定成功了* 解锁:订单状况* 1、没有这个订单,必须解锁库存* 2、有这个订单,不一定解锁库存* 订单状态:已取消:解锁库存* 已支付:不能解锁库存*/WareOrderTaskDetailEntity taskDetailInfo = wareOrderTaskDetailService.getById(detailId);if (taskDetailInfo != null) {//查出wms_ware_order_task工作单的信息Long id = to.getId();WareOrderTaskEntity orderTaskInfo = wareOrderTaskService.getById(id);//获取订单号查询订单状态String orderSn = orderTaskInfo.getOrderSn();//远程查询订单信息R orderData = orderFeignService.getOrderStatus(orderSn);if (orderData.getCode() == 0) {//订单数据返回成功OrderVo orderInfo = orderData.getData("data", new TypeReference<OrderVo>() {});//判断订单状态是否已取消或者支付或者订单不存在if (orderInfo == null || orderInfo.getStatus() == 4) {//订单已被取消,才能解锁库存if (taskDetailInfo.getLockStatus() == 1) {//当前库存工作单详情状态1,已锁定,但是未解锁才可以解锁unLockStock(detail.getSkuId(),detail.getWareId(),detail.getSkuNum(),detailId);}}} else {//消息拒绝以后重新放在队列里面,让别人继续消费解锁//远程调用服务失败throw new RuntimeException("远程调用服务失败");}} else {//无需解锁}}}
- achangmall-ware/src/main/resources/mapper/ware/WareSkuDao.xml
<update id="unLockStock">UPDATE wms_ware_skuSET stock_locked = stock_locked - #{num}WHEREsku_id = ${skuId}AND ware_id = #{wareId}
</update>
- com.achang.achangmall.ware.service.impl.WareOrderTaskServiceImpl
@Override
public WareOrderTaskEntity getOrderTaskByOrderSn(String orderSn) {WareOrderTaskEntity orderTaskEntity = this.baseMapper.selectOne(new QueryWrapper<WareOrderTaskEntity>().eq("order_sn", orderSn));return orderTaskEntity;
}
- com.achang.achangmall.ware.listener.StockReleaseListener
@Slf4j
@RabbitListener(queues = "stock.release.stock.queue")
@Service
public class StockReleaseListener {@Autowiredprivate WareSkuService wareSkuService;/*** 1、库存自动解锁* 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁** 2、订单失败* 库存锁定失败** 只要解锁库存的消息失败,一定要告诉服务解锁失败*/@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {log.info("******收到解锁库存的信息******");try {//当前消息是否被第二次及以后(重新)派发过来了// Boolean redelivered = message.getMessageProperties().getRedelivered();//解锁库存wareSkuService.unlockStock(to);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}@RabbitHandlerpublic void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {log.info("******收到订单关闭,准备解锁库存的信息******");try {wareSkuService.unlockStock(orderTo);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
- com.achang.achangmall.order.service.impl.OrderServiceImpl
/*** 按照订单号获取订单信息* @param orderSn* @return*/@Overridepublic OrderEntity getOrderByOrderSn(String orderSn) {OrderEntity orderEntity = this.baseMapper.selectOne(new QueryWrapper<OrderEntity>().eq("order_sn", orderSn));return orderEntity;}
- com.achang.achangmall.order.controller.OrderController
/**
* 根据订单编号查询订单状态
*/
@GetMapping(value = "/status/{orderSn}")
public R getOrderStatus(@PathVariable("orderSn") String orderSn) {OrderEntity orderEntity = orderService.getOrderByOrderSn(orderSn);return R.ok().setData(orderEntity);
}
- com.achang.achangmall.ware.feign.OrderFeignService
@FeignClient("achangmall-order")public interface OrderFeignService {@GetMapping(value = "/order/order/status/{orderSn}")R getOrderStatus(@PathVariable("orderSn") String orderSn);}
- com.achang.achangmall.ware.vo.OrderVo
@Data
public class OrderVo {private Long id;/*** member_id*/private Long memberId;/*** 订单号*/private String orderSn;/*** 使用的优惠券*/private Long couponId;/*** create_time*/private Date createTime;/*** 用户名*/private String memberUsername;/*** 订单总额*/private BigDecimal totalAmount;/*** 应付总额*/private BigDecimal payAmount;/*** 运费金额*/private BigDecimal freightAmount;/*** 促销优化金额(促销价、满减、阶梯价)*/private BigDecimal promotionAmount;/*** 积分抵扣金额*/private BigDecimal integrationAmount;/*** 优惠券抵扣金额*/private BigDecimal couponAmount;/*** 后台调整订单使用的折扣金额*/private BigDecimal discountAmount;/*** 支付方式【1->支付宝;2->微信;3->银联; 4->货到付款;】*/private Integer payType;/*** 订单来源[0->PC订单;1->app订单]*/private Integer sourceType;/*** 订单状态【0->待付款;1->待发货;2->已发货;3->已完成;4->已关闭;5->无效订单】*/private Integer status;/*** 物流公司(配送方式)*/private String deliveryCompany;/*** 物流单号*/private String deliverySn;/*** 自动确认时间(天)*/private Integer autoConfirmDay;/*** 可以获得的积分*/private Integer integration;/*** 可以获得的成长值*/private Integer growth;/*** 发票类型[0->不开发票;1->电子发票;2->纸质发票]*/private Integer billType;/*** 发票抬头*/private String billHeader;/*** 发票内容*/private String billContent;/*** 收票人电话*/private String billReceiverPhone;/*** 收票人邮箱*/private String billReceiverEmail;/*** 收货人姓名*/private String receiverName;/*** 收货人电话*/private String receiverPhone;/*** 收货人邮编*/private String receiverPostCode;/*** 省份/直辖市*/private String receiverProvince;/*** 城市*/private String receiverCity;/*** 区*/private String receiverRegion;/*** 详细地址*/private String receiverDetailAddress;/*** 订单备注*/private String note;/*** 确认收货状态[0->未确认;1->已确认]*/private Integer confirmStatus;/*** 删除状态【0->未删除;1->已删除】*/private Integer deleteStatus;/*** 下单时使用的积分*/private Integer useIntegration;/*** 支付时间*/private Date paymentTime;/*** 发货时间*/private Date deliveryTime;/*** 确认收货时间*/private Date receiveTime;/*** 评价时间*/private Date commentTime;/*** 修改时间*/private Date modifyTime;}
- com.achang.achangmall.order.interceptor.LoginUserInterceptor
//用户登录拦截器
@Component
public class LoginUserInterceptor implements HandlerInterceptor {public static ThreadLocal<MemberResponseVo> loginUser = new ThreadLocal<>();@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {boolean match = new AntPathMatcher().match("/order/order/status/**", request.getRequestURI());if (match){return true;}HttpSession session = request.getSession();MemberResponseVo user = (MemberResponseVo) session.getAttribute(AuthServerConstant.LOGIN_USER);if (user!=null){//登录了loginUser.set(user);return true;}else {//没登录session.setAttribute("msg","请先登录");response.sendRedirect("http://auth.achangmall.com/login.html");return false;}}}
2、定时关闭订单
首先订单创建成功之后,使用order.create.order
路由键将消息路由到order-event-exchange
交换机
交换机发现order.create.order
这个路由键绑定的是order.delay.queue
这个延时队列,然后就把它放到order.delay.queue
队列里
过了30分钟,这个延时队列里面的消息,也就是死信,通过order.release.order
又路由回order-event-exchange
交换机
然后交换机发现这个路由键对应的是order.release.order.queue
这个队列,然后就放到order.release.order.queue
这个队列里
最终监听order.release.order.queue
这个队列的释放订单服务,发现有消息进来了,就会针对里面的数据对其进行关闭订单
- 问题
这种关闭订单方式会有一些问题
假设订单创建成功之后,订单服务的机器由于卡顿、消息延迟等原因,导致订单未及时取消
此时库存服务的逻辑是订单创建成功之后,它自己会发一个消息,等 40分钟 以后检查之前下单的订单是否已取消,如果是已取消,则解锁库存
结果,库存服务过来查询时,订单服务由于上述原因没有将订单修改为已取消,所以库存就不会解锁,此时的库存消息就算是消费了
等库存服务都检查完了,此时的订单服务才反应过来,然后把订单状态改为已取消了,但是此时库存服务会再有任何的操作了,因为检查订单的消息已经被消费了,库存永远得不到解锁。
- 解决
为了解决这个问题,我们在监听取消订单的消息时,再发一个消息,主动解锁库存。
- com.achang.achangmall.order.listener.OrderCloseListener
/*** @Description: 定时关闭订单**/@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {@Autowiredprivate OrderService orderService;@RabbitHandlerpublic void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn());try {orderService.closeOrder(orderEntity);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
- com.achang.achangmall.order.service.impl.OrderServiceImpl
/*** 关闭订单* @param orderEntity*/
@Override
public void closeOrder(OrderEntity orderEntity) {//关闭订单之前先查询一下数据库,判断此订单状态是否已支付OrderEntity orderInfo = this.getOne(new QueryWrapper<OrderEntity>().eq("order_sn",orderEntity.getOrderSn()));if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) {//代付款状态进行关单OrderEntity orderUpdate = new OrderEntity();orderUpdate.setId(orderInfo.getId());orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode());this.updateById(orderUpdate);// 发送消息给MQOrderTo orderTo = new OrderTo();BeanUtils.copyProperties(orderInfo, orderTo);try {//TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);} catch (Exception e) {//TODO 定期扫描数据库,重新发送失败的消息}}
}
- com.achang.achangmall.order.service.impl.OrderServiceImpl
/*** 提交订单* @param vo* @return*/
// @Transactional(isolation = Isolation.READ_COMMITTED) 设置事务的隔离级别
// @Transactional(propagation = Propagation.REQUIRED) 设置事务的传播级别
@Transactional(rollbackFor = Exception.class)
@GlobalTransactional(rollbackFor = Exception.class)
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {confirmVoThreadLocal.set(vo);SubmitOrderResponseVo responseVo = new SubmitOrderResponseVo();//去创建、下订单、验令牌、验价格、锁定库存...//获取当前用户登录的信息MemberResponseVo memberResponseVo = LoginUserInterceptor.loginUser.get();responseVo.setCode(0);//1、验证令牌是否合法【令牌的对比和删除必须保证原子性】String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";String orderToken = vo.getOrderToken();//通过lure脚本原子验证令牌和删除令牌Long result = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),Arrays.asList("order:token" + memberResponseVo.getId()),orderToken);if (result == 0L) {//令牌验证失败responseVo.setCode(1);return responseVo;} else {//令牌验证成功//1、创建订单、订单项等信息OrderCreateTo order = createOrder();//2、验证价格BigDecimal payAmount = order.getOrder().getPayAmount();BigDecimal payPrice = vo.getPayPrice();if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) {//金额对比//TODO 3、保存订单saveOrder(order);//4、库存锁定,只要有异常,回滚订单数据//订单号、所有订单项信息(skuId,skuNum,skuName)WareSkuLockVo lockVo = new WareSkuLockVo();lockVo.setOrderSn(order.getOrder().getOrderSn());//获取出要锁定的商品数据信息List<OrderItemVo> orderItemVos = order.getOrderItems().stream().map((item) -> {OrderItemVo orderItemVo = new OrderItemVo();orderItemVo.setSkuId(item.getSkuId());orderItemVo.setCount(item.getSkuQuantity());orderItemVo.setTitle(item.getSkuName());return orderItemVo;}).collect(Collectors.toList());lockVo.setLocks(orderItemVos);//TODO 调用远程锁定库存的方法//出现的问题:扣减库存成功了,但是由于网络原因超时,出现异常,导致订单事务回滚,库存事务不回滚(解决方案:seata)//为了保证高并发,不推荐使用seata,因为是加锁,并行化,提升不了效率,可以发消息给库存服务R r = wmsFeignService.orderLockStock(lockVo);if (r.getCode() == 0) {//锁定成功responseVo.setOrder(order.getOrder());// int i = 10/0;//TODO 订单创建成功,发送消息给MQrabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder());//删除购物车里的数据redisTemplate.delete(CART_PREFIX+memberResponseVo.getId());return responseVo;} else {//锁定失败String msg = (String) r.get("msg");throw new NoStockException(msg);// responseVo.setCode(3);// return responseVo;}} else {responseVo.setCode(2);return responseVo;}}
}
3、主动解锁库存
具体是这样的,在释放订单之后,我们主动发一个消息解锁库存,
使用order.release.other
将消息路由到交换机,
交换机根据order.release.other.#
匹配到stock.release.stock.queue
这个队列,并将消息发了过去,
库存服务有对这个队列进行监听,所有一旦有数据来了,就会对其进行解锁库存服务
Day434.订单库存服务分布式事务的最终解决 -谷粒商城相关推荐
- Day431.本地事务分布式事务CAP理论 -谷粒商城
本地事务 一.本地事务 一个或一组SQL语句组成一个执行单元,这个执行单元要么全部执行,要么全部不执行 二.ACID 特性 原子性:一个事务的整体操作不可拆分,要么都成功,要么都失败 一致性:一个事务 ...
- 一行代码就能解决微服务分布式事务问题,你知道GTS怎么做到的吗?
2019独角兽企业重金招聘Python工程师标准>>> GTS直播火热报名中,直播直通车 一.GTS (Global Transaction Service)是啥? GTS(全局事务 ...
- seata xid是什么_微服务分布式事务解决方案-springboot整合分布式seata1.3.0
概述 Seat是蚂蚁金服和阿里巴巴联合推出的一个开源的分布式事务框架,在阿里云商用的叫做GTS. 项目地址:https://github.com/longxiaonan/springcloud-dem ...
- 微服务-分布式事务seata
什么是分布式事务 单体应用被拆分成微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用三个独立的数据源, 业务操作需要调用三个服务来完成.此时每个服务内部的数据一致性由本地事务来保证,但是全局的 ...
- 谷粒商城项目篇13_分布式高级篇_订单业务模块(提交订单幂等性、分布式事务、延时MQ实现定时任务)
目录 一.订单业务模块 订单流程 购物车跳转订单确认页 登录拦截器 封装vo Feign远程调用丢失请求头信息 Feign远程异步调用丢失上下文信息 提交订单接口幂等性 令牌token机制 各种锁机制 ...
- 微服务分布式事务解决方案Seata
文章目录 一.Seata是什么? 二.使用步骤 1.引入库 2.读入数据 总结 一.什么是Seata? Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用 的分布式事务服务.Sea ...
- .Net Core with 微服务 - 分布式事务 - 2PC、3PC
最近比较忙,好久没更新了.这次我们来聊一聊分布式事务. 在微服务体系下,我们的应用被分割成多个服务,每个服务都配置一个数据库.如果我们的服务划分的不够完美,那么为了完成业务会出现非常多的跨库事务.即使 ...
- 微服务--分布式事务的实现方法及替代方案
这两天正在研究微服务架构中分布式事务的处理方案, 做一个小小的总结, 作为备忘. 如有错误, 欢迎指正! 概念澄清 事务补偿机制: 在事务链中的任何一个正向操作, 都必须存在一个完全符合回滚规则的可逆 ...
- 微服务 分布式事务解决方案
一. 前言 阿里2017云栖大会<破解世界性技术难题!GTS让分布式事务简单高效>中,阿里声称提出了一种破解世界性难题之分布式事务的终极解决方案,无论是可靠性.还是处理速率都领先于市面上所 ...
- 微服务~分布式事务里的最终一致性
本地事务ACID大家应该都知道了,统一提交,失败回滚,严格保证了同一事务内数据的一致性!而分布式事务不能实现这种ACID,它只能实现CAP原则里的某两个,CAP也是分布式事务的一个广泛被应用的原型,C ...
最新文章
- SAP MM Consignment 寄售库存
- python从sqlserver提取数据_通过Python读取sqlserver数据写成json文件的总结
- 正则表达式中的分组的匹配次数的理解
- python getattr和getattribute_详解Python中 __get__和__getattr__和__getattribute__的区别
- 微信企业号让IOS不要识别数字为电话号码的方法
- LeetCode 题 - 53. 最大子序和 python解法
- pythonarp攻击_python通过scapy模块进行arp断网攻击
- cocos2d-xFinalProject踩坑记录(cocosStudio控件获取,角色移动及动画,碰撞检测,背景音乐与场景)...
- python面试专题--with 关键字与上下文管理
- 开源点云数据处理 开源_开源云–充满希望的未来
- l440加装固态硬盘ngff_[转载]Thinkpad E431装NGFF固态硬盘图文详解
- uefiboot 文件_UEFI下win系统启动过程及用bcdboot命令如何修复引导启动
- sketchup 图片转模型_3d模型转su模型(如何将3D模型转化为sketchup)
- ir2104s的自举电容_IR2104s半桥驱动芯片使用经验及注意事项
- 【JavaScript】简易打地鼠游戏
- 探寻埋藏在心底的梦想,社科院与杜兰大学金融管理硕士项目伴你同行
- linux u盘 引导修复工具下载,win7+ubuntu双系统引导修复工具boot repair disk 32+64位ISO版...
- 声明式导航编程式导航
- linux文件损坏怎么修复工具,在Ubuntu操作系统下修复损坏程序包的三种办法
- springboot 启动报错 Unexpected filename extension of file