优惠券秒杀

  • 全局ID生成器
  • 优惠券秒杀
    • 秒杀实现
    • 库存超卖
      • 乐观锁实现
    • 一人一单
  • 分布式锁
    • 分布式锁版本一
    • Redis分布式锁误删情况
      • 解决分布式锁误删
    • 分布式锁原子性问题
      • 解决原子性问题
      • 利用Java代码调用Lua脚本改造分布式锁
  • 分布式锁-redission
    • Redission快速入门
    • 可重入锁原理-(不可重入)
    • 锁重试Watchdog机制-(不可重试&超时释放)
      • 重试获取锁
      • 释放锁
    • MutiLock原理-(主从一致性)
  • 总结
  • 秒杀优化
  • 消息队列
    • 基于List实现消息队列
    • 基于PubSub实现消息队列
    • 基于Stream实现消息队列
      • 消费者组
  • 异步秒杀下单

redis缓存参考:https://blog.csdn.net/weixin_43994244/article/details/127527201

全局ID生成器

数据库自增ID存在问题:
1.id的规律性太明显
2.受单表数据量的限制

全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,需要满足:唯一性,高性能(随时可用),安全性(不容易被猜测),递增性(便于创建索引,提高速度),高可用(速度快)

ID的组成部分:

  • 符号位:1bit,永远为0
  • 时间戳:31bit,以秒为单位,可以使用69年
  • 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
public class RedisIdWorker {/*** 开始时间戳*/private static final long BEGIN_TIMESTAMP = 1640995200L;/*** 序列号位数*/private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}/*** keyPredix前缀区分不同的业务*/public long nextId(String keyPredix){//1.生成时间戳(当前时间减去初始时间)LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timeStamp = nowSecond-BEGIN_TIMESTAMP;//2.生成序列号(随着订单数增多,redis自增长的数值最大为2^64,id的序列号只有32位--key自增的上限是今天的订单量)//2.1获取当前日期,精确到天String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));//2.2自增长Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPredix + ":" + date);//3.拼接并返回:时间戳移动32位,填充序列号return timeStamp << COUNT_BITS | count;}
}

测试:

@SpringBootTest
class HmDianPingApplicationTests {@Autowiredprivate RedisIdWorker redisIdWorker;private ExecutorService es = Executors.newFixedThreadPool(500);@Testvoid testId() throws InterruptedException {//计数器,参考下文链接CountDownLatch latch = new CountDownLatch(300);//lambda创建线程,参考下文链接Runnable task = () ->{for (int i = 0; i < 100; i++) {long id = redisIdWorker.nextId("order");System.out.println("id="+id);}latch.countDown();};long begin = System.currentTimeMillis();for (int i = 0; i < 300; i++) {es.submit(task);}latch.await();long end = System.currentTimeMillis();System.out.println("time="+(end-begin));}}

参考:
CountDownLatch详解以及用法示例,线程创建方式

优惠券秒杀

秒杀实现

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下
@Override
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {return Result.fail("库存不足!");}//5,扣减库存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).update();if (!success) {//扣减库存return Result.fail("库存不足!");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 6.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 6.2.用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);// 6.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);return Result.ok(orderId);
}

库存超卖

假设线程1过来查询库存,判断出来库存大于1,正准备去扣减库存,但是还没有来得及去扣减,此时线程2过来,线程2也去查询库存,发现这个数量一定也大于1,那么这两个线程都会去扣减库存,最终多个线程相当于一起去扣减库存,此时就会出现库存的超卖问题。

针对这一问题的常见解决方案就是加锁:而对于加锁,我们通常有两种解决方案:见下图:

悲观锁:添加同步锁,让线程串行执行,简单粗暴但是性能一般
乐观锁:不加锁,在更新时怕判断是否有其他线程在修改,性能好但是成功率低

乐观锁实现

乐观锁:会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过。

不需要在额外设置一个version来判断,只要扣减库存时的库存和之前查询到的库存是一样的,就意味着没有人在中间修改过库存,那么此时就是安全,就可以扣减,但是测试后会发现很多失败,失败的原因在于:在使用乐观锁过程中假设100个线程同时都拿到了100的库存,然后大家一起去进行扣减,但是100个人中只有1个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败。

所以我们的乐观锁需要变一下,改成stock大于0 即可

@Override
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {return Result.fail("库存不足!");}//5,扣减库存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId)//eq("stock",voucher.getStock()).gt("stock",0).update();if (!success) {//扣减库存return Result.fail("库存不足!");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 6.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);// 6.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);return Result.ok(orderId);}

一人一单

让一个用户只能下一个单,而不是让一个用户下多个单。
根据优惠卷id和用户id查询是否已经下过这个订单,如果下过这个订单,则不再下单,否则进行下单。

@Override
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {return Result.fail("库存不足!");}// 5.一人一单判断// 5.1.用户idLong userId = UserHolder.getUser().getId();int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判断是否存在if (count > 0) {return Result.fail("用户已经购买过一次!");}//6,扣减库存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId)    .gt("stock",0).update();if (!success) {//扣减库存return Result.fail("库存不足!");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 7.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);return Result.ok(orderId);
}

乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作:

1.封装了一个createVoucherOrder方法(封装一人一单的判断,成功之后再创建订单),为了确保他线程安全,在方法上添加了一把synchronized 锁,但是会导致每个订单都会串行执行,粒度太大

   @Override@Transactionalpublic Result seckillVoucher(Long voucherId) {//1.查询优惠券//2.判断秒杀是否开始//3.判断秒杀是否结束//4.判断库存是否重组return createVoucherOrder(voucherId);}}
@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();// 5.1.查询订单// 5.2.判断是否存在// 6.扣减库存// 7.创建订单// 8.返回订单idreturn Result.ok(orderId);
}

