目录


一、订单业务模块

  1. 订单流程

  2. 购物车跳转订单确认页

    1. 登录拦截器
    2. 封装vo
    3. Feign远程调用丢失请求头信息
    4. Feign远程异步调用丢失上下文信息
  3. 提交订单接口幂等性

    1. 令牌token机制
    2. 各种锁机制
    3. 各种唯一约束
    4. 防重表
    5. 全局请求唯一id

二、分布式事务

  1. 本地事务
  2. 分布式事务
    • CAP定理
    • BASE理论
    • 强一致性、弱一致性、最终一致性
  3. 分布式事务解决方案
    • 2PC模式
    • 柔性事务-TCC事务补偿方案
    • 柔性事务-最大努力通知型方案
    • 柔性事务-可靠消息+最终一致性方案(异步确认)
  4. 整合Seata分布式事务(强一致性)

三、延时队列实现定时任务

  1. 场景分析
  2. 概念
  3. 代码实现
    1. 订单服务
    2. 库存服务
  4. 如何保证消息的可靠性
    • 消息丢失
    • 消息重复
    • 消息积压

一、订单业务模块

概述

  • 电商系统涉及到 3 流,分别时信息流,资金流,物流,而订单系统作为中枢将三者有机的集
    合起来。
  • 订单模块是电商系统的枢纽,在订单这个环节上需求获取多个模块的数据和信息,同时对这
    些信息进行加工处理后流向下个环节,这一系列就构成了订单的信息流通

1.订单流程

不管类型如何订单都包括正向流程和逆向流程,对应的场景就是购买商品和退换货流程,正
向流程就是一个正常的网购步骤:订单生成–>支付订单–>卖家发货–>确认收货–>交易成功。
而每个步骤的背后,订单是如何在多系统之间交互流转的,可概括如下图

1、订单创建与支付

(1) 、订单创建前需要预览订单,选择收货信息等
(2) 、订单创建需要锁定库存,库存有才可创建,否则不能创建
(3) 、订单创建后超时未支付需要解锁库存
(4) 、支付成功后,需要进行拆单,根据商品打包方式,所在仓库,物流等进行拆单
(5) 、支付的每笔流水都需要记录,以待查账
(6) 、订单创建,支付成功等状态都需要给 MQ 发送消息,方便其他系统感知订阅

2、逆向流程

(1) 、修改订单,用户没有提交订单,可以对订单一些信息进行修改,比如配送信息,
优惠信息,及其他一些订单可修改范围的内容,此时只需对数据进行变更即可。

(2) 、订单取消,用户主动取消订单和用户超时未支付,两种情况下订单都会取消订
单,而超时情况是系统自动关闭订单,所以在订单支付的响应机制上面要做支付的
限时处理,尤其是在前面说的下单减库存的情形下面,可以保证快速的释放库存。
另外需要需要处理的是促销优惠中使用的优惠券,权益等视平台规则,进行相应补
回给用户。

(3) 、退款,在待发货订单状态下取消订单时,分为缺货退款和用户申请退款。如果是
全部退款则订单更新为关闭状态,若只是做部分退款则订单仍需进行进行,同时生
成一条退款的售后订单,走退款流程。退款金额需原路返回用户的账户。

(4) 、发货后的退款,发生在仓储货物配送,在配送过程中商品遗失,用户拒收,用户
收货后对商品不满意,这样情况下用户发起退款的售后诉求后,需要商户进行退款
的审核,双方达成一致后,系统更新退款状态,对订单进行退款操作,金额原路返
回用户的账户,同时关闭原订单数据。仅退款情况下暂不考虑仓库系统变化。如果
发生双方协调不一致情况下,可以申请平台客服介入。在退款订单商户不处理的情
况下,系统需要做限期判断,比如 5 天商户不处理,退款单自动变更同意退款

2.购物车页跳转到订单确认页

用户登录状态下查看购物车商品信息,点击去结算,订单详情页需要显示

  • 商品最新价格、优惠信息
  • 用户的基本信息、地址
  • 支付方式等
  • 订单总价格
1.登录拦截器

首先订单业务都需要登录状态,设置一个全局拦截器LoginInterceptor

package henu.soft.xiaosi.order.interceptor;import henu.soft.common.constant.AuthServerConstant;
import henu.soft.common.to.MemberResponseTo;
import org.springframework.util.AntPathMatcher;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.ModelAndView;import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;/*** 登录拦截器,未登录的用户不能进入订单服务*/
public class LoginInterceptor implements HandlerInterceptor {public static ThreadLocal<MemberResponseTo> loginUser = new ThreadLocal<>();@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 获取登录状态HttpSession session = request.getSession();MemberResponseTo memberResponseVo = (MemberResponseTo) session.getAttribute(AuthServerConstant.LOGIN_USER);//登陆了if (memberResponseVo != null) {loginUser.set(memberResponseVo);return true;}else {session.setAttribute("msg","请先登录!");response.sendRedirect("http://auth.gulishop.cn/login.html");return false;}}@Overridepublic void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {}
}

注册拦截器

package henu.soft.xiaosi.order.config;import henu.soft.xiaosi.order.interceptor.LoginInterceptor;
import org.springframework.context.annotation.Configuration;import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;@Configuration
public class MyWebConfig implements WebMvcConfigurer {@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginInterceptor()).addPathPatterns("/**");}
}
2.封装vo
package henu.soft.xiaosi.order.vo;import lombok.Getter;
import lombok.Setter;import java.math.BigDecimal;
import java.util.List;
import java.util.Map;public class OrderConfirmVo {@Getter@Setter/** 会员收获地址列表 **/private List<MemberAddressVo> memberAddressVos;@Getter @Setter/** 所有选中的购物项 **/private List<OrderItemVo> items;/** 发票记录 **/@Getter @Setter/** 优惠券(会员积分) **/private Integer integration;/** 防止重复提交的令牌 **/@Getter @Setterprivate String orderToken;@Getter @SetterMap<Long,Boolean> stocks;public Integer getCount() {Integer count = 0;if (items != null && items.size() > 0) {for (OrderItemVo item : items) {count += item.getCount();}}return count;}/** 订单总额 **///BigDecimal total;//计算订单总额public BigDecimal getTotal() {BigDecimal totalNum = BigDecimal.ZERO;if (items != null && items.size() > 0) {for (OrderItemVo item : items) {//计算当前商品的总价格BigDecimal itemPrice = item.getPrice().multiply(new BigDecimal(item.getCount().toString()));//再计算全部商品的总价格totalNum = totalNum.add(itemPrice);}}return totalNum;}/** 应付价格 **///BigDecimal payPrice;public BigDecimal getPayPrice() {return getTotal();}
}
3.Feign远程调用丢失请求头信息

登录信息保存在分布式session中,浏览器的cookie保存这些信息,直接浏览器访问controller会带上cookie

