写在开头

这是我在观看黑马Redis视频教程中根据PPT和上课内容,个人写的笔记,中间有部分来源于百度,如有侵权,联系我删除。

文章目录

  • 写在开头
  • 实战篇-黑马点评项目
    • 导入黑马点评项目
    • 短信登陆
      • 基于Session实现
        • Holder介绍
        • DTO介绍
      • 基于Redis实现
        • 登录拦截器的优化
    • 缓存
      • Redis缓存
      • 缓存练习
      • 缓存更新策略
        • 主动更新策略
        • 操作缓存和数据库需要考虑的问题
        • 缓存更新策略的最佳实践方案
        • 给查询商铺的缓存添加超时剔除和主动更新的策略
      • 缓存穿透
        • 解决商户查询的缓存穿透问题
      • 缓存雪崩
      • 缓存击穿
        • 基于互斥锁方式解决缓存击穿问题
        • 基于逻辑过期解决缓存击穿问题
      • 缓存工具类
    • 优惠券秒杀
      • 全局唯一ID
      • 实现优惠券秒杀下单
      • 超卖问题
        • 乐观锁
        • 一人一单
          • 集群并发问题
        • idea开启本地集群
      • 分布式锁
        • 基于Redis的分布式锁
          • 基于Redis实现分布式锁初级版本
            • **初级版本的极端情况的bug1**
            • 初级版本的极端情况的bug2
          • Redis的Lua脚本
            • eval命令(在redis中调用Lua脚本)
            • bug2解决方案的实现
            • 总结
            • 基于Redis的分布式锁优化
      • Redisson
        • Redisson入门
        • Redisson可重入锁原理
        • Redisson重试、超时续约原理
        • Redisson分布式锁主从一致性问题
    • Redis秒杀优化(阻塞队列异步秒杀)
      • Redis优化分析
      • 改进秒杀业务,提高并发性能
    • Redis消息队列实现异步秒杀
      • 消息队列
      • Redis三种方式模拟消息队列
        • 总结
      • 基于Redis的Stream结构作为消息队列,实现异步秒杀下单
    • 探店笔记
      • 发布探店笔记
        • 实现查看笔记详细信息
      • 点赞
      • 点赞排行榜(top5)
    • 好友关注
      • 关注和取关
      • 共同关注
      • 关注推送(Feed流)
        • Feed流的模式
        • Feed流的实现方案(Timeline模式)
        • 基于推模式实现关注推送功能
        • 实现关注推送页面的分页查询
      • 附近商户
        • 附近商户搜索
    • 用户签到
    • UV统计

实战篇-黑马点评项目

导入黑马点评项目

导入sql文件

导入后端项目

导入前端项目

短信登陆

基于Session实现

需求分析

实现发送验证码

@Override
public Result sendCode(String phone, HttpSession session) {// 1.校验手机号if (RegexUtils.isPhoneInvalid(phone)) {// 2.不符合,返回错误信息return Result.fail("手机号码格式错误!");}// 3.符合,生成验证码String code = RandomUtil.randomNumbers(6);// 4.保存验证码到sessionsession.setAttribute("code",code);// 5.发送验证码,这里是模拟的,生产环境要使用第三方服务商接口log.debug("发送短信验证码成功,验证码:{}",code);// 6.返回成功信息return Result.ok();
}

实现短信验证码登录和注册

@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {// 1.校验手机号String phone = loginForm.getPhone();if (RegexUtils.isPhoneInvalid(phone)) {// 2.不符合,返回错误信息return Result.fail("手机号码格式错误!");}// 2.校验验证码String code = loginForm.getCode();Object cacheCode = session.getAttribute("code");if (cacheCode == null || cacheCode.toString().equals(code)){// 3.不一致,报错return Result.fail("验证码错误!");}// 4.一致,根据手机号查询用户User user = query().eq("phone", phone).one();// 5.判断用户是否存在if (user == null){// 6.不存在,创建新用户并保存user = createUserWithPhone(phone);}// 7.保存用户信息到session中session.setAttribute("user",BeanUtil.copyProperties(user, UserDTO.class));return Result.ok();
}private User createUserWithPhone(String phone) {User user = new User();user.setPhone(phone);user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));save(user);return user;
}

实现登录校验拦截器

public class LoginInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 1. 获取sessionHttpSession session = request.getSession();// 2.获取session中的用户Object user = session.getAttribute("user");// 3.判断用户是否存在if (user == null) {// 4.不存在,拦截response.setStatus(401);return false;}// 5.存在,保存用户信息到ThreadLocalUserHolder.saveUser((UserDTO) user);// 6.放行return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {// 销毁user对象,防止内存泄露UserHolder.removeUser();}
}// 让拦截器生效,以及设置放行路径
@Configuration
public class MvcConfig implements WebMvcConfigurer {@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginInterceptor()).excludePathPatterns("/shop/**","/voucher/**","/shop-type/**","/upload/**","/blog/hot","/user/code","/user/login");}
}

Holder介绍

 在Web开发中,service层或者某个工具类中需要获取到HttpServletRequest对象还是比较常见的。一种方式是将HttpServletRequest作为方法的参数从controller层一直放下传递,不过这种有点繁琐;还有另一种则是写一个RequestHolder,以下举例:public class UserHolder {private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();public static void saveUser(UserDTO user){tl.set(user);}public static UserDTO getUser(){return tl.get();}public static void removeUser(){tl.remove();}}

DTO介绍

 这里简单介绍以下DTO,以后可以深入了解。出于各种目的,如节省内存空间、保护数据安全、保护用户隐私、保护敏感信息,我们希望保存在内存里的对象数据只需要必须的,一些用不到的就不需要了,反正都在数据库里,用到了再去查询就是了。这个时候就引入了DTO的概念,就是把必须的属性提取出来生成一个DTO对象.以下举例一个DTO案例。同时在查询SQL时,我们一般都是返回一个完整的对象,我们该如何把这个对象转换为DTO对象呢?本办法就是new DTO对象,然后慢慢set,但是我们可以使用快捷的Hutool工具包的BeanUtil.copyProperties(user, UserDTO.class),来快速转换。
public class User {private Long id;private String phone;private String password;private String nickName;private String icon = "";private LocalDateTime createTime;private LocalDateTime updateTime;
}public class UserDTO {private Long id;private String nickName;private String icon;
}

集群的session共享问题

session共享问题:多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题。
session的替代方案应该满足:1、数据共享2、内存存储3、key、value结构。无可厚非,那就是Redis了。

基于Redis实现

实现发送验证码

@Override
public Result sendCode(String phone, HttpSession session) {// 1.校验手机号if (RegexUtils.isPhoneInvalid(phone)) {// 2.不符合,返回错误信息return Result.fail("手机号码格式错误!");}// 3.符合,生成验证码String code = RandomUtil.randomNumbers(6);// 4.保存验证码到Redis set key value ex 120stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);// 5.发送验证码log.debug("发送短信验证码成功,验证码:{}",code);// 6.返回成功信息return Result.ok();
}

实现短信验证码登录和注册

@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {// 1.校验手机号String phone = loginForm.getPhone();if (RegexUtils.isPhoneInvalid(phone)) {// 2.不符合,返回错误信息return Result.fail("手机号码格式错误!");}// 3.从redis获取验证码并校验String code = loginForm.getCode();String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);if (cacheCode == null || !cacheCode.equals(code)){// 不一致,报错return Result.fail("验证码错误!");}// 4.一致,根据手机号查询用户User user = query().eq("phone", phone).one();// 5.判断用户是否存在if (user == null){// 6.不存在,创建新用户并保存user = createUserWithPhone(phone);}// 7.保存用户信息到redis中// 7.1 随机生成token,作为登录令牌String token = UUID.randomUUID().toString(false);// 7.2 将User对象转换为HashMap存储UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),CopyOptions.create().setIgnoreNullValue(true)// 因为Long无法直接强转为String,使用StringRedisTemplate必须为String,而它底层做的是强转所以报错,我们需要在这里指定一下类型.setFieldValueEditor((fieldName,fieldValue) -> fieldValue.toString()));// 7.3 存储到redis,并设置过期时间String tokenKey = LOGIN_USER_KEY + token;stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);// 8.返回tokenreturn Result.ok(token);
}

实现登录校验拦截器