2.用userId作为锁,保证是对每个用户加锁,userId.toString() 拿到的对象也是new出来的对象,使用锁必须保证锁必须是同一把,所以需要使用intern()方法(intern() 从常量池中拿到数据)

   @Override@Transactionalpublic Result seckillVoucher(Long voucherId) {//1.查询优惠券//2.判断秒杀是否开始//3.判断秒杀是否结束//4.判断库存是否重组return createVoucherOrder(voucherId);}}
@Transactional
public Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();synchronized(userId.toString().intern()){// 5.1.查询订单// 5.2.判断是否存在// 6.扣减库存// 7.创建订单// 8.返回订单idreturn Result.ok(orderId);}
}

3.当前方法被spring的事务控制,如果在方法内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放也会导致问题,所以选择将当前方法整体包裹起来,确保事务不会出现问题

 @Override@Transactionalpublic Result seckillVoucher(Long voucherId) {//1.查询优惠券//2.判断秒杀是否开始//3.判断秒杀是否结束//4.判断库存是否重组synchronized(userId.toString().intern()){return createVoucherOrder(voucherId);}}}
@Transactional
public  Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();// 5.1.查询订单// 5.2.判断是否存在// 6.扣减库存// 7.创建订单// 8.返回订单idreturn Result.ok(orderId);
}

4.调用的方法,是this.的方式调用的,事务想要生效,还得利用代理来生效,所以需要获得原始的事务对象来操作事务

    @Override@Transactionalpublic Result seckillVoucher(Long voucherId) {//1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if(voucher.getBeginTime().isAfter(LocalDateTime.now())){return Result.fail("秒杀尚未开始");}//3.判断秒杀是否结束if(voucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已经开始");}//4.判断库存是否重组if(voucher.getStock()< 1){return Result.fail("库存不足");}Long userId = UserHolder.getUser().getId();//在方法上加锁,每一单都串行效率低,只用userId加锁保证是一个用户订单串行synchronized (userId.toString().intern()) {//获取代理对象(事务)//本类的普通方法不能调用同类的事务方法,所以需要一个代理对象来调用,不然事务会失效IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}}@Overridepublic Result createVoucherOrder(Long voucherId) {//6一人一单判断--多线程Long userId = UserHolder.getUser().getId();//6.1查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();//6.2判断订单是否存在if (count > 0) {return Result.fail("用户已经购买过一次!");}//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {return Result.fail("库存不足");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();//订单id,用户id,代金券idlong orderId = idWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);save(voucherOrder);//8.返回订单idreturn Result.ok(orderId);}

启动类上加注解 @EnableAspectJAutoProxy(exposeProxy = true, proxyTargetClass = true)

@EnableAspectJAutoProxy(exposeProxy = true, proxyTargetClass = true)
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
public class HmDianPingApplication {public static void main(String[] args) {SpringApplication.run(HmDianPingApplication.class, args);}
}

添加依赖:

 <dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency>

但是在集群环境下,每个tomcat都有一个属于自己的jvm,在服务器A的tomcat内部,有两个线程,这两个线程使用的是同一份代码,他们的锁对象是同一个,可以实现互斥,但是在服务器B的tomcat内部,又有两个线程,他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,和线程1线程2无法实现互斥,所以需要使用分布式锁来解决问题。
分布式锁:

分布式锁

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

满足条件:

  • 可见性:多个线程都能看到相同的结果,多个JVM进程可以看到。
  • 互斥:互斥是分布式锁的最基本的条件,使得程序串行执行,只能有一个人拿到,其他都失败。
  • 高可用:程序不易崩溃,时时刻刻都保证较高的可用性。
  • 高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能。
  • 安全性:获取锁要考虑安全性问题,比如死锁。

分布式锁的核心是实现多进程之间互斥,满足这一点的方式很多,常见有三种:

  • mysql本身就带有锁机制,但是mysql性能本身一般,很少使用mysql作为分布式锁。
  • redis经常作为企业级开发中的分布式锁,利用setnx(存在才新增),如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁
  • Zookeeper:zookeeper也是企业级开发中较好的一个实现分布式锁的方案,在此不过多阐述。

基于redis实现分布式锁的方法

  • 获取锁:
    互斥:确保只能有一个线程获取锁
    非阻塞:尝试一次,成功返回true,失败返回false
    SET lock thread1 NX EX 10
  • 释放锁:
    手动释放
    超时释放:获取锁时添加一个超时时间
    DEL lock

分布式锁版本一

锁的基本接口

public interface ILock {/***  尝试获取锁* @param timeoutSec 锁持有的超时时间,过期后自动释放* @return true获取锁成功,false获取锁失败*/boolean tryLock(long timeoutSec);/*** 释放锁*/void unlock();}

接口实现:

public class SimpleRedisLock implements ILock{//锁名称private String name;//锁名称前缀private static final String KEY_PREFIX = "lock:";private StringRedisTemplate stringRedisTemplate;/*** 实例化锁时,传入锁名称和stringRedisTemplate*/public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}/*** 加锁:利用setnx方法加锁,增加过期时间,防止死锁,保证加锁和增加过期时间具有原子性 */@Overridepublic boolean tryLock(long timeoutSec) {//获取当前线程idLong threadId = Thread.currentThread().getId();//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId+"", timeoutSec, TimeUnit.SECONDS);//自动拆箱会有空指针异常return Boolean.TRUE.equals(success);}/***释放锁:防止删除别人的锁*/@Overridepublic void unlock() {//通过del删除锁stringRedisTemplate.delete(KEY_PREFIX + name);
}

业务代码:

  @Overridepublic Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();//创建锁对象(新增代码)SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);//获取锁对象boolean isLock = lock.tryLock(1200);//加锁失败if (!isLock) {return Result.fail("不允许重复下单");}try {//获取代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {//释放锁lock.unlock();}}

Redis分布式锁误删情况

线程1获取到锁之后业务阻塞,超时释放锁,线程2获取到锁后开始执行业务,此时线程1阻塞结束,执行业务完成,释放锁,将原本属于线程2的锁释放掉,线程3获取到锁开始执行,会有两个线程获取到锁同时执行。

解决分布式锁误删

在存入锁时,放入自己线程的标识(UUID标识),在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。

public class SimpleRedisLock implements ILock{//锁名称private String name;//锁名称前缀private static final String KEY_PREFIX = "lock:";//线程id前缀private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";private StringRedisTemplate stringRedisTemplate;/*** 实例化锁时,传入锁名称和stringRedisTemplate*/public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}/*** 加锁:利用setnx方法加锁,增加过期时间,防止死锁,保证加锁和增加过期时间具有原子性 */@Overridepublic boolean tryLock(long timeoutSec) {//获取当前线程idString threadId = ID_PREFIX + Thread.currentThread().getId();//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);//自动拆箱会有空指针异常return Boolean.TRUE.equals(success);}/***释放锁:防止删除别人的锁*/@Overridepublic void unlock() {//释放锁//获取线程标识,与锁中标识判断是否一致String threadId = ID_PREFIX + Thread.currentThread().getId();String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);if (threadId.equals(id)) {stringRedisTemplate.delete(KEY_PREFIX + name);}}
}

在修改完此处代码后,启动两个线程,第一个线程持有锁后,手动释放锁,第二个线程此时进入到锁内部,再放行第一个线程,此时第一个线程由于锁的value值并非是自己,所以不能释放锁,也就无法删除别人的锁,此时第二个线程能够正确释放锁,通过这个案例初步说明我们解决了锁误删的问题。

分布式锁原子性问题

更极端的误删逻辑:
线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程2进来,但是线程1他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的,要防止此情况发生。

解决原子性问题

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。可以使用lua去操作redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了。
Redis提供的调用函数:

redis.call(‘命令名称’, ‘key’, ‘其它参数’, …)

执行set name Jack:

redis.call(‘set’, ‘name’, ‘jack’)

#先执行 set name jack
redis.call(‘set’, ‘name’, ‘Rose’)
#再执行 get name
local name = redis.call(‘get’, ‘name’)
#返回
return name

Redis命令来调用脚本:

执行 redis.call(‘set’, ‘name’, ‘jack’) 脚本:

如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:

释放锁Lua脚本:

-- 获取锁的key
-- local key = KEYS[1]
-- 获取当前线程标示
-- local threadId = ARGV[1]-- 获取所属线程标识
-- local id = redis.call('get',KEYS[1])
--比较线程标识与锁中的标识是否一致
if(redis.call('get',KEYS[1])==ARGV[1]) then--释放锁return redis.call('del',KEYS[1])
end
return 0

利用Java代码调用Lua脚本改造分布式锁

RedisTemplate中,可以利用execute方法去执行lua脚本,参数对应关系就如下图:

public class SimpleRedisLock implements ILock{//锁名称private String name;//锁名称前缀private static final String KEY_PREFIX = "lock:";//线程id前缀private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";//lua脚本private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;//加载lua脚本static{UNLOCK_SCRIPT = new DefaultRedisScript<>();//设置脚本位置ClassPathResource就是resource位置UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {//获取当前线程idString threadId = ID_PREFIX + Thread.currentThread().getId();//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);//自动拆箱会有空指针异常return Boolean.TRUE.equals(success);}/*** 调用lua脚本,释放锁变为原子性操作*/@Overridepublic void unlock() {//脚本,锁的Key,线程IDstringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX+name),ID_PREFIX + Thread.currentThread().getId());}
}

测试结果:
第一个线程进来,得到了锁,手动删除锁,模拟锁超时了,其他线程会执行lua来抢锁,当第一天线程利用lua删除锁时,lua能保证他不能删除他的锁,第二个线程删除锁时,利用lua同样可以保证不会删除别人的锁,同时还能保证原子性。

分布式锁-redission

基于setnx实现的分布式锁存在下面的问题:

  • 不可重入:重入指获取锁的线程可以再次进入到相同的锁的代码块中,可重入锁可以防止死锁。目前同一线程无法多次获取同一把锁。
  • 不可重试:目前的分布式只能尝试一次,合理的情况是:当线程在获得锁失败后,能再次尝试获得锁。
  • 超时释放:锁超时释放可以避免死锁,但如果业务执行耗时较长,会导致锁释放,存在安全隐患。
  • 主从一致性:当向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

Redisson提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

Redission快速入门

1.添加依赖

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>

2.配置客户端

@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){// 配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.150.101:6379").setPassword("123321");// 创建RedissonClient对象return Redisson.create(config);}
}

3.使用Redission的分布式锁

@Resource
private RedissonClient redissonClient;@Override
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();//创建锁对象,用用户id创建,保证是一个用户用一个锁//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId,stringRedisTemplate);//获取锁RLock lock = redissonClient.getLock("order:" + userId);//三个参数:重试最大等待时间,超时释放时间,时间单位//无参数:默认-1不等待失败立即返回,超过30s无响应自动释放//两参数:重试最大等待时间,时间单位boolean isLock = lock.tryLock();//判断是否获取锁成功//加锁失败if (!isLock) {return Result.fail("不允许重复下单");}try {//获取代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {//释放锁lock.unlock();}}

可重入锁原理-(不可重入)

在Lock锁中,借助于底层的一个voaltile的一个state变量来记录重入的状态,比如当前没有人持有这把锁,state=0,假如有人持有这把锁,state=1,如果持有这把锁的人再次持有这把锁,state就会+1 。
synchronized在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。

在redission中,采用hash结构用来存储锁:
KEYS[1] :锁名称,ARGV[1]:锁失效时间,ARGV[2]:id + “:” + threadId; 锁标识
获取锁:判断锁是否存在,存在是自己的锁,锁计数+1,不是自己的获取锁失败;锁不存在,则创建锁添加标识,设置有效期,执行任务。
释放锁:判断是否是自己的锁,是锁计数-1,否锁已释放,判断锁计数是否为0,否重新设置有效期,是彻底释放锁。
保证执行语句的原子性在lua脚本中执行。

获取锁的脚本:

local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断是否存在
if(redis.call('exists', key) == 0) then
-- 不存在, 获取锁redis.call('hset', key, threadId, '1');
-- 设置有效期redis.call('expire', key, releaseTime);
return 1;-- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then
-- 存在, 获取锁,重入次数+1
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1;
-- 返回结果
end;
return 0;
-- 代码走到这里,说明获取锁的不是自己,获取锁失败

释放锁的脚本:

local key = KEYS[1];
-- 锁的key
local threadId = ARGV[1];
-- 线程唯一标识
local releaseTime = ARGV[2];
-- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if (redis.call('HEXISTS', key, threadId) == 0) then
-- 不是自己的,直接返回
return nil;
end;
-- 是自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断是否重入次数是否已经为0
if (count > 0) then
-- 大于0说明不能释放锁,重置有效期然后返回
redis.call('EXPIRE', key, releaseTime);
return nil;
else
-- 等于0说明可以释放锁,直接删除
redis.call('DEL', key);
return nil;
end;

锁重试Watchdog机制-(不可重试&超时释放)

重试获取锁

 @Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();//1.尝试获取锁,成功返回null,失败返回ttlLong ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}//3.判断剩余等待时间是否>0,否获取锁失败,time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}//是订阅锁释放信号current = System.currentTimeMillis();RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);//在time内没有等到锁释放通知,取消订阅,获取锁失败if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {//取消订阅unsubscribe(subscribeFuture, threadId);}});}//获取锁失败acquireFailed(waitTime, unit, threadId);return false;}//4.获取到锁释放信息,判断等待锁释放时间是否超时,是获取锁失败try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}//否重新尝试获取锁while (true) {//第一次重试long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}//判断剩余时间是否充足time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {//订阅释放锁信号,尝试获取,剩余等待时间ttl(ttl<time没必要再等)subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {//订阅释放锁信号,尝试获取,剩余等待时间timesubscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}//判断时间是否充足,是再尝试time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}
//        return get(tryLockAsync(waitTime, leaseTime, unit));}

1.尝试获取锁,成功返回null,失败返回ttl

 <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}