但是远程调用的方法创建一个新的request对应的controller不会带上cookie,解决办法是加上feign的拦截器

package henu.soft.xiaosi.cart.config;import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;@Configuration
public class MyFeignConfig {@Beanpublic RequestInterceptor requestInterceptor() {return new RequestInterceptor() {@Overridepublic void apply(RequestTemplate template) {//1. 使用RequestContextHolder拿到常常请求的请求数据,同一个线程内可以获取的到ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();if (requestAttributes != null) {HttpServletRequest request = requestAttributes.getRequest();if (request != null) {//2. 将老请求得到cookie信息放到feign请求上String cookie = request.getHeader("Cookie");template.header("Cookie", cookie);}}}};}
}
4.Feign远程异步调用丢失上下文信息

Feign远程方法调用异步方法,会开启新的线程,

  • 之前是 主线程---> 拦截器--->controller--->service都是一个主线程,可以拿到ThreadLocal线程共享的cookie信息
  • 现在是 新的异步线程--->拦截器--->controller--->service 每个异步任务对应一个线程,拿不到主线程的cookie信息

解决办法

在异步方法内部重新设置 上下文信息 RequestContextHolder.setRequestAttributes(requestAttributes);

3.提交订单接口幂等性

概念

  • 接口幂等性就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因
    为多次点击而产生了副作用;
  • 比如说支付场景,用户购买了商品支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额返发现多扣钱了,流水记录也变成了两条...,这就没有保证接口
    的幂等性。

防止场景

  • 用户多次点击按钮
  • 用户页面回退再次提交
  • 微服务互相调用,由于网络问题,导致请求失败。feign 触发重试机制
  • 其他业务情况

幂等情况, 以 SQL 为例,有些操作是天然幂等的。

  • SELECT * FROM table WHER id=?,无论执行多少次都不会改变状态,是天然的幂等。
  • UPDATE tab1 SET col1=1 WHERE col2=2,无论执行成功多少次状态都是一致的,也是幂等操作。
  • delete from user where userid=1,多次操作,结果一样,具备幂等性
  • insert into user(userid,name) values(1,'a')如 userid 为唯一主键,即重复操作上面的业务,只
    会插入一条用户数据,具备幂等性。
  • UPDATE tab1 SET col1=col1+1 WHERE col2=2,每次执行的结果都会发生变化,不是幂等的。
  • insert into user(userid,name) values(1,'a')如 userid 不是主键,可以重复,那上面业务多次操
    作,数据都会新增多条,不具备幂等性。
1.令牌token机制

1、服务端提供了发送 token 的接口。我们在分析业务的时候,哪些业务是存在幂等问题的,
就必须在执行业务前,先去获取 token,服务器会把 token 保存到 redis 中。

2、然后调用业务接口请求时,把 token 携带过去,一般放在请求头部。

3、服务器判断 token 是否存在 redis 中,存在表示第一次请求,然后删除 token,继续执行业
务。
4、如果判断 token 不存在 redis 中,就表示是重复操作,直接返回重复标记给 client,这样
就保证了业务代码,不被重复执行。

危险性:
1、先删除 token 还是后删除 token;

  • 先删除可能导致,业务确实没有执行,重试还带上之前 token,由于防重设计导致,
    请求还是不能执行。

  • 后删除可能导致,业务处理成功,但是服务闪断,出现超时,没有删除 token,别
    人继续重试,导致业务被执行两边

  • 我们最好设计为先删除 token,如果业务调用失败,就重新获取 token 再次请求。

2、Token 获取、比较和删除必须是原子性

  • redis.get(token) 、token.equals、redis.del(token)如果这两个操作不是原子,可能导
    致,高并发下,都 get 到同样的数据,判断都成功,继续业务并发执行

  • 可以在 redis 使用 lua 脚本完成这个操作
    if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 en

1.准备令牌

// 常量
package henu.soft.common.constant;public class OrderConstant {public static final String USER_ORDER_TOKEN_PREFIX = "order:token";
}//6. 防重令牌String token = UUID.randomUUID().toString().replace("-", "");// 存入redisredisTemplate.opsForValue().set(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseTo.getId(), token, 30, TimeUnit.MINUTES);// 返回给页面confirmVo.setOrderToken(token);
<form action="http://order.gulishop.cn/submitOrder" method="post"><input id="addrInput" type="hidden" name="addrId"/><input id="payPriceInput" type="hidden" name="payPrice"><input name="orderToken" th:value="${confirmOrder.orderToken}" type="hidden"/><button class="tijiao" type="submit">提交订单</button></form>
2.封装实体

订单实体

package henu.soft.xiaosi.order.entity;import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;import java.math.BigDecimal;
import java.io.Serializable;
import java.util.Date;
import lombok.Data;/*** 订单* * @author xiaosi* @email 2589165806@qq.com* @date 2021-07-22 23:34:48*/
@Data
@TableName("oms_order")
public class OrderEntity implements Serializable {private static final long serialVersionUID = 1L;/*** id*/@TableIdprivate Long id;/*** member_id*/private Long memberId;/*** 订单号*/private String orderSn;/*** 使用的优惠券*/private Long couponId;/*** create_time*/private Date createTime;/*** 用户名*/private String memberUsername;/*** 订单总额*/private BigDecimal totalAmount;/*** 应付总额*/private BigDecimal payAmount;/*** 运费金额*/private BigDecimal freightAmount;/*** 促销优化金额(促销价、满减、阶梯价)*/private BigDecimal promotionAmount;/*** 积分抵扣金额*/private BigDecimal integrationAmount;/*** 优惠券抵扣金额*/private BigDecimal couponAmount;/*** 后台调整订单使用的折扣金额*/private BigDecimal discountAmount;/*** 支付方式【1->支付宝;2->微信;3->银联; 4->货到付款;】*/private Integer payType;/*** 订单来源[0->PC订单;1->app订单]*/private Integer sourceType;/*** 订单状态【0->待付款;1->待发货;2->已发货;3->已完成;4->已关闭;5->无效订单】*/private Integer status;/*** 物流公司(配送方式)*/private String deliveryCompany;/*** 物流单号*/private String deliverySn;/*** 自动确认时间(天)*/private Integer autoConfirmDay;/*** 可以获得的积分*/private Integer integration;/*** 可以获得的成长值*/private Integer growth;/*** 发票类型[0->不开发票;1->电子发票;2->纸质发票]*/private Integer billType;/*** 发票抬头*/private String billHeader;/*** 发票内容*/private String billContent;/*** 收票人电话*/private String billReceiverPhone;/*** 收票人邮箱*/private String billReceiverEmail;/*** 收货人姓名*/private String receiverName;/*** 收货人电话*/private String receiverPhone;/*** 收货人邮编*/private String receiverPostCode;/*** 省份/直辖市*/private String receiverProvince;/*** 城市*/private String receiverCity;/*** 区*/private String receiverRegion;/*** 详细地址*/private String receiverDetailAddress;/*** 订单备注*/private String note;/*** 确认收货状态[0->未确认;1->已确认]*/private Integer confirmStatus;/*** 删除状态【0->未删除;1->已删除】*/private Integer deleteStatus;/*** 下单时使用的积分*/private Integer useIntegration;/*** 支付时间*/private Date paymentTime;/*** 发货时间*/private Date deliveryTime;/*** 确认收货时间*/private Date receiveTime;/*** 评价时间*/private Date commentTime;/*** 修改时间*/private Date modifyTime;}

封装的订单vo

package henu.soft.xiaosi.order.vo;import lombok.Data;import java.math.BigDecimal;@Data
public class OrderSubmitVo {/** 收获地址的id **/private Long addrId;/** 支付方式 **/private Integer payType;//无需提交要购买的商品,去购物车再获取一遍//优惠、发票/** 防重令牌 **/private String orderToken;/** 应付价格 **/private BigDecimal payPrice;/** 订单备注 **/private String remarks;//用户相关的信息,直接去session中取出即可
}
3.对应controller
/*** 确认订单* @param submitVo* @param model* @param attributes* @return*/@RequestMapping("/submitOrder")public String submitOrder(OrderSubmitVo submitVo, Model model, RedirectAttributes attributes) {try {SubmitOrderResponseVo responseVo = orderService.submitOrder(submitVo);Integer code = responseVo.getCode();if (code == 0) {model.addAttribute("order", responseVo.getOrder());return "pay";} else {String msg = "下单失败;";switch (code) {case 1:msg += "防重令牌校验失败";break;case 2:msg += "商品价格发生变化";break;}attributes.addFlashAttribute("msg", msg);return "redirect:http://order.gulishop.cn/toTrade";}} catch (Exception e) {if (e instanceof NoStockException) {String msg = "下单失败,商品无库存";attributes.addFlashAttribute("msg", msg);}return "redirect:http://order.gulishop.cn/toTrade";}}
4.对应service

步骤