public class LoginInterceptor implements HandlerInterceptor {private StringRedisTemplate stringRedisTemplate;public LoginInterceptor(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 1. 获取请求头中的token,判断token是否存在String token = request.getHeader("authorization");if (StrUtil.isEmpty(token)) {// token不存在,拦截,返回401状态码response.setStatus(401);return false;}// 2.基于token获取redis中的用户String tokenKey = LOGIN_USER_KEY + token;Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(tokenKey);// 3.判断用户是否存在if (userMap.isEmpty()) {// 4.不存在,拦截,返回401状态码response.setStatus(401);return false;}// 5.将查询到的Hash数据转换为UserDTO对象UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap,new UserDTO(),false);// 6.存在,保存用户信息到ThreadLocalUserHolder.saveUser(userDTO);// 7.刷新redis中token有效期stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL, TimeUnit.MINUTES);// 8.放行return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {// 销毁user对象,防止内存泄露UserHolder.removeUser();}
}

登录拦截器的优化

 由于最开始的方案,刷新token过期时间是在一个只拦截需要登录才能访问的路径,但是有一些不需要登录的路径并不会被拦截,比如首页。那就会出现一个场景,当用户登陆完成以后,只在首页停留,来回刷新首页,就是不访问别的路径,那么这个时候token的过期时间是不会被刷新的,时间一到以后用户就被提出登陆状态了,这是不合理的。所以解决方案就是,在该拦截器前面再加一个拦截一切路径的拦截器,该拦截器只负责刷新token过期时间,不管token存不存在都放行,若token存在且对应的用户存在才刷新token过期时间,并把对应的用户放入ThreadLocal中。然后再第二个拦截器里判断ThreadLocal中的用户是否存在,不存在则拦截,反之放行。
// 第一个拦截器
public class RefreshTokenInterceptor implements HandlerInterceptor {private StringRedisTemplate stringRedisTemplate;public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 1. 获取请求头中的token,判断token是否存在String token = request.getHeader("authorization");if (StrUtil.isEmpty(token)) {return true;}// 2.基于token获取redis中的用户String tokenKey = LOGIN_USER_KEY + token;Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(tokenKey);// 3.判断用户是否存在if (userMap.isEmpty()) {return true;}// 5.将查询到的Hash数据转换为UserDTO对象UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap,new UserDTO(),false);// 6.存在,保存用户信息到ThreadLocalUserHolder.saveUser(userDTO);// 7.刷新redis中token有效期stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL, TimeUnit.MINUTES);// 8.放行return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {// 销毁user对象,防止内存泄露UserHolder.removeUser();}
}// 第二个拦截器
public class LoginInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 判断是否需要拦截(通过ThreadLocal中是否有用户来判断)if (UserHolder.getUser() == null) {// 没有,需要拦截,设置状态码response.setStatus(401);// 拦截return false;}// 有用户,放行return true;}
}// 添加两个拦截器,并设置拦截器顺序
@Configuration
public class MvcConfig implements WebMvcConfigurer {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic void addInterceptors(InterceptorRegistry registry) {// 先后顺序由order属性值决定,默认都是0,可以修改order值,order越小优先级越高,order越大顺序越靠后// 同时order值相同,就通过添加顺序来决定先后顺序。// 登录拦截器registry.addInterceptor(new LoginInterceptor()).excludePathPatterns("/shop/**","/voucher/**","/shop-type/**","/upload/**","/blog/hot","/user/code","/user/login").order(1);// 刷新token拦截器registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0);}
}

Redis代替session需要考虑的问题

1、选择合适的数据结构      2、选择合适的key      3、选择合适的存储粒度

缓存

    我们日常生活中,经常会接触听到缓存这个词,例如,浏览器清空缓存,处理器缓存大小,磁盘缓存等等。经过分类,可以将缓存分为:1、硬件缓存: 一般指的是机器上的 CPU、硬盘等等组件的缓存区间,一般是利用的内存作为一块中转区域,都通过内存交互信息,减少系统负载,提供传输效率。2、客户端缓存: 一般指的是某些应用,例如浏览器、手机App、视频缓冲等等,都是在加载一次数据后将数据临时存储到本地,当再次访问时候先检查本地缓存中是否存在,存在就不必去远程重新拉取,而是直接读取缓存数据,这样来减少远端服务器压力和加快载入速度。3、服务端缓存: 一般指远端服务器上,考虑到客户端请求量多,某些数据请求量大,这些热点数据经常要到数据库中读取数据,给数据库造成压力,还有就是 IO、网络等原因有一定延迟,响应客户端较慢。所以,在一些不考虑实时性的数据中,经常将这些数据存在内存中(内存速度非常快),当请求时候,能够直接读取内存中的数据及时响应。

Redis缓存

 Redis缓存,属于服务器缓存。其实就是把Redis当作数据库的缓存,因为Redis是在内存上存储的,比在硬盘上存储的数据库运行速度快很多。客户端发起查询数据的请求,都先去Redis中查询,Redis中若有,则直接返回,反之没有,那就再去数据库查询,数据库查询到以后,再存入Redis中,然后返回。以此往复,Redis中的数据越来越多,命中率也越来越大,大大加快了服务端的响应速度。

缓存练习

实现商铺查询缓存

// 这里其实用Hash类型存储也可以(个人认为这样更合适),但是由于前面的案例演示的是Hash类型存储,所以这里就用万能的String了
@Override
public Result queryById(Long id) {// 1.在redis中查询店铺缓存String key = CACHE_SHOP_KEY + id;String shopJson = stringRedisTemplate.opsForValue().get(key);// 2.判断是否命中if (StrUtil.isNotBlank(shopJson)) {// 3.命中,直接返回Shop shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}// 4.未命中,查询数据库Shop shop = getById(id);// 5.判断商铺是否存在if (shop == null) {// 6.不存在,返回错误信息return Result.fail("商铺不存在!");}// 7.存在,将商铺数据写入redisstringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop));// 8.返回return Result.ok(shop);
}

实现店铺类型缓存

// 这里演示使用的是List类型,不得不说,List<?> <====>List<String>转换真的是有点麻烦哈哈哈
@Service
public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic Result queryList() {// 1.在redis中查询店铺类型缓存String key = CACHE_SHOP_KEY + "list";List<String> stringTypeList = stringRedisTemplate.opsForList().range(key, 0, -1);// 2.判断是否命中if (!stringTypeList.isEmpty()) {// 3.命中,直接返回// List<String>  ===> List<?>List<ShopType> shopTypeList = JSONUtil.toList(stringTypeList.toString(), ShopType.class);return Result.ok(shopTypeList);}// 4.没有命中,查询数据库List<ShopType> typeList = query().orderByAsc("sort").list();// 5.判断数据库中是否存在if (typeList == null) {// 6.不存在,返回错误信息return Result.fail("店铺类型不存在!");}// 7.存在,存入redis// List<?>  ===> List<String>stringTypeList = JSONUtil.toList(new JSONArray(typeList), String.class);stringRedisTemplate.opsForList().rightPushAll(key, stringTypeList);// 8.返回return Result.ok(typeList);}
}

缓存更新策略

 缓存更新策略分为 内存淘汰、超时剔除、主动更新策略。内存淘汰策略:一致性差、维护成本无。Redis是默认开启的,我们也可以制定一些内存不足时淘汰数据的规则,且是由Redis管理的,我们不需要去人为维护。超时剔除策略:一致性一般(属于最终一致性),维护成本低。给缓存数据添加TTL时间,到期自动删除,下次查询时再查询数据库写入缓存,一致性不是很好,但维护成本低,一般作为兜底策略,不会作为主策略。主动更新策略:一致性好(任何方案都做不到完美一致),维护成本高。在修改数据库的时候就更新缓存(一般是删除缓存),下次查询时再查询数据库写入缓存,一般把超时剔除策略作为兜底策略,作为出现脏数据时的补救措施。业务场景:低一致性需求:使用内存淘汰策略,并且可以视情况以超时剔除策略作为兜底方案。高一致性需求:使用主动更新策略,并以超时剔除策略作为兜底方案。

主动更新策略

 主动更新策略有三种主流的实现方案:Cache Aside Pattern、Read/Writer Through Pattern、Write Behind Caching Psttern。一般都是使用Cache Aside Pattern实现方案,虽然会稍微复杂一点,但是可以我们自己人为控制,因为Read/Writer Through Pattern实现方案作为调用者无需关心内部实现,但是维护者维护难度高,市面上的这类第三方服务商也不好找;而Write Behind Caching Psttern实现方案只操作缓存,异步将缓存数据持久化,当出现系统宕机时,缓存数据会丢失,而且当缓存被操作了很多次,但是还没有被异步持久化时,会出现很严重的不一致性。

操作缓存和数据库需要考虑的问题

1、更新缓存的操作的实现是 删除缓存还是修改缓存?删除缓存,因为如果是修改缓存的话,每次更新数据库都要更新缓存,但是若这期间若一个缓存值修改多次,期间却没有几次读操作,那么期间的无效写操作过多,浪费系统性能。而删除缓存,在每次更新数据库时都删除缓存,等之后有请求需要查找这条被删除的缓存数据时再去数据库中查找出来并再次写入缓存,那么这样不会有无效写操作,同时也最大节省了系统性能。
2、如何保证缓存与数据库的操作同时成功或失败(即原子性)?单个系统:将缓存与数据库操作放在一个事务内分布式系统:利用TCC等分布式事务方案

3、先操作缓存还是先操作数据库?1、先删除缓存,再操作数据库2、先操作数据库,再删除缓存一般情况下选择2、先操作数据库,再删除缓存。由以下两图可得,不管是先缓存后数据库,还是先数据库后缓存,都会有脏数据的出现,但是因为Redis缓存是基于内存的,数据库是基于磁盘的,所以操作缓存的速度是远大于操作数据库的,出现脏数据概率更小的是先数据库后缓存。同时,这里出现的脏数据问题,可以通过超时剔除兜底,来缓解脏数据带来的影响。

先删除缓存,再操作数据库

 如图所示,在正常流程下,先缓存后数据库的流程完全没问题,但是正如右图,也有出现脏数据的情况。

先操作数据库,再删除缓存

 如图所示,在正常流程下,先数据库后缓存的流程完全没问题,但是正如右图,也有出现脏数据的情况。

总结

 在需要高一致性的场景下,我们一般都会选择主动更新策略以及超时剔除策略兜底,并且更新缓存的操作为删除缓存、先操作数据库再删除缓存。

缓存更新策略的最佳实践方案

低一致性需求:使用Redis自带的内存淘汰机制,视情况选择以超时剔除作为兜底方案
高一致性需求:主动更新,并以超时剔除作为兜底方案读操作:缓存命中则直接返回缓存未命中则查询数据库,并写入缓存,设定超时时间写操作:先写数据库,然后再删除缓存要确保数据库与缓存操作的原子性

给查询商铺的缓存添加超时剔除和主动更新的策略

修改ShopController中的业务逻辑,满足下面的需求:1、根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间2、根据id修改店铺时,先修改数据库,再删除缓存1、@Overridepublic Result queryById(Long id) {// 1.在redis中查询店铺缓存String key = CACHE_SHOP_KEY + id;String shopJson = stringRedisTemplate.opsForValue().get(key);// 2.判断是否命中if (StrUtil.isNotBlank(shopJson)) {// 3.命中,直接返回Shop shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}// 4.未命中,查询数据库Shop shop = getById(id);// 5.判断商铺是否存在if (shop == null) {// 6.不存在,返回错误信息return Result.fail("商铺不存在!");}// 7.存在,将商铺数据写入redis,并设置过期时间stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);// 8.返回return Result.ok(shop);}
2、@Override@Transactional  // 基于方法的事务,不是数据库的事务操作public Result update(Shop shop) {// 1.更新数据库updateById(shop);// 2.删除缓存stringRedisTemplate.delete(CACHE_SHOP_KEY + shop.getId());// 3.返回成功状态码return Result.ok();}

缓存穿透

 缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求永远都会打到数据库,占用系统I/O性能。要是有人利用这个原理,恶意多次请求这个不存在的数据,就会使系统宕机。

解决方案

常见的解决方案有两种:1、缓存空对象实现原理:数据库中不存在也缓存到redis中,缓存值为null,可以设置较短的TTL,防止长期的数据不一致优点:实现简单,维护方便缺点:额外的内存消耗可能造成短期的不一致2、布隆过滤器实现原理:基于Hash算法实现的二进制字节数组优点:内存占用较少,没有多余key缺点:自行实现复杂(好在Redis提供了BitMap类型,自带的一种布隆过滤器)存在误判可能(他判断不存在的值一定是不存在的,但是判断存在的值不一定真的存在)

解决商户查询的缓存穿透问题

 这里选择的是缓存空对象的解决方案,因为实现简单。

@Override
public Result queryById(Long id) {// 1.在redis中查询店铺缓存String key = CACHE_SHOP_KEY + id;String shopJson = stringRedisTemplate.opsForValue().get(key);// 2.判断是否命中,命中的是null或""这里结果是falseif (StrUtil.isNotBlank(shopJson)) {// 3.命中,直接返回Shop shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}// 判断命中的是否是空字符串// 因为stringRedisTemplate.opsForValue().get(key)的key不存在,就返回null,不会返回"",所以这里可以用这个判断条件if ("".equals(shopJson)){// 返回错误信息return Result.fail("店铺信息不存在!");}// 4.未命中,查询数据库Shop shop = getById(id);// 5.判断商铺是否存在if (shop == null) {// 为不存在的key设置空字符串值,设置较短的过期时间,缓解缓存穿透stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL, TimeUnit.MINUTES);// 6.不存在,返回错误信息return Result.fail("店铺信息不存在!");}// 7.存在,将商铺数据写入redis,并设置过期时间stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);// 8.返回return Result.ok(shop);
}

总结

1、缓存穿透产生的原因是什么?用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力。
2、缓存穿透的解决方案有哪些?缓存null值、布隆过滤的解决方案都属于被动解决。我们还有很多主动的解决方案,比如:(1)给id设置一定的格式规律,同时增强id的复杂度,避免被猜测id规律,然后在此基础上每次都先对id进行基础格式校验。,这样就可以做到主动解决了。(2)加强用户权限校验(3)做好热点参数的限流

缓存雪崩

缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机时,导致大量请求直接到达数据库,带来巨大压力。
解决方案:1、给不同的Key的TTL添加随机值(尤其是缓存预热的时候,同时加入大批量数据)2、利用Redis集群提高服务的可用性(高级篇会讲):多台Redis服务器,可以实现多台宏观调控3、给缓存业务添加降级限流策略(SpringCloud中学):当服务器请求压力大时,直接拦截请求快速返回,不再让服务器承受那么多请求4、给业务添加多级缓存(高级篇会讲):就是多端缓存,利用本地缓存等。

缓存击穿

    缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
常见的解决方案有两种:1、互斥锁2、逻辑过期(内包含互斥锁)

互斥锁和逻辑过期

 互斥锁:就是在缓存未命中后,线程准备去查询数据库重建缓存数据之前,获取一个互斥锁,除了第一个线程拿到锁以外,其他后来的线程拿互斥锁失败以后会休眠一段时间,然后再从查询缓存从新走流程。逻辑过期:就是设置热点key永不过期,但是把expire过期时间属性写入value中,而key的过期时间是(写入该数据当时时间加上过期时间的时间戳)。这样每次查询缓存的时候都可以拿到缓存数据,然后通过value中的expire属性来判断当前这个缓存是否已经过期。若已经过期,那就先获取互斥锁,然后在这个线程里开启新的线程,让这个新的线程去执行查询数据库重建缓存数据的任务,而原来的线程就直接返回以及过期数据先用着,在新线程释放锁之前(也有可能是写入缓存之前),其他线程来查询缓存,都会因为因为尝试获取互斥锁失败后返回过期数据继续用着先。

实现流程

优缺点

 互斥锁和逻辑过期各有所长,也各有所短。如果我们要保证数据一致性就使用互斥锁,如果要服务可用性,想要性能好,就选择逻辑过期。一切都视情况而定。

基于互斥锁方式解决缓存击穿问题

 注意!!!这里的互斥锁方案只是一个简易的实现,真正生产环境中的互斥锁方案和这个还是略有不用的。同时这里的互斥锁由于需要实现休眠后从头开始,虽然可以用java中的goto(我也忘记java的goto叫什么了)去实现,但是goto是不被推荐使用的,代码太过于乱了。而对普通的互斥锁来说,拿不到这个互斥锁后,是在无限的等待当中的,知道拿到锁为止,所以这里我们需要自己实现一套锁。我们通过利用redis中setnx命令,来实现了自定义的锁,同时由于redis是单线程的,所以并不会出现并发问题,至少在当前的单机环境不会。我们把加锁、释放锁操作封装到了方法里,方法如下:// 加锁方法public boolean tryLock(String key){// 加锁操作,同时这里加过期时间是为了防止拿到锁的进程发生以后或死锁等其他原因// 导致该进程花了超过原本正常可以完成的时间数十倍以上时间还没释放锁,让整个服务停止了,所以加了过期时间// setIfAbsent()就是redis中的setnx,同时redis中的返回值应该只有0或1,分别代表失败和成功// 这里spring data redis进行了封装,直接转换为对应的false或true,分别代表失败和成功Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", LOCK_SHOP_TTL, TimeUnit.SECONDS);// 因为boolean自动拆箱为Boolean可能会产生空指针异常// 所以这里需要判断传入的的Boolean类型的值是不是true,并且做安全的拆箱。return BooleanUtil.isTrue(flag);}// 释放锁方法public void unlock(String key){stringRedisTemplate.delete(key);}
@Override
public Result queryById(Long id) {// 缓存击穿Shop shop = queryWithMutex(id);if (shop == null) {return Result.fail("店铺不存在!");}return Result.ok(shop);
}// 封装的缓存击穿解决方法
public Shop queryWithMutex(Long id){// 1.在redis中查询店铺缓存String key = CACHE_SHOP_KEY + id;String shopJson = stringRedisTemplate.opsForValue().get(key);// 2.判断是否命中,命中的是null或""这里结果是falseif (StrUtil.isNotBlank(shopJson)) {// 3.命中,直接返回return JSONUtil.toBean(shopJson, Shop.class);}// 判断命中的是否是空字符串// 因为stringRedisTemplate.opsForValue().get(key)的key不存在,就返回null,不会返回"",所以这里可以用这个判断条件if ("".equals(shopJson)){// 返回错误信息return null;}// 4.实现缓存重建// 4.1.获取互斥锁String lockKey = LOCK_SHOP_KEY + id;Shop shop;try {boolean isLock = tryLock(lockKey);// 4.2.判断是否获取成功if (!isLock) {// 4.3.失败,则休眠并重试Thread.sleep(50);return queryWithMutex(id);}// 4.4.检测此时缓存是否存在了,若存在,直接返回,反之继续走下去shopJson = stringRedisTemplate.opsForValue().get(key);if (shopJson != null) {return JSONUtil.toBean(shopJson, Shop.class);}// 4.5.获取锁成功,根据id查询数据库shop = getById(id);// 5.判断商铺是否存在if (shop == null) {// 6.不存在,为不存在的key设置空字符串值,设置较短的过期时间,缓解缓存穿透stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL, TimeUnit.MINUTES);// 不存在,返回错误信息return null;}// 7.存在,将商铺数据写入redis,并设置过期时间stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);} catch (InterruptedException e) {throw new RuntimeException();}finally {// 8.释放互斥锁unlock(lockKey);}// 9.返回return shop;
}

基于逻辑过期解决缓存击穿问题

 因为逻辑过期一般应用于活动商品的库存,不在活动内的商品,在缓存内肯定找不到的,所以缓存未命中直接返回null。

// 创建线程池
public static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);@Override
public Result queryById(Long id) {// 缓存击穿Shop shop = queryWithLogicalExpire(id);if (shop == null) {return Result.fail("店铺不存在!");}return Result.ok(shop);
}// 封装的缓存击穿解决方法
public Shop queryWithLogicalExpire(Long id){// 1.在redis中查询店铺缓存String key = CACHE_SHOP_KEY + id;String shopJson = stringRedisTemplate.opsForValue().get(key);// 2.判断是否不存在if (StrUtil.isBlank(shopJson)) {// 3.不存在,直接返回// 因为逻辑过期一般应用于活动商品的库存,不在活动内的商品,在缓存内肯定找不到的// 并且也不能因为找不到就去数据库中找,这不合逻辑,不存在即代表不参加活动return null;}// 4.存在,需要先把json反序列化为对象RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);LocalDateTime expireTime = redisData.getExpireTime();// 这里要这么做是因为Data属性是Object类型的,而在JsonUtil无法分辨他具体是哪个类// 所以就先转换成了JSONObject类型,这里可以直接强转为JSONObject类型,然后再通过toBean()方法转换为目标类型// 注意!!!不能直接强转为目标类型!Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(),Shop.class);// 5.判断是否过期if (expireTime.isAfter(LocalDateTime.now())) {// 5.1.未过期,直接返回店铺信息return shop;}// 5.2.已过期,需要缓存重建String lockKey = LOCK_SHOP_KEY + id;// 6.缓存重建// 6.1.获取互斥锁boolean isLock = tryLock(lockKey);// 6.2.判断是否获取锁成功if (isLock) {// 做DoubleCheck,防止第一个线程完成缓存重建后,别的线程已经执行到缓存重建这里了// 这里的DoubleCheck是我自己写的,估计有问题,之后复习一下多线程shopJson = stringRedisTemplate.opsForValue().get(key);redisData = JSONUtil.toBean(shopJson, RedisData.class);expireTime = redisData.getExpireTime();shop = JSONUtil.toBean((JSONObject) redisData.getData(),Shop.class);// 5.判断是否过期if (expireTime.isAfter(LocalDateTime.now())) {// 5.1.未过期,直接返回店铺信息return shop;}// 6.3.成功,通过线程池开启独立线程,让该线程实现缓存重建// 使用线程池的原因是,频繁创建和销毁线程很消耗性能CACHE_REBUILD_EXECUTOR.submit(() -> {try {this.saveShop2Redis(id, 20L);}catch (Exception e){throw new RuntimeException(e);}finally {// 释放锁unlock(lockKey);}});}// 7.返回过期的店铺信息return shop;
}public void saveShop2Redis(Long id,Long expireSeconds) throws InterruptedException {// 1.查询店铺数据Shop shop = getById(id);// 2.封装逻辑过期时间RedisData redisData = new RedisData();redisData.setData(shop);redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));// 3.写入redisstringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,JSONUtil.toJsonStr(redisData));
}

缓存工具类

 基于StringRedisTemplate封装一个缓存工具类,满足下列需求:方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题普通的key通常使用方法一和方法三来进行维护,热点key通常使用方法二和方法四来进行维护。在工作中,很多操作都是可以封装成工具类来使用的,这里就举例互斥锁、逻辑过期两个方法来写一个工具类。
@Slf4j
@Component
public class CacheClient {// 这里不需要 @Resource,因为使用的有参构造方法注入,只有一个构造方法甚至不需要写@Beanprivate final StringRedisTemplate stringRedisTemplate;// 线程池private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);public CacheClient(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public void set(String key, Object value, Long time, TimeUnit unit) {stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);}public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {// 设置逻辑过期RedisData redisData = new RedisData();redisData.setData(value);redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));// 写入RedisstringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));}public <R,ID> R queryWithPassThrough(// keyPrefix是redis中key的前缀,id是和前缀拼接在一切的唯一标识// dbFallback是有传参有返回值的查询数据库方法,time、unit分别是时间值和时间单位String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit){String key = keyPrefix + id;// 1.从redis查询商铺缓存String json = stringRedisTemplate.opsForValue().get(key);// 2.判断是否存在if (StrUtil.isNotBlank(json)) {// 3.存在,直接返回return JSONUtil.toBean(json, type);}// 判断命中的是否是空值if (json != null) {// 返回一个错误信息return null;}// 4.不存在,根据id查询数据库R r = dbFallback.apply(id);// 5.不存在,返回错误if (r == null) {// 将空值写入redisstringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);// 返回错误信息return null;}// 6.存在,写入redisthis.set(key, r, time, unit);return r;}public <R, ID> R queryWithLogicalExpire(// keyPrefix是redis中key的前缀,id是和前缀凭借在一切的唯一标识// dbFallback是有传参有返回值的查询数据库方法,time、unit分别是时间值和时间单位String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {String key = keyPrefix + id;// 1.从redis查询商铺缓存String json = stringRedisTemplate.opsForValue().get(key);// 2.判断是否存在if (StrUtil.isBlank(json)) {// 3.存在,直接返回return null;}// 4.命中,需要先把json反序列化为对象RedisData redisData = JSONUtil.toBean(json, RedisData.class);R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);LocalDateTime expireTime = redisData.getExpireTime();// 5.判断是否过期if(expireTime.isAfter(LocalDateTime.now())) {// 5.1.未过期,直接返回店铺信息return r;}// 5.2.已过期,需要缓存重建// 6.缓存重建// 6.1.获取互斥锁String lockKey = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);// 6.2.判断是否获取锁成功if (isLock){// 6.3.成功,开启独立线程,实现缓存重建CACHE_REBUILD_EXECUTOR.submit(() -> {try {// 查询数据库R newR = dbFallback.apply(id);// 重建缓存this.setWithLogicalExpire(key, newR, time, unit);} catch (Exception e) {throw new RuntimeException(e);}finally {// 释放锁unlock(lockKey);}});}// 6.4.返回过期的商铺信息return r;}public <R, ID> R queryWithMutex(// keyPrefix是redis中key的前缀,id是和前缀凭借在一切的唯一标识// dbFallback是有传参有返回值的查询数据库方法,time、unit分别是时间值和时间单位String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {String key = keyPrefix + id;// 1.从redis查询商铺缓存String shopJson = stringRedisTemplate.opsForValue().get(key);// 2.判断是否存在if (StrUtil.isNotBlank(shopJson)) {// 3.存在,直接返回return JSONUtil.toBean(shopJson, type);}// 判断命中的是否是空值if (shopJson != null) {// 返回一个错误信息return null;}// 4.实现缓存重建// 4.1.获取互斥锁String lockKey = LOCK_SHOP_KEY + id;R r = null;try {boolean isLock = tryLock(lockKey);// 4.2.判断是否获取成功if (!isLock) {// 4.3.获取锁失败,休眠并重试Thread.sleep(50);return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit);}// 4.4.获取锁成功,根据id查询数据库r = dbFallback.apply(id);// 5.不存在,返回错误if (r == null) {// 将空值写入redisstringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);// 返回错误信息return null;}// 6.存在,写入redisthis.set(key, r, time, unit);} catch (InterruptedException e) {throw new RuntimeException(e);}finally {// 7.释放锁unlock(lockKey);}// 8.返回return r;}// 获取锁方法private boolean tryLock(String key) {Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);}// 释放锁方法private void unlock(String key) {stringRedisTemplate.delete(key);}
}

优惠券秒杀

全局唯一ID

在黑马点评项目中每个店铺都可以发布优惠券。当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:1、id的规律性太明显,容易被人通过id号猜出数据信息,如一天销售量多少等2、受单表数据量的限制,以后分库分表自增id就会出现问题,因为mysql的自增都是每张表独立的。正因为以上的原因,所以产生了一个概念叫全局ID生成器。
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:

Redis很适合实现全局ID生成器,除了安全性(纯自增的值安全性不高),所以我们需要对redis中自增的id值进行拼接。
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
ID的组成部分:符号位:1bit,永远为0时间戳:31bit,以秒为单位,可以使用69年序列号:32bit,秒内的计数器,支持每秒产生2^32个不同IDPS:这里的ID规则是自定义的,个人认为可以其实可以直接使用雪花算法生成的id的,mp默认的雪花算法id生成就很好用,这里就先当作学习了,之后再评价。

实现Redis全局ID生成器

@Component
public class RedisIdWorker {/***  开始时间戳,可以自定义* */public static final long BEGIN_TIMESTAMP = 1640995200L;/***  序列号位数,可以自定义* */public static final long COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;// 使用构造方法注入public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix){// 1.生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;// 2.生成序列号// 2.1.获取当前日期,精确到天String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));// 2.2.自增长long count = stringRedisTemplate.opsForValue().increment("incr:" + keyPrefix + ":" + date);// 3.拼接并返回// 这里通过位运算来实现 字符串的拼接效果,但结果又不是字符串,是long类型return timestamp << COUNT_BITS | count;}
}

使用

// 直接注入调用nextId()方法即可。
@Resource
private RedisIdWorker redisIdWorker;@Test
void test(){long id = redisIdWorker.nextId("order");System.out.println("id = " + id);
}

总结

全局唯一ID生成策略:UUID:生成的是16进制的字符串,也没有递增性Redis自增:采用数值类型,存储空间占用少。可以应用在分布式体系中,不依赖于机器码,不需要人员维护snowflake算法(雪花算法):不依赖Reids,性能强于Redis,依赖机器自身的机器码,需要人员去维护数据库自增:即创建一个专门用于存储全局唯一ID的表,每次批量创建,然后供查询使用,性能低于Redis自增方案
Redis自数据库增ID策略:每天一个key,方便统计订单量,同时这样可以限制key对应的value值不过于太大,因为value最大为2的64次方,看起来很大没什么问题,但是像淘宝这样的电商,几天可能就占满value最大值了,采用每天一个key可以有效解决这个问题。ID构造是 时间戳 + 计数器

实现优惠券秒杀下单

每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:表关系如下:tb_voucher:优惠券的基本信息,优惠金额、使用规则等tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息

添加优惠券

请求路径:http://localhost:8081/voucher/seckill
请求方式:POST
请求体:{"shopId":1,"title":"100元代金券","subTitle":"周一至周五均可使用","rules":"全场通用\\n无需预约\\n课无限叠加\\不兑现、不找零\\n仅限堂食","payValue":8000,"actualValue":10000,"type":1,"stock":100,"beginTime":"2022-05-31T16:00:00","endTime":"2022-05-31T20:00:00"}

实现下单

@Override
@Transactional // 这里添加了事务,因为扣减库存和保存订单要保持原子性
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券信息SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("优惠券不存在!");}// 2.判断秒杀是否开始if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀活动还未开始!");}// 3.判断秒杀是否结束if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀活动已经结束!");}// 4.判断库存是否充足if (seckillVoucher.getStock() < 1) {return Result.fail("库存不足!");}// 5.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).update();if (!success) {// 扣减失败return Result.fail("库存不足!");}// 6.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 6.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 6.2.用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);// 6.3.代金券idvoucherOrder.setVoucherId(voucherId);// 6.4.保存订单到数据库中去save(voucherOrder);// 7.返回订单idreturn Result.ok(orderId);
}

超卖问题