2.如果超时释放时间(leaseTime)为-1,开启Watchdog机制定时更新锁有效期,不为1获取锁成功
2.1判断leaseTime是否为-1

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}//开启Watchdog机制RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}//获取锁成功,定时更新有效期if (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;}

2.2更新有效期代码

 private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}//延时任务,在delay(WatchdogTime-30s/3)到期之后才会执行Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {//取出entryExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}//取出线程idLong threadId = ent.getFirstThreadId();if (threadId == null) {return;}//执行更新有效期脚本RFuture<Boolean> future = renewExpirationAsync(threadId);//异常抛出future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}//递归更新有效期if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);}

2.3更新有效期脚本

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,//判断锁是否是当前线程,是刷新有效期"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));}

3.判断剩余等待时间是否>0,否获取锁失败,是订阅锁释放信号(有锁释放信号才去尝试,不是无休止的尝试,节约cpu缓存)

 public RFuture<E> subscribe(String entryName, String channelName) {AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));RPromise<E> newPromise = new RedissonPromise<>();semaphore.acquire(() -> {if (!newPromise.setUncancellable()) {semaphore.release();return;}E entry = entries.get(entryName);if (entry != null) {entry.acquire();semaphore.release();entry.getPromise().onComplete(new TransferListener<E>(newPromise));return;}E value = createEntry(newPromise);value.acquire();E oldValue = entries.putIfAbsent(entryName, value);if (oldValue != null) {oldValue.acquire();semaphore.release();oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));return;}RedisPubSubListener<Object> listener = createListener(channelName, value);service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);});return newPromise;}