  • //1. 验证令牌,前端传递的令牌和redis存储的令牌对比
  • //2. 创建订单、订单项
  • //3. 验价
  • //4. 保存订单
  • //5. 锁定库存(RabbitMQ延时队列)
====================== 前面传给页面的token ==================String token = UUID.randomUUID().toString().replace("-", "");redisTemplate.opsForValue().set(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseTo.getId(), token, 30, TimeUnit.MINUTES);confirmVo.setOrderToken(token);=================== 1. 验证防重令牌 ========================MemberResponseTo memberResponseTo = LoginInterceptor.loginUser.get();String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";Long execute = redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseTo.getId()), submitVo.getOrderToken());if (execute == 0L) {//1.1 防重令牌验证失败responseVo.setCode(1);return responseVo;} else {xxx}

数据库表信息

 /*** //2. 创建订单、订单项* @param memberResponseVo* @param submitVo* @return*/private OrderCreateTo createOrderTo(MemberResponseTo memberResponseVo, OrderSubmitVo submitVo) {//用IdWorker生成订单号String orderSn = IdWorker.getTimeId();//构建订单OrderEntity entity = buildOrder(memberResponseVo, submitVo,orderSn);//构建订单项List<OrderItemEntity> orderItemEntities = buildOrderItems(orderSn);//计算价格compute(entity, orderItemEntities);OrderCreateTo createTo = new OrderCreateTo();createTo.setOrder(entity);createTo.setOrderItems(orderItemEntities);return createTo;


验价

//3. 验价BigDecimal payAmount = order.getOrder().getPayAmount();BigDecimal payPrice = submitVo.getPayPrice();if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) {xxx}
/*** //4. 保存订单* @param orderCreateTo*/private void saveOrder(OrderCreateTo orderCreateTo) {OrderEntity order = orderCreateTo.getOrder();order.setCreateTime(new Date());order.setModifyTime(new Date());this.save(order);orderItemService.saveBatch(orderCreateTo.getOrderItems());}

锁库存

封装vo

package henu.soft.xiaosi.order.vo;import henu.soft.common.to.OrderItemTo;
import lombok.Data;import java.util.List;@Data
public class WareSkuLockVo {private String OrderSn;private List<OrderItemTo> locks;
}
//5. 锁定库存List<OrderItemTo> orderItemTos = order.getOrderItems().stream().map((item) -> {OrderItemTo orderItemTo = new OrderItemTo();orderItemTo.setSkuId(item.getSkuId());orderItemTo.setCount(item.getSkuQuantity());return orderItemTo;}).collect(Collectors.toList());WareSkuLockVo lockVo = new WareSkuLockVo();lockVo.setOrderSn(order.getOrder().getOrderSn());lockVo.setLocks(orderItemTos);R r = wareFeignService.orderLockStock(lockVo);//5.1 锁定库存成功if (r.getCode() == 0) {//                    int i = 10 / 0;responseVo.setOrder(order.getOrder());responseVo.setCode(0);//发送消息到订单延迟队列,判断过期订单rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder());//清除购物车记录BoundHashOperations<String, Object, Object> ops = redisTemplate.boundHashOps(CartConstant.CART_PREFIX + memberResponseTo.getId());for (OrderItemEntity orderItem : order.getOrderItems()) {ops.delete(orderItem.getSkuId().toString());}return responseVo;} else {//5.1 锁定库存失败String msg = (String) r.get("msg");throw new NoStockException(msg);}
2.各种锁机制
1、数据库悲观锁
  • select * from xxxx where id = 1 for update;

  • 悲观锁使用时一般伴随事务一起使用,数据锁定时间可能会很长,需要根据实际情况选用。
    另外要注意的是,id 字段一定是主键或者唯一索引,不然可能造成锁表的结果,处理起来会
    非常麻烦。

2、数据库乐观锁
  • update t_goods set count = count -1 , version = version + 1 where good_id=2 and version = 1

  • 这种方法适合在更新的场景中,根据 version 版本,也就是在操作库存前先获取当前商品的 version 版本号,然后操作的时候带上此 version 号。我们梳理下,我们第一次操作库存时,得到 version 为 1,调用库存服务version 变成了 2;但返回给订单服务出现了问题,订单服务又一次发起调用库存服务,当订单服务传如的 version 还是 1,再执行上面的 sql 语句时,就不会执行;因为 version 已经变为 2 了,where 条件就不成立。这样就保证了不管调用几次,只会真正的处理一次。

  • 乐观锁主要使用于处理读多写少的问题

3、业务层分布式锁
  • 如果多个机器可能在同一时间同时处理相同的数据,比如多台机器定时任务都拿到了相同数
    据处理,我们就可以加分布式锁,锁定此数据,处理完成后释放锁。获取到锁的必须先判断
    这个数据是否被处理过。
3.各种唯一约束
1、数据库唯一约束
  • 插入数据,应该按照唯一索引进行插入,比如订单号,相同的订单就不可能有两条记录插入。
    我们在数据库层面防止重复。
  • 这个机制是利用了数据库的主键唯一约束的特性,解决了在 insert 场景时幂等问题。但主键
    的要求不是自增的主键,这样就需要业务生成全局唯一的主键。
  • 如果是分库分表场景下,路由规则要保证相同请求下,落地在同一个数据库和同一表中,要
    不然数据库主键约束就不起效果了,因为是不同的数据库和表主键不相关。
2、redis set 防重
  • 很多数据需要处理,只能被处理一次,比如我们可以计算数据的 MD5 将其放入 redis 的 set,
    每次处理数据,先看这个 MD5 是否已经存在,存在就不处理。
4.防重表
  • 使用订单号 orderNo 做为去重表的唯一索引,把唯一索引插入去重表,再进行业务操作,且
    他们在同一个事务中。这个保证了重复请求时,因为去重表有唯一约束,导致请求失败,避
    免了幂等问题。这里要注意的是,去重表和业务表应该在同一库中,这样就保证了在同一个
    事务,即使业务操作失败了,也会把去重表的数据回滚。这个很好的保证了数据一致性。

  • 之前说的 redis 防重也算

5.全局请求唯一id
  • 调用接口时,生成一个唯一 id,redis 将数据保存到集合中(去重),存在即处理过。
  • 可以使用 nginx 设置每一个请求的唯一 id;
    proxy_set_header X-Request-Id $request_id

二、分布式事务(提交订单、锁库存)

前面提交订单调用远程的仓库服务锁库存,加上的事务是 本地事务

  • 提交订单加上,出现异常回滚
  • 订单保存成功,但是锁库存失败,还是回滚

但是由于不确定因素

  • 订单保存成功了,库存也锁成功了,但是 由于网络原因 并未正常完成逻辑,导致订单保存回滚,但是锁库存没有回滚
  • 订单提交保存订单之后,还可能调用多个其他的远程服务,远程父事务 并不能 很好的管理 子事务

因此本地事务只能控制本地方法的调用,对于远程调用,因此要求统一的分布式事务管理

1.本地事务

1、事务的基本性质
数据库事务的几个特性:原子性(Atomicity )、一致性( Consistency )、隔离性或独立性( Isolation)
和持久性(Durabilily),简称就是 ACID;

  • 原子性:一系列的操作整体不可拆分,要么同时成功,要么同时失败
  • 一致性:数据在事务的前后,业务整体一致。 转账。A:1000;B:1000; 转 200 事务成功; A:800 B:1200
  • 隔离性:事务之间互相隔离。
  • 持久性:一旦事务成功,数据一定会落盘在数据库

2、事务的隔离级别isolation

  • READ UNCOMMITTED(读未提交)
    该隔离级别的事务会读到其它未提交事务的数据,此现象也称之为脏读。
  • READ COMMITTED(读提交)
    一个事务可以读取另一个已提交的事务,多次读取会造成不一样的结果,此现象称为不可重
    复读问题,Oracle 和 SQL Server 的默认隔离级别。
  • REPEATABLE READ(可重复读)
    该隔离级别是 MySQL 默认的隔离级别,在同一个事务里,select 的结果是事务开始时时间
    点的状态,因此,同样的 select 操作读到的结果会是一致的,但是,会有幻读现象。MySQL
    的 InnoDB 引擎可以通过 next-key locks 机制(参考下文"行锁的算法"一节)来避免幻读。
  • SERIALIZABLE(序列化)
    在该隔离级别下事务都是串行顺序执行的,MySQL 数据库的 InnoDB 引擎会给读操作隐式
    加一把读共享锁,从而避免了脏读、不可重读复读和幻读问题。

3、事务的传播行为propagation

  1. PROPAGATION_REQUIRED:如果当前没有事务,就创建一个新事务,如果当前存在事务,
    就加入该事务,该设置是最常用的设置。
  2. PROPAGATION_SUPPORTS:支持当前事务,如果当前存在事务,就加入该事务,如果当
    前不存在事务,就以非事务执行。
  3. PROPAGATION_MANDATORY:支持当前事务,如果当前存在事务,就加入该事务,如果
    当前不存在事务,就抛出异常。
  4. PROPAGATION_REQUIRES_NEW:创建新事务,无论当前存不存在事务,都创建新事务。
  5. PROPAGATION_NOT_SUPPORTED:以非事务方式执行操作,如果当前存在事务,就把当
    前事务挂起。
  6. PROPAGATION_NEVER:以非事务方式执行,如果当前存在事务,则抛出异常。
  7. PROPAGATION_NESTED:如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,
    则执行与 PROPAGATION_REQUIRED 类似的操作。

4、SpringBoot 事务关键点

  1. 事务的自动配置 TransactionAutoConfiguration
  2. 事务的坑
  • 在同一个类里面,编写两个方法,内部调用的时候,会导致事务设置失效。原因是没有用到
    代理对象的缘故。同一个service方法子事务的调用绕过了代理对象,导致直接是方法调用
  • 解决:
    0)、导入 spring-boot-starter-aop
    1)、@EnableTransactionManagement(proxyTargetClass = true)
    2)、@EnableAspectJAutoProxy(exposeProxy=true)
    3)、AopContext.currentProxy() 调用方