 如上图所示,如果不进行任何措施,那么就很容易在并发情况下出现超卖现象,这是电商平台绝对不允许出现的,一定要解决这个问题。超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:悲观锁or乐观锁。关于悲观锁,其实没什么好讲的,就是第一个拿到锁的线程,先执行完,再释放锁,以此往返,保证线程串行执行,实现简单但是性能影响大。这里我们主要讲乐观锁,这个“锁”并不是真正的锁,只是通过版本号来判断是否数据被修改过(不一定要存在一个版本号属性,具有版本号功能的也是可以的)。

乐观锁

乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种: 1、版本号法2、CAS法(Compare and swap,比较与交换)

版本号法

 版本号法,就是给表中添加一个version字段,作为每次修改后的版本,用于判断在自己拿到数据后,是否已经被别的线程修改过了。这个方法适用于任何情况下的任何表,只是会造成一定的数据存储,如果可以使用CAS法最好就用CAS法,节省数据存储空间。

CAS法

 CAS法,也叫比较与交换法,其实就是在表中要是存在这次业务流程拥有version功能一样的字段,那就把这个字段同时作为version来用,节省存储空间。当然要是有五个业务场景都需要使用了乐观锁,但是只要有一个不能使用CAS法,那就必须使用版本号法。

@Override
@Transactional
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券信息SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("优惠券不存在!");}// 2.判断秒杀是否开始if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀活动还未开始!");}// 3.判断秒杀是否结束if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀活动已经结束!");}// 4.判断库存是否充足if (seckillVoucher.getStock() < 1) {return Result.fail("库存不足!");}// 5.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId)// .eq("voucher_id", voucherId)    这样才是乐观锁,但是要考虑成功率的原因,// 明明库存还足够明确因为同时有几个共同并发的线程,导致不应该的失败,过于浪费性能,所以下面的 stock > 0,才是最优方案.gt("stock",0)  // 这里是一个巧妙的设计但是我感觉这一句不算乐观锁了。.update();if (!success) {// 扣减失败return Result.fail("库存不足!");}// 6.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 6.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 6.2.用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);// 6.3.代金券idvoucherOrder.setVoucherId(voucherId);// 6.4.保存订单到数据库中去save(voucherOrder);// 7.返回订单idreturn Result.ok(orderId);
}

总结

超卖这样的线程安全问题,解决方案有哪些?悲观锁:添加同步锁,让线程串行执行优点:简单粗暴缺点:性能一般乐观锁:不加锁,在更新时判断是否目标数据是否已经被修改优点:性能好缺点:存在成功率低的问题因此,我们在生产环境中,只是用乐观锁或悲观锁,是远远不够,后面会继续优化。

一人一单

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单。

@Override
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券信息SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("优惠券不存在!");}// 2.判断秒杀是否开始if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀活动还未开始!");}// 3.判断秒杀是否结束if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀活动已经结束!");}// 4.判断库存是否充足if (seckillVoucher.getStock() < 1) {return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();// 这里使用字符串常量池中的userId作为锁,缩小了一个锁的范围,// 可以保证只有同一个对象的请求才会互斥加锁,不是同一个对象就可以并行// 同时不把事务放在createVoucherOrder方法里面是因为这样会造成锁释放的时候,事务还未提交// 事务未提交的话,那么做的修改都不会被查询到,下一个进程就还会再次进行修改,就会出现超卖问题// 一定要注意Spring的事务和锁的先后关系。synchronized (userId.toString().intern()){// 获取代理对象(实现事务的代理对象)。要获取这个对象需要做两步// 因为Spring的事务是基于代理类的,原理其实我还不是很懂,需要继续努力!/*** 1、*    <dependency>*         <groupId>org.aspectj</groupId>*         <artifactId>aspectjweaver</artifactId>*       </dependency>**  2、@EnableAspectJAutoProxy(exposeProxy = true)*/// 1.在pom.xml中引入aspectjweaver包 2.使用@EnableAspectJAutoProxy(exposeProxy = true) 暴露代理对象IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}
}@Transactional
public Result createVoucherOrder(Long voucherId) {// 限制一人一单Integer count = query().eq("user_id", UserHolder.getUser().getId()).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("一人一单!不能重复购买!");}// 5.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock",0).update();if (!success) {// 扣减失败return Result.fail("库存不足!");}// 6.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 6.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 6.2.用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);// 6.3.代金券idvoucherOrder.setVoucherId(voucherId);// 6.4.保存订单到数据库中去save(voucherOrder);// 7.返回订单idreturn Result.ok(orderId);
}
集群并发问题

单机系统

 如图所示,在一个单机系统中,我们上面实现的synchronized锁已经可以实现一人一单的限制,不会出现超卖问题。但是令人担忧的是集群下的并发问题。

分布式系统

 如图所示,在分布式系统中,由于JVM不同,导致的多个锁监视器,而我们又会用nginx开启负载均衡功能,多次访问的地址很有可能不是一个地址。如果出现一个时刻一个用户发起一个请求多次,那么就很容易出现一人一单限购要求下的的超卖现象。这种时候,基于JVM的锁机制已经无法解决了,那么就轮到了分布式锁的登场的时候了。

idea开启本地集群

idea

 idea在服务小窗口中右击复制配置或快捷键Crtl+D进行复制配置,然后先修改服务名称,再在虚拟机环境(VM options)中输入
-Dserver.port=8082,就成功修改第二个SpringBoot服务的端口号为8082,然后重启两个服务,这样两个SpringBoot服务就在本地组成了一个集群。

nginx

 在老师已经配置好的nginx.conf中,去掉到8082端口的注释,并在命令行输入nginx.exe -s reload命令来重启nginx服务,让修改的配置生效,让nginx反向代理这两个端口的地址,实现集群的负载均衡。此时我们就可以通过访问8080端口,然后通过nginx负载均衡的访问8081、8082端口。

分布式锁

 分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。简单来说,就是提高了原来在JVM中锁的作用域,变为全局作用域。分布式锁的基本性要求有:多线程可见、互斥、高可用、高性能、安全性。当然还有很多别的功能性要求,但是那是功能性要求,可选的。

分布式锁的实现

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:
PS:因为是Redis的课程,所以后面只讲Redis分布式锁的实现,其他实现可以自行学习

基于Redis的分布式锁

实现分布式锁时需要实现的两个基本方法:获取锁:互斥:确保只能有一个线程获取锁,且设置超时时间,防止因死锁或系统宕机导致锁无法被释放非阻塞:尝试一次,成功返回true,失败返回false //  其实还有阻塞式的,因为阻塞式实现复杂,并且十分占用CPU性能,所以就不演示阻塞式的了,只演示非阻塞式,拿到锁就返回true继续执行,没拿到锁就返回false结束执行。释放锁:手动释放超时释放:获取锁时添加一个超时时间