释放锁

1.尝试释放锁成功,发送释放锁消息,取消watchDog
2.失败,记录异常结束

 @Overridepublic RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<Void>();//1.执行释放锁脚本RFuture<Boolean> future = unlockInnerAsync(threadId);//2.释放锁成功,取消更新任务future.onComplete((opStatus, e) -> {cancelExpirationRenewal(threadId);//异常信息if (e != null) {result.tryFailure(e);return;}//异常信息if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}result.trySuccess(null);});return result;}

1.取消watchDog更新锁有效期任务

void cancelExpirationRenewal(Long threadId) {//获取当前锁的任务ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (task == null) {return;}//移除线程Idif (threadId != null) {task.removeThreadId(threadId);}//取消任务if (threadId == null || task.hasNoThreads()) {Timeout timeout = task.getTimeout();if (timeout != null) {timeout.cancel();}//删除EntryEXPIRATION_RENEWAL_MAP.remove(getEntryName());}}

2.释放锁脚本

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " + //推送释放锁消息"return 1; " +"end; " +"return nil;",Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}

总结:

  1. 可重入:利用hash结构记录线程id和重入次数
  2. 可重试:利用信号量和PubSub功能实现等待,唤醒,获取锁失败的重试机制
  3. 超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置超时时间

MutiLock原理-(主从一致性)

当向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
**解决思路:**不使用主从,每个节点都是独立节点,没有主从,都可以读写,把锁加锁的逻辑需要写入到每一个主从节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。
1.配置多个redis

@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){// 配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.150.101:6379").setPassword("123321");// 创建RedissonClient对象return Redisson.create(config);}@Beanpublic RedissonClient redissonClient2(){// 配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.150.102:6379").setPassword("123321");// 创建RedissonClient对象return Redisson.create(config);}......
}