2.分布式事务

1、CAP 定理

CAP 原则又称 CAP 定理,指的是在一个分布式系统中

  • 一致性(Consistency):
    在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访
    问同一份最新的数据副本)
  • 可用性(Availability)
    在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据
    更新具备高可用性)
  • 分区容错性(Partition tolerance)
    大多数分布式系统都分布在多个子网络。每个子网络就叫做一个区(partition)。
    分区容错的意思是,区间通信可能失败。比如,一台服务器放在中国,另一台服务
    器放在美国,这就是两个区,它们之间可能无法通信。

CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。

  • 一般来说,分区容错无法避免,因此可以认为 CAP 的 P 总是成立。CAP 定理告诉我们,剩下的 C 和 A 无法同时做到。

  • 分布式系统中实现一致性的 raft 算法,类似redis的主从复制、哨兵模式
    paxos:http://thesecretlivesofdata.com/raft/

  • 对于多数大型互联网应用的场景,主机众多、部署分散,而且现在的集群规模越来越大,所
    以节点故障、网络故障是常态,而且要保证服务可用性达到 99.99999%(N 个 9),即保证
    P 和 A,舍弃 C,即不能保证强一致,但是可以弥补强一致

2.BASE理论

是对 CAP 理论的延伸,思想是即使无法做到强一致性(CAP 的一致性就是强一致性),但

  • 以采用适当的采取弱一致性,即最终一致性。
  • 即不能保证强一致,但是可以弥补强一致