基于Redis实现分布式锁初级版本
以下只是redis分布式锁的初级版本,还有一些极端情况无法应对,还是会出现错误,所以后面还会升级这个redis分布式锁,让他可以应对极端情况。
需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能。
// 接口
public interface ILock {/*** 尝试获取锁* @param timeoutSec 锁持有的超时时间,过期后自动释放* @return true表示取锁成功,false表示获取锁失败* */boolean tryLock(long timeoutSec);/*** 释放锁* */void unLock();
}// 实现类,用于实现Redis分布式锁功能
public class SimpleRedisLock implements ILock{private String name;private StringRedisTemplate stringRedisTemplate;public static final String KEY_PREFIX = "lock:";public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标识long threadId = Thread.currentThread().getId();// 尝试获取锁,同时一起给代表锁的key设置超时时间,保证他们俩的原子性,// 防止出现设置key后,还没设置key的超时时间系统宕机了,导致key永不过期,进而造成死锁等问题。Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);// 返回获取锁结果,同时不直接返回是因为拆箱容易出现空指针异常。return Boolean.TRUE.equals(success);}@Overridepublic void unLock() {// 释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}
}

使用Redis分布式锁实现集群下一人一单限购

@Override
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券信息SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("优惠券不存在!");}// 2.判断秒杀是否开始if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀活动还未开始!");}// 3.判断秒杀是否结束if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀活动已经结束!");}// 4.判断库存是否充足if (seckillVoucher.getStock() < 1) {return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();// 创建锁对象SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);// 尝试获取锁,超时时间根据正常执行业务所需时间决定boolean isLock = lock.tryLock(10);// 判断是否获取锁成功if (!isLock) {// 获取锁失败,返回错误或重试,这里是非阻塞式,所以直接返回错误,阻塞式应该是重试或等待。return Result.fail("不允许重复下单!");}try {// 获取代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {// 释放锁lock.unLock();}
}@Transactional
public Result createVoucherOrder(Long voucherId) {// 限制一人一单Integer count = query().eq("user_id", UserHolder.getUser().getId()).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("一人一单!不能重复购买!");}// 5.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock",0).update();if (!success) {// 扣减失败return Result.fail("库存不足!");}// 6.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 6.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 6.2.用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);// 6.3.代金券idvoucherOrder.setVoucherId(voucherId);// 6.4.保存订单到数据库中去save(voucherOrder);// 7.返回订单idreturn Result.ok(orderId);
}
初级版本的极端情况的bug1
 上面的redis分布式锁初级版本还是有小bug的,假设有三个线程,分别是线程1、线程2、线程3,他们都是同一个用户发起的下单请求,带着相同的userId发起请求,所以他们是互斥同一个锁的。线程1率先拿到了锁,然后发生了不明所以的业务阻塞,导致业务还没执行完成就已经到了redis分布式锁设置的超时时间,那么这个时候就等于锁被释放了,此时线程2发起请求,尝试获取锁,当然是获取成功的。然后在线程2执行业务的过程中,线程1突然完成了业务,然后进行了释放锁的操作,导致线程2的锁被释放了,这时线程3又正好发起了请求,尝试获取锁,当然也是成功的,然后可以一直这样错下去。很显然这样的错误流程是不允许的,虽然就算两个线程同时拿到锁概率低,并且这两个线程就算在执行同一业务造成一人一单限购的超卖现象概率比也很低,两者重叠发生的概率微乎其微,但是有概率发生的事情就必须解决,超卖是电商平台绝对不允许的事情。

解决方案

    修改之前的分布式锁实现,满足:1、在获取锁时存入线程标示(可以用UUID表示)2、在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致如果一致则释放锁如果不一致则不释放锁就是在尝试获取锁的时候,给锁的value设置成(UUID+当前线程的id)。然后再释放锁的时候,判断锁的value是不是(UUID+当前线程的id),如果是就代表这个锁是该线程的,就要进行释放操作,如果不是就代表这个锁不是该线程的,什么都不做,不进行释放锁操作即可。注意!!!是当前JVM生成的UUID+线程id,在不同的JVM中,不同JVM生成的UUID不同,所以就算不同JVM中的线程id相同,也不会丧失唯一性。在相同的JVM中,就算不同线程的UUID是相同的,他们的线程id也不是相同的,所以完美塑造了不管是相同JVM还是不同JVM的唯一性value。

// 根据以上需求,只需要修改获取锁和释放锁的方法即可。
// 注意!!!这样还是有很极端情况下的bug的,后续会继续完善的。
public class SimpleRedisLock implements ILock{private String name;private StringRedisTemplate stringRedisTemplate;public static final String KEY_PREFIX = "lock:";// 使用Hutool工具包生成没有斜杠的UUIDpublic static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {// UUID拼接线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 尝试获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);// 返回获取锁结果,同时不直接返回是因为拆箱容易出现空指针异常。return Boolean.TRUE.equals(success);}@Overridepublic void unLock() {// 获取UUID拼接线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁的value唯一标识String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 判断标识是否一致if (threadId.equals(id)) {// 释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}}
}
初级版本的极端情况的bug2
 在修改完bug1以后,还是会存在一个极端的bug。就是在线程1成功获取锁以后,执行完业务并获取锁标识判断为一致了,准备执行释放锁的操作了,这个时候因为JVM的垃圾回收机制导致了阻塞,从而使锁key的超时时间到了,锁自动被释放了。线程2在这个时候获取锁成功,去执行业务了,但是执行到一半,线程1不阻塞了,进行了释放锁操作,从而导致线程2的锁被释放了,这时线程3就可以成功获取锁了,但是线程2还是在执行业务的,此时就有两个线程同时在执行业务了。

解决方案

 造成以上的bug,是因为 1、获取锁的标识并判断是否一致 和 2、释放锁 ,这两个操作并没有做到原子性操作,一起成功或一起失败。那么要保证这两个操作的原子性,有两个方案:1、使用redis的事务 2、使用Lua脚本语言写一个脚本调用1、使用redis的事务可以满足原子性但是没办法满足一致性,个人认为redis的事务和mysql的一样,毕竟都只是数据库不是编程语言,只能负责执行很多命令,却不能像编程语言一样,根据传入的参数,做一些if判断语句的操作,来决定每次操作要不要执行一些特定的操作。就拿这次的案例来说,我们每次都要执行get语句来获取锁key中的value,但是要根据value是否和(UUID + 线程id)相同,再决定是否执行del语句来释放锁,我们使用redis事务只能选择多条命令进行执行,但是却不能进行编程语言的if语言等操作来动态的选择要执行的语句,所以使用redis事务实现,只能每次都执行get和del语句,这很显然不符合我们的要求,但是黑马老师说有实现方案,但是很麻烦,之后有机会再了解把。2、使用Lua脚本,可以同时实现原子性和一致性。因为redis调用Lua脚本时,把一个Lua脚本整体看作一个语句,所以原子性得以满足;Lua是一种脚本语言,可以实现if判断语句以及其他编程语言可以实现的操作,从而可以判断操作,然后再选择是否执行del语句操作,所以一致性得以满足。关于原子性和一致性的举例:转账:张三给李四转账100元。那数据库假设需要张三扣100,李四加100,记录一条流水。如果流水没记录成功,那整体回滚,张三也没转账成功,李四也没多钱。这就是原子性的体现。而张三必须扣100,李四必须加100,这个就是一致性了,如果因为某些逻辑原因,导致张三扣了100,流水记录100转账,而李四只加了60。然后这3条操作都成功了,那原子性就符合了,但是一致性就不符合了其实在实际应用中肯定不是这么简单的例子的。往往是类似,买东西扣库存这类的逻辑,主表里有库存,库存表里有库存,然后就因为设计缺陷,就算加了事务还是出现了主表库存对不上库存表库存的问题,这个就是一致性不满足的了。
Redis的Lua脚本
    Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性,即把一个Lua脚本中的多个Redis命令以及语言流程看作一句语句,如set、get这样的语句。通过把一个Lua脚本看成一句来保证原子性,通过Lua语言在脚本中的判断语句等编程语言的操作来保证一致性。。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
这里重点介绍Redis给Lua语言提供的调用函数,语法如下:

eval命令(在redis中调用Lua脚本)
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:语法:eval script numkeys [key ...] [arg ...]
功能描述:在redis中调用Lua脚本
选项描述:scrpit:脚本文件/脚本字符串numkeys:必填参数,脚本需要传入的KEYS数组的个数[key ...]:可选参数,依次放入Lua脚本中自带的KEYS数组中,注意Lua语言中数组的下标从1开始[arg ...]:可选参数,依次放入Lua脚本中自带的ARGV数组中,注意Lua语言中数组的下标从1开始[key ...]/[arg ...]:两者可以视为传参的一个整体,但是怎么区分哪些参数是放在KEYS数组中,哪些参数是放在ARGV数组中的呢?答案很简单,这时候numkeys选项就起作用了,他代表在后面的可选参数中,前n个参数是放在KEYS数组中的,其余的都放在ARGV数组中。举例:eval 脚本字符串 0      -----> KEYS数组和ARGV数组均为空eval 脚本字符串 0 1 2 3 4 ----->    KEYS数组为空,ARGV数组为[1,2,3,4]eval 脚本字符串 1 1 2 3 4 ----->  KEYS数组为[1],ARGV数组为[2,3,4]eval 脚本字符串 2 1 2 3 4 -----> KEYS数组为[1,2],ARGV数组为[3,4]eval 脚本字符串 4 1 2 3 4 -----> KEYS数组为[1,2,3,4],ARGV数组为空

bug2解决方案的实现

实现的Lua脚本

-- 该Lua脚本满足了释放锁操作的原子性和一致性
-- 获取锁的key
local key = KEYS[1]
-- 获取当前线程标识(UUID + 线程id)
local threadId = ARGV[1]-- 获取锁中的线程标识 get key
local id = redis.call('get',key)
-- 比较线程标识与锁中的标识是否一致
if(id == threadId) then-- 一致,释放锁,并返回1return redis.call('del',key)
end
-- 不一致,返回0
return 0

Java内操控redis调用Lua脚本

public class SimpleRedisLock implements ILock{private String name;private StringRedisTemplate stringRedisTemplate;public static final String KEY_PREFIX = "lock:";// 使用Hutool工具包生成没有斜杠的UUIDpublic static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";// 创建读取Lua脚本的对象,因为给每个线程都创建io流很浪费资源和性能,所以这里使用静态属性的方式public static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {// 创建RedisScript接口的实现类UNLOCK_SCRIPT = new DefaultRedisScript<>();// 读取类路径下的Lua脚本UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));// 指定返回值泛型,其实在创建对象的时候就可以指定泛型了UNLOCK_SCRIPT.setResultType(Long.class);}public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {// UUID拼接线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 尝试获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);// 返回获取锁结果,同时不直接返回是因为拆箱容易出现空指针异常。return Boolean.TRUE.equals(success);}@Overridepublic void unLock() {// 在reids中调用Lua脚本,保证原子性情况下进行释放锁操作stringRedisTemplate.execute(UNLOCK_SCRIPT,  // Lua脚本的io对象Collections.singletonList(KEY_PREFIX + name), // List的长度决定了numkeys,所以不需要我们手动指定ID_PREFIX + Thread.currentThread().getId());}
}
总结
基于Redis的分布式锁实现思路:利用set nx ex获取锁,并设置过期时间,保存线程标示(UUID + 线程id)释放锁时先判断线程标示是否与自己一致,一致则删除锁(采用Lua脚本)
特性:利用set nx满足互斥性利用set ex保证故障时锁依然能释放,避免死锁,提高安全性采用Lua脚本,保证原子性和一致性,避免因阻塞而产生的误删锁key利用Redis集群保证高可用和高并发特性即使给出了这样的解决方案,这样线程1和线程2之间,还是有几率出现超卖现象,虽然概率很低,就是当线程1阻塞后导致锁的超时时间到了自动被释放,然后线程2拿到锁,此时线程1和线程2都是可以拿到在业务当中的,当他们都拿到count=0时,那么就会出现超卖现象,还是需要继续改进!我觉得应该是概率很低,所以没有被拿出来说,但是有概率发生就是有个概率发生。我目前的想法是把以下判断一人一单和扣减库存作为一个原子性操作。原来的代码:// 限制一人一单Integer count = query().eq("user_id", UserHolder.getUser().getId()).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("一人一单!不能重复购买!");}// 扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock",0).update();if (!success) {// 库存不足return Result.fail("库存不足!");}修改后的代码:// 5.判断一人一单以及扣减库存的原子性操作// UPDATE tb_seckill_voucher SET stock = stock - 1 WHERE (voucher_id = ? AND stock > ? // AND (select count(*) from tb_voucher_order where user_id = 1013 AND voucher_id = 2) = 0)boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock",0).apply("(select count(*) from tb_voucher_order where user_id = " + UserHolder.getUser().getId() +" AND voucher_id = " + voucherId + ") = 0").update();if (!success) {// 限制一人一单Integer count = query().eq("user_id", UserHolder.getUser().getId()).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("一人一单!不能重复购买!");}// 库存不足return Result.fail("库存不足!");}
基于Redis的分布式锁优化
 基于setnx实现的分布式锁还有很多问题需要优化,但是在一般的生产环境中以及足够用了,如果你的要求更高,那么就需要对下面的问题进行优化,但是由于自己实现很复杂很难,而且我们也没必要自己优化,因为市面上已经有很多这样的分布式框架解决方案了,我们使用这些框架就可以了,其中最为出名的就是Redisson。上面的学习是为了让我们了解分布式锁实现的原理,也很重要。

Redisson

    Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。说通俗点,Redisson是一个分布式工具框架,它里面有很多分布式工具,分布式锁只是其中一种。官网地址: https://redisson.orgGitHub地址: https://github.com/redisson/redisson

Redisson入门

1、引入依赖

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>

2、配置Redission客户端(放入IOC容器中)

// 注意!!!其实也可以通过yaml的方式配置Redisson客户端,因为Redisson提供了SpringBoot的start启动器,听起来这样很方便,但是yaml配置方式会替代Spring官方对redis的配置和实现,建议不要使用这种方式。
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redisClient(){// 配置类Config config = new Config();// 添加redis地址,这里添加了单点的地址(redis集群可以用useClusterServers()),也可以使用config.useClusterServers()添加集群地址config.useSingleServer().setAddress("redis://192.168.182.154:6379").setPassword("mima");// 创建连接客户端return Redisson.create(config);}
}

3、使用Redisson的分布式锁

@Resource   // 要注入这个对象来使用
private RedissonClient redissonClient;@Test
void testRedisson() throws InterruptedException {// 创建锁对象(可重入锁),并给锁的key赋值(此时并未获取锁)RLock lock = redissonClient.getLock("anyLock");// 尝试获取锁,这里的锁key就是上面构造方法传入的属性// tryLock(long, long, TimeUnit),参数分别是获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位// tryLock(),即最大等待时间默认值为-1获取锁失败就直接返回(非阻塞式),锁自动释放时间为30秒boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);// 判断是否获取成功if(isLock){try {System.out.println("执行业务");}finally {// 释放锁lock.unlock();}}
}