2.创建连锁

@Slf4j
@SpringBootTest
public class redisClientTest {@Resourceprivate RedissonClient redissonClient1;@Resourceprivate RedissonClient redissonClient2;@Resourceprivate RedissonClient redissonClient;private RLock lock;//获取三个节点对应锁@Beforevoid setUp(){RLock lock1 = redissonClient1.getLock("order");RLock lock2 = redissonClient2.getLock("order");RLock lock3 = redissonClient.getLock("order");//创建连锁lock = redissonClient.getMultiLock(lock1,lock2,lock3);}/*** 业务代码* ......*/
}

获取锁源码:重试最大等待时间,超时释放时间,时间单位

   @Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {//        try {//            return tryLockAsync(waitTime, leaseTime, unit).get();
//        } catch (ExecutionException e) {//            throw new IllegalStateException(e);
//        }long newLeaseTime = -1;//超时释放时间是否为-1if (leaseTime != -1) {if (waitTime == -1) {//不想重试,获取一次newLeaseTime = unit.toMillis(leaseTime);} else {//想重试,增加释放时间newLeaseTime = unit.toMillis(waitTime)*2;}}long time = System.currentTimeMillis();long remainTime = -1;if (waitTime != -1) {//剩余等待时间remainTime = unit.toMillis(waitTime);}//锁等待时间=waitTimelong lockWaitTime = calcLockWaitTime(remainTime);//锁失败限制0int failedLocksLimit = failedLocksLimit();//获取成功的锁List<RLock> acquiredLocks = new ArrayList<>(locks.size());//遍历独立的锁for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {if (waitTime == -1 && leaseTime == -1) {//空参锁,获取一次 lockAcquired = lock.tryLock();} else {//获取锁,返回结果long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {lockAcquired = false;}//获取锁成功,添加到已经成功锁集合里if (lockAcquired) {acquiredLocks.add(lock);//获取失败} else {//锁数量-已获得锁数量=0---都获取到锁才结束if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}if (failedLocksLimit == 0) {//释放掉已经拿到锁的unlockInner(acquiredLocks);//waitTime == -1,不想重试,直接失败if (waitTime == -1) {return false;}//想重试failedLocksLimit=0failedLocksLimit = failedLocksLimit();//获取到的锁清空acquiredLocks.clear();// reset iterator//迭代器往前,从第一把锁开始获取while (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}//判断剩余等待时间是否充足if (remainTime != -1) {remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();//剩余时间<0,获取锁消耗掉剩余等待时间,获取锁超时,返回falseif (remainTime <= 0) {//已经获取到的锁,释放掉unlockInner(acquiredLocks);return false;}}}//leaseTime = -1时就会自动触发watchDog机制,不需手动设置if (leaseTime != -1) {List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());//遍历拿到的每一把锁,重新设置有效期,避免锁的有效期不一样for (RLock rLock : acquiredLocks) {RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);futures.add(future);}for (RFuture<Boolean> rFuture : futures) {rFuture.syncUninterruptibly();}}return true;}

总结

  1. 不可重入redis分布式锁
    原理:利用setnx的互斥性,利用es避免死锁,释放锁时刻判断线程标识
    缺陷:不可重入,无法重试,锁超时失效
  2. 可重入的Redisson分布式锁
    原理:利用hash结构,记录线程标示和重入次数,利用watchDog延续锁时间,利用信号量控制锁重试等待
    缺陷:redis宕机引起锁失效问题
  3. redisson的multiLock
    原理:多个独立的redis节点,必须在所有节点都获取重入锁,才算获取锁成功
    缺陷:运维成本高,实现复杂

秒杀优化

秒杀过程:https://blog.csdn.net/weixin_43994244/article/details/127560350
下单流程:1.查询优惠卷 2.判断秒杀库存是否足够 3.查询订单 4.校验是否是一人一单 5.扣减库存 6.创建订单
在这六步操作中,又有很多操作是要去操作数据库的,而且还是一个线程串行执行, 这样就会导致我们的程序执行的很慢,可以考虑使用异步线程来做,然后再统一做返回,但是如果访问的人很多,线程池中的线程可能一下子就被消耗完了,而且时效性差,比如只要确定他能做这件事,后边慢慢做就可以了,并不需要他一口气做完这件事,所以采用类似消息队列的方式来完成需求,而不是使用线程池或者是异步编排的方式来完成这个需求。

优化思路:
1.利用reedis完成库存余量,一人一旦判断,完成抢单业务
2.将下单业务放入阻塞队列,利用对立线程异步下单

优化方案:将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,是否一人一单这样的操作,只要这些逻辑可以完成,就意味着一定可以下单完成,只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,可以直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,这样程序响应超快,而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池。

当用户下单之后,判断库存是否充足只需要到redis中去根据key找对应的value是否大于0即可,如果不充足,则直接结束,如果充足,继续在redis中判断用户是否可以下单,如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠卷存入到redis中,并且返回0,整个过程需要保证是原子性的,可以使用lua来操作。

当以上判断逻辑走完之后,可以判断当前redis中返回的结果是否是0 ,如果是0,则表示可以下单,则将之前说的信息存入到到queue中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功。

  1. 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
 @Override@Transactionalpublic void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);//保存秒杀信息到redisstringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK + voucher.getId(),voucher.getStock().toString());}
  1. 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