BASE 是指

  • 基本可用(Basically Available)
    (1)基本可用是指分布式系统在出现故障的时候,允许损失部分可用性(例如响应时间、
    功能上的可用性),允许损失部分可用性。需要注意的是,基本可用绝不等价于系
    统不可用。
    (2)响应时间上的损失:正常情况下搜索引擎需要在 0.5 秒之内返回给用户相应的
    查询结果,但由于出现故障(比如系统部分机房发生断电或断网故障),查询
    结果的响应时间增加到了 1~2 秒。
    (3)功能上的损失:购物网站在购物高峰(如双十一)时,为了保护系统的稳定性,
    部分消费者可能会被引导到一个降级页面。
  • 软状态( Soft State)(处于失败、成功的中间状态)
    软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。分布
    式存储中一般一份数据会有多个副本,允许不同副本同步的延时就是软状态的体
    现。mysql replication 的异步复制也是一种体现。
  • 最终一致性( Eventual Consistency)
    最终一致性是指系统中的所有数据副本经过一定时间后,最终能够达到一致的状
    态。弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况
3. 强一致性、弱一致性、最终一致性

从客户端角度,多进程并发访问时,更新过的数据在不同进程如何获取的不同策略,决定了
不同的一致性。

  • 对于关系型数据库,要求更新过的数据能被后续的访问都能看到,这是强一致性。

  • 如果能容忍后续的部分或者全部访问不到,则是弱一致性。(容忍软件态,容忍弥补一致性的时间操作)

  • 如果经过一段时间后要求能访问到更新后的数据,则是最终一致性

三、分布式事务解决方案

1. 2PC 模式

数据库支持的 2PC【2 phase commit 二阶提交】,又叫做 XA Transactions。MySQL 从 5.5 版本开始支持,SQL Server 2005 开始支持,Oracle 7 开始支持。其中,XA 是一个两阶段提交协议,该协议分为以下两个阶段:

  • 第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是
    否可以提交.
  • 第二阶段:事务协调器要求每个数据库提交数据。

其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务
中的那部分信息。

  • XA 协议比较简单,而且一旦商业数据库实现了 XA 协议,使用分布式事务的成本也比较
    低。
  • XA 性能不理想,特别是在交易下单链路,往往并发量很高,XA 无法满足高并发场景
  • XA 目前在商业数据库支持的比较理想,在 mysql 数据库中支持的不太理想,mysql 的XA 实现,没有记录 prepare 阶段日志,主备切换回导致主库与备库数据不一致。
  • 许多 nosql 也没有支持 XA,这让 XA 的应用场景变得非常狭隘。
  • 也有 3PC,引入了超时机制(无论协调者还是参与者,在向对方发送请求后,若长时间
    未收到回应则做出相应处理)
2. 柔性事务-TCC 事务补偿型方案(遵循BASE原则)
  • 刚性事务:遵循 ACID 原则,强一致性。
  • 柔性事务:遵循 BASE 理论,最终一致性;与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。

  • 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。该逻辑为父事务、各个子事务调用自己的try接口方法
  • 二阶段 commit 行为:调用 自定义 的 commit 逻辑。该逻辑为父事务、各个子事务调用自己的confirm接口方法
  • 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。该逻辑为父事务、各个子事务调用自己的cancle接口方法

所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中(抽取一层)

3.柔性事务-最大努力通知型方案

按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对。

  • 这种方案主要用在与第三方系统通讯时,比如:调用微信或支付宝支付后的支付结果通知。这种方案也是结合 MQ 进行实现,例如:通过 MQ 发送 http 请求,设置最大通知次数。达到通知次数后即不再通知。
  • 案例:银行通知、商户通知等(各大交易业务平台间的商户通知:多次通知、查询校对、对账文件),支付宝的支付成功异步回调
  • 案例:不断提醒订阅MQ的服务父事务执行失败,直到作出回应(如订单下失败解锁库存)
4.柔性事务-可靠消息+最终一致性方案(异步确保型)
  • 实现:业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务只
    记录消息数据,而不是真正的发送。业务处理服务在业务事务提交之后,向实时消息服务确
    认发送。只有在得到确认发送指令后,实时消息服务才会真正发送