Redisson可重入锁原理

 Redisson可重入锁,是基于redis的Hash类型实现的,分别有三个参数:key、field、value。其中key还是和原来我们自定义的string类型分布式锁一样,就是前面的锁key,类似于lock:order:userId这种;而field就是原来我们自定义的string类型分布式锁的value,类似于(UUID + 线程id)的组合;而Hash类型中的value,就是该锁的重入数量。根据可重入锁的原理可知,为了保证原子性和一致性,以上的操作必须使用Lua脚本来实现。

获取可重入锁的Lua脚本

-- 以下脚本在小细节上和Redisson实现的稍有出入,但原理和结果是一样的。
-- Redisson把这段Lua脚本以硬编码的字符串形式写在类中加载。
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断是否存在
if(redis.call('exists', key) == 0) then-- 不存在, 获取锁redis.call('hset', key, threadId, '1'); -- 设置有效期redis.call('expire', key, releaseTime);return 1; -- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then-- 锁是自己线程的, 获取锁,重入次数+1redis.call('hincrby', key, threadId, '1'); -- 刷新有效期redis.call('expire', key, releaseTime);return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败

释放锁的Lua脚本

-- 以下脚本在小细节上和Redisson实现的稍有出入,但原理和结果是一样的。
-- Redisson把这段Lua脚本以硬编码的字符串形式写在类中加载。
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if (redis.call('HEXISTS', key, threadId) == 0) thenreturn nil; -- 如果已经不是自己,则直接返回
end;
-- 是自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断是否重入次数是否已经为0
if (count > 0) then-- 大于0说明不能释放锁,重置有效期然后返回redis.call('EXPIRE', key, releaseTime);return nil;
else  -- 等于0说明可以释放锁,直接删除redis.call('DEL', key);return nil;
end;

Redisson重试、超时续约原理

 这一部分黑马老师带着看了一遍源码,但是其实没怎么看懂,稍微有点晕,之后回来深入了解一下。

总结

Redisson分布式锁原理:可重入:利用hash结构记录线程id和重入次数可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制。因为采用了订阅发布功能,所以线程在获取锁失败以后的重试并不是一直尝试,而是先睡眠,等待释放锁的信号以后再被唤醒去尝试获取锁,这样很好的做到了不浪费CPU性能。超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间。利用watchDog只有在未显式指定加锁时间(leaseTime)时才会生效。(这点很重要)若显式指定leaseTime参数,则看门狗会失效,不会自动为超时key续约。所以当我们不指定Redisson的tryLock()里的leaseTime时,那么这个锁只有在业务结束之后线程主动释放可以被释放,并不会因为线程业务阻塞而导致锁超时被释放,被其他线程获取到锁从而导致线程不安全问题。当时我们显式指定leaseTime时,WatchDog就不会生效,就会出现超时释放的情况,所以这一切都视情况来使用。

Redisson分布式锁主从一致性问题

    Redisson对redis集群主从一致性问题的解决方案就是采用联锁。联锁(RedissonMultiLock)对象可以将多个RLock对象关联为一个联锁,实现加锁和解锁功能。每个RLock对象实例可以来自于不同的Redisson实例。

总结

1)不可重入Redis分布式锁:原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示缺陷:不可重入、无法重试、锁超时失效
2)可重入的Redis分布式锁:原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待缺陷:redis宕机引起锁失效问题
3)Redisson的multiLock(联锁,主从一致性解决方案):原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功缺陷:运维成本高、实现复杂

Redis秒杀优化(阻塞队列异步秒杀)

同步流程

 在我们之前的优惠券秒杀中,关于优惠券库存、一人一单的购买资格等对数据库大量的IO操作,我们全部都是直接对数据库进行操作。要知道数据库的并发性本来就很差,这么多数据库请求打过来,肯定会造成我们对用户的请求响应很慢。同时用户下单请求是一个同步流程,服务器只有在处理完一切流程以后才会返回数据给用户,这样一套流程下来,响应速度可想而知会很慢。

异步流程

 我们完全可以对优惠券秒杀流程进行拆分操作,把库存数据以及创建一个Set类型用于存放已经下单的用户id写入到redis中,然后通过判断库存是否充足和订单Set中是已经有该用户了来判断用户是否可以下单。若可以下单,把库存减去1,订单Set中加入用户id,然后在Java中根据Redis的返回结果以及优惠券id、用户id生成订单id放入一个阻塞队列中,等待之后异步线程来读取进行写入数据库操作,对于用户的请求,我们只需要把这次生成的订单id等信息直接返回该订单信息给用户(此时并未写入数据库);若不可以下单,就直接返回错误信息给用户即可。最后,redis创建一个线程,来读取订单Set中的订单信息,去数据库中进行插入订单和扣减库存操作。以上就是优惠券秒杀的异步流程,把判定流程和对数据库进行操作的流程分离出来,判定流程交给redis实现,对数据库操作可以之后异步实现,先把结果返回给用户才是最重要的。因为我们会对下单成功的用户先生成一些必须的订单信息返回,但是先不写入数据库,把它们放在阻塞队列中,等待异步线程来读取数据,然后进行写入数据库操作。由于以上的异步流程,所以对用户的感知来说是完全没有差别的,响应速度反而快了很多。

Redis优化分析

 采用异步流程后,需要在redis存储两个内容:库存、下单成功用户。库存没有什么特殊要求,我们可以直接使用string类型作为库存的数据类型;对于下单成功用户,由于一人一单的要求,我们选取Set类型最合适不过。同时由于要求对数据的原子性和一致性要求,我们要把判断用户是否有下单资格以及为可以下单的用户记录下单信息的操作封装成Lua脚本来执行。Lua脚本如图和Java中的流程如图。

改进秒杀业务,提高并发性能

需求:1、新增秒杀优惠券的同时,将优惠券信息保存到Redis中2、基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功3、如果抢购成功,将优惠券id和用户id封装后存入阻塞队列4、开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能注意!!!这里并没有非常严谨,没有判断优惠券的开始时间和结束时间,只要了解结合redis进行异步操作就可以了,不要纠结这些小问题。

1、添加优惠券时添加到redis中

@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);// 保存秒杀库存到Redis中,不需要设置过期时间stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(),voucher.getStock().toString());
}

2、Lua脚本

-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get',stockKey)) <= 0) then-- 库存不足,返回1return 1
end-- 3.2.判断用户是否下单 sismember orderKey userId
if(redis.call('sismember',orderKey,userId) == 1) then-- 存在,说明是重复下单,返回2return 2
end-- 3.3.扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.5.下单(保存用户) sadd orderKey userId
redis.call('sadd',orderKey,userId)
-- 下单成功,返回0
return 0

3、异步流程代码

3.1、主线程判断是否有下单资格以及把下单数据放入阻塞队列

// 用于创建下单数据
@Resource
private RedisIdWorker redisIdWorker;// 创建调用Lua脚本所需对象,因为给每个线程都创建io流很浪费资源和性能,所以这里使用静态属性的方式
public static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {// 创建RedisScript接口的实现类SECKILL_SCRIPT = new DefaultRedisScript<>();// 读取类路径下的Lua脚本SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));// 指定返回值泛型,其实在创建对象的时候就可以指定泛型了SECKILL_SCRIPT.setResultType(Long.class);
}// 阻塞队列:即当一个线程尝试从该阻塞队列中读取数据时,若当前阻塞队列中没有数据,那么该线程就会阻塞,直到阻塞队列中有数据为止
// 1024 * 1024 是用于指定阻塞队列的大小,我猜测是bit为单位,不然这个就太大了,容量不能过大,过大会造成内存溢出
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>( 1024 * 1024);// 提前获取主线程的的代理对象,不然在异步线程无法获取到
private IVoucherOrderService proxy;@Override
public Result seckillVoucher(Long voucherId) {// 获取用户idLong userId = UserHolder.getUser().getId();// 1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),    // 如果没有KEYS数组的参数就给一个空的List,不能给nullvoucherId.toString(), userId.toString());// 2.判断结果是否为0int r = result.intValue();if (r != 0) {// 不为0,库存不足 或 不能重复下单return Result.fail(result == 1 ? "库存不足!" : "不能重复下单!");}// 3.结果为0,有购买资格,把下单信息保存到阻塞队列VoucherOrder voucherOrder = new VoucherOrder();// 3.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 3.2.用户idvoucherOrder.setUserId(userId);// 3.3.代金券idvoucherOrder.setVoucherId(voucherId);// 3.4.放入阻塞队列orderTasks.add(voucherOrder);// 赋值主线程的代理类proxy = (IVoucherOrderService) AopContext.currentProxy();// 4.结果为0,有购买资格,返回订单idreturn Result.ok(orderId);
}

3.2、异步线程读取阻塞对象进行写入数据库操作