--1.参数列表
--优惠券id
local vouchId = ARGV[1]
--用户id
local userId = ARGV[2]--2.数据Key
-- 库存..拼接
local stockKey = 'seckill:stock:' .. vouchId
-- 订单
local orderKey = 'seckill:order:' .. vouchId--3.脚本业务
--3.1判断库存是否充足 get stockKey
if (tonumber(redis.call('get',stockKey)) <=0) then--3.2库存不足,返回1return 1
end
--3.2判断用户是否下单 sismember orderKey userId
if(redis.call('sismember',orderKey,userId) == 1) then--存在,重复下单,返回2return 2
end
--3.4 扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
--3.5 下单(保存用户) sadd orderKey userId
redis.call('sadd',orderKey,userId)
return 0
  1. 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
  @Overridepublic Result seckillVoucher(Long voucherId) {//获取用户idLong userId = UserHolder.getUser().getId();//1.执行lua脚本--脚本名称,集合,参数Long result = stringRedisTemplate.execute(SECKILL, Collections.emptyList(), voucherId.toString(), userId.toString());//2.判断结果是否为0int r = result.intValue();if(r != 0){//2.1不为0,没有购买资格return Result.fail(r == 1?"库存不足":"不能重复下单");}//2.2为0,有购买资格,把下单信息保存到阻塞队列long orderId = redisIdWorker.nextId("order");//2.3创建订单VoucherOrder voucherOrder = new VoucherOrder();//订单id,用户id,代金券idvoucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);//2.4放入阻塞队列orderTask.add(voucherOrder);//3.获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();//4.返回订单Idreturn Result.ok(orderId);}
  1. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
  //lua脚本private static final DefaultRedisScript<Long> SECKILL;//加载lua脚本static{SECKILL = new DefaultRedisScript<>();//设置脚本位置ClassPathResource就是resource位置SECKILL.setLocation(new ClassPathResource("seckill.lua"));SECKILL.setResultType(Long.class);}//阻塞队列-数组实现的阻塞队列,指定初始化大小private BlockingQueue<VoucherOrder> orderTask = new ArrayBlockingQueue<>(1024*1024);//线程池private static ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();private IVoucherOrderService proxy;//类初始化时执行线程池@PostConstruct //在类加载之后,就加载private void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}//异步线程任务,获取队列中的订单信息,创建订单private class VoucherOrderHandler implements  Runnable{@Overridepublic void run() {while(true){try {//获取队列中的订单信息VoucherOrder voucherOrder = orderTask.take();//创建订单handVoucherOrder(voucherOrder);} catch (InterruptedException e) {log.error("处理订单异常",e);}}}}//创建订单private void handVoucherOrder(VoucherOrder voucherOrder) {//1.获取用户Long userId = voucherOrder.getUserId();//2.创建锁对象RLock lock = redissonClient.getLock("order:" + userId);//3.获取值boolean isLock = lock.tryLock();//4.判断是否获取锁成功if(!isLock){//获取锁失败,返回错误重试log.error("获取锁失败");return;}try {proxy.createVoucherOrder(voucherOrder);}finally {//释放锁lock.unlock();}}@Override@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {log.error("-----异步线程开始执行-----");//6一人一单判断--多线程Long userId = voucherOrder.getUserId();//6.1查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();//6.2判断订单是否存在if (count > 0) {log.error("用户已经购买过一次!");}//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0).update();if (!success) {log.error("库存不足");return;}//7.保存订单save(voucherOrder);//8.返回订单id}

存在问题
内存限制–阻塞队列在jvm中创建,高并发情况下会导致内存溢出。
数据安全–服务宕机内存信息丢失,与数据库信息不一致。

消息队列

字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

    秒杀下单业务和完成下单任务分离,异步操作,接触耦合。消息队列是在jvm之外的独立服务,不受jvm内存的限制,消息队列不仅仅做数据存储,还确保数据安全,数据都会持久化,不管服务宕机还是重启,数据不会丢失,而且在消息投递给消费者以后,要求消费者确认,如果消息没有确认会依然在消息队列中存在,下次仍然会投递给消费者让他处理,直到消息被处理,确保消息至少被消费一次。

可以使用一些现成的mq,比如kafka,rabbitmq等等,也可以直接使用redis提供的mq方案。

基于List实现消息队列

Redis的list数据结构是一个双向链表,很容易模拟出队列效果。队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。但是当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此使用BRPOP或者BLPOP来实现阻塞效果。
优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub实现消息队列

消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
pattern 通配符,?一个字符,*多个字符,[qwer]指定字符

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

基于Stream实现消息队列

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
发送消息:

举例:

读取消息的方式一:XREAD

使用XREAD读取第一个消息:

XREAD阻塞方式,读取最新的消息: BLOCK 0 永久阻塞,读取最新消息,未被读过的最新一条消息。注意:当我们指定起始ID为读取最新消息,未被读过的最新一条消息。 注意:当我们指定起始ID为读取最新消息,未被读过的最新一条消息。注意:当我们指定起始ID为时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。


在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯,消息持久化保存
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点

  • 消息分流:队列中的消息会分流给组内不同消费者,而不是重复消费,加快消息处理的速度。
  • 消息标示:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每个一消息都会被消费。(类似书签)
  • 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list,当处理完成后需要通过XACK来确认消息,标记消息为己处理,才会从pending-list移除。

创建消费者组:

XGROUP CREATE key groupName ID [MKSTREAM]
key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列

删除指定的消费者组

XGROUP DESTORY key groupName

给指定的消费者组添加消费者(一般不需要手动添加,创建组时不存在会自动添加)

XGROUP CREATECONSUMER key groupname consumername

删除消费者组中的指定消费者

XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]
group:消费组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时最长等待时间
NOACK:无需手动ACK,获取到消息后自动确认,不会进入pending-list,建议不设置
STREAMS key:指定队列名称
ID:获取消息的起始ID:1.“>”:从下一个未消费的消息开始 2. 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