/**
* 1、做好消息确认机制(pulisher,consumer【手动 ack】)
* 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一
遍
*/CREATE TABLE `mq_message` (`message_id` char(32) NOT NULL, `content` text, `to_exchane` varchar(255) DEFAULT NULL, `routing_key` varchar(255) DEFAULT NULL, `class_type` varchar(255) DEFAULT NULL, `message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb

四、整合Seata

1.概述
  • 官网:http://seata.io/zh-cn/docs/overview/what-is-seata.html
  • Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

  • TC:协调全局
  • TM:控制整个大的事务
  • RM:各个微服务独立的资源管理器,每一个微服务都需要一个回滚日志表,即使不能回滚也要补偿修改的内容
2.建立Seata日志表

每个微服务创建 UNDO_LOG 表,SEATA AT 模式需要 UNDO_LOG 表

-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`branch_id` bigint(20) NOT NULL,`xid` varchar(100) NOT NULL,`context` varchar(128) NOT NULL,`rollback_info` longblob NOT NULL,`log_status` int(11) NOT NULL,`log_created` datetime NOT NULL,`log_modified` datetime NOT NULL,`ext` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
3.安装事务协调器TC
  • 下载地址:https://github.com/seata/seata/releases/tag/v1.2.0
  • 解压配置


配置文件file.conf

启动nacos和seata,如果报错参考:报错2_Seata启动报错:Initialization of output ‘file=xxxlogs/seata_gc.log‘ using options ‘(null)‘ failed.

4.整合
  • 导入依赖:https://github.com/seata/seata
<!--        分布式事务seata--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><version>2.0.1.RELEASE</version><exclusions><exclusion><groupId>io.seata</groupId><artifactId>seata-all</artifactId></exclusion></exclusions></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-all</artifactId><version>${seata.version}</version></dependency>

  • 配置DataSourceProxy代理各个微服务的数据源
package henu.soft.xiaosi.order.config;import com.zaxxer.hikari.HikariDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;import javax.sql.DataSource;@Configuration
public class MySeataConfig {@AutowiredDataSourceProperties dataSourceProperties;@Beanpublic DataSource dataSource(DataSourceProperties dataSourceProperties) {HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();if (StringUtils.hasText(dataSourceProperties.getName())) {dataSource.setPoolName(dataSourceProperties.getName());}return new DataSourceProxy(dataSource);}
}
  • 每个微服务的resource需要放入registry.conf、file.conf,并配置分组

  • 父事务加注解@GlobalTransactional,子事务不用,测试
5.说明

对于普通业务如后台管理的业务,分布式事务解决方案可以使用 Seata的 AT 分布式事务管理,也就是上面的操作

但是对于高并发下的场景,这种是不适合的,需要使用 柔性事务-可靠消息 + 最终一致性方案

  • 父事务失败,发送失败消息给子事务,子事务回滚
  • 延时队列实现定时任务(定时任务太耗费资源),扫描数据库表保存的锁库存记录,根据订单状态判断,然后将失败的锁库存自动解锁

五、延时队列实现定时任务

1.场景分析

定时任务的缺点

  • 耗费系统内存、数据库资源
  • 时效性不能保证,30分钟订单未支付被关闭可能需要多轮才能被扫描出来

父事务下订单,子事务1锁库存,子事务…

  • 不需要分布式事务的场景

    • 订单失败,未进行到锁库存,只需要父事务自动回滚即可。
    • 订单成功,库存锁定成功,其他子事务成功。无需回滚
  • 需要分布式事务的场景

    • 订单成功,锁库存业务也成功,其他远程子事务失败,导致订单回滚,需要自动解锁库存。
    • 订单成功,用户未支付、手动取消,需要自动解锁库存

使用RabbitMQ的延时队列

  • 订单提交之后,先被放到消息队列,到达指定时间30分钟后发送给逻辑业务进行数据库订单保存
  • 订单提交之后,库存锁定成功信息先被放到消息队列,达到指定时间40分钟后检查订单,订单不存在的话自动解锁库存
  • 其实延时队列就是保证 订单状态更新后(已支付、手动取消),判断库存锁定是否逻辑正确,不正确就更正
2.概念
  • 消息的TTL(Time To Live)消息的存活时间
  • 可以对队列、消息设置TTL,前者没有该队列消费者时消息保留最大时间,后者是该消息没有消费者是保留最大时间,超过这个时间成为死信
  • 如果队列和消息都设置了,取二者最小的
  • 通过消息的expiration字段或者 x-message-ttl属性来设置时间

死信会进入死信路由(DLX对应多个队列的路由的Dead Letter Exchage是在普通的路由加上消息转发机制)

  • 被消费者reject拒收的消息,并且参数为requeue为false,即该消息不会被重新放入队列
  • 设置TTL到期的消息
  • 队列长度限制满了。排在前面的消息被丢弃或者扔到死路由的

使用MQ

  • 1、Queue、Exchange、Binding可以@Bean进去
  • 2、监听消息的方法可以有三种参数(不分数量,顺序)
  • Object content, Message message, Channel channel
  • 3、channel可以用来拒绝消息,否则自动ack;

可以控制消息在一段时间变成死信,也可一控制死信转到对应的交换机,结合二者,实现延时队列


建议使用队列过期时间

升级版

  • 不需要分布式事务的场景

    • 订单失败,未进行到锁库存,只需要父事务自动回滚即可。
    • 订单成功,库存锁定成功,其他子事务成功。无需回滚。
  • 需要分布式事务的场景

    • 订单成功,锁库存业务也成功,发送库存锁定信息到延时队列,其他远程子事务失败,导致订单回滚,需要自动解锁库存。
    • 订单成功,锁库存业务也成功,发送库存锁定信息到延时队列,用户未支付、手动取消,需要自动解锁库存

3.代码实现
1.订单服务

订单服务

  • 订单微服务交换机order-event-exchange
  • 延时队列order.delay.queue
  • 死信消费队列order.release.order.queue
  • 两个队列的绑定
package henu.soft.xiaosi.order.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;@Configuration
public class MyRabbitmqConfig {/*订单微服务交换机*/@Beanpublic Exchange orderEventExchange() {/***   String name,*   boolean durable,*   boolean autoDelete,*   Map<String, Object> arguments*/return new TopicExchange("order-event-exchange", true, false);}/*** 延时队列* @return*/@Beanpublic Queue orderDelayQueue() {/**Queue(String name,  队列名字boolean durable,  是否持久化boolean exclusive,  是否排他boolean autoDelete, 是否自动删除Map<String, Object> arguments) 属性*/HashMap<String, Object> arguments = new HashMap<>();//死信交换机arguments.put("x-dead-letter-exchange", "order-event-exchange");//死信路由键arguments.put("x-dead-letter-routing-key", "order.release.order");arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟return new Queue("order.delay.queue",true,false,false,arguments);}/*** 普通队列** @return*/@Beanpublic Queue orderReleaseQueue() {Queue queue = new Queue("order.release.order.queue", true, false, false);return queue;}/*** 创建订单的binding* @return*/@Beanpublic Binding orderCreateBinding() {/*** String destination, 目的地(队列名或者交换机名字)* DestinationType destinationType, 目的地类型(Queue、Exhcange)* String exchange,* String routingKey,* Map<String, Object> arguments* */return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);}@Beanpublic Binding orderReleaseBinding() {return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}}
2.库存服务

库存服务

  • 一个交换机stock-event-exchange
  • 普通队列 stock.release.stock.queue,需要service监听,有消息证明要回滚
  • 延时队列 stock.delay.stock.queue
  • 两个绑定
package henu.soft.xiaosi.ware.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;import java.util.HashMap;/*** 创建MQ的交换机*/
@Configuration
public class MyMQConfig {/*** 交换机* @return*/@Beanpublic Exchange stockEventExchange(){return new TopicExchange("stock-event-exchange",true,false);}/*** 普通队列*/@Beanpublic Queue stockReleaseStockQueue(){// String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> argumentsreturn new Queue("stock.release.stock.queue",true,false,false);}/*** 延时队列*/@Beanpublic Queue stockDelayStockQueue(){// String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments/**Queue(String name,  队列名字boolean durable,  是否持久化boolean exclusive,  是否排他boolean autoDelete, 是否自动删除Map<String, Object> arguments) 属性*/HashMap<String, Object> arguments = new HashMap<>();//死信交换机arguments.put("x-dead-letter-exchange", "stock-event-exchange");//死信路由键arguments.put("x-dead-letter-routing-key", "stock.release");arguments.put("x-message-ttl", 70000); // 消息过期时间 1分钟return new Queue("stock.delay.stock.queue",true,false,false,arguments);}/*** 绑定*/@Beanpublic Binding stockReleaseBinding(){//String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> argumentsreturn new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE,"stock-event-exchanges","stock.release.#",null);}/*** 绑定*/@Beanpublic Binding stockLockedBinding(){//String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> argumentsreturn new Binding("stock.delay.stock.queue", Binding.DestinationType.QUEUE,"stock-event-exchanges","stock.delay",null);}
}

保存订单详情日志发给mq

// 待锁的商品、数量、仓库id信息for (SkuLockVo lockVo : lockVos) {boolean lock = true;Long skuId = lockVo.getSkuId();List<Long> wareIds = lockVo.getWareIds();//如果没有满足条件的仓库,抛出异常if (wareIds == null || wareIds.size() == 0) {throw new NoStockException(skuId);} else {for (Long wareId : wareIds) {// 一个个仓库的锁定// 成功返回1Long count = baseMapper.lockWareSku(skuId, lockVo.getNum(), wareId);if (count == 0) {lock = false;} else {//1. 锁定成功,保存订单详情WareOrderTaskDetailEntity detailEntity = WareOrderTaskDetailEntity.builder().skuId(skuId).skuName("").skuNum(lockVo.getNum()).taskId(taskEntity.getId()).wareId(wareId).lockStatus(1).build();wareOrderTaskDetailService.save(detailEntity);//2. 发送库存锁定消息至延迟队列StockLockedTo lockedTo = new StockLockedTo();lockedTo.setId(taskEntity.getId());StockDetailTo detailTo = new StockDetailTo();try {BeanUtils.copyProperties(detailEntity, detailTo);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}lockedTo.setDetailTo(detailTo);rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", lockedTo);lock = true;break;}}}if (!lock) throw new NoStockException(skuId);}package henu.soft.common.to.mq;import lombok.Data;@Data
public class StockDetailTo {private Long id;/*** sku_id*/private Long skuId;/*** sku_name*/private String skuName;/*** 购买个数*/private Integer skuNum;/*** 工作单id*/private Long taskId;/*** 仓库id*/private Long wareId;/*** 锁定状态*/private Integer lockStatus;
}
3.库存服务监听死信队列
package henu.soft.xiaosi.ware.listener;import com.rabbitmq.client.Channel;import henu.soft.common.to.mq.OrderTo;
import henu.soft.common.to.mq.StockLockedTo;
import henu.soft.xiaosi.ware.service.WareSkuService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;@Slf4j
@Component
@RabbitListener(queues = {"stock.release.stock.queue"})
public class StockReleaseListener {@Autowiredprivate WareSkuService wareSkuService;// 库存回滚@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {log.info("************************收到库存解锁的消息********************************");try {wareSkuService.unlock(stockLockedTo);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 重新放到消息队列,重试机制channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}

之前即使订单失败,父事务回滚了,但是分布式事务锁库存并未回滚,但是保存了

  • 成功的库存锁定信息到消息队列(只有库存锁定成功才会发消息到延时队列)
  • 数据库中间表中也保存了订单、锁库存关系(便于回滚)

现在需要库存服务监听 用于 实现回滚,即从死信队列获取订单详情日志信息

  • 再次查询订单表,无该订单号,证明父事务已经回滚,说明必须回滚锁定的库存,调用unlock()方法再次查询中间表,回滚库存
  • 再次查询订单表,有该订单号,说明没有被取消,变成了支付的订单(因为订单死信队列30分钟会被订单模块监听,若是待付款则订单号直接被关闭了,此时有订单号,证明一定是支付过了),则无需回滚
 /*** 1、没有这个订单,必须解锁库存* 2、有这个订单,不一定解锁库存* *              订单状态:已取消:解锁库存* *                      已支付:不能解锁库存* 消息队列解锁库存** @param stockLockedTo*/@Overridepublic void unlock(StockLockedTo stockLockedTo) {StockDetailTo detailTo = stockLockedTo.getDetailTo();WareOrderTaskDetailEntity detailEntity = wareOrderTaskDetailService.getById(detailTo.getId());//1.如果工作单详情不为空,说明该库存锁定成功if (detailEntity != null) {WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(stockLockedTo.getId());R r = orderFeignService.infoByOrderSn(taskEntity.getOrderSn());if (r.getCode() == 0) {OrderTo order = r.getData(new TypeReference<OrderTo>() {});//没有这个订单||订单状态已经取消 解锁库存if (order == null || order.getStatus() == OrderStatusEnum.CANCLED.getCode()) {//为保证幂等性,只有当工作单详情处于被锁定的情况下才进行解锁if (detailEntity.getLockStatus() == WareTaskStatusEnum.Locked.getCode()) {unlockStock(detailTo.getSkuId(), detailTo.getSkuNum(), detailTo.getWareId(), detailEntity.getId());}}} else {throw new RuntimeException("远程调用订单服务失败");}} else {//无需解锁,因为}}private void unlockStock(Long skuId, Integer skuNum, Long wareId, Long detailId) {//数据库中解锁库存数据baseMapper.unlockStock(skuId, skuNum, wareId);//更新库存工作单详情的状态WareOrderTaskDetailEntity detail = WareOrderTaskDetailEntity.builder().id(detailId).lockStatus(2).build();wareOrderTaskDetailService.updateById(detail);}
4.订单服务监听死信队列关单

分析

  • 下订单业务成功之后,会被放到延时队列,30分钟后进入死信队列,订单模块监听死信队列,此时需要先判断订单状态

    • 订单状态为代付款状态:需要关闭订单
    • 订单状态为已付款、已发货、已完成、已取消、售后中、售后完成等状态:不需要关闭订单

订单状态枚举类

package henu.soft.xiaosi.order.enume;public enum OrderStatusEnume {CREATE_NEW(0,"待付款"),PAYED(1,"已付款"),SENDED(2,"已发货"),RECIEVED(3,"已完成"),CANCLED(4,"已取消"),SERVICING(5,"售后中"),SERVICED(6,"售后完成");private String msg;private Integer code;public String getMsg() {return msg;}public Integer getCode() {return code;}OrderStatusEnume(Integer code, String msg){this.msg = msg;this.code = code;}
}

判断关闭订单

/*** 收到过期的订单信息,准备关闭订单* @param orderEntity*//*** 关闭过期的的订单* @param orderEntity*/@Overridepublic void closeOrder(OrderEntity orderEntity) throws InvocationTargetException, IllegalAccessException {//因为消息发送过来的订单已经是很久前的了,中间可能被改动,因此要查询最新的订单OrderEntity newOrderEntity = this.getById(orderEntity.getId());//如果订单还处于新创建的状态,说明超时未支付,进行关单if (newOrderEntity.getStatus() == OrderStatusEnume.CREATE_NEW.getCode()) {OrderEntity updateOrder = new OrderEntity();updateOrder.setId(newOrderEntity.getId());updateOrder.setStatus(OrderStatusEnume.CANCLED.getCode());this.updateById(updateOrder);//关单后发送消息通知其他服务进行关单相关的操作,如解锁库存OrderTo orderTo = new OrderTo();BeanUtils.copyProperties(newOrderEntity,orderTo);rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other",orderTo);}}

现在出现的问题是

  • 理论上库存死信队列的时长比订单死信队列的时长要长,即 库存是否解锁在 订单状态关闭前判断
  • 现在可能出现网络延迟的原因,导致 库存服务监听的死信队列 先 获得数据,这时候判断订单状态还存在,就没解锁库存,过了一段时间才收到订单关闭的信息,这时候库存就会一直锁定

解决办法

  • 在订单模块在设置一个路由键order.release.order.other.# 该路由键直接 转发到 库存的死信队列stock.release.stock.queue,库存服务监听用于解锁
  • 订单释放和库存释放直接绑定,也就是当有订单关闭的时候通知库存服务,再次判断是否进行库存解锁
/*** 在订单模块在设置一个路由键`order.release.other.#` 该路由键直接 转发到 库存的死信队列`stock.release.stock.queue`,库存服务监听用于解锁* 也就是当有订单关闭的时候通知库存服务,再次判断是否进行库存解锁* @return*/@Beanpublic Binding orderReleaseOrderBinding() {return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.other.#",null);}

订单服务

库存服务

5.订单提交保存

4.如何保证消息的可靠性
1.消息丢失
  • 消息发送出去,由于网络问题没有抵达服务器
    • 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定期扫描重发的方式
    • 做好日志记录,每个消息状态是否都被服务器收到都应该记录
    • 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发
  • 消息抵达Broker,Broker要将消息写入queue、磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。 publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。
  • 自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机
    • 一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重新入队
2.消息重复
  • 消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者
    • 消息消费失败,由于重试机制,自动又将消息发送出去
    • 成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送
    • 消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标志
  • 使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用处理
  • rabbitMQ的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的
3.消息积压
  • 消费者宕机积压
  • 消费者消费能力不足积压
  • 发送者发送流量太大
    • 上线更多的消费者,进行正常消费
    • 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理

谷粒商城项目篇13_分布式高级篇_订单业务模块(提交订单幂等性、分布式事务、延时MQ实现定时任务)相关推荐

  1. 谷粒商城电商项目 分布式高级篇

    更多视频,JAVA收徒 QQ:987115885谷粒商城电商项目 分布式高级篇102.全文检索-ElasticSearch-简介.mp4103.全文检索-ElasticSearch-Docker安装E ...

  2. 谷粒商城-分布式高级篇[商城业务-检索服务]

    谷粒商城-分布式基础篇[环境准备] 谷粒商城-分布式基础[业务编写] 谷粒商城-分布式高级篇[业务编写]持续更新 谷粒商城-分布式高级篇-ElasticSearch 谷粒商城-分布式高级篇-分布式锁与 ...

  3. M5(项目)-01-尚硅谷谷粒商城项目分布式基础篇开发文档

    M5(项目)-01-尚硅谷谷粒商城项目分布式基础篇开发文档 分布式基础篇 一.环境搭建 各种开发软件的安装 虚拟机: docker,mysql,redis 主机: Maven, idea(后端),Vs ...

  4. 谷粒商城分布式高级篇(中)

    谷粒商城分布式基础篇 谷粒商城分布式高级篇(上) 谷粒商城分布式高级篇(中) 谷粒商城分布式高级篇(下) 文章目录 商城业务 异步 异步复习 线程池详解 CompletableFuture Compl ...

  5. 谷粒商城-分布式高级篇【业务编写】

    谷粒商城-分布式基础篇[环境准备] 谷粒商城-分布式基础[业务编写] 谷粒商城-分布式高级篇[业务编写]持续更新 谷粒商城-分布式高级篇-ElasticSearch 谷粒商城-分布式高级篇-分布式锁与 ...

  6. 谷粒商城-分布式高级篇[商城业务-秒杀服务]

    谷粒商城-分布式基础篇[环境准备] 谷粒商城-分布式基础[业务编写] 谷粒商城-分布式高级篇[业务编写]持续更新 谷粒商城-分布式高级篇-ElasticSearch 谷粒商城-分布式高级篇-分布式锁与 ...

  7. 谷粒商城项目笔记之分布式基础(一)

    谷粒商城项目之分布式基础 目录 谷粒商城项目之分布式基础 前言 1 项目简介 1.1 项目背景 1.1.1 电商模式 1.1.2 谷粒商城 1.2 项目架构图 1.2.1 项目微服务架构图 1.2.2 ...

  8. 谷粒商城项目搭建思路

    文章目录 基础篇 核心技术点 1. 搭建环境 1.1 安装Linux虚拟机 1.2 安装Docker 1.3 统一开发环境 1.4 搭建后台管理项目 1.5 逆向工程 1.6 测试商品服务功能 1.7 ...

  9. 谷粒商城项目笔记总结(2/2)

    文章目录 商城项目 - 高级篇(下) 商城业务 - 认证服务 1.初始化环境搭建 2.开通阿里云的短信服务 3.整合短信服务 4.发送验证码并防刷 5.注册功能 6.用户名密码登录功能 7.OAuth ...

最新文章

  1. 平民架构的春天——UCloud数据方舟实战记
  2. 一个退休程序员,用高中几何方法,让百年数学难题逼近理论极限
  3. Day 08 周六下午的活动
  4. 双缓冲法解决重绘和闪屏问题
  5. 探讨后端选型中不同语言及对应的Web框架
  6. html特效指令,vue2——指令v-text v-html v-bind
  7. java jdbc init_Java 的JDBC 数据库连接池实现方法
  8. wireshark网卡权限_设置网卡属性用wireshark抓VLAN包
  9. jquery validate验证remote时的多状态问题
  10. 深度总结:软件设计七大原则
  11. PPT镂空字体、填充文字、图片字、拆分字制作
  12. bpsk调制及解调实验_【详解】5G的调制与解调
  13. android 桌球游戏,安卓上目前最台球游戏《台球帝国》测评
  14. ActivityManager 管理Activity
  15. linux+psp+模拟器下载,PSP1.5模拟器全教程+最新版本下载(最终版)
  16. eclipse-登录注册web项目-练
  17. 互联网保险投诉量翻倍 众安与安心财险上榜
  18. 百问网7天物联网智能家居 学习心得 打卡第七天
  19. 帆软Report设置参数列表
  20. 云服务器1:云服务器能干什么

热门文章

  1. 快速了解必要的网络知识
  2. 大学生数学竞赛试题解析选编 [李心灿 等编] 2011年版
  3. pdf2word(pdf转word)
  4. 计算机组成原理--运算方法:加减乘除
  5. java home not set_dbsetup运行时提示JAVA_HOME IS NOT SET
  6. 计算机机房方面的职业资格证,【国家标准】国家职业技能标准 (2019年版) 水文勘测工(33页)-原创力文档...
  7. 最适合女生入门的计算机,计算机有哪些专业适合女生
  8. 工信部整治平台网址屏蔽问题,打击互联网行业垄断任重道远
  9. Linux桌面两大阵营 GNOME与KDE的战争
  10. 软件测试-浪晋的小讲堂-学习笔记