@Resource
private ISeckillVoucherService seckillVoucherService;@Resource
private RedisIdWorker redisIdWorker;@Resource
private StringRedisTemplate stringRedisTemplate;@Resource
private RedissonClient redissonClient;// 阻塞队列:即当一个线程尝试从该阻塞队列中读取数据时,若当前阻塞队列中没有数据,那么该线程就会阻塞,直到阻塞队列中有数据为止
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>( 1024 * 1024);// 创建只有单个线程的线程池,因为异步用户无感知,所以我们可以慢慢写入,没必要浪费服务器资源
public static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();// 提前获取主线程的的代理对象,不然在异步线程无法获取到
private IVoucherOrderService proxy;/*什么时候执行这个任务呢,肯定是用户秒杀抢购之前开始,因为用户一旦开始秒杀,他就会向队列里添加新的订单,那我们任务就要去队列取出订单信息,所以他必须在队列之前执行。事实上,这个项目一启动,用户随时可能来抢购,所以应该在这个类初始化之后赶紧的执行这个任务。可以通过spring提供的注解来做:@PostConstruct,这个注解是在当前类初始化完毕后,去执行init方法。
*/
@PostConstruct  // Spring注解,在该类加载完成以后,马上执行该方法,因为读取阻塞队列的线程应该在秒杀之前就就绪了
private void init(){// 提交线程任务,异步读取阻塞队列并写入数据库SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}// 内部类,线程要执行的读取阻塞队列任务
private  class VoucherOrderHandler implements Runnable{@Overridepublic void run() {while (true) {try {// 因为BlockingQueue.take()是专门用于读取阻塞队列的方法// 所以阻塞队列为空时会自动阻塞,所以不用担心while循环给CPU带来压力// 1.获取并删除队列中的头部(最上面一条)订单信息VoucherOrder voucherOrder = orderTasks.take();// 2.创建订单handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理订单异常",e);}}}
}// 异步线程执行写入数据库的任务之前要判断以及获取锁、释放锁,还有通过主线程的代理类(为了让事务生效),调用执行写入数据库的方法
// 其实我觉得下面的锁在单线程要求下,其实没有意义,而且在主线程的Lua脚本中以及进行了判断,能下单的肯定都是有购买资格且库存充足的,并不惧怕并发引发问题,但是为了保险起见还是要写一下,同时万一以后要用到多个异步线程来处理呢
private void handleVoucherOrder(VoucherOrder voucherOrder) {// 1.获取用户id// 这里把 UserHolder.getUser().getId()换成了voucherOrder.getUserId(),// 是因为这一步操作是有异步线程来完成的,他独立于主线程的,所以他的Threadlocal中不会有User对象Long userId = voucherOrder.getUserId();// 2.创建锁对象RLock lock = redissonClient.getLock("lock:order:" + userId);// 3.获取锁boolean isLock = lock.tryLock();// 4.判断是否获取锁成功if (!isLock) {// 获取锁失败,返回错误或重试log.error("不允许重复下单");return;}try {// 获取锁成功,把订单信息写入数据库proxy.createVoucherOrder(voucherOrder);} finally {// 释放锁lock.unlock();}
}// 异步线程要执行的把订单信息写入数据库任务
// 并且这里不需要返回值,因为是异步线程在做写入数据库操作,给用户返回的信息早就返回了
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {// 5.扣减库存,并和判断一人一单为原子操作boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0)      // 这里把 UserHolder.getUser().getId()换成了voucherOrder.getUserId(),// 是因为这一步操作是有异步线程来完成的,他独立于主线程的,所以他的Threadlocal中不会有User对象.apply("(select count(*) from tb_voucher_order where user_id = " + voucherOrder.getUserId() +" AND voucher_id = " + voucherOrder.getVoucherId() + ") = 0").update();// 6.判断是否扣减库存成功if (!success) {// 限制一人一单Integer count = query().eq("user_id", voucherOrder.getUserId()).eq("voucher_id", voucherOrder.getVoucherId()).count();if (count > 0) {log.error("一人一单!不能重复购买!");}// 库存不足log.error("库存不足!");return;}// 扣减库存成功,保存订单到数据库中去save(voucherOrder);
}

总结

秒杀业务的优化思路是什么?先利用Redis完成库存余量、一人一单判断,完成抢单业务再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?内存限制问题:我们使用的阻塞队列是JDK的阻塞队列,不加以限制的话,在高并发情况下容易造成内存溢出,所以我们给阻塞队列限制了大小,那么在阻塞队列满了的情况下,就添加不了数据了,这也是个问题。数据安全问题:第一种由于Redis是基于内存的(虽然可以持久化,但是持久化不是每时每刻的,是每一段时间持久化一次的),要是用户收到返回的订单信息后马上进行了付款,但是这个时候异步进程还没有写入数据库,而是放在redis中,此时redis突然断电或宕机了,用户付款的数据就没有了;第二种情况是,由于我们使用的是JDK的阻塞队列,是基于JVM内存的,JVM内存是没有持久化机制的,如果这个时候java进程断电了或者宕机了,那么JVM内存里的东西就没有了,也就是阻塞队列里原本的数据就没有了,也造成了数据的丢失;第三种情况就是,异步线程刚从阻塞队列中取出订单数据,准备写入数据库,在处理的过程中出现了异常或者断电、宕机等问题,依旧会导致用户付款的数据丢失。

Redis消息队列实现异步秒杀

消息队列

消息队列,简单来说就是存放消息的队列。一个最简单但完整的消息队列模型中包括三个角色:生产者、消息队列(自身)、消费者生产者:负责发送消息(数据)给消息队列,然后让消息队列接收到这个消息去进行存储。消费者:负责从消息队列中读取消息(数据)来使用。消息队列:负责存储和管理消息,就是接收生产者发送过来的消息并进行存储,以及消费者从自己拿走存储的消息(数据)以后不会马上删除这个消息,而是再给消费者发消息,让消费者回复(以及收到之前的消息)的消息,然后才会真正把被拿走的消息删除,有效防止消息被拿走以后但是断电、宕机等情况导致任务未执行、且执行任务需要的数据丢失的情况。市面上消息队列的产品有很多,如RabbitMQ 、 ActiveMQ 、RocketMQ等,之后可以单独去学习。

    其实就是上面的Redis使用阻塞队列来进行异步下单,加快服务器响应速度是一样的。而这个消息队列担任的其实就是阻塞队列的角色,但是和原来的阻塞队列又有什么区别呢?原来的阻塞队列是基于JVM内存的,不限制内存容量容易导致内存溢出,所以我们限制了他的内存,那么就会导致这个到达这个限制以后存不进去数据,并且这个阻塞队列也因为基于JVM内存,而JVM内存是没有持久化功能的,一断电原来阻塞队列中的东西就丢失了。现在所讲的消息队列,则是一个独立的服务,就像mysql服务一样独立占用一个端口,不是内嵌在JVM中的,这就解决了内存溢出的问题。同时该消息队列是支持持久化的,拿到的数据他都会进行持久化,这就解决了断电数据丢失的安全问题了。同时他还有类似于“淘宝商品用户确认收货以后才会结束订单”的功能,就是消费者拿走消息队列中的数据以后,完成数据的消费然后返回确认消息给消息队列以后,消息队列才会删除自身中的消息。这样就可以保证,从消息队列中拿走数据的进程死锁或宕机了,但是数据已经不在消息队列中,无法再执行的问题了。

Redis三种方式模拟消息队列

 由于消息队列的服务需要单独去学习且需要重新起一个服务,对于小公司来说成本较高,所以小公司一般使用现成的Redis服务来模拟实现消息队列功能。Redis提供了三种不同的方式来实现消息队列:list结构:基于List结构模拟消息队列PubSub:基本的点对点消息模型Stream:比较完善的消息队列模型注意!!!这里了解一下就可以了,之后肯定还是要去学习RabbitMQ等专门的服务的。

基于List结构模拟消息队列

    消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。基于List的消息队列有哪些优缺点?
优点:1、利用Redis存储,不受限于JVM内存上限2、基于Redis的持久化机制,数据安全性有保证3、可以满足消息有序性
缺点:1、无法避免消息丢失:消息被线程拿走以后,无法做到保留,线程要是出错导致任务没有执行,那么数据就彻底丢失了2、只支持单消费者

基于PubSub的消息队列

    PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
命令:SUBSCRIBE channel [channel] :订阅一个或多个频道PUBLISH channel msg :向一个频道发送消息PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道基于PubSub的消息队列有哪些优缺点?
优点:1、采用发布订阅模型,支持多生产、多消费
缺点:2、不支持数据持久化3、无法避免消息丢失4、消息堆积有上限,超出时数据丢失

基于Stream的消息队列(单消费者)

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
发送消息的命令:

读取消息的命令方式之一:XREAD

优缺点(单消费者)

STREAM类型消息队列的XREAD单消费者
优先:消息可回溯一个消息可以被多个消费者读取可以阻塞读取
缺点:有消息漏读的风险

基于Stream的消息队列(消费者组)

 消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。虽然都是由Stream实现,但是上面实现的是单消费者,这里实现的是消费组。把前者缺点弥补了。具备下列特点:

创建消费者组命令

语法:XGROUP CREATE  key groupName ID [MKSTREAM]
参数key:队列名称groupName:消费者组名称ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息MKSTREAM:队列不存在时自动创建队列
其它常见命令:# 删除指定的消费者组XGROUP DESTORY key groupName# 给指定的消费者组添加消费者XGROUP CREATECONSUMER key groupname consumername# 删除消费者组中的指定消费者XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息命令

语法:XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
参数:group:消费组名称consumer:消费者名称,如果消费者不存在,会自动创建一个消费者count:本次查询的最大数量BLOCK milliseconds:当没有消息时最长等待时间NOACK:无需手动ACK,获取到消息后自动确认STREAMS key:指定队列名称ID:获取消息的起始ID:">":从下一个未消费的消息开始其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

消费者监听消息的基本思路

STREAM类型消息队列的XREADGROUP命令优缺点

优点:1、消息可回溯2、可以多消费者争抢消息,加快消费速度3、可以阻塞读取4、没有消息漏读的风险5、有消息确认机制,保证消息至少被消费一次
缺点:1、只能做到消费者消息确认,无法做到生产者消息确认,即生产者发送数据给消息队列时出现问题了,数据就丢失了2、无法和专业的MQ服务比,如果下公司要求不严苛,可以使用这种方式来节约成本。

总结

 如果小公司为了节约成本,可以使用Stream消费组来实现消息队列。但是对消息队列有硬性要求还是需要使用专业的MQ服务。

基于Redis的Stream结构作为消息队列,实现异步秒杀下单

需求:1、创建一个Stream类型的消息队列,名为stream.orders(无需在Java中创建,我们使用命令行创建即可,XGROUP CREATE stream.orders g1 0 MKSTREAM)2、修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId3、项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单注意!!!下面的代码可能有点问题,我把主线程的proxy赋值提前了,因为不提前,会先执行Lua脚本,往Stream消息队列中放数据,会被异步线程监测到,调用proxy.createVoucherOrder()方法,会报空指针异常,但是大部分时间都不会报错,因为主线程调用Lua脚本后只执行一个if判断语句以后就马上给proxy赋值,所以大概率是因为主线程给proxy赋值快于异步线程调用proxy.createVoucherOrder()的,但是把主线程给proxy赋值放在Lua脚本后面执行就是会概率报错的,所以最好放到前面。

1、Lua脚本

-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get',stockKey)) <= 0) then-- 库存不足,返回1return 1
end-- 3.2.判断用户是否下单 sismember orderKey userId
if(redis.call('sismember',orderKey,userId) == 1) then-- 存在,说明是重复下单,返回2return 2
end-- 3.3.扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.5.下单(保存用户) sadd orderKey userId
redis.call('sadd',orderKey,userId)
-- 3.6.发送消息到队列中,XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
-- 下单成功,返回0
return 0

2、主线程代码

@Override
public Result seckillVoucher(Long voucherId) {// 获取用户idLong userId = UserHolder.getUser().getId();// 获取订单idlong orderId = redisIdWorker.nextId("order");// 赋值主线程的代理类,这里提前赋值了,不然Lua脚本会先执行,会handleVoucherOrder()中调用proxy.createVoucherOrder(),所以我觉得要在主线程方法seckillVoucher()中提早赋值,否则proxy是null会报错proxy = (IVoucherOrderService) AopContext.currentProxy();// 1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),    // 如果没有KEYS数组的参数就给一个空的List,不能给nullvoucherId.toString(), userId.toString(),String.valueOf(orderId)    // 因为传入的参数必须为String,在里面强转会报错);// 2.判断结果是否为0int r = result.intValue();if (r != 0) {// 不为0,库存不足 或 不能重复下单return Result.fail(result == 1 ? "库存不足!" : "不能重复下单!");}// 4.结果为0,有购买资格,返回订单idreturn Result.ok(orderId);
}

3、异步线程

@Resource
private ISeckillVoucherService seckillVoucherService;@Resource
private RedisIdWorker redisIdWorker;@Resource
private StringRedisTemplate stringRedisTemplate;@Resource
private RedissonClient redissonClient;// 创建调用Lua脚本所需对象,因为给每个线程都创建io流很浪费资源和性能,所以这里使用静态属性的方式
public static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {// 创建RedisScript接口的实现类SECKILL_SCRIPT = new DefaultRedisScript<>();// 读取类路径下的Lua脚本SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));// 指定返回值泛型,其实在创建对象的时候就可以指定泛型了SECKILL_SCRIPT.setResultType(Long.class);
}// 创建只有单个线程的线程池
public static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();// 提前获取主线程的的代理对象,不然在异步线程无法获取到
private IVoucherOrderService proxy;/*什么时候执行这个任务呢,肯定是用户秒杀抢购之前开始,因为用户一旦开始秒杀,他就会向队列里添加新的订单,那我们任务就要去队列取出订单信息,所以他必须在队列之前执行。事实上,这个项目一启动,用户随时可能来抢购,所以应该在这个类初始化之后赶紧的执行这个任务。可以通过spring提供的注解来做:@PostConstruct,这个注解是在当前类初始化完毕后,去执行init方法。
*/
@PostConstruct  // Spring注解,在该类加载完成以后,马上执行该方法
private void init(){// 提交线程任务,异步读取阻塞队列并写入数据库SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}// 内部类,线程要执行的读取阻塞队列任务
private  class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@Overridepublic void run() {while (true) {try {// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));// 2.判断消息获取是否成功if (list == null || list.isEmpty()) {// 如果获取失败,说明没有消息,继续下一次循环continue;}// 3.如果获取成功,可以下单// <String, Object, Object> 分别代表 消息的唯一标识id、<Object,Object>为消息的键值对MapRecord<String, Object, Object> record = list.get(0);// Map<Object,Object> 就是消息里的键值对,封装成了Map而已Map<Object, Object> values = record.getValue();// map转为订单对象,true的意思是忽略转换产生的错误VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 4.如果获取成功,可以下单handleVoucherOrder(voucherOrder);// 5.ACK确认 SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理订单异常",e);// 如果处理过程中,出现异常,导致消息没有被确认,调用该方法,去pending-list中重新进行消息确认handlePendingList();}}}private void handlePendingList(){while (true) {try {// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));// 2.判断消息获取是否成功if (list == null || list.isEmpty()) {// 如果获取失败,说明pending-list没有消息,即已经被上一次循环完成了消息确认,结束循环break;}// 3.解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 4.如果获取成功,可以下单handleVoucherOrder(voucherOrder);// 5.ACK确认 SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理pending-list订单异常",e);try {// 这里只是为了防止线程一直获取不成功,进入死循环,占用CPU,所以每次稍微睡眠一下Thread.sleep(20);} catch (InterruptedException interruptedException) {interruptedException.printStackTrace();}}}}
}private void handleVoucherOrder(VoucherOrder voucherOrder) {// 1.获取用户idLong userId = voucherOrder.getUserId();// 2.创建锁对象RLock lock = redissonClient.getLock("lock:order:" + userId);// 3.获取锁boolean isLock = lock.tryLock();// 4.判断是否获取锁成功if (!isLock) {// 获取锁失败,返回错误或重试log.error("不允许重复下单");return;}try {// 获取锁成功,把订单信息写入数据库System.out.println("handleVoucherOrder.proxy = " + proxy);// 由于这里会调用proxy,所以我得在主线程方法seckillVoucher()中提早赋值,否则proxy是null会报错proxy.createVoucherOrder(voucherOrder);} finally {// 释放锁lock.unlock();}}@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {// 5.扣减库存,并和判断一人一单为原子操作boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0)      // 这里把 UserHolder.getUser().getId()换成了voucherOrder.getUserId(),// 是因为这一步操作是有异步线程来完成的,他独立于主线程的,所以他的Threadlocal中不会有User对象.apply("(select count(*) from tb_voucher_order where user_id = " + voucherOrder.getUserId() +" AND voucher_id = " + voucherOrder.getVoucherId() + ") = 0").update();// 6.判断是否扣减库存成功if (!success) {// 限制一人一单Integer count = query().eq("user_id", voucherOrder.getUserId()).eq("voucher_id", voucherOrder.getVoucherId()).count();if (count > 0) {log.error("一人一单!不能重复购买!");return;}// 库存不足log.error("库存不足!");return;}// 扣减库存成功,保存订单到数据库中去save(voucherOrder);
}

探店笔记

发布探店笔记

 发布探店笔记的功能,其实就是和发小红书一样,主要分为两张表,tb_blog表(探店笔记表)和tb_blog_comments表(其他用户评价表)。同时要注意,由于上传图片的功能别的地方也会用到,所以这里把上传图片的功能抽离了出现,作为了一个独立的接口。而且上传图片的功能一般都是上传到阿里云OSS中,最差也是上传到服务器中,这里就先放在本地nginx目录下了,所以要记得在SYstemContants类下修改保存地址,上传的图片名字会被Hash算法处理并重命名,所以不用担心重名问题。注意!!!由于这一部分和Redis没有关系,所以由黑马老师提供的代码直接实现了,我们不用去管,了解一下就可以,之后别的实战项目中会遇到的。

实现查看笔记详细信息

 添加笔记功能,已经由黑马老师提供了,但是详细查看笔记内容的功能,我们需要实现,显示如下:

Bean:
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_blog")
public class Blog implements Serializable {/*** 一下三个属性,不是blog表的字段,但是由于业务需求,需要一起返回这些信息给前端,所以在Blog类中加了这些属性,但*   是不能让他们加入Mybatis-Plus自带的方法中,所以要使用@TableField注解,这样MyBatis-Plus就不会把他们作为字段了*    这里把其他主要属性省略掉没复制过来*//*** 用户图标*/@TableField(exist = false)private String icon;/*** 用户姓名*/@TableField(exist = false)private String name;
}Controller:
@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") Long id){return blogService.queryBlogById(id);
}ServiceImpl:
@Override
public Result queryBlogById(Long id) {// 1.查询blog笔记Blog blog = getById(id);if (blog == null) {return Result.fail("笔记不存在!");}// 2.查询发布笔记的作者queryBlogUser(blog);return Result.ok(blog);
}private void queryBlogUser(Blog blog) {Long userId = blog.getUserId();User user = userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());
}

点赞

 在首页的热点笔记显示和详细笔记显示中,都会显示用户是否点赞图标。针对这个需求,我们设计了Redis中的Set结构用来存储点赞的用户id,以及在数据库中的表记录具体点赞数量。做到了Redis+数据库的结构。

需求:1、同一个用户只能点赞一次,再次点击则取消点赞2、如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)实现步骤:1、给Blog类中添加一个isLike字段,表示是否被当前用户点赞2、修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-13、修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值给isLike字段4、修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段
Bean:
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_blog")
public class Blog implements Serializable {/*** 一下三个属性,不是blog表的字段,但是由于业务需求,需要一起返回这些信息给前端,所以在Blog类中加了这些属性,但*   是不能让他们加入Mybatis-Plus自带的方法中,所以要使用@TableField注解,这样MyBatis-Plus就不会把他们作为字段了*    这里把其他主要属性省略掉没复制过来*//*** 是否点赞过了*/@TableField(exist = false)private Boolean isLike;
}Controller:
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {return blogService.likeBlog(id);
}ServiceImpl:
@Override
public Result likeBlog(Long id) {// 1.获取登录用户Long userId = UserHolder.getUser().getId();// 2.判断当前用户是否已经点赞String key = "blog:liked:" + id;Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());if (BooleanUtil.isFalse(isMember)) {// 3.如果未点赞,可以点赞// 3.1.数据库点赞数 + 1boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();// 3.2.保存用户到Redis的set集合if (isSuccess) {stringRedisTemplate.opsForSet().add(key,userId.toString());}}else {// 4.如果已点赞,取消点赞// 4.1.数据库点赞数-1boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();// 4.2.把用户从Redis的set集合移除if (isSuccess) {// 注意,Redis中的Set集合,要是最后一个元素被移除了,那么这个Set就会消失。stringRedisTemplate.opsForSet().remove(key,userId.toString());}}return Result.ok();
}最后还要设置,查询Blog的方法里,判断该用户是否点赞某笔记,并设置Blog对象的isLike属性
// 判断用户是否点赞某笔记
private void isBlogLiked(Blog blog){// 1.获取登录用户UserDTO user = UserHolder.getUser();// 这里需要做是否登录的判断是因为,热点笔记的查询不登录也能查看,所以在登录拦截其中放行了这条请求,// 如果其他被拦截的请求有这种情况,是不需要判断的,因为只有登录了才能请求到// 这里先判断用户是否登录,未登录不需要调用UserHolder.getUser().getId(),不然会报空指针异常if (user == null) {return;}Long userId = user.getId();// 2.判断当前用户是否已经点赞String key = "blog:liked:" + blog.getId();Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());// 3.设置是否点赞结果blog.setIsLike(BooleanUtil.isTrue(isMember));
}// 应用isLike属性后,修改的查询热点笔记方法
@Override
public Result queryHotBlog(Integer current) {// 根据用户查询Page<Blog> page = query().orderByDesc("liked").page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据List<Blog> records = page.getRecords();// 查询用户records.forEach(blog -> {this.queryBlogUser(blog);// 查询该用户点赞此博客,并赋值isLike属性this.isBlogLiked(blog);});return Result.ok(records);
}// 应用isLike属性后,修改的查询笔记方法
@Override
public Result queryBlogById(Long id) {// 1.查询blog笔记Blog blog = getById(id);if (blog == null) {return Result.fail("笔记不存在!");}// 2.查询发布笔记的作者queryBlogUser(blog);// 3.查询该用户点赞此博客,并赋值isLike属性isBlogLiked(blog);return Result.ok(blog);
}

点赞排行榜(top5)

    在探店笔记的详情页面,应该把给该笔记点赞的人显示出来,比如最早点赞的TOP5,形成点赞排行榜。

 经过分析最终选择Zset作为Redis中存储点赞用户信息的数据结构,key:(value:score)。1、其中key,value不变,score为插入时的时间戳,使用System.currentTimeMillis()即可。2、由于Zset数据类型中,没有判断某一数据是否存在的方法,这里使用zscore key命令来代替,若key存在会返回score值,不存在则返回nil,Spring在java中会封装为null。3、sql语句中使用in(?,?),排序会默认为升序,需要使用order by 或者order by field(字段,字段值1,字段值2,...)来指定排序顺序。4、下面的语句里,用了很多api操作,现在先理解是目的是什么就可以了,以后可以专门学习一下api操作。

1、改造存储结构为Zet

Controller:
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {return blogService.likeBlog(id);
}ServiceImpl:
@Override
public Result likeBlog(Long id) {// 1.获取登录用户Long userId = UserHolder.getUser().getId();// 2.判断当前用户是否已经点赞   zscore keyString key = BLOG_LIKED_KEY + id;Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());if (score == null) {// 3.如果未点赞,可以点赞// 3.1.数据库点赞数 + 1boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();// 3.2.保存用户到Redis的set集合if (isSuccess) {stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());}}else {// 4.如果已点赞,取消点赞// 4.1.数据库点赞数-1boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();// 4.2.把用户从Redis的set集合移除if (isSuccess) {stringRedisTemplate.opsForZSet().remove(key,userId.toString());}}return Result.ok();
}private void isBlogLiked(Blog blog){// 1.获取登录用户UserDTO user = UserHolder.getUser();// 这里先判断用户是否登录,未登录不需要调用UserHolder.getUser().getId(),不然会报空指针异常if (user == null) {return;}Long userId = user.getId();// 2.判断当前用户是否已经点赞String key = BLOG_LIKED_KEY + blog.getId();Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());// 3.设置是否点赞结果blog.setIsLike(score != null);
}

2、查询点赞排行榜top5(按时间升序)

Controller;
@GetMapping("/likes/{id}")
public Result queryBlogLikes(@PathVariable("id") Long id){return blogService.queryBlogLikes(id);
}ServiceImpl:
@Override
public Result queryBlogLikes(Long id) {String key = BLOG_LIKED_KEY + id;// 1.查询top5的点赞用户 zrang key 0 4 默认按score升序,前包后包Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);if (top5 == null || top5.isEmpty()) {// 若top5为空,即没人点赞,应当返回空列表,而不是nullreturn Result.ok(Collections.emptyList());}// 2.解析出其中的用户idList<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());// 把top5的id转换成(?,?,...)的格式,用于sql语句中拼接String idStr = StrUtil.join(",", ids);// 3.根据用户id查询用户 where id in (5,1) order by field(id,5,1)List<UserDTO> userDTOS = userService.query().in("id", ids).last("order by field(id," + idStr + ")").list()// 这里一段看不懂的操作,是用来把List<User>转换为List<UserDTO>的。.stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());// 4.返回return Result.ok(userDTOS);
}

好友关注

关注和取关

需求:基于该表数据结构,实现两个接口:1、关注和取关接口2、判断是否关注的接口关注是User之间的关系,是博主与粉丝的关系,数据库中有一张tb_follow表来标示:CREATE TABLE `tb_follow`  (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',`user_id` bigint(20) UNSIGNED NOT NULL COMMENT '用户id',`follow_user_id` bigint(20) UNSIGNED NOT NULL COMMENT '关联的用户id',`create_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = COMPACT;注意: 这里需要把主键修改为自增长,简化开发。同时要注意字段`create_time`的属性为NOT NULL DEFAULT CURRENT_TIMESTAMP(0),即不指定该字段时。默认为当前时间的时间戳。所以后面添加关注记录不需要指定`create_time`属性。

实现代码

Bean;
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_follow")
public class Follow implements Serializable {private static final long serialVersionUID = 1L;/*** 主键*/@TableId(value = "id", type = IdType.AUTO)private Long id;/*** 用户id*/private Long userId;/*** 关联的用户id*/private Long followUserId;/*** 创建时间*/private LocalDateTime createTime;
}Controller:
// isFollow为true代表关注,false代表取关
// 这里的isFollow和下面方法返回的是相反的,这里由前端操作,我们不需要管
@PutMapping("/{id}/{isFollow}")
public Result follow(@PathVariable("id") Long followUserId, @PathVariable("isFollow") Boolean isFollow){return followService.follow(followUserId,isFollow);
}// 判断该用户是否以及目标用户,已关注返回true,未关注返回false
@GetMapping("/or/not/{id}")
public Result isFollow(@PathVariable("id") Long followUserId){return followService.isFollow(followUserId);
}Serviceimpl:
@Override
public Result follow(Long followUserId, Boolean isFollow) {// 1.获取登录用户Long userId = UserHolder.getUser().getId();// 2.判断是要关注还是取关if (isFollow) {// 3.关注,新增数据// 这里的Follow对象不需要指定createTime属性,因为字段`create_time`的属性为NOT NULL DEFAULT CURRENT_TIMESTAMP(0)// 即不指定该字段时。默认为当前时间的时间戳。所以后面添加关注记录不需要指定`create_time`属性。Follow follow = new Follow();follow.setUserId(userId);follow.setFollowUserId(followUserId);save(follow);} else {// 4.取关,删除数据remove(new QueryWrapper<Follow>().eq("user_id",userId).eq("follow_user_id",followUserId));}return Result.ok();
}@Override
public Result isFollow(Long followUserId) {// 1.获取登录用户Long userId = UserHolder.getUser().getId();// 2.查询是否关注 select count(*) from tb_follow where user_id = ? and follow_user_id = ?Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();// 3.判断并返回return Result.ok(count > 0);
}

共同关注

 想要查看自己与博主(别的用户)的共同关注,就得先点到别的用户简介界面里,但是这两个接口和redis没有什么关系,就是简单的CRUD,所以直接使用黑马老师提供代码就可以。进入博主个人界面主要依赖的两个接口:1、根据id查询user信息:UserController下:// 用于查询用户个人信息@GetMapping("/{id}")public Result queryUserById(@PathVariable("id") Long userId){User user = userService.getById(userId);if (user == null) {return Result.ok();}UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);return Result.ok(userDTO);}2、根据id查询博主的探店笔记:BlogController下:// 用于获取某用户的全部博客@GetMapping("/of/user")public Result queryBlogByUserId(@RequestParam(value = "current", defaultValue = "1") Integer current,@RequestParam("id") Long id) {Page<Blog> page = blogService.query().eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));return Result.ok(page.getRecords());}

实现共同关注功能

需求:利用Redis中Set数据类型,实现共同关注功能。在博主个人页面展示出当前用户与博主的共同好友。原理:其实就是把数据库中的用户关注信息同步到Redis的Set数据类型中,然后使用Set类型的(sinter key1 key2 ...)命令,来快速获取交集返回,即共同关注。

1、修改关注和取关部分代码(同步存储在Redis的Set数据类型中)

// 关注和取关部分代码只修改了这部分,其余代码均为改动
@Override
public Result follow(Long followUserId, Boolean isFollow) {// 1.获取登录用户Long userId = UserHolder.getUser().getId();String key = "follows:" + userId;// 2.判断是要关注还是取关if (isFollow) {// 3.关注,新增数据Follow follow = new Follow();follow.setUserId(userId);follow.setFollowUserId(followUserId);boolean isSuccess = save(follow);// 将数据同步到redis中,用于获得共同关注if (isSuccess) {stringRedisTemplate.opsForSet().add(key,followUserId.toString());}} else {// 4.取关,删除数据boolean isSuccess = remove(new QueryWrapper<Follow>().eq("user_id", userId).eq("follow_user_id", followUserId));// 将数据同步到redis中,用于获得共同关注if (isSuccess) {stringRedisTemplate.opsForSet().remove(key,followUserId.toString());}}return Result.ok();
}

2、共同关注代码(求两个用户的关注Set的交集)

Controller:
@GetMapping("/common/{id}")
public Result followCommons(@PathVariable("id") Long id){return followService.followCommons(id);
}ServiceImpl:
@Override
public Result followCommons(Long id) {// 1.获取当前登录用户Long userId = UserHolder.getUser().getId();String key1 = "follows:" + userId;String key2 = "follows:" + id;// 2.求交集Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);if (intersect == null || intersect.isEmpty()) {// 无交集,返回空列表return Result.ok(Collections.emptyList());}// 3.解析id集合,把Set<String>交集转换为List<Long>,方便下面使用List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());// 4.查询用户,同时把List<User>转为List<UserDTO>,由于共同关注不需要有序性,所以这里可以直接使用listByIds()List<UserDTO> userDTOS = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());// 返回return Result.ok(userDTOS);
}

关注推送(Feed流)

关注推送也叫做Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。

Feed流的模式

Feed流产品有两种常见模式:Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈优点:信息全面,不会有缺失。并且实现也相对简单缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷缺点:如果算法不精准,可能起到反作用本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:1、拉模式2、推模式3、推拉结合

Feed流的实现方案(Timeline模式)

拉模式

 拉模式:也叫做读扩散。拿B站up主发视频举例。如果一个用户关注了某个UP主,这个UP主发视频了,那么这个UP主的视频会先发送到一个一个名为“发件箱”的存储空间,当用户主动打开“推荐”页面的时候,这个时候服务器才会取发件箱里读取数据,拉取到用户的"收件箱"存储空间里。

推模式

    推模式:也叫做写扩散。拿B站up主发视频举例。如果一个用户关注了某个UP主,这个UP主发视频了,服务器会直接把这个视频发送到粉丝的“收件箱”中,不再存在“发件箱”。

推拉结合模式

 推拉结合模式:也叫做读写混合,兼具推和拉两种模式的优点。拿B站up主发视频举例。UP主发布视频,对于粉丝量少的UP主,不区分活跃粉丝和普通粉丝,都直接发送到粉丝的收件箱中,不存在“发件箱”存储空间;对于粉丝多的UP主,我们区分活跃粉丝和普通粉丝,对于活跃粉丝,直接发送视频到其“收件箱”,而对于普通粉丝,先把UP主的视频存放在"发件箱"存储空间,等普通粉丝主动去读取了再发送到其“收件箱”。

总结

基于推模式实现关注推送功能

需求:1、修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱(Redis中的Zset类型)2、收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现3、查询收件箱数据时,可以实现分页查询基于以上分析,最终决定采用推模式实现该项目的关注推送功能。同时可以实现这个需求的有列表List、有序集合集合Zset,最终选择了Zset作为收件箱的数据结构,因为使用List会出现数据重复读取的问题,以下说明。
 采用List数据类型作为收件箱,在分页情况下,当读取完第一页以后,如是在读取第二页区间,有新的消息插入了,会导致角标数据发生偏移,由于List采用的角标记录分页片段,所以会造成重复读取数据的情况。

 采用Zset数据类型作为收件箱,可以把score(如:时间戳等)作为代替角标的方案,可以完美解决重复读取数据的问题,但是我在想的是,用户多起来的时候,并发情况也多起来了,那要是出现了相同的时间戳不就还是会发生重复读取数据的问题么,但是我觉得和List方案比起来是小概率事件,或者尝试用(雪花算法、UUID等代替时间戳)。

实现代码

 其实实际上,就是采用了Redis+数据库的存储结构,使用Redis作为收件箱,达到快速读取消息的目的,每次都访问数据库获取数据将会很慢,且影响性能。所以在这里,我们需要改变的就是,修改用户发布博客的方法,除了原本就有的保存到数据库,还要新增一个把Blog对象的id属性(博客id)保存到Redis的Zset中去(每个用户的收件箱)。Zeet中为什么只保存博客id,不保存整条博客呢?答案是显而易见的,节省Redis的内存空间。
Controller:
@PostMapping
public Result saveBlog(@RequestBody Blog blog) {return blogService.saveBlog(blog);
}ServiceImpl:
@Override
public Result saveBlog(Blog blog) {// 1.获取登录用户UserDTO user = UserHolder.getUser();blog.setUserId(user.getId());// 2.保存探店博文boolean isSuccess = save(blog);if (!isSuccess) {return Result.fail("新增笔记失败!");}// 3.查询笔记作者的所有粉丝List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();// 4.推送笔记id给所有粉丝for (Follow follow : follows) {// 4.1.获取粉丝idLong userId = follow.getUserId();// 4.2.推送String key = "feed:" + userId;stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());}// 5.返回idreturn Result.ok(blog.getId());
}

实现关注推送页面的分页查询

需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息
PS:以滚动分页的形式展示,意思就是就像微信朋友圈,你点到朋友圈里以后,你向下滚动查看之前好友发的朋友圈,这时候只会向下更新数据,当前页面的最顶部数据是不会更新的,只有你退出朋友圈重新再点进来或者你在最顶部的时候才会更新顶部的最新数据展示。暂时先这么理解,不知道对不对。分析:
1、第一次发起请求,lastId值为当前时间戳,且前端不需要传offset参数,因为固定为0,但是也可以传过来,这个无所谓;其次返回值为查询出来的分页数据、本次最小时间戳、偏移量,把这些参数返回给前端,让前端下次再传过来,第一次不带offset参数的问题让前端去解决就可以了,我们在后端做个判断即可。
2、首先,可以确认的是,使用的是zrevrangebyscore命令来查找需要的分页数据的,完整用到的命令格式为zrevrangebyscore key max min withscores limit offset count参数:max:要查询score的最大值,和min组成一个区间,前包后包min:要查询score的最小值,和max组成一个区间,前包后包witscores:返回内容加上key的scorelimit:分页操作,和mysql里的用法一致offset:偏移量,对按照[min,max]查询的结果,进行分页处理,代表从第几个参数开始获取,0代表第一个count:总量,即从offset开始获取几个元素。
3、对滚动分页查询参数的要求:max: 第一次查询时为当前时间戳(即获取最新的数据,当前时间戳就是最大值了) | 上次查询的最小时间戳(为了分页到下一页)min: 0(固定写死为0即可,因为对最小值时间没有限制,时间戳最小也不会是负数)offset:第一次查询时为0(因为第一次获取就是要从开头获取) | 在上次查询的结果中,与最小值一样的元素个数count: 固定写死每页需要的几条记录即可注意!!!offset即使这样设置了,还是会有bug存在,因为Zset中存在大于固定值每个记录数count的重复元素时,就会出现Bug。
举例:count为3,存在值为5重复元素6个,Zset的score为 5 5 5 5 5 5 4 3 2第一次获取为 5 5 5,第二次获取为 5 5 5,第三次获取为 5 5 5,之后也全都是 5 5 5
很显然,上面就出现了Bug,一直在重复获取5 5 5,想要缓解和降低出现的概率就加大count的值,或者采用别的办法解决(可以尝试雪花id、UUID等)。

实现代码

 原理就是根据传入的lastId和offset,查询出收件箱中的分页数据,然后再根据存放在List<Long>的blogId们查询出List<Blog> blogs,同时要想办法把最新的minTime、offset得出来并设置好,等最后一起封装返回给前端。这里不能忘记的是,还要查询并设置blogs中每一个Blog对象的作者、以及该用户是否点赞属性isLiked。最后要把blogs、minTime(时间戳)、offset封装到ScrollResult对象中返回。
Bean:
@Data  // 创建一个通用的Bean,用来方便返回这样类型的数据给前端
public class ScrollResult {// 泛型为?,为了通用private List<?> list;private Long minTime;private Integer offset;
}@Controller:  BlogController下
@GetMapping("/of/follow")
public Result queryBlogOfFollow(@RequestParam("lastId") Long max,@RequestParam(value = "offset",defaultValue = "0") Integer offset
){return blogService.queryBlogOfFollow(max,offset);
}@ServiceImpl:
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {// 1.获取当前用户Long userId = UserHolder.getUser().getId();// 2.查询收件箱 zrevrangebyscore key max min withscores limit offset countString key = FEED_KEY + userId;Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key,0,max,offset,2);// 3.非空判断if (typedTuples == null || typedTuples.isEmpty()) {// 收件箱没有数据,关注的人没有发笔记return Result.ok();}// 4.解析数据:blogId、minTime(时间戳)、offset// 注意!!!一开始就要根据typedTuples的长度来指定ArrayList的长度,// 防止内存空间浪费(默认长度为16),以后工作中肯恶搞要经常考虑这个问题ArrayList<Long> ids = new ArrayList<>(typedTuples.size());long minTime = 0; // 局部变量必须初始化赋值,好像是为了防止野指针泛滥int os = 1; // 因为limit前包后包,所以上一次的最小值元素个数最起码为1个for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {// 4.1.获取idids.add(Long.valueOf(tuple.getValue()));// 4.2.获取score(时间戳)long time = tuple.getScore().longValue();if (time == minTime) {os++;}else {minTime = time;os = 1;}}// 5.根据id查询blog,注意mysql中in的升序问题String idStr = StrUtil.join(",", ids);List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();for (Blog blog : blogs) {// 5.1.查询发布笔记的作者queryBlogUser(blog);// 5.2.查询该用户点赞此博客,并赋值isLike属性isBlogLiked(blog);}// 6.封装并返回ScrollResult r = new ScrollResult();r.setList(blogs);r.setOffset(os);r.setMinTime(minTime);return Result.ok(r);
}

附近商户

附近商户搜索

在首页中点击某个频道,即可看到频道下的商户:

需求:点击分类图标后,根据当前位置按照商铺离我的距离,升序排序

实现代码

原理:按照商户类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中即可1、SpringDataRedis的2.3.9版本并不支持Redis 6.2提供的GEOSEARCH命令,因此我们需要提示其版本,修改自己的POM文件,内容如下:
2、由于GEOSEARCH命令不存在分页功能,所以我们后面使用的是Spring提供 [end]方式的分页,然后自己再处理一下,才是 [from,end]的分页数据。所以整段代码,其实查询id数据并排序 和 根据查询出来的id去数据库中查询出List<Shop>返回给前端这两部是不难的,主要难的是使用Spring提供的分页方法实现分页功能,以及分页后的数据处理!!!

1、首先把原本在数据库中的店铺数据导入到redis的GEO数据类型里(为了节省内存空间,只导入[经度,维度,店铺id])@Testvoid loadShopData(){// 1.查询店铺信息List<Shop> list = shopService.list();// 2.把店铺分组,按照typeId分组,typeId一致的放到一个集合Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));// 3.分批完成写入Redisfor (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {// 3.1.获取类型idLong typeId = entry.getKey();String key = "shop:geo:" + typeId;// 3.2.获取同类型的店铺的集合List<Shop> value = entry.getValue();// 创建该对象,用于批量插入ArrayList<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());// 3.2.写入redis GEOADD key 经度 维度 memberfor (Shop shop : value) {// 这个方法是每一条数据添加一次,性能损耗大,建议后面的批量方法// stringRedisTemplate.opsForGeo().add(key,new Point(shop.getX(),shop.getY(),shop.getId().toString())// 先添加到这个List中,后面一次性批量插入locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),new Point(shop.getX(),shop.getY())));}// 批量插入locations里的数据// 3.3.写入redis GEOADD key 经度 维度 memberstringRedisTemplate.opsForGeo().add(key,locations);}}2、修改Controller、ServiceImpl方法,实现从redis的GEO中读取[经度,维度,商铺id]数据进行距离升序排序,然后使用排序后的List<Long>shopid去查询数据库中的详细数据,封装成List<Shop>返回给前端。Controller:   ShopController下@GetMapping("/of/type")public Result queryShopByType(@RequestParam("typeId") Integer typeId,@RequestParam(value = "current", defaultValue = "1") Integer current,@RequestParam(value = "x",required = false) Double x,@RequestParam(value = "y",required = false) Double y) {return shopService.queryShopByType(typeId,current,x,y);}ServiceImpl:@Overridepublic Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {// 1.判断是否需要根据坐标查询if (x == null || y == null) {// 根据类型分页查询// 不需要坐标查询,直接走数据库查询(一般来说,还会按照别的规律查询,但是目前重点不是这里,所以统一为走数据库查询)Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));// 返回数据return Result.ok(page.getRecords());}// 2.计算分页参数// 采用 [form,end] 区间的格式,而不是采用 limit 的格式,是因为GEOSEARCHSTORE命令只支持前者,而不支持后者,我猜是这样的// from 代表的是从第一个参数开始,并不是从第几页开始!!!int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;// end 代表的是第几个参数结束,和from组成 [from,end]区间int end = current * SystemConstants.DEFAULT_PAGE_SIZE;// 3.查询redis,按照距离升序排序、分页。结果:shopId、distance(经度纬度)String key = SHOP_GEO_KEY + typeId;// GEOSEARCH key FROMLONLAT x y BYRADIUS 5000 m WITHDIST 注意!!!GEOSEARCH命令没有分页功能// GeoResults<RedisGeoCommands.GeoLocation<String>> search 对象是一个封装结构的对象// 它有一个重要的属性:results,他才是真正的封装查询结果的东西,获取它的方法是getContent(),不是getResults()!!!// results是一个List,它的每个元素都是一条记录,有属性:name、point,分别代表 number、经度纬度GeoResults<RedisGeoCommands.GeoLocation<String>> search = stringRedisTemplate.opsForGeo().search(key,GeoReference.fromCoordinate(x, y),// 不指定单位的话,默认单位为mnew Distance(5000),// 这个分页是Spring实现的,GEOSEARCH不存在分页功能,所以我们只用使用Spring提供 [0,end]方式的分页RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));// 4.解析出idif (search == null) {// results不存在,返回空列表return Result.ok(Collections.emptyList());}// list代表的是List<results>,每个元素都是一条GEO记录List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = search.getContent();// 判断 form 的位置是否超过了list中存在的个数if (list.size() <= from) {// 没有下一页了,结束return Result.ok(Collections.emptyList());}// 4.1.截取 [from,end]部分,同时指定大小,不浪费内存空间ArrayList<Long> ids = new ArrayList<>(list.size());Map<String, Distance> distanceMap = new HashMap<String, Distance>(list.size());// skip()方法,跳过from位置之前的元素,// 因为GEOSEARCH不存在分页功能,所以我们前面使用的是Spring提供 [0,end]方式的分页,然后自己再处理一下,才是 [from,end]的分页数据list.stream().skip(from).forEach(result -> {// 4.2.获取店铺idString shopIdStr = result.getContent().getName();// 添加到ids.add(Long.valueOf(shopIdStr));// 4.3.获取距离(point对象,距离存储在里面,后面会取出来)Distance distance = result.getDistance();distanceMap.put(shopIdStr,distance);});// 5.根据id去数据库中查询ShopString idStr = StrUtil.join(",", ids);// 注意mysql中in的排序问题List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();for (Shop shop : shops) {// getValue()获取的就是返回的Point对象中存储的距离shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());}// 6.返回return Result.ok(shops);}

用户签到

需求:实现签到接口,将当前用户当天签到信息保存到Redis中提示:因为BitMap底层是基于String数据结构,因此其操作也都封装在(opsForValue)字符串相关操作中了,所以直接使用opsForValue()操作即可。

Controller:
@PostMapping("/sign")
public Result sign(){return userService.sign();
}ServiceImpl:
@Override
public Result sign() {// 1.获取当前登录用户Long userId = UserHolder.getUser().getId();// 2.获取日期LocalDateTime now = LocalDateTime.now();// 3.拼接keyString keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = USER_SIGN_KEY + userId + keySuffix;// 4.获取今天是本月的第几天int dayOfMonth = now.getDayOfMonth();// 5.写入Redis setbit key offset 1stringRedisTemplate.opsForValue().setBit(key,dayOfMonth - 1,true);return Result.ok();
}注意!!!以上代码,在黑马店铺的项目里并没有前端,所以我们使用ApiPost等工具手动发起请求测试即可,请求路径:localhost:8080/api/user/sign,记得在Header中加入登录信息authorization=token。

签到统计

UV统计

 由于我们没有那么多用户量来做UV统计,所以这里就做一个Test方法来做UV统计,反正原理懂了,知道怎么实现的就可以了

实现代码

@Test
void testHyperLogLog(){String[] values = new String[1000];int j = 0;for (int i = 0; i < 1000000; i++) {j = i % 1000;values[j] = "user_" + i;if (j == 999) {// 发送到RedisstringRedisTemplate.opsForHyperLogLog().add("hll1",values);}}// 统计数量Long count = stringRedisTemplate.opsForHyperLogLog().size("hll1");System.out.println("count = " + count);
}结果:
count = 997593    // 和100w的实际误差非常小

Redis笔记-实战篇(黑马视频教程)相关推荐

  1. Redis笔记-基础篇(黑马视频教程)

    写在开头 这是我在观看黑马Redis视频教程中根据PPT和上课内容,个人写的笔记,中间有部分来源于百度,如有侵权,联系我删除. 文章目录 写在开头 NoSQL数据库简介 技术发展 NoSQL数据库 R ...

  2. 2022黑马Redis跟学笔记.实战篇(二)

    2022黑马Redis跟学笔记.实战篇 二 实战篇Redis 开篇导读 4.1短信登录 4.1.1. 搭建黑马点评项目 一.导入黑马点评项目 二.导入SQL 三.有关当前模型 四.导入后端项目 相关依 ...

  3. Redis学习笔记②实战篇_黑马点评项目

    若文章内容或图片失效,请留言反馈.部分素材来自网络,若不小心影响到您的利益,请联系博主删除. 资料链接:https://pan.baidu.com/s/1189u6u4icQYHg_9_7ovWmA( ...

  4. 黑马Redis笔记高级篇 | 多级缓存

    黑马Redis笔记高级篇 | 多级缓存(黑马教程云服务器踩坑记录) 1.JVM进程缓存(tomcat服务内部) 1.1 导入商品案例 1.2 初识Caffeine 1.3 实现进程缓存 2.Lua语法 ...

  5. 【Redis】实战篇:优惠卷秒杀 (库存超卖问题、一人一单问题)

    文章目录 3.1 全局唯一ID 3.2 -Redis实现全局唯一Id 3.3 添加优惠卷 3.4 实现秒杀下单 3.5 库存超卖问题分析 3.6 乐观锁解决超卖问题 3.7 优惠券秒杀-一人一单 3. ...

  6. 黑马Redis学习——实战篇(4)

    目录 4.分布式锁 4.1 .基本原理和实现方式对比 4.2 .Redis分布式锁的实现核心思路 4.3 实现分布式锁版本一 4.4 Redis分布式锁误删情况说明 4.5 解决Redis分布式锁误删 ...

  7. Redis笔记基础篇:6分钟看完Redis的八种数据类型

    目录 一.常识补充 二.安装 三.启动redis 四.常用基础命令 五.Redis五大基本数据类型 5.1.String 5.2.Hash 5.3.List 5.4.Set 5.5.Zset 六.三大 ...

  8. Redis笔记-入门篇

    Nosql概述 为什么要用Nosql 大数据时代, 大数据 一般的数据库无法进行分析处理了! 1.单机MySQL的年代 90年代,一个基本的访问量一般不会太大,单个数据库完全足够! 那个时候,更多的去 ...

  9. DDD学习笔记 - 实战篇(Ⅱ)

    14 | 代码模型(下):如何保证领域模型与代码模型的一致性? 课程链接:https://time.geekbang.org/column/article/166147 DDD 强调先构建领域模型然后 ...

  10. Redis学习笔记①基础篇_Redis快速入门

    若文章内容或图片失效,请留言反馈.部分素材来自网络,若不小心影响到您的利益,请联系博主删除. 资料链接:https://pan.baidu.com/s/1189u6u4icQYHg_9_7ovWmA( ...

最新文章

  1. 汇编: 描述内存长度
  2. 手写体识别(数据挖掘入门与实践-实验11)
  3. kibana 查看索引库中文档个数_百度索引量是什么意思?和百度收录量的区别。...
  4. [C#.Net]判断文件是否被占用的两种方法
  5. #{}不自动改参数类型_我是干流动补胎的,想让我的柴油机气泵自动打气,怎么改装。谢谢。...
  6. ip电话系统设计和实现
  7. 后缀数组2.0--Height数组(bzoj 1717: [Usaco2006 Dec]Milk Patterns 产奶的模式)
  8. 数据结构严蔚敏版课后答案
  9. python句柄无效_python免注册调用大漠出现错误句柄无效
  10. LeetCode 1818. 绝对差值和 [java实现]
  11. 运放的输入失调电压、输入偏置电流和输入失调电流以及电阻匹配的作用之一
  12. 解决Windows 10环境下 Realtek声卡 台式机前面板插孔没有声音输出问题
  13. 【系统运维-raid5】HW5885V3下挂4块2T硬盘如何做RAID5
  14. 强智教务系统验证码识别 java
  15. C2. Pokémon Army (hard version)
  16. 恋与实习生服务器维护,恋与制作人设计实习生事件选什么好?恋与制作人设计实习生事件选择推荐...
  17. 小虎电商浏览器:淘数据聚划算功能有什么用?
  18. Shell第三天-讲义
  19. Android像素单位dp,sp,px,pt的区别和比较
  20. 为什么python性能差

热门文章

  1. 关于FL Studio ASIO驱动不工作的一个解决方案
  2. 求2020 CFA二级notes资源,谢谢!
  3. Qualcomm MSM8937 dual DSI 笔记
  4. c语言运算符大全极其意义,C语言运算符大全
  5. web前端笔试试题二(含答案)
  6. c#获取中国三级行政区域划分(省市县)以及县级经纬度demo
  7. js实现分页并请求ajax,js实现ajax分页完整实例
  8. 图解机器学习算法(14) | PCA降维算法详解(机器学习通关指南·完结)
  9. 【限流算法】java实现滑动时间窗口算法
  10. SAXReader解析