确认消息读取:从pending-list移除消息

XACK key group ID [ID…]

查看pending-list

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
XPENDING key group - + 10
key:队列名称
group:消费者组名称
start:开始标记 -最小
end:结束标记 +最大
count:读取数量

消费者监听消息的基本思路:

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯(不同组消费者)
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险(会标记上次消费位置)
  • 有消息确认机制,保证消息至少被消费一次(ACK确认机制)

对比:

异步秒杀下单

需求:

  • 创建一个Stream类型的消息队列,名为stream.orders

    XGROUP CREATE stream.orders g1 0 MKSTREAM

  • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId,减少java与redis交互

--1.参数列表
--优惠券id
local vouchId = ARGV[1]
--用户id
local userId = ARGV[2]
--+++订单id+++
local orderId = ARGV[3]--2.数据Key
-- 库存..拼接
local stockKey = 'seckill:stock:' .. vouchId
-- 订单
local orderKey = 'seckill:order:' .. vouchId--3.脚本业务
--3.1判断库存是否充足 get stockKey
if (tonumber(redis.call('get',stockKey)) <=0) then--3.2库存不足,返回1return 1
end
--3.2判断用户是否下单 sismember orderKey userId
if(redis.call('sismember',orderKey,userId) == 1) then--存在,重复下单,返回2return 2
end
--3.4 扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
--3.5 下单(保存用户) sadd orderKey userId
redis.call('sadd',orderKey,userId)
--3.6+++发送消息到队列中 XADD stream.orders * k1 v1...+++
redis.call("xadd","stream.orders","*","userId",userId,"vouchId",vouchId,"id",orderId)
return 0
  • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;//lua脚本private static final DefaultRedisScript<Long> SECKILL;//加载lua脚本static{SECKILL = new DefaultRedisScript<>();//设置脚本位置ClassPathResource就是resource位置SECKILL.setLocation(new ClassPathResource("seckill.lua"));SECKILL.setResultType(Long.class);}//线程池private static ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();private IVoucherOrderService proxy;//类初始化时执行线程池@PostConstruct //在类加载之后,就加载private void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}/*** 1.开启新的线程获取消息队列中的消息* 2.获取消息成功,完成ACK确认,证明消息被消费* 3.有异常消息,消息存入pending-list*/private class VoucherOrderHandler implements  Runnable {//队列名称String queueName = "stream.orders";@Overridepublic void run() {while (true) {try {//1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//2.判断消息获取是否成功if(list == null || list.isEmpty()){//2.1获取失败,说明没有消息,继续下一次循环continue;}//2.2有消息,下单//解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//下单handVoucherOrder(voucherOrder);//3.ACK确认 SACK stream.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());log.error("订单信息"+voucherOrder.getVoucherId()+"==="+voucherOrder.getUserId()+"==="+voucherOrder.getId());} catch (Exception e) {log.error("处理订单异常", e);//处理pending-list中消息handlePendingList();}}}//处理pending-list中消息private void handlePendingList() {while (true) {try {//1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1  STREAMS streams.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));//2.判断消息获取是否成功if(list == null || list.isEmpty()){//2.1获取失败,说明pending-list没有异常消息,结束循环break;}//2.2有消息,下单//解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);log.error("订单信息"+voucherOrder.getVoucherId()+"==="+voucherOrder.getUserId()+"==="+voucherOrder.getId());//下单handVoucherOrder(voucherOrder);//3.ACK确认 SACK stream.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理pending-list订单异常", e);try {Thread.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}}}}}//分布式锁操作数据库中下单操作private void handVoucherOrder(VoucherOrder voucherOrder) {//1.获取用户Long userId = voucherOrder.getUserId();//2.创建锁对象RLock lock = redissonClient.getLock("order:" + userId);//3.获取值boolean isLock = lock.tryLock();//4.判断是否获取锁成功if(!isLock){//获取锁失败,返回错误重试log.error("获取锁失败");return;}try {//数据库中校验一人一单,扣减库存proxy.createVoucherOrder(voucherOrder);}finally {//释放锁lock.unlock();}}//数据库中校验一人一单,扣减库存@Override@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {log.error("-----异步线程开始执行-----");//6一人一单判断--多线程Long userId = voucherOrder.getUserId();//6.1查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();//6.2判断订单是否存在if (count > 0) {log.error("用户已经购买过一次!");}//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0).update();if (!success) {log.error("库存不足");return;}//7.保存订单save(voucherOrder);}/***  1.在redis中判断库存是否有秒杀资格0(库存不足1,校验一人一单2)*  2.有秒杀资格,直接返回订单id*  3.没秒杀资格,返回错误信息*/@Overridepublic Result seckillVoucher(Long voucherId) {//获取用户idLong userId = UserHolder.getUser().getId();//获取订单idlong orderId = redisIdWorker.nextId("order");//1.执行lua脚本--脚本名称,集合,参数(券id,用户id,订单id)Long result = stringRedisTemplate.execute(SECKILL,Collections.emptyList(),voucherId.toString(),userId.toString(),String.valueOf(orderId));//2.判断结果是否为0int r = result.intValue();if(r != 0){//2.1不为0,没有购买资格return Result.fail(r == 1?"库存不足":"不能重复下单");}//3.获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();//4.返回订单Idreturn Result.ok(orderId);}
}

