电商项目实战之分布式事务解决方案
电商项目实战之分布式事务解决方案
- 本地事务
- 事务隔离级别
- 事务传播机制
- 分布式事务
- CAP理论
- 选举与同步理论
- BASE理论
- 解决方案
- 2PC模式(XA事务)
- 柔性事务-TCC事务补偿型方案
- 柔性事务-最大努力通知型方案
- 柔性事务=可靠消息+最终一致性方案(异步确保型)
- 案例分析
- 订单模型
- 订单状态
- 订单流程
- 订单确认
- 订单确认页数据展示
- 订单确认页数据获取
- 运费收件信息获取
- 订单提交
- 订单数据
- 提交订单
- 订单回滚
- seata解决分布式事务问题(了解)
- 实现过程
- 消息队列实现最终一致性(推荐)
- 延迟队列
- 实现流程
- 延迟队列使用场景
- 订单分布式主体逻辑
- @Bean交换机和队列
- 库存回滚解锁
- 定时关单
- 参考链接
本地事务
事务隔离级别
事务传播机制
spring在TransactionDefinition接口中定义了七个事务传播行为
propagation_requierd:如果当前没有事务,就新建一个事务,如果已存在一个事务中,加入到这个事务中,这是最常见的选择。
propagation_supports:支持当前事务,如果没有当前事务,就以非事务方法执行。
propagation_mandatory:使用当前事务,如果没有当前事务,就抛出异常。
propagation_required_new:新建事务,如果当前存在事务,把当前事务挂起。
propagation_not_supported:以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
propagation_never:以非事务方式执行操作,如果当前事务存在则抛出异常。
propagation_nested:如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与propagation_required类似的操作
本地事务失效问题
注意同类中调用的话,被调用事务会失效,原因在于aop。
事务基于代理,同对象的方法动态代理都是同一个。
解决方案是使用代理对象调用。
引用aop-starter后,使用aspectJ,开启AspectJ动态代理,原来默认使用的是jdk动态代理。
解决方案
使用@EnableAspectJAutoProxy(exposeProxy=true)后,就取代了jdk动态代理,它没有接口也可以创建动态代理。设置true是为了对外暴露代理对象。
AopContext.currentProxy()然后强转,就是当前代理对象。
public interface AService { public void a(); public void b(); } /**** 此处的this指向目标对象,因此调用this.b()将不会执行b事务切面,即不会执行事务增强,* 因此b方法的事务定义“@Transactional(propagation = Propagation.REQUIRES_NEW)”将不会实施,* 即结果是b和a方法的事务定义是一样的(我们可以看到事务切面只对a方法进行了事务增强,没有对b方法进行增强)* */@Service()public class AServiceImpl1 implements AService{ @Transactional(propagation = Propagation.REQUIRED) public void a() { this.b(); } @Transactional(propagation = Propagation.REQUIRES_NEW) public void b() { } }
Q1:b中的事务会不会生效?
A1:不会,a的事务会生效,b中不会有事务,因为a中调用b属于内部调用,没有通过代理,所以不会有事务产生。
Q2:如果想要b中有事务存在,要如何做?
A2:<aop:aspectj-autoproxy expose-proxy=“true”> ,设置expose-proxy属性为true,将代理暴露出来,使用AopContext.currentProxy()获取当前代理,将this.b()改为((UserService)AopContext.currentProxy()).b()
public void a() { ((AService) AopContext.currentProxy()).b();//即调用AOP代理对象的b方法即可执行事务切面进行事务增强 }
注意事项
事务传播问题中,传播后事务设置还是原来的,如果不想用原来设置,必须new事务。
Spring中事务的默认实现使用的是AOP,也就是代理的方式,如果大家在使用代码测试时,同一个Service类中的方法相互调用需要使用注入的对象来调用,不要直接使用this.方法名来调用,this.方法名调用是对象内部方法调用,不会通过Spring代理,也就是事务不会起作用
分布式事务
CAP理论
内容介绍
一致性(Consistency)
在分布式系统中的所有数据备份,在同一时刻是否同样的值(等同于所有节点访问同一份最新的数据副本)。
可用性(Availability)
在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
分区容惜性(Partitiontolerance)
大多数分布式系统都分布在多个子网络。每个子网络就叫做一个区(partition)。
分区容错的意思是,区间通信可能失败。比如,一台服务器放在中国,另一台服务器放在美国,这就是两个区,它们之间可能无法通信。
原则介绍
CAP原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾
CP要求一致性(有一个没同步好就不可用)
AP要求高可用
选举与同步理论
分布式一致性动画演示
http://thesecretlivesofdata.com/raft/
raft协议
是一个实现分布式一致性的协议
结点状态
follower、candidate和leader
选举leader
默认都以follower状态启动,follower监听不到leader,就称为一个candidate;
投票给自己,然后告诉其他人,同时也收到别人的投票信息。根据投票信息和投票信息里带的信息(如那个节点里的数据);
收到投票后,改选一个自己觉得最靠谱的。某一节点收到票数超过一半就变成leader
raft有两个超时时间控制领导选举
选举超时:从follower到candidate的时间,150ms-300ms(自旋时间),这个时间段内没收到leader的心跳就变为候选者。
a. 自旋时间结束后变成candidate,开始一轮新的选举(老师上课举的例子是);
b. 投出去票后重新计时自旋;
c. leader就发送追加日志给follower,follower就正常
消息发送的心跳时间:如10ms,leader收到投票后,下一次心跳时就带上消息,follower收到消息后重置选举时间
leader宕机,follower收不到心跳,开始新的选举
写数据
接下来所有的数据都要先给leader,leader派发给follower;
比如领导收到信息5后,领导先在leader的log中写入变化set 5。(上面的动态红颜色代表没提交),此时5还没提交,而是改了leader的log后;
leader下一次心跳时,顺便带着信息让follower也去改变follower的log,follower写入日志成功后,发送确认ack 5给leader,
leader收到大多数的ack后,leader就自己正式写入数据,然后告诉follower提交写入硬盘/内存吧(这个过程和响应客户端是同时的)。这个过程叫做日志复制(也有过半机制)
然后leader响应说集群写入好了
其他
5台机器因为局域网隔离又分为3、2生成两个leader(导致部分结点消息滞后)
对于1、2结点那个leader:更新log后收不到大多数的ack(得超过1个ack),所以改log不成功,一直保存不成功
对于3、4、5结点的leader:收到消息后更新log并且收到ack过半且超过1个,成功保存。
此时网络又通了,以更高轮选举的leader为主,退位一个leader。那1、2结点日志都回滚,同步新leader的log。这样就都一致性了
更多动画(可以自己选择宕机情况)
raft.github.io
注意事项
集群一般都是单数,因为有过半机制。比如原来集群6个机器,分为2半后,各3个,选leader时谁都拿不到6/2+1=4个投票,所以都没有leader,导致前端请求都无法保存数据。
一般都是保证AP,舍弃C,后续发现扣减不一致后,再恢复。
BASE理论
内容介绍
BASE理论是对CAP理论的延伸,思想是即使无法做到强一致性(CAP的一致性就是强一致性),但可以采用弱一致性,即最终一致性
基本可用(Basically Available)
基本可用是指分布式系统在出现故障的时候,允许损失部分可用性(例如响应时间、功能上的可用性),允许损失部分可用性。需要注意的是,基本可用绝不等价于系统不可用。
响应时间上的损失:正常情况下搜索引擎需要在0.5秒之内返回给用户相应的查询结果,但由于出现故障(比如系统部分机房发生断电或断网故障),查询结果的响应时间增加到了1~2秒。
功能上的损失:购物网站在购物高峰(如双十一)时,为了保护系统的稳定性,部分消费者可能会被引导到一个降级页面。
软状态(Soft State)
软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。分布式存储中一般一份数据会有多个副本,允许不同副本同步的延时就是软状态的体现。mysql replication的异步复制也是一种体现。
最终一致性(Eventual Consistency)
最终一致性是指系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况。(这也是分布式事务的想法)
一致性分类
从客户端角度,多进程并发访同时,更新过的数据在不同程如何获的不同策珞,决定了不同的一致性。
对于关系型要求更新过据能后续的访同都能看到,这是强一致性;
如果能容忍后经部分过者全部访问不到,则是弱一致性;
如果经过一段时间后要求能访问到更新后的数据,则是最终一致性。
解决方案
2PC模式(XA事务)
内容介绍
数据库支持的2pc(2二阶段提交),又叫做XA Transactions
支持情况:mysql从5.5版本开始支持,SQLserver2005开始支持,Oracle7开始支持。
其中,XA是一个两阶段提交协议,该协议分为以下两个阶段:
第一阶段:事务协调器要求每个涉及到事务的数据库预提交(P090此操作,并反映是否可以提交;
第二阶段:事务协调器要求每个数据库提交数据。
如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。
原理介绍
如图所示,如果有订单服务和库存服务要求分布式事务,要求有一个总的事务管理器将事务分为两个阶段:
第一个阶段是预备(log);
第二个阶段是正式提交(commit)
总事务管理器接收到两个服务都预备好了log(收到ack),就告诉他们commit,如果有一个没准备好,就回滚所有事务。
小结
XA协议比较简单,而且一旦商业数据库实现了XA协议,使用分布式事务的成本也比较低。
性能不理想,特别是在交易下单链路,往往并发量很高,XA无法满足高并发场景;
XA目前在商业数据库支持的比较理想,在mysql数据库中支持的不太理想,mysql的XA实现,没有记录阶段日志,主备切换回导致主库与备库数据不一致。
许多nosql没有支持XA,这让XA的应用场景变得非常狭隘。
也有3PC,引入了超时机制(无论协调者还是参与者,在向对方发送请求后,若长时间未收到回应则做出相应处理)。
柔性事务-TCC事务补偿型方案
事务分类
刚性事务:遵循ACID原则,强一致性;
柔性事务:遵循BASE理论,最终一致性。
柔性事务简介
与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。
一阶段prepare行为:调用自定义的prepare逻辑。
二阶段commit行为:调用自定义的commit逻憬。
二阶段rollback行为:调用自定义的rollback逻辑。
TCC模式,是指支持 自定义的 分支事务纳入到全局事务的管理中。
柔性事务-最大努力通知型方案
内容介绍
按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接囗进行核对。
这种方案主要用在与第三方系统通讯时,比如:调用微信或支付宝支付后的支付结果通知。
这种方案也是结合MQ进行实现,例如:通过MQ发送就请求,设置最大通知次数。达到通知次数后即不再通知。
案例分析
银行通知、商户通知等(各大交易业务平台间的商户涌知:多次通知、查询校对、对账文件),支付宝的支付成功异步回调。
大业务调用订单、库存、积分。最后积分失败,则一遍遍通知他们回滚
让子业务监听消息队列
如果收不到就重新发
柔性事务=可靠消息+最终一致性方案(异步确保型)
实现方式
业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不是真正的发送。业务处理服务在业务事务提交之后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才会真正发送。
案例分析
内容介绍
以电商项目下订单为例,下订单业务流程涉及跨系统操作:订单服务下订单 —> 库存服务锁库存 —> 用户服务扣减积分
事务保证情景分析
订单服务异常,库存锁定不运行,全部回滚,撤销操作;
库存服务事务自治,锁定失败全部回滚,订单感受到,继续回滚;
库存服务锁定成功,但是网络原因返回数据超时失败问题?
库存服务锁定成功,库存服务下面的逻辑发生故障,订单回滚了,怎么处理?
解决方案
利用消息队列实现最终一致
库存服务锁定成功后发给消息队列消息(当前库存工作单),过段时间自动解锁,解锁时先查询订单的支付状态。解锁成功修改库存工作单详情项状态为已解锁
远程服务假失败:远程服务其实成功了,由于网络故障等没有返回导致订单回滚,库存却扣减;
远程服务执行完成:下面的其他方法出现问题导致已执行的远程请求,肯定不能回滚
订单模型
内容介绍
电商系统涉及到3流,分别是信息流,资金流,物流,而订单系统作为中枢将三者有机的集合起来。
订单生成校验
订单状态
待付款
用户提交订单后,订单进行预下单,目前主流电商网站都会唤起支付,便于用户快速完成支付,需要汪意的是待付款状态下可以对库存进行锁定,锁定库存需要配置支付超时时间,超时后将自动取消订单,订单变更关闭状态。
已付款/代发货
用户完成订单支付,订单系统需要记录支付时间,支付流水单号便于对账,订单下放到WMS系统,仓库进行调拨,配货,分拣,出库等操作。
待收货/已发货
仓储将商品出库后,订单进入物流环节,订单系统需要同步物流信息,便于用户实时知悉物品物流状态。
已完成
用户确认收货后,订单交易完成。后续支付则进行结算,如果订单存在间题进入售后状态。
已取消
付款之前取消订单。包括超时未付款或用户商户取消订单都会产生这种订单状态。
售后中
用户在付款后申请退款,或商家发货后用户申请退换货。
售后也同样存在各种状态:
当发起售后申请后生成售后订单;
售后订单状态为待审核,等待商家审核;
商家审核过后订单状态变更为待退货,等待用户将商品寄回;
商家收到货后订单
订单流程
内容介绍
线上实物订单和虚拟订单的流程,线上实物订单与O2O订单等,需要根据不同的类型进行构建订单流程。
不管类型如何订单都包括正向流程(购买商品)和逆向流程(退换货),正向流程就是一个正常的网购步骤:
订单生成 -> 支付订单 -> 卖家发货 -> 确认收货 -> 交易成功。
订单确认
订单确认页数据展示
- 点击"去结算" -> 订单确认页(详情展示)
展示当前用户收获地址list;
所有选中的购物项list;
支付方式;
送货清单,价格也是最新价格,不是加入购物车时的价格;
优惠信息
- 点击"去结算" -> 订单确认页(携带的数据模型)
要注意生成订单的时候,价格得重新算;
在后面的修改中,会让提交订单时不带着购物车数据,而是在后台重新 查询购物车选项;
会带着总价,比对新总价和就总价是否一致。
订单确认数据模型
public class OrderConfirmVo { // 跳转到确认页时需要携带的数据模型。@Getter@Setter/** 会员收获地址列表 **/private List<MemberAddressVo> memberAddressVos;@Getter @Setter/** 所有选中的购物项 **/private List<OrderItemVo> items;/** 发票记录 **/@Getter @Setter/** 优惠券(会员积分) **/private Integer integration;/** 防止重复提交的令牌 **/@Getter @Setterprivate String orderToken;@Getter @SetterMap<Long,Boolean> stocks;public Integer getCount() { // 总件数Integer count = 0;if (items != null && items.size() > 0) {for (OrderItemVo item : items) {count += item.getCount();}}return count;}/** 计算订单总额**///BigDecimal total;public BigDecimal getTotal() { BigDecimal totalNum = BigDecimal.ZERO;if (items != null && items.size() > 0) {for (OrderItemVo item : items) {//计算当前商品的总价格BigDecimal itemPrice = item.getPrice().multiply(new BigDecimal(item.getCount().toString()));//再计算全部商品的总价格totalNum = totalNum.add(itemPrice);}}return totalNum;}/** 应付价格 **///BigDecimal payPrice;public BigDecimal getPayPrice() {return getTotal();}}
订单确认页数据获取
异步处理
查询购物项(redis)、库存和收货地址(数据库)都要调用远程服务,串行会浪费大量时间,因此我们使用CompletableFuture进行异步编排。
防重处理
为了防止多次重复点击“订单提交按钮”。我们在返回订单确认页时,在redis中生成一个随机的令牌,过期时间为30min,提交订单时会携带这个令牌,我们将会在订单提交的处理页面核验此令牌。
利用CompletableFuture异步获取各项数据
@Override // OrderServiceImplpublic OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException {// 获取用户,用用户信息获取购物车MemberRespVo MemberRespVo = LoginUserInterceptor.threadLocal.get();// 封装订单OrderConfirmVo confirmVo = new OrderConfirmVo();// 我们要从request里获取用户数据,但是其他线程是没有这个信息的,// 所以可以手动设置新线程里也能共享当前的request数据RequestAttributes attributes = RequestContextHolder.getRequestAttributes();// 1.远程查询所有的收获地址列表CompletableFuture<Void> getAddressFuture = CompletableFuture.runAsync(() -> {// 因为异步线程需要新的线程,而新的线程里没有request数据,所以我们自己设置进去RequestContextHolder.setRequestAttributes(attributes);List<MemberAddressVo> address;try {address = memberFeignService.getAddress(MemberRespVo.getId());confirmVo.setAddress(address);} catch (Exception e) {log.warn("\n远程调用会员服务失败 [会员服务可能未启动]");}}, executor);// 2. 远程查询购物车服务,并得到每个购物项是否有库存CompletableFuture<Void> cartFuture = CompletableFuture.runAsync(() -> {// 异步线程共享 RequestContextHolder.getRequestAttributes()RequestContextHolder.setRequestAttributes(attributes);// feign在远程调用之前要构造请求 调用很多拦截器// 远程获取用户的购物项List<OrderItemVo> items = cartFeignService.getCurrentUserCartItems();confirmVo.setItems(items);}, executor).thenRunAsync(() -> {RequestContextHolder.setRequestAttributes(attributes);List<OrderItemVo> items = confirmVo.getItems();// 获取所有商品的idList<Long> skus = items.stream().map(item -> item.getSkuId()).collect(Collectors.toList());R hasStock = wmsFeignService.getSkuHasStock(skus);List<SkuStockVo> data = hasStock.getData(new TypeReference<List<SkuStockVo>>() {});if (data != null) {// 各个商品id 与 他们库存状态的映射map // 学习下收集成map的用法Map<Long, Boolean> stocks = data.stream().collect(Collectors.toMap(SkuStockVo::getSkuId, SkuStockVo::getHasStock));confirmVo.setStocks(stocks);}}, executor);// 3.查询用户积分Integer integration = MemberRespVo.getIntegration();confirmVo.setIntegration(integration);// 4.其他数据在类内部自动计算// TODO 5.防重令牌 设置用户的令牌String token = UUID.randomUUID().toString().replace("-", "");confirmVo.setOrderToken(token);// redis中添加用户id,这个设置可以防止订单重复提交。生成完一次订单后删除redisstringRedisTemplate.opsForValue().set(OrderConstant.USER_ORDER_TOKEN_PREFIX + MemberRespVo.getId(), token, 10, TimeUnit.MINUTES);// 等待所有异步任务完成CompletableFuture.allOf(getAddressFuture, cartFuture).get();return confirmVo;}
运费收件信息获取
- 注意事项
有货无货状态,每个商品单独查比较麻烦,可以用skuId-list异步调用库存系统查出来;
加上运费,并且切换地址时要重新计算运费、总额;
点击提交订单时计算总额,而不是用当前页面的值,或者比对一下值,不一致让用户重新看订单
邮费数据封装
@Datapublic class FareVo { // 邮费private MemberAddressVo address;private BigDecimal fare;}
将页面选中地址的id传给请求获取邮费
@RequestMapping("/fare/{addrId}")public FareVo getFare(@PathVariable("addrId") Long addrId) {return wareInfoService.getFare(addrId);}@Overridepublic FareVo getFare(Long addrId) {FareVo fareVo = new FareVo();R info = memberFeignService.info(addrId);if (info.getCode() == 0) {MemberAddressVo address = info.getData("memberReceiveAddress", new TypeReference<MemberAddressVo>() {});fareVo.setAddress(address);String phone = address.getPhone();//取电话号的最后两位作为邮费String fare = phone.substring(phone.length() - 2, phone.length());fareVo.setFare(new BigDecimal(fare));}return fareVo;}
订单提交
- 幂等性处理(token令牌机制)
准备好订单确认数据后,返回给用户看运费等信息,同时创建防重令牌redis.set(‘order:token:(userId)’,uuid),一并返回;
用户点击提交订单按钮,带着token(hidden元素带着);
渲染订单确认页,后台处理的时候确认请求带过来token的uuid和redis库中是否一致;
此处是重点,比对后立刻删除,比对和删除要求具有原子性,通过redis-lua脚本完成;
提交订单时不要提交购买的商品,去购物车数据库重新获取即可,防止购物车变化和修改页面值;
但可以提交总额,防止商品金额变了还提交订单,用户不满意;
其他信息可以用token和session获取
订单数据
订单提交携带数据
@Datapublic class OrderSubmitVo {/** 收获地址的id **/private Long addrId;/** 支付方式 **/private Integer payType;//无需提交要购买的商品,去购物车再获取一遍//优惠、发票/** 防重令牌 **/private String orderToken;/** 应付价格 **/private BigDecimal payPrice;/** 订单备注 **/private String remarks;//用户相关的信息,直接去session中取出即可}
成功后转发至支付页面携带的数据
@Datapublic class SubmitOrderResponseVo {// 该实体为order表的映射private OrderEntity order;/** 错误状态码 **/private Integer code;}
提交订单
内容介绍
提交订单成功,则携带返回数据转发至支付页面;
提交订单失败,则携带错误信息重定向至确认页
逻辑分析
在OrderWebController里接收到下单请求,然后去OrderServiceImpl里验证和下单,然后再返回到OrderWebController。相当于OrderWebController是封装了我们原来的OrderServiceImpl,用作web
调用service,具体逻辑是交给orderService.submitOrder(submitVo),service返回了失败Code信息,可以看是什么原因引起的下单失败
@PostMapping("/submitOrder") // OrderWebControllerpublic String submitOrder(OrderSubmitVo submitVo, Model model,RedirectAttributes redirectAttributes){try {// 去OrderServiceImpl服务里验证和下单SubmitOrderResponseVo responseVo = orderService.submitOrder(submitVo);// 下单失败回到订单重新确认订单信息if(responseVo.getCode() == 0){// 下单成功去支付响应model.addAttribute("submitOrderResp", responseVo);// 支付页return "pay";}else{String msg = "下单失败";switch (responseVo.getCode()){case 1: msg += "订单信息过期,请刷新在提交";break;case 2: msg += "订单商品价格发送变化,请确认后再次提交";break;case 3: msg += "商品库存不足";break;}redirectAttributes.addFlashAttribute("msg", msg); // 重定向return "redirect:http://order.gulimall.com/toTrade";}} catch (Exception e) {if (e instanceof NotStockException){String message = e.getMessage();redirectAttributes.addFlashAttribute("msg", message);}return "redirect:http://order.gulimall.com/toTrade";}}
验证原子性令牌
为防止在【获取令牌、对比值和删除令牌】之间发生错误导入令牌校验出错,我们必须使用lua脚本保证原子性操作;
改为先锁库存再生成订单;
库存服务后面讲
// @Transactional(isolation = Isolation.READ_COMMITTED) 设置事务的隔离级别// @Transactional(propagation = Propagation.REQUIRED) 设置事务的传播级别@Transactional(rollbackFor = Exception.class)// @GlobalTransactional(rollbackFor = Exception.class)@Overridepublic SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {// 当前线程共享该对象confirmVoThreadLocal.set(vo);SubmitOrderResponseVo responseVo = new SubmitOrderResponseVo();//去创建、下订单、验令牌、验价格、锁定库存...//获取当前用户登录的信息MemberResponseVo memberResponseVo = LoginUserInterceptor.loginUser.get();// 0:正常responseVo.setCode(0);//1、验证令牌是否合法【令牌的对比和删除必须保证原子性】返回 0 - 令牌删除失败 或 1 - 删除成功String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";String orderToken = vo.getOrderToken();//通过lua脚本原子验证令牌和删除令牌Long result = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),Arrays.asList(USER_ORDER_TOKEN_PREFIX + memberResponseVo.getId()),orderToken);if (result == 0L) {//令牌验证失败responseVo.setCode(1);return responseVo;} else {//令牌验证成功//1、创建订单、订单项等信息OrderCreateTo order = createOrder();//2、验证价格BigDecimal payAmount = order.getOrder().getPayAmount();BigDecimal payPrice = vo.getPayPrice();if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) {//金额对比//TODO 3、保存订单 挪到最后//4、库存锁定,只要有异常,回滚订单数据//订单号、所有订单项信息(skuId,skuNum,skuName)WareSkuLockVo lockVo = new WareSkuLockVo();lockVo.setOrderSn(order.getOrder().getOrderSn());//获取出要锁定的商品数据信息List<OrderItemVo> orderItemVos = order.getOrderItems().stream().map((item) -> {OrderItemVo orderItemVo = new OrderItemVo();orderItemVo.setSkuId(item.getSkuId());orderItemVo.setCount(item.getSkuQuantity());orderItemVo.setTitle(item.getSkuName());return orderItemVo;}).collect(Collectors.toList());lockVo.setLocks(orderItemVos);//TODO 调用远程锁定库存的方法//出现的问题:扣减库存成功了,但是由于网络原因超时,出现异常,导致订单事务回滚,库存事务不回滚(解决方案:seata)//为了保证高并发,不推荐使用seata,因为是加锁,并行化,提升不了效率,可以发消息给库存服务R r = wmsFeignService.orderLockStock(lockVo);if (r.getCode() == 0) {//锁定成功responseVo.setOrder(order.getOrder());// int i = 10/0;// 保存订单saveOrder(order);//TODO 订单创建成功,发送消息给MQrabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder());//删除购物车里的数据redisTemplate.delete(CART_PREFIX+memberResponseVo.getId());return responseVo;} else {//锁定失败String msg = (String) r.get("msg");throw new NoStockException(msg);// responseVo.setCode(3);// return responseVo;}} else {responseVo.setCode(2);return responseVo;}}}
订单创建To
最终订单要返回的数据
@Datapublic class OrderCreateTo {private OrderEntity order;private List<OrderItemEntity> orderItems;/** 订单计算的应付价格 **/private BigDecimal payPrice;/** 运费 **/private BigDecimal fare;}
创建订单和订单项
用IdWorker生成订单号,是时间和本身对象的组合;
构建订单。此时还没商品,用threadlocal保存一些当前线程的数据,就不用写形参了;
构建订单项。填入具体的商品,涉及锁库存的问题;
计算价格
//2. 创建订单、订单项OrderCreateTo order =createOrderTo(memberResponseVo,submitVo);private OrderCreateTo createOrderTo(MemberResponseVo memberResponseVo, OrderSubmitVo submitVo) {//2.1 用IdWorker生成订单号String orderSn = IdWorker.getTimeId();//2.2 构建订单OrderEntity entity = buildOrder(memberResponseVo, submitVo,orderSn);//2.3 构建订单项List<OrderItemEntity> orderItemEntities = buildOrderItems(orderSn);//2.4 计算价格compute(entity, orderItemEntities);OrderCreateTo createTo = new OrderCreateTo();createTo.setOrder(entity);createTo.setOrderItems(orderItemEntities);return createTo;}// 构建订单private OrderEntity buildOrder(MemberResponseVo memberResponseVo, OrderSubmitVo submitVo, String orderSn) {OrderEntity orderEntity =new OrderEntity();orderEntity.setOrderSn(orderSn);//1) 设置用户信息orderEntity.setMemberId(memberResponseVo.getId());orderEntity.setMemberUsername(memberResponseVo.getUsername());//2) 获取邮费和收件人信息并设置FareVo fareVo = wareFeignService.getFare(submitVo.getAddrId());BigDecimal fare = fareVo.getFare();orderEntity.setFreightAmount(fare);MemberAddressVo address = fareVo.getAddress();orderEntity.setReceiverName(address.getName());orderEntity.setReceiverPhone(address.getPhone());orderEntity.setReceiverPostCode(address.getPostCode());orderEntity.setReceiverProvince(address.getProvince());orderEntity.setReceiverCity(address.getCity());orderEntity.setReceiverRegion(address.getRegion());orderEntity.setReceiverDetailAddress(address.getDetailAddress());//3) 设置订单相关的状态信息orderEntity.setStatus(OrderStatusEnum.CREATE_NEW.getCode());orderEntity.setConfirmStatus(0);orderEntity.setAutoConfirmDay(7);return orderEntity;}
构建订单项
订单项指的是订单里具体的商品
**StringUtils.collectionToDelimitedString(list, “;分隔符”)**工具可以集合/数组转string;
订单项得算优惠后的价格;
用BigDecimal精确计算
// OrderServiceImplprivate List<OrderItemEntity> buildOrderItems(String orderSn) {// 这里是最后一次来确认购物项的价格 这个远程方法还会查询一次数据库List<OrderItemVo> cartItems = cartFeignService.getCurrentUserCartItems();List<OrderItemEntity> itemEntities = null;if(cartItems != null && cartItems.size() > 0){itemEntities = cartItems.stream().map(cartItem -> {OrderItemEntity itemEntity = buildOrderItem(cartItem);itemEntity.setOrderSn(orderSn);return itemEntity;}).collect(Collectors.toList());}return itemEntities;}/*** 构建某一个订单项*/ // OrderServiceImplprivate OrderItemEntity buildOrderItem(OrderItemVo cartItem) {OrderItemEntity itemEntity = new OrderItemEntity();// 1.订单信息: 订单号// 已经在items里设置了// 2.商品spu信息Long skuId = cartItem.getSkuId();// 远程获取spu的信息R r = productFeignService.getSpuInfoBySkuId(skuId);SpuInfoVo spuInfo = r.getData(new TypeReference<SpuInfoVo>() {});itemEntity.setSpuId(spuInfo.getId());itemEntity.setSpuBrand(spuInfo.getBrandId().toString());itemEntity.setSpuName(spuInfo.getSpuName());itemEntity.setCategoryId(spuInfo.getCatalogId());// 3.商品的sku信息itemEntity.setSkuId(cartItem.getSkuId());itemEntity.setSkuName(cartItem.getTitle());itemEntity.setSkuPic(cartItem.getImage());itemEntity.setSkuPrice(cartItem.getPrice());// 把一个集合按照指定的字符串进行分割得到一个字符串// 属性list生成一个stringString skuAttr = StringUtils.collectionToDelimitedString(cartItem.getSkuAttr(), ";");itemEntity.setSkuAttrsVals(skuAttr);itemEntity.setSkuQuantity(cartItem.getCount());// 4.积分信息 买的数量越多积分越多 成长值越多itemEntity.setGiftGrowth(cartItem.getPrice().multiply(new BigDecimal(cartItem.getCount())).intValue());itemEntity.setGiftIntegration(cartItem.getPrice().multiply(new BigDecimal(cartItem.getCount())).intValue());// 5.订单项的价格信息 优惠金额itemEntity.setPromotionAmount(new BigDecimal("0.0")); // 促销打折itemEntity.setCouponAmount(new BigDecimal("0.0")); // 优惠券itemEntity.setIntegrationAmount(new BigDecimal("0.0")); // 积分// 当前订单项的原价BigDecimal orign = itemEntity.getSkuPrice().multiply(new BigDecimal(itemEntity.getSkuQuantity().toString()));// 减去各种优惠的价格BigDecimal subtract =orign.subtract(itemEntity.getCouponAmount()) // 优惠券逻辑没有写,应该去coupon服务查用户的sku优惠券.subtract(itemEntity.getPromotionAmount()) // 官方促销.subtract(itemEntity.getIntegrationAmount()); // 京豆/积分itemEntity.setRealAmount(subtract);return itemEntity;}
商品项价格计算完毕创建订单
private OrderCreateTo createOrder() {OrderCreateTo orderCreateTo = new OrderCreateTo();// 1. 生成一个订单号String orderSn = IdWorker.getTimeId();// 填充订单的各种基本信息,价格信息OrderEntity orderEntity = buildOrderSn(orderSn);// 2. 获取所有订单项 // 从里面已经设置好了用户该使用的价格List<OrderItemEntity> items = buildOrderItems(orderSn);// 3.根据订单项计算价格 传入订单 、订单项 计算价格、积分、成长值等相关信息computerPrice(orderEntity, items);orderCreateTo.setOrder(orderEntity);orderCreateTo.setOrderItems(items);return orderCreateTo;}
计算总价
private void computerPrice(OrderEntity orderEntity, List<OrderItemEntity> items) {// 叠加每一个订单项的金额BigDecimal coupon = new BigDecimal("0.0");BigDecimal integration = new BigDecimal("0.0");BigDecimal promotion = new BigDecimal("0.0");BigDecimal gift = new BigDecimal("0.0");BigDecimal growth = new BigDecimal("0.0");// 总价BigDecimal totalPrice = new BigDecimal("0.0");for (OrderItemEntity item : items) { // 这段逻辑不是特别合理,最重要的是累积总价,别的可以跳过// 优惠券的金额coupon = coupon.add(item.getCouponAmount());// 积分优惠的金额integration = integration.add(item.getIntegrationAmount());// 打折的金额promotion = promotion.add(item.getPromotionAmount());BigDecimal realAmount = item.getRealAmount();totalPrice = totalPrice.add(realAmount);// 购物获取的积分、成长值gift.add(new BigDecimal(item.getGiftIntegration().toString()));growth.add(new BigDecimal(item.getGiftGrowth().toString()));}// 1.订单价格相关 总额、应付总额orderEntity.setTotalAmount(totalPrice);orderEntity.setPayAmount(totalPrice.add(orderEntity.getFreightAmount()));orderEntity.setPromotionAmount(promotion);orderEntity.setIntegrationAmount(integration);orderEntity.setCouponAmount(coupon);// 设置积分、成长值orderEntity.setIntegration(gift.intValue());orderEntity.setGrowth(growth.intValue());// 设置订单的删除状态orderEntity.setDeleteStatus(OrderStatusEnum.CREATE_NEW.getCode());}
验价
计算完总价后返回主逻辑,将"页面提交的价格"和"后台计算的价格"进行对比,若不同则提示用户商品价格发生变化
// @GlobalTransactional@Transactional@Override // OrderServiceImplpublic SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {// 1. 验证令牌 [必须保证原子性] 返回 0 or 1if (result == 0L) { // 令牌验证失败} else { // 令牌验证成功// 1 .创建订单等信息OrderCreateTo order = createOrder();// 2. 验价BigDecimal payAmount = order.getOrder().getPayAmount();BigDecimal voPayPrice = vo.getPayPrice();// 获取带过来的价格if (Math.abs(payAmount.subtract(voPayPrice).doubleValue()) < 0.01) {/****************/}else {//验价失败responseVo.setCode(2);return responseVo;}}}
保存订单到数据库
private void saveOrder(OrderCreateTo orderCreateTo) {OrderEntity order = orderCreateTo.getOrder();order.setCreateTime(new Date());order.setModifyTime(new Date());this.save(order);orderItemService.saveBatch(orderCreateTo.getOrderItems());}
锁定库存发送延迟队列
锁定库存失败要取消订单
// 在订单里的逻辑:// 前面是创建订单、订单项、验价等逻辑...// .....// List<OrderItemVo> orderItemVos = order.getOrderItems().stream().map((item) -> {OrderItemVo orderItemVo = new OrderItemVo();orderItemVo.setSkuId(item.getSkuId());orderItemVo.setCount(item.getSkuQuantity());return orderItemVo;}).collect(Collectors.toList());// 去锁库存 @RequestMapping("/lock/order")R r = wareFeignService.orderLockStock(orderItemVos);//5.1 锁定库存成功if (r.getCode()==0){responseVo.setOrder(order.getOrder());responseVo.setCode(0);return responseVo;}else {//5.2 锁定库存失败String msg = (String) r.get("msg");throw new NoStockException(msg);}
锁定库存远程服务
找出所有库存大于商品数的仓库;
遍历所有满足条件的仓库,逐个尝试锁库存,若锁库存成功则退出遍历。
/*** 锁定库存WareSkuController* @param vo** 库存解锁的场景* 1)、下订单成功,订单过期没有支付被系统自动取消或者被用户手动取消,都要解锁库存* 2)、下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁* 3)、** @return*/@PostMapping(value = "/lock/order")public R orderLockStock(@RequestBody WareSkuLockVo vo) {try {boolean lockStock = wareSkuService.orderLockStock(vo);return R.ok().setData(lockStock);} catch (NoStockException e) {return R.error(NO_STOCK_EXCEPTION.getCode(),NO_STOCK_EXCEPTION.getMessage());}}/*** 锁定库存WareSkuServiceImpl* 为某个订单锁定库存* @param vo* @return*/@Transactional(rollbackFor = Exception.class)@Overridepublic boolean orderLockStock(WareSkuLockVo vo) {/*** 保存库存工作单详情信息* 便于追溯进行消息撤回*/WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity();wareOrderTaskEntity.setOrderSn(vo.getOrderSn());wareOrderTaskEntity.setCreateTime(new Date());wareOrderTaskService.save(wareOrderTaskEntity);//1、按照下单的收货地址,找到一个就近仓库,锁定库存//2、找到每个商品在哪个仓库都有库存List<OrderItemVo> locks = vo.getLocks();List<SkuWareHasStock> collect = locks.stream().map((item) -> {// 创建订单项SkuWareHasStock stock = new SkuWareHasStock();Long skuId = item.getSkuId();stock.setSkuId(skuId);stock.setNum(item.getCount()); // 购买数量//查询这个商品在哪个仓库有库存List<Long> wareIdList = wareSkuDao.listWareIdHasSkuStock(skuId);stock.setWareId(wareIdList);return stock;}).collect(Collectors.toList());//2、锁定库存for (SkuWareHasStock hasStock : collect) {boolean skuStocked = false;Long skuId = hasStock.getSkuId();List<Long> wareIds = hasStock.getWareId();if (org.springframework.util.StringUtils.isEmpty(wareIds)) {//没有任何仓库有这个商品的库存(注意可能会回滚之前的订单项,没关系)throw new NoStockException(skuId);}//1、如果每一个商品都锁定成功,将当前商品锁定了几件的工作单记录发给MQ//2、锁定失败。前面保存的工作单信息都回滚了。发送出去的消息,即使要解锁库存,由于在数据库查不到指定的id,所以就不用解锁for (Long wareId : wareIds) {//锁库存,更新sql用到了cas, 锁定成功就返回1,失败就返回0Long count = wareSkuDao.lockSkuStock(skuId,wareId,hasStock.getNum());if (count == 1) {skuStocked = true;WareOrderTaskDetailEntity taskDetailEntity = WareOrderTaskDetailEntity.builder().skuId(skuId).skuName("").skuNum(hasStock.getNum()).taskId(wareOrderTaskEntity.getId()).wareId(wareId).lockStatus(1).build();// db保存订单sku项工作单详情,告诉商品锁的哪个库存wareOrderTaskDetailService.save(taskDetailEntity);//TODO 发送库存锁定消息到延迟队列,告诉MQ库存锁定成功StockLockedTo lockedTo = new StockLockedTo();lockedTo.setId(wareOrderTaskEntity.getId());StockDetailTo detailTo = new StockDetailTo();BeanUtils.copyProperties(taskDetailEntity,detailTo);lockedTo.setDetailTo(detailTo);// 发送rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",lockedTo);break; // 一定要跳出,防止重复发送多余消息} else {//当前仓库锁失败,重试下一个仓库}}if (skuStocked == false) {//当前商品所有仓库都没有锁住throw new NoStockException(skuId);}}//3、肯定全部都是锁定成功的return true;}
-- 新建商品库存表DROP TABLE IF EXISTS `wms_ware_sku`;CREATE TABLE `wms_ware_sku` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',`sku_id` bigint(20) NULL DEFAULT NULL COMMENT 'sku_id',`ware_id` bigint(20) NULL DEFAULT NULL COMMENT '仓库id',`stock` int(11) NULL DEFAULT NULL COMMENT '库存数',`sku_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'sku_name',`stock_locked` int(11) NULL DEFAULT 0 COMMENT '锁定库存',PRIMARY KEY (`id`) USING BTREE,INDEX `sku_id`(`sku_id`) USING BTREE,INDEX `ware_id`(`ware_id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '商品库存' ROW_FORMAT = Dynamic;
<!-- cas 锁定库存 --><update id="lockSkuStock">UPDATE `wms_ware_sku` SET stock_locked = stock_locked + #{num}WHERE sku_id = #{skuId} AND ware_id = #{wareId} AND stock-stock_locked >= #{num}</update>
小结
这里通过异常机制控制事务回滚,如果在锁定库存失败则抛出NoStockExceptions,订单服务和库存服务都会回滚。
优化逻辑为:锁库存后,把内容发到消息队列里
消息队列并不立刻消费,而是让其过期,过期后重新入队别的消息队列,别的消息队列拿到后验证订单是否被支付,没被支付的话还原到库存里。
订单回滚
seata解决分布式事务问题(了解)
内容介绍
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
快速开始:http://seata.io/zh-cn/docs/user/quickstart.html
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
流程分析
TM告诉TC开启一个全局事务。
storage注册分支事务,实时向TC汇报分支状态。
account失败,告诉TC失败了,TC回滚全部全局事务。
实现过程
使用@GlobalTransactional 注解在业务方法上
@GlobalTransactionalpublic void purchase(String userId, String commodityCode, int orderCount) {......}
创建日志表
有业务步骤,但是SEATA AT模式需要 UNDO_LOG 表,记录之前执行的操作。每个涉及的子系统对应的数据库都要新建表
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_logCREATE TABLE `undo_log` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`branch_id` bigint(20) NOT NULL,`xid` varchar(100) NOT NULL,`context` varchar(128) NOT NULL,`rollback_info` longblob NOT NULL,`log_status` int(11) NOT NULL,`log_created` datetime NOT NULL,`log_modified` datetime NOT NULL,`ext` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
引入依赖
<!-- 带上版本号 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId></dependency>
整合应用
从 https://github.com/seata/seata/archive/v0.7.1.zip 下载服务器软件包senta-server-0.7.1,将其解压缩,作为TC;
为了节省git资源,我们下载源码的项目自己编译;
编译项目
(1) 下载后复制到guli项目下,然后在File -> Project Structure -> Modules 中点击+号Import Module,选择项目里的seata;
(2) 会有报错,protobuf这个包找不到。在idea中安装proto buffer editor插件,重启idea(还找不到就重新编译一下,在mvn中找到seata-serializer子项目,点击protobuf里的compile选项。有个grpc的test报错,先全注释掉)
(3) 有一个server项目,找到注册中心配置resource/registry.conf,修改启动的nacos信息。可以修改注册中心和配置中心(先不用管file.conf)
registry {# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa# 修改这个type = "nacos"nacos {# 修改这个serverAddr = "localhost:8848"namespace = "public"cluster = "default"}
(4) 启动server下的主类;
(5) 在nacos中看到一个serverAddr服务
添加注解
在大事务的入口标记注解@GlobalTransactional开启全局事务,并且每个小事务标记注解@Transactional。
@GlobalTransactional@Transactional@Overridepublic SubmitOrderResponseVo submitOrder(OrderSubmitVo submitVo) {}
使用参考链接:https://github.com/seata/seata-samples/tree/master/springcloud-jpa-seata
配置数据源
注入 DataSourceProxy
因为Seata 通过代理数据源实现分支事务,如果没有注入,事务无法成功回滚
// 方式一@Configurationpublic class DataSourceConfig {@Bean@ConfigurationProperties(prefix = "spring.datasource")public DruidDataSource druidDataSource() {return new DruidDataSource();}/*** 需要将 DataSourceProxy 设置为主数据源,否则事务无法回滚** @param druidDataSource The DruidDataSource*/@Primary@Bean("dataSource")public DataSource dataSource(DruidDataSource druidDataSource) {return new DataSourceProxy(druidDataSource);}}// 方式二@Configurationpublic class MySeataConfig {@AutowiredDataSourceProperties dataSourceProperties;@Beanpublic DataSource dataSource(DataSourceProperties dataSourceProperties) {HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();if (StringUtils.hasText(dataSourceProperties.getName())) {dataSource.setPoolName(dataSourceProperties.getName());}return new DataSourceProxy(dataSource);}}
注意事项:
file.conf 的 service.vgroup_mapping 配置必须和spring.application.name一致
在GlobalTransactionAutoConfiguration类中,默认会使用 ${spring.application.name}-fescar-service-group作为服务名注册到 Seata Server上(每个小事务也要注册到tc上),如果和file.conf中的配置不一致,会提示 no available server to connect错误
可以通过配置yaml的 spring.cloud.alibaba.seata.tx-service-group 修改后缀,但是必须和file.conf中的配置保持一致。
修改配置文件
在order、ware中都配置好上面的配置;
然后它还要求每个微服务要有register.conf和file.conf;
将register.conf和file.conf复制到需要开启分布式事务的根目录,并修改file.conf中配置vgroup_mapping.${application.name}-fescar-service-group = "default"
service {#vgroup->rgroupvgroup_mapping.gulimall-ware-fescar-service-group = "default"#only support single node default.grouplist = "127.0.0.1:8091"#degrade current not supportenableDegrade = false#disabledisable = false#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanentmax.commit.retry.timeout = "-1"max.rollback.retry.timeout = "-1"}
小结
tcc也可以看samples。
但是上面使用的是AT模式,2pc不适用高并发,发生了几次远程调用。去保护spu,适合使用at模式。
高并发,如下单,at模式有很多锁,影响效率。所以不使用at tcc。
使用消息队列,失败了之后发消息。库存服务本身也可以使用自动解锁模式。
自动解锁:定期全部检索很麻烦,所以引入延迟队列。库存服务订阅消息队列,库存解锁发给消息队列
保存库存工作单和库存工作单详情,锁定库存后数据库记录。后面的事务失败后看前面的库存,有没解锁的就解锁。
锁库存后害怕订单失败,锁库存后发送给消息队列,只不过要暂存一会先别被消费,半小时以后再消费就可以知道大事务成功没有。
消息队列实现最终一致性(推荐)
延迟队列
场景介绍
比如未付款订单,超过一定时间后,系统自动取消订单并释放占有商品库存。
方案对比
定时任务:spring的schedule定时任务轮询数据库
消耗系统内存、增加了数据库的压力、存在较大时间误差;
存在超时和检测时间段错开的情况(时效性问题),最高等2倍的定时任务时间
rabbitmq的消息TTL和死信Exchange结合(推荐)
订单关了之后40分钟后库存检查订单存在还是取消。
下订单延迟队列,不要设置消息过期,要设置为队列过期方式。节省一个交换机,使用bean方式创建交换机。
内容介绍
延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
实现流程
内容简介
rabbitmq可以通过 设置队列的TTL + 死信路由 实现延迟队列
TTL(Time-To-Live 消息存活时间)
RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
死信路由DLX
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
流程图示
针对订单模块创建以上消息队列,创建订单时消息会被发送至队列order.delay.queue,经过TTL的时间后消息会变成死信以order.release.order的路由键经交换机转发至队列order.release.order.queue,再通过监听该队列的消息来实现过期订单的处理。
延迟队列使用场景
为什么不能用定时任务完成?
如果恰好在一次扫描后完成业务逻辑,那么就会等待两个扫描周期才能扫到过期的订单,不能保证时效性。
订单分布式主体逻辑
订单超时未支付触发订单过期状态修改与库存解锁
创建订单时消息会被发送至队列order.delay.queue,经过TTL的时间后消息会变成死信以order.release.order的路由键经交换机转发至队列order.release.order.queue,再通过监听该队列的消息来实现过期订单的处理
如果该订单已支付,则无需处理;
否则说明该订单已过期,修改该订单的状态并通过路由键order.release.other发送消息至队列stock.release.stock.queue进行库存解锁。
库存锁定后延迟检查是否需要解锁库存
在库存锁定后通过路由键stock.locked发送至延迟队列stock.delay.queue,延迟时间到,死信通过路由键stock.release转发至stock.release.stock.queue,通过监听该队列进行判断当前订单状态,来确定库存是否需要解锁。
@Bean交换机和队列
内容介绍
在ware和order中配置好pom、yaml、@EnableRabbit
订单模块
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.Exchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;@Configurationpublic class MyRabbitMQConfig {/* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 *//*** 死信队列** @return*/@Beanpublic Queue orderDelayQueue() {/*Queue(String name, 队列名字boolean durable, 是否持久化boolean exclusive, 是否排他boolean autoDelete, 是否自动删除Map<String, Object> arguments) 属性*/HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", "order-event-exchange");arguments.put("x-dead-letter-routing-key", "order.release.order");arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟Queue queue = new Queue("order.delay.queue", true, false, false, arguments);return queue;}/*** 普通队列** @return*/@Beanpublic Queue orderReleaseQueue() {Queue queue = new Queue("order.release.order.queue", true, false, false);return queue;}/*** TopicExchange** @return*/@Beanpublic Exchange orderEventExchange() {/** String name,* boolean durable,* boolean autoDelete,* Map<String, Object> arguments* */return new TopicExchange("order-event-exchange", true, false);}@Beanpublic Binding orderCreateBinding() {/** String destination, 目的地(队列名或者交换机名字)* DestinationType destinationType, 目的地类型(Queue、Exhcange)* String exchange,* String routingKey,* Map<String, Object> arguments* */return new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}@Beanpublic Binding orderReleaseBinding() {return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}/*** 订单释放直接和库存释放进行绑定* @return*/@Beanpublic Binding orderReleaseOtherBinding() {return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.other.#",null);}/*** 商品秒杀队列* @return*/@Beanpublic Queue orderSecKillOrrderQueue() {Queue queue = new Queue("order.seckill.order.queue", true, false, false);return queue;}@Beanpublic Binding orderSecKillOrrderQueueBinding() {//String destination, DestinationType destinationType, String exchange, String routingKey,// Map<String, Object> argumentsBinding binding = new Binding("order.seckill.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.seckill.order",null);return binding;}}
库存模块
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.Exchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;@Configurationpublic class MyRabbitMQConfig {/*** 使用JSON序列化机制,进行消息转换* @return*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}// @RabbitListener(queues = "stock.release.stock.queue")// public void handle(Message message) {//// }/*** 库存服务默认的交换机* @return*/@Beanpublic Exchange stockEventExchange() {//String name, boolean durable, boolean autoDelete, Map<String, Object> argumentsTopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false);return topicExchange;}/*** 普通队列,用于解锁库存* @return*/@Beanpublic Queue stockReleaseStockQueue() {//String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> argumentsQueue queue = new Queue("stock.release.stock.queue", true, false, false);return queue;}/*** 延迟队列* @return*/@Beanpublic Queue stockDelay() {HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", "stock-event-exchange");arguments.put("x-dead-letter-routing-key", "stock.release");// 消息过期时间 2分钟arguments.put("x-message-ttl", 120000);Queue queue = new Queue("stock.delay.queue", true, false, false,arguments);return queue;}/*** 交换机与普通队列绑定* @return*/@Beanpublic Binding stockLocked() {//String destination, DestinationType destinationType, String exchange, String routingKey,// Map<String, Object> argumentsBinding binding = new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);return binding;}/*** 交换机与延迟队列绑定* @return*/@Beanpublic Binding stockLockedBinding() {return new Binding("stock.delay.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}}
库存回滚解锁
库存锁定
业务逻辑
由于可能订单回滚的情况,所以为了能够得到库存锁定的信息,在锁定时需要记录库存工作单,其中包括订单信息和锁定库存时的信息(仓库id,商品id,锁了几件…);
在锁定成功后,向延迟队列发消息,带上库存锁定的相关信息
代码逻辑(具体参照上面提交订单)
遍历订单项,遍历每个订单项的每个库存,直到锁到库存;
发消息后库存回滚也没关系,用id是查不到数据库的;
数据库锁定库存SQL
<update id="lockSkuStock">UPDATE `wms_ware_sku` SET stock_locked = stock_locked + #{num}WHERE sku_id = #{skuId} AND ware_id = #{wareId}AND stock-stock_locked >= #{num}</update>
接收消息
业务逻辑
延迟队列会将过期的消息路由至"stock.release.stock.queue",通过监听该队列实现库存的解锁;
为保证消息的可靠到达,我们使用手动确认消息的模式,在解锁成功后确认消息,若出现异常则重新归队
库存解锁
代码逻辑
如果工作单详情不为空,说明该库存锁定成功:
(1) 查询最新的订单状态;
(2) 如果订单不存在,说明订单提交出现异常回滚;
(3) 如果订单存在(但订单处于已取消的状态),我们都对已锁定的库存进行解锁
如果工作单详情为空,说明库存未锁定,自然无需解锁;
为保证幂等性,我们分别对订单的状态和工作单的状态都进行了判断,只有当订单过期且工作单显示当前库存处于锁定的状态时,才进行库存的解锁;
解锁库存同时更改工作单状态为已解锁。
监听器异步解锁
import com.rabbitmq.client.Channel;import com.xunqi.common.to.OrderTo;import com.xunqi.common.to.mq.StockLockedTo;import com.xunqi.gulimall.ware.service.WareSkuService;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.io.IOException;@Slf4j@RabbitListener(queues = "stock.release.stock.queue")@Servicepublic class StockReleaseListener {@Autowiredprivate WareSkuService wareSkuService;/*** 1、库存自动解锁* 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁** 2、订单失败* 库存锁定失败** 只要解锁库存的消息失败,一定要告诉服务解锁失败*/@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {log.info("******收到解锁库存的信息******");try {//当前消息是否被第二次及以后(重新)派发过来了// Boolean redelivered = message.getMessageProperties().getRedelivered();//解锁库存wareSkuService.unlockStock(to);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}@RabbitHandlerpublic void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {log.info("******收到订单关闭,准备解锁库存的信息******");try {wareSkuService.unlockStock(orderTo);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
解锁库存核心sql
<update id="unlockStock">UPDATE `wms_ware_sku` SET stock_locked = stock_locked - #{num}WHERE sku_id = #{skuId} AND ware_id = #{wareId}</update>
解锁库存核心代码
@Overridepublic void unlockStock(StockLockedTo to) {log.info("收到解锁库存的消息");//库存工作单的idStockDetailTo detail = to.getDetailTo();Long detailId = detail.getId();/*** 解锁* 1、查询数据库关于这个订单锁定库存信息* 有:证明库存锁定成功了* 解锁:订单状况* 1、没有这个订单,必须解锁库存* 2、有这个订单,不一定解锁库存* 订单状态:已取消:解锁库存* 已支付:不能解锁库存* 没有:就是库存锁定失败, 库存回滚了 这种情况无需回滚*/WareOrderTaskDetailEntity taskDetailInfo = wareOrderTaskDetailService.getById(detailId);if (taskDetailInfo != null) {//查出wms_ware_order_task工作单的信息Long id = to.getId();WareOrderTaskEntity orderTaskInfo = wareOrderTaskService.getById(id);//获取订单号查询订单状态 已取消才解锁库存String orderSn = orderTaskInfo.getOrderSn();//远程查询订单信息R orderData = orderFeignService.getOrderStatus(orderSn);if (orderData.getCode() == 0) {//订单数据返回成功OrderVo orderInfo = orderData.getData("data", new TypeReference<OrderVo>() {});//判断订单状态是否已取消或者支付或者订单不存在if (orderInfo == null || orderInfo.getStatus() == 4) {//订单已被取消,才能解锁库存if (taskDetailInfo.getLockStatus() == 1) {//当前库存工作单详情状态1,已锁定,但是未解锁才可以解锁unLockStock(detail.getSkuId(),detail.getWareId(),detail.getSkuNum(),detailId);}}} else {//消息拒绝以后重新放在队列里面,让别人继续消费解锁//远程调用服务失败throw new RuntimeException("远程调用服务失败");}} else {//无需解锁}}/*** 解锁库存的方法* @param skuId* @param wareId* @param num* @param taskDetailId*/public void unLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId) {//库存解锁wareSkuDao.unLockStock(skuId,wareId,num);//更新工作单的状态WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity();taskDetailEntity.setId(taskDetailId);//变为已解锁taskDetailEntity.setLockStatus(2);wareOrderTaskDetailService.updateById(taskDetailEntity);}
其他
- 注意远程调用还需要登录的问题,所以设置拦截器不拦截 order/order/status/{orderSn}
boolean match = new AntPathMatcher().match("order/order/status/**", uri);
get方法,安全性还好,如果修改的url呢?前面主要是因为没带redis-key查询session,所以我们或许**可以在远程调用中想办法传入redis-key**。
定时关单
提交订单
详见上方订单提交模块
监听队列
业务逻辑
创建订单的消息会进入延迟队列,最终发送至队列order.release.order.queue,因此我们对该队列进行监听,进行订单的关闭
监听器异步关单
import com.rabbitmq.client.Channel;import com.xunqi.gulimall.order.entity.OrderEntity;import com.xunqi.gulimall.order.service.OrderService;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.io.IOException;@RabbitListener(queues = "order.release.order.queue")@Servicepublic class OrderCloseListener {@Autowiredprivate OrderService orderService;@RabbitHandlerpublic void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn());try {orderService.closeOrder(orderEntity);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
关闭订单
业务逻辑
由于要保证幂等性,因此要查询最新的订单状态判断是否需要关单;
关闭订单后也需要解锁库存,因此发送消息进行库存、会员服务对应的解锁
核心代码
/*** 关闭订单* @param orderEntity*/@Overridepublic void closeOrder(OrderEntity orderEntity) {//关闭订单之前先查询一下数据库,判断此订单状态是否已支付OrderEntity orderInfo = this.getOne(new QueryWrapper<OrderEntity>().eq("order_sn",orderEntity.getOrderSn()));//如果订单还处于新创建的状态,说明超时未支付,进行关单if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) {//代付款状态进行关单OrderEntity orderUpdate = new OrderEntity();orderUpdate.setId(orderInfo.getId());orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode());this.updateById(orderUpdate);// 关单后发送消息给MQ通知其他服务进行关单相关的操作,如解锁库存OrderTo orderTo = new OrderTo();BeanUtils.copyProperties(orderInfo, orderTo);try {//TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);} catch (Exception e) {//TODO 定期扫描数据库,重新发送失败的消息}}}
解锁库存
监听器异步解锁库存
import com.rabbitmq.client.Channel;import com.xunqi.common.to.OrderTo;import com.xunqi.common.to.mq.StockLockedTo;import com.xunqi.gulimall.ware.service.WareSkuService;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.io.IOException;@Slf4j@RabbitListener(queues = "stock.release.stock.queue")@Servicepublic class StockReleaseListener {@Autowiredprivate WareSkuService wareSkuService;/*** 1、库存自动解锁* 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁** 2、订单失败* 库存锁定失败** 只要解锁库存的消息失败,一定要告诉服务解锁失败*/@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {log.info("******收到解锁库存的信息******");try {//当前消息是否被第二次及以后(重新)派发过来了// Boolean redelivered = message.getMessageProperties().getRedelivered();//解锁库存wareSkuService.unlockStock(to);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}@RabbitHandlerpublic void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {log.info("******收到订单关闭,准备解锁库存的信息******");try {wareSkuService.unlockStock(orderTo);// 手动删除消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {// 解锁失败 将消息重新放回队列,让别人消费channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
解锁库存核心代码
/*** 防止订单服务卡顿,导致订单状态消息一直改不了,库存优先到期,查订单状态新建,什么都不处理* 导致卡顿的订单,永远都不能解锁库存* @param orderTo*/@Transactional(rollbackFor = Exception.class)@Overridepublic void unlockStock(OrderTo orderTo) {String orderSn = orderTo.getOrderSn();//查一下最新的库存解锁状态,防止重复解锁库存WareOrderTaskEntity orderTaskEntity = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);//按照工作单的id找到所有 没有解锁的库存,进行解锁Long id = orderTaskEntity.getId();List<WareOrderTaskDetailEntity> list = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", id).eq("lock_status", 1));for (WareOrderTaskDetailEntity taskDetailEntity : list) {unLockStock(taskDetailEntity.getSkuId(),taskDetailEntity.getWareId(),taskDetailEntity.getSkuNum(),taskDetailEntity.getId());}}
参考链接
【谷粒商城】分布式事务与下单
https://blog.csdn.net/hancoder/article/details/114983771
全网最强电商教程《谷粒商城》对标阿里P6/P7,40-60万年薪
https://www.bilibili.com/video/BV1np4y1C7Yf?p=284
mall源码工程
https://github.com/CharlesKai/mall
电商项目实战之分布式事务解决方案相关推荐
- 400集高并发分布式超级电商项目实战
带走一盏渔火 让他温暖我的双眼 留下一段真情 让它停泊在枫桥边 久违的你 一定保存着那套网盘 许多年以后 躺在网盘里的视频 依然尘封未动 涛声依旧不见当初的夜晚 今天的你我 怎样重复昨天的故事 涛哥说 ...
- 电商项目实战之缓存与Redis分布式锁
电商项目实战之缓存与Redis分布式锁 缓存失效 缓存穿透 缓存雪崩 缓存击穿 分布式缓存 分布式锁 SpringBoot整合Redisson实现分布式锁 实现过程 缓存和数据库一致性 场景分析 解决 ...
- java spu sku_SpringBoot电商项目实战 — 商品的SPU/SKU实现
最近事情有点多,所以系列文章已停止好多天了.今天我们继续Springboot电商项目实战系列文章.到目前为止,整个项目的架构和基础服务已经全部实现,分布式锁也已经讲过了.那么,现在应该到数据库设计及代 ...
- 电商项目实战之商品秒杀
电商项目实战之商品秒杀 定时任务 corn表达式 实现方式 基于注解 基于接口 实战 秒杀系统 秒杀系统关注问题 秒杀架构设计 商品上架 获取当前秒杀商品 获取当前商品的秒杀信息 秒杀最终处理 参考链 ...
- 软件测试电商项目实战(写进简历没问题)
前言 说实话,在找项目的过程中,我下载过(甚至付费下载过)N多个项目.联系过很多项目的作者,但是绝大部分项目,在我看来,并不适合你拿来练习,它们或多或少都存在着"问题",比如: 1 ...
- 微信小程序电商项目实战-前言
各位CSDN的朋友,我们都知道,现在微信小程序电商平台特别火爆,所以我将以一个生鲜电商项目为例,为大家讲述微信小程序的实战化开发,价值几万元的成熟项目,你可千万不要错过哦. 大家直接通过视频链接直接看 ...
- 电商项目实战-项目模板-毕业设计
下载地址:电商项目实战项目模板.毕业设计-Web服务器文档类资源-CSDN下载 ├── 基于vue电商管理系统.zip └── 电商项目实战 ├── 10.vuex │ ├── c ...
- 电商项目实战第一节: CSS3+HTML5+JS 设计案例【考拉海购网站】之【顶部导航】
文章目录 [考拉海购网站]之[顶部导航] 第一步,分析布局 第二步,建立基本的文本目录及文件 第三步,根据第一步对导航栏的分析,在html代码里面补全需要的标签 index.html文件代码 第四步, ...
- 前端电商项目实战,如何从 0 开始创造一个【考拉海购官网】?( 共6节教程 )
文章目录 声明 一,关于页面还原度效果比较 二,第一组演示图是 考拉海购官网的 三,第二组演示图是 本次教程从0开发的 四,教程目录(共6节) 五,全部代码下载地址 新手提示 (1)如何从github ...
最新文章
- LeetCode 75. 颜色分类(Sort Colors)
- 网络宣传推广教大家网站的过期页面更合理的处理方法
- 如何升级xcode 中的cocos2dx 到v2.2.2以上版本
- 使用VS2019编写C语言程序,环境安装配置+代码调试
- BaseAdapter封装 实现万能适配器
- 敏捷开发总结(1)软件研发过程
- ASP.NET Core MVC+EF Core从开发到部署
- Numpy库的学习(三)
- 怎么单选_第一届化妆品分类大赛丨用过的化妆刷、过期的口红…该怎么扔
- Mac上编译C++报错
- 怎么查看XP系统是32位还是64位
- 游戏制作人谈10大开发经验
- Windows两台服务器之间实现文件共享
- T1076 正常血压(信息学一本通C++)
- v2ray服务端启动出现panic: runtime error: invalid memory address or nil pointer dereference
- 决定迭代次数的两种效应
- 针对Mrpt/build中的make时u出现的问题ccache: error: Failed to create temporary file for /home/jyy/.ccache/tmp/tm
- 成功编译和运行roslaunch qbo_webi qbo_webi.launch(解决qbo_object_recognition之后的其他问题)
- Navicat15安装使用
- 而立苏宁:零售之王的自我迭代
热门文章
- 多线程一,什么是多线程,创建多线程的几种方式
- java.lang.NoSuchMethodException 的解决方法们(转)
- @程序员 定制专属于你的保温杯
- iframe页面无法跳转问题
- 自然语言菜鸟学习笔记(二)
- python batchnorm2d_BatchNorm2d()理解
- FastJson实现复杂对象序列化与反序列化
- Solidify实现一个智能合约9(数组和string之间的转换关系)
- jecat php toolbox,利用JeCat PHP Toolbox 做网站留言板
- 惨遭 openssl 不同版本毒打的一天