Redis(4)优惠券秒杀相关推荐

  1. Redis 基础 - 优惠券秒杀《初步优化(异步秒杀)》

    Redis基础 - 基本类型及常用命令 Redis基础 - Java客户端 Redis 基础 - 短信验证码登录 Redis 基础 - 用Redis查询商户信息 Redis 基础 - 优惠券秒杀< ...

  2. Redis 基础 - 优惠券秒杀《分布式锁(初级)》

    参考 Redis基础 - 基本类型及常用命令 Redis基础 - Java客户端 Redis 基础 - 短信验证码登录 Redis 基础 - 用Redis查询商户信息 Redis 基础 - 优惠券秒杀 ...

  3. Redis 基础 - 优惠券秒杀《非集群》

    参考 Redis基础 - 基本类型及常用命令 Redis基础 - Java客户端 Redis 基础 - 短信验证码登录 Redis 基础 - 用Redis查询商户信息 摘要 用Redis生成保证唯一性 ...

  4. SpringBoot整合Redis实现优惠券秒杀服务(笔记+优化思路版)

    本文属于看黑马的redis的学习笔记,记录了思路和优化流程,精简版最终版请点击这里查看. 文章目录 一.全局ID生成器 1.1 理论 1.1.1 全局唯一ID生成策略 1.2 代码(Redis自增) ...

  5. SpringBoot整合Redis实现优惠券秒杀服务

    文章目录 一.服务整体流程 二.具体业务逻辑实现 1. 新增秒杀券 1.1. Controller层 1.2. Service层 2. 秒杀优惠券 2.1. 具体业务流程以及细节说明 2.2. Con ...

  6. Redis实现优惠券秒杀

    优惠券秒杀 全局唯一ID 问题 当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题 id的规律性太明显 受单表数据量的限制 解决办 ...

  7. Redis解决优惠券秒杀

    虽然本文是针对黑马点评的优惠券秒杀业务的实现,但是是适用于各种抢购活动,保证线程安全. 摘要:本文先讲了抢购问题,指出其中会出现的多线程问题,提出解决方案采用悲观锁和乐观锁两种方式进行实现,然后发现在 ...

  8. Redis实战——优惠券秒杀(超卖问题)

    1 实现优惠券秒杀功能 下单时需要判断两点:1.秒杀是否开始或者结束2.库存是否充足 所以,我们的业务逻辑如下 1. 通过优惠券id获取优惠券信息 2.判断秒杀是否开始,如果未返回错误信息 3.判断秒 ...

  9. Redis 实现优惠券秒杀、分布式锁、消费队列

    文章目录 一.全局唯一ID 1.1 为啥需要全局唯一ID 1.2 全局ID生成器 1.3 全局唯一ID生成策略 二.实现优惠卷秒杀下单 2.1 优惠卷表结构 2.2 秒杀功能实现 三.超卖问题 四.实 ...

最新文章

  1. 模拟实现: strstr strcpy strlen strcat strcmp memcpy memmove
  2. element引入的组件大小高度不对_试水 elementplus ui 组件库
  3. 一步一步SharePoint 2007之十七:解决实现Form认证后无法再用SharePoint Designer编辑网站的问题...
  4. 8.5-7 mkfs、dumpe2fs、resize2fs
  5. 前后端分离之JWT用户认证
  6. 搭建Maven私有仓库
  7. 【php7扩展开发五】函数调用
  8. 保持SVN仓库结构只checkout部分子目录
  9. vnc连接linux颜色灰色,VNC 灰色的屏幕解决方法
  10. 铁大Facebook——十天冲刺(1)
  11. django 1.8 官方文档翻译: 2-3-2 关联对象参考
  12. mysql的int最大值_MySQL中int最大值深入讲解
  13. mongodb由于目标计算机积极拒绝无法连接失败
  14. Android 逆向流程
  15. 党建活动献爱心,达飞云贷冬日送温暖
  16. 单龙芯3A3000-7A1000PMON研究学习-(15)撸起袖子干-先来一杯代码吧
  17. 解决flume整合kafka报错Attempt to heart beat failed since member id is not valid, reset it and try to re-jo
  18. java实现光盘摆渡_一种光盘摆渡机的制作方法
  19. browserquest php安装,H5多人联机网游《Browserquest》源码 node.js版本+php版本
  20. vipkid怎么样?来自家长的真实评价

热门文章

  1. Invalid Host/Origin header vue项目
  2. springboot整合POI导出word(文字加图片)
  3. 树莓派配置https://www.raspberrypi.org/documentation/configuration/中的一个单词翻译:
  4. https://start.spring.io‘ 的初始化失败请检查 URL、网络和代理设置
  5. 蒙特卡洛算法及简单应用
  6. qq邮箱 android,QQ邮箱(com.tencent.androidqqmail) - 6.2.1 - 应用 - 酷安
  7. 跟着小马哥学系列之 Spring AOP(Pointcut 组件详解)
  8. 贝叶斯算法 — 朴素贝叶斯分类器— 过滤垃圾邮件 — 流失用户 — 用户画像
  9. 三星会在泰泽大会上展示meego系统的新机么?
  10. mysql 开启 thread pool_MySQL線程池(THREAD POOL)的處理