文章目录

  • 全局唯一ID
    • 设计
  • 添加秒杀卷
  • 实现秒杀下单
    • 超卖问题
    • 一人一单
    • 分布式集群
  • 分布式锁
    • Redis实现方案
      • Lua 脚本解决多条命令原子性问题
  • redission分布式锁
    • 不可重入问题
    • 不可重试问题
    • 超时释放
      • 流程图
    • 主从一致性问题
  • 秒杀优化
    • 6.2 秒杀优化 - Redis 完成秒杀资格判断
  • Redis 消息队列(基于 Stream 的消息队列)
    • 基于 Stream 的消息队列 - 消费者组
    • 优化步骤
      • `秒杀业务:判断是否有购买资格,有则操作redis保存的数据,不等订单创建直接返回结果给客户`
      • 创建线程池,项目启动时,开启一个线程任务,尝试获取 stream.orders 中的消息,完成下单

全局唯一ID

当用户抢购时,就会生成订单并保存到订单表中,而订单表如果使用数据库自增 id 就存在一些问题:

  • id 的规律性太明显
  • 采用数据库自增ID受单表数据量的限制,订单表采用分库分表时候,ID在不同表会重复

设计

序列号为了在并发下高可用和唯一性,可采用Redis的自增来构成序列号,不过要考虑以下问题:

  • Redis的自增值要在2的64次方的范围内
package com.hmdp.utils;import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;@Component
public class RedisIdWorker {/*** 开始时间戳*/private static final long BEGIN_TIMESTAMP = 1640995200L;/*** 序列号的位数*/private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}// keyPrefix业务字段,订单业务可用传order,也可生成其他业务的全局唯一idpublic long nextId(String keyPrefix) {// 1.生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;// 2.生成序列号// 2.1.获取当前日期,精确到天,一天一个key,使得value从0自增不会超出大小范围String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));// Redis incrby 命令将 key 中储存的数字加上指定的增量值,如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCRBY 命令long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);// 3.拼接并返回,左移并进行或操作return timestamp << COUNT_BITS | count;}public static void main(String[] args) {LocalDateTime of = LocalDateTime.of(2022, 1, 1, 0, 0, 0);long l = of.toEpochSecond(ZoneOffset.UTC);// LocalTime类的toEpochSecond()方法用于将此LocalTime转换为自1970-01-01T00:00:00以来的秒数System.out.println(l);}}

测试

@Test
void testIdWorker() throws InterruptedException {CountDownLatch latch = new CountDownLatch(300);Runnable task = () -> {for (int i = 0; i < 100; i++) {long id = redisIdWorker.nextId("order");System.out.println("id = " + id);}latch.countDown();};long begin = System.currentTimeMillis();for (int i = 0; i < 300; i++) {es.submit(task);}latch.await();long end = System.currentTimeMillis();System.out.println("time = " + (end - begin));
}

await 方法 是阻塞方法,我们担心分线程没有执行完时,main 线程就先执行,所以使用 await 可以让 main 线程阻塞,那么什么时候 main 线程不再阻塞呢?当CountDownLatch 内部维护的 变量变为 0 时,就不再阻塞,直接放行,那么什么时候 CountDownLatch 维护的变量变为 0 呢,我们只需要调用一次 countDown ,内部变量就减少 1,我们让分线程和变量绑定, 执行完一个分线程就减少一个变量,当分线程全部走完,CountDownLatch 维护的变量就是 0,此时 await 就不再阻塞,统计出来的时间也就是所有分线程执行完后的时间。


添加秒杀卷

  • 普通代金卷表:优惠券的基本信息,优惠金额、使用规则等
  • 秒杀卷表:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息(普通代金卷拓展)
// 要加事务,同时保存到redis,提供更好的性能
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);// 保存秒杀库存到Redis中stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

实现秒杀下单

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单

@Override
public Result seckillVoucher(Long voucherId) {// 1.查询相关店铺的具体优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {// 库存不足return Result.fail("库存不足!");}//5,扣减库存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).update();if (!success) {//扣减库存return Result.fail("库存不足!");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 6.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 6.2.用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);// 6.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);return Result.ok(orderId);}

超卖问题

如果并发多个线程,在仅剩一张优惠劵的同时判断库存都有剩余,则会并发减库存导致超卖,对于该问题可用乐观锁解决

// 每次更新时候带上之前查询库存的版本号,如果更新时候字段被改过则放弃此次更新
boolean success = seckillVoucherService.update().setSql("stock= stock -1") //set stock = stock -1.eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update(); //where id = ? and stock = ?

但该优化是错误,因为如果库存为99,多个线程并发,判断版本号为98,与上次查询到的99不一样,放弃更新,但实际上还有库存,我们可继续改进

boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).update().gt("stock",0); //where id = ? and stock > 0

一人一单

@Override
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {// 库存不足return Result.fail("库存不足!");}// 5.一人一单逻辑// 5.1.用户idLong userId = UserHolder.getUser().getId();int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判断是否存在if (count > 0) {// 用户已经购买过了return Result.fail("用户已经购买过一次!");}//6,扣减库存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).update().gt("stock", 0); //where id = ? and stock > 0if (!success) {//扣减库存return Result.fail("库存不足!");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 7.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);return Result.ok(orderId);}

现在的问题还是和之前一样,并发过来,查询数据库,都不存在订单,所以我们还是需要加锁,插入数据本不存在,所以难以用乐观锁解决,所以我们需要使用悲观锁操作

如果把synchronized加到方法上,锁的粒度太粗了,在使用锁过程中,控制锁粒度是一个非常重要的事情,因为如果锁的粒度太大,会导致每个线程进来都会锁住,所以我们需要去控制锁的粒度,封装要锁住的代码段:

@Transactional
public  Result createVoucherOrder(Long voucherId) {Long userId = UserHolder.getUser().getId();synchronized(userId.toString().intern()){// 5.1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判断是否存在if (count > 0) {// 用户已经购买过了return Result.fail("用户已经购买过一次!");}// 6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣减失败return Result.fail("库存不足!");}// 7.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 7.1.订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2.用户idvoucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7.返回订单idreturn Result.ok(orderId);}
}

intern () 这个方法是从常量池中拿到数据,如果我们直接使用 userId.toString () 他拿到的对象实际上是不同的对象,new 出来的对象,我们使用锁必须保证锁必须是同一把,所以我们需要使用 intern () 方法

但是以上代码还是存在问题,问题的原因在于当前方法被 spring 的事务控制,如果你在方法内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放,这样会出现有线程释放了锁,但事务没有提交,同一用户的其他线程获取锁查询数据库发现还是没有购买记录,所以也新增了一个数据,这时就会导致两个事务提交了,有两条记录,不符合一人一单,所以我们选择将当前方法整体包裹起来,确保事务不会出现问题

@Override
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {// 库存不足return Result.fail("库存不足!");}// 5.一人一单逻辑// 5.1.用户idLong userId = UserHolder.getUser().getId();synchronized(userId.toString().intern()) {return createVoucherOrder(voucherId)}}

但是以上做法依然有问题,因为你调用的方法,其实是 this. 的方式调用的,事务想要生效,还得利用代理来生效,所以这个地方,我们需要获得原始的事务对象, 来操作事务

synchronized(userId.toString().intern()) {// 要添加相关依赖和暴露接口IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId)
}

分布式集群

不同机器不同JVM实例,所以会导致锁失效


分布式锁

原理

要素设计

Redis实现方案

redis 作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用 redis 作为分布式锁,利用 setnx 这个方法,如果插入 key 成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁

要点和难点

  • 线程互斥:两个线程只能一个获取锁
  • 死锁:当业务执行完成,应该释放该锁,给其他线程获取
  • 锁误删:就是线程1业务阻塞导致锁超时释放了,其他业务获取锁后被线程1业务完成后给执行释放了,我们要对锁加标识
  • 锁续期:当线程1超时释放了,业务没有完成,第二个锁获取了该锁去执行业务,这是不应该的,因为线程1的业务没有提交,第二个线程获取锁了去数据库查询本人订单并没有下单,所以下了一单,第一个线程也重新获取cpu时间片也去下了单,这显然没有满足需求。所以当第一个业务的事务没有提交时,锁应该不被释放(这是给redission解决的了
import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Collections;
import java.util.concurrent.TimeUnit;public class SimpleRedisLock implements ILock {private String name;private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}public void unlock() {// 获取线程标示String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁中的标示String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 判断标示是否一致if(threadId.equals(id)) {// 释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}}}
@Override
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {// 库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();// 创建锁对象(新增代码)SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);// 获取锁对象boolean isLock = lock.tryLock(1200);// 加锁失败if (!isLock) {return Result.fail("不允许重复下单");}try {// 获取代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {// 释放锁lock.unlock();}
}

由于获取锁、锁判断和释放锁不是原子性(也就是释放锁业务不是原子性的),所以极端情况下:

if(threadId.equals(id)) {// 判断完要释放锁,cpu切换阻塞,导致锁超时,第二个线程获取了锁,然后重新获取cpu时间片释放了第二个线程刚刚获取的锁,这还是会出现锁误删情况stringRedisTemplate.delete(KEY_PREFIX + name);
}

Lua 脚本解决多条命令原子性问题

释放锁的业务流程是这样的

  • ​ 1、获取锁中的线程标示
  • ​ 2、判断是否与指定的标示(当前线程标示)一致
  • 3、如果一致则释放锁(删除)
  • 4、如果不一致则什么都不做
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then-- 一致,则删除锁return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

利用 Java 代码调用 Lua 脚本改造分布式锁

private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);
}public void unlock() {// 调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());
}

redission分布式锁

依赖

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){// 配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.150.101:6379").setPassword("123321");// 创建RedissonClient对象return Redisson.create(config);}
}

简单入门

@Resource
private RedissionClient redissonClient;@Test
void testRedisson() throws Exception{//获取锁(可重入),指定锁的名称RLock lock = redissonClient.getLock("anyLock");/* 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位 */boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);//判断获取锁成功if(isLock){try{System.out.println("执行业务");          }finally{//释放锁lock.unlock();}}
}

基于setnx实现的分布式锁存在下面的问题:

不可重入问题

采用了hash结构,锁标识由业务名称+线程id组成,如果是同一把锁重入,则value值+1,表示第二次获取了锁,如果为0则表示锁没有人持有。

# 获取锁成功都是返回nil
# 判断锁是否存在,是进入创建并获取锁逻辑
"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +# 再判断锁标识是否自己,是自己的进入锁重入逻辑"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +# 否则获取锁失败,返回锁过期时间"return redis.call('pttl', KEYS[1]);"

不可重试问题

/*第一个参数是等待时间,如果没有给也是默认是-1,redission不会等待,获取失败则马上返回结果第二个参数是超时时间,如果没有给默认是-1,redission会给一个默认时间第三个参数是时间单位
*/
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime); // 传化时间单位long current = System.currentTimeMillis(); // 获取当前时间long threadId = Thread.currentThread().getId(); // 线程idLong ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // 尝试获取锁,也就是进入上面lua脚本重入脚本逻辑,返回null则表示获取锁成功// lock acquiredif (ttl == null) {return true;}// 失败,计算尝试获取锁花费的时间,并和等待时间比较time -= System.currentTimeMillis() - current;// 如果超过剩余等待时间,则表示等待时间内获取锁失败if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();// 订阅释放锁的消息RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);// 订阅等待时间如果超过剩余等待时间,则也是获取锁失败if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {// 失败后要取消订阅unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {// 再次计算剩余等待时间time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();// 满足上述条件了,就是还有剩余等待时间且收到了消息订阅的通知,可用再次重试获取锁了ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {// 重试获取锁成功return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for message// 获取锁还是失败了,但还有剩余时间currentTime = System.currentTimeMillis();// 也要等,不过要采用信号量,如果ttl过期时间 < 剩余等待时间,则等待ttl时间后再唤醒该线程if (ttl >= 0 && ttl < time) {subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {// 其实这里看得不是很懂了,可以查阅相关资料深入了解subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}
//        return get(tryLockAsync(waitTime, leaseTime, unit));
}

其实就是采用消息订阅和信号量解决不可重入问题

超时释放

如果不设置过期时间,redission会采用看门狗机制

private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();// putIfAbsent:没有该key则加入,有则返回null// 为了保证如果是同一把锁重入,返回的是同一个实例,不同线程则返回不同的ExpirationEntry实例ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {// 不为null,则证明是一个新的线程来获取锁entry.addThreadId(threadId);// 给这把锁加上一个定时任务,每隔一段时间重新刷新过期时间,直到业务完成释放锁后才把定时任务接触renewExpiration();}
}
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}// 定时任务Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}

看门狗机制就是新线程如果获取锁成功后加一个定时任务,每隔一段时间去更新过期时间,直到业务完成释放锁才解除

流程图

主从一致性问题

假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个 slave 变成 master,而此时新的 master 中实际上并没有锁信息,此时锁信息就已经丢掉了。

为了解决这个问题,redission 提出来了 MutiLock 锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。

当我们去设置了多个锁时,redission 会将多个锁添加到一个集合中,然后用 while 循环去不停去尝试拿锁,但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有 3 个锁,那么时间就是 4500ms,假设在这 4500ms 内,所有的锁都加锁成功, 那么此时才算是加锁成功,如果在 4500ms 有线程加锁失败,则会再次去进行重试


秒杀优化

优化方案:我们将耗时比较长的逻辑判断放入到 redis 中(库存判断,一人一单),我们只需要进行快速的逻辑判断,不用等下单逻辑走完,我们直接给用户返回成功,后台去异步把成功下单的数据记录到mysql即可。

库存和一人一单的业务解决分别采用了string和set数据结构

6.2 秒杀优化 - Redis 完成秒杀资格判断

需求:

1.新增秒杀优惠券的同时,将优惠券信息保存到 Redis 中

stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());

2.基于 Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2.库存不足,返回1return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then-- 3.3.存在,说明是重复下单,返回2return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

3.如果抢购成功,将优惠券 id 和用户 id 封装后存入阻塞队列

4.开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

基于阻塞队列的异步秒杀存在哪些问题?

  • 内存限制问题(会损耗jvm内存)
  • 数据安全问题(不能持久化可能会丢失)

Redis 消息队列(基于 Stream 的消息队列)

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

# 往名为users的队列中发送一个内容为{name=jacks, age=21}的消息,并且使用Redis自动生成ID
127.0.0.1:6379> XADD users * name jacks age 34
"1663310029278-0"# 在名为users的队列中一次读取1条消息
# 0代表从第一条消息开始
# $代表从最新一条消息开始
127.0.0.1:6379> XREAD COUNT 1 STREAMS users 0
1) 1) "users"2) 1) 1) "1663310029278-0"2) 1) "name"2) "jacks"3) "age"4) "34"

注意:当我们指定起始 ID 为 $ 时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过 1 条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

基于 Stream 的消息队列 - 消费者组

特点

# 给队列steam:orders创建一个消费者组g1,从第一个消息开始消费(其实就是为了消费之前保留的消息,如果是从最新的消息消费,那以前的消息则不会被消费)
XGROUP CREATE steam:orders g1 0

优化步骤

秒杀业务:判断是否有购买资格,有则操作redis保存的数据,不等订单创建直接返回结果给客户

private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);
}@Override
public Result seckillVoucher(Long voucherId) {// 获取当前登录用户idLong userId = UserHolder.getUser().getId();// 生成全局唯一id给订单实体类long orderId = redisIdWorker.nextId("order");// 1.执行lua脚本,/*进行库存判断,库存不足返回1再进行一人一单判断,如果set集合已经存在用户id,表明不能重复购买,返回2有购买资格则扣减库存,往set集合添加用户id标记用户购买过该券发送订单消息给队列stream.orders,给之后异步生成订单*/Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(), // 没有key,给一个空数组voucherId.toString(), userId.toString(), String.valueOf(orderId));int r = result.intValue();// 2.判断结果是否为0if (r != 0) {// 2.1.不为0 ,代表没有购买资格return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}// 3.返回订单idreturn Result.ok(orderId);
}

lua脚本

-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2.库存不足,返回1return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then-- 3.3.存在,说明是重复下单,返回2return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

创建线程池,项目启动时,开启一个线程任务,尝试获取 stream.orders 中的消息,完成下单

private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstruct
private void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >// 如果消费者c1不存在,往g1组加入c1消费者List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),// >:从下一个未消费的消息开始,确保每一个消息都能被消费掉StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 2.判断订单信息是否为空if (list == null || list.isEmpty()) {// 如果为null,说明没有消息,继续下一次循环continue;}// 解析数据MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACKstringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());} catch (Exception e) {log.error("处理订单异常", e);handlePendingList();}}}// 处理异常消息private void handlePendingList() {while (true) {try {// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),// 0:从第一个开始,如果确认消费了会移除出pending-list,从第一个开始也是为了确保异常消息都能被消费StreamOffset.create("stream.orders", ReadOffset.from("0")));// 2.判断订单信息是否为空if (list == null || list.isEmpty()) {// 如果为null,说明没有异常消息,结束循环break;}// 解析数据MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACKstringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());} catch (Exception e) {// 出异常了不用递归调用了,因为有外循环log.error("处理订单异常", e);}}}
}

优惠劵秒杀优化-分布式锁相关推荐

  1. Redis 实现优惠券秒杀、分布式锁、消费队列

    文章目录 一.全局唯一ID 1.1 为啥需要全局唯一ID 1.2 全局ID生成器 1.3 全局唯一ID生成策略 二.实现优惠卷秒杀下单 2.1 优惠卷表结构 2.2 秒杀功能实现 三.超卖问题 四.实 ...

  2. Redis 基础 - 优惠券秒杀《分布式锁(初级)》

    参考 Redis基础 - 基本类型及常用命令 Redis基础 - Java客户端 Redis 基础 - 短信验证码登录 Redis 基础 - 用Redis查询商户信息 Redis 基础 - 优惠券秒杀 ...

  3. 视频教程- 19年录制Redis实战教程 高可用秒杀分布式锁布隆过滤器实战 SpringBoot教程整合-Java

    19年录制Redis实战教程 高可用秒杀分布式锁布隆过滤器实战 SpringBoot教程整合 7年的开发架构经验,曾就职于国内一线互联网公司,开发工程师,现在是某创业公司技术负责人, 擅长语言有nod ...

  4. 每秒上千订单场景下的分布式锁高并发优化实践!

    本文授权转自石杉的架构笔记 背景引入 首先,我们一起来看看这个问题的背景? 前段时间有个朋友在外面面试,然后有一天找我聊说:有一个国内不错的电商公司,面试官给他出了一个场景题: 假如下单时,用分布式锁 ...

  5. Java架构-每秒上千订单场景下的分布式锁高并发优化实践!

    "上一篇文章我们聊了聊Redisson这个开源框架对Redis分布式锁的实现原理,如果有不了解的兄弟可以看一下:<拜托,面试请不要再问我Redis分布式锁实现原理>. 今天就给大 ...

  6. 用分布式锁来防止库存超卖,但是是每秒上千订单的高并发场景,如何对分布式锁进行高并发优化来应对这个场景?

    用分布式锁来防止库存超卖,但是是每秒上千订单的高并发场景,如何对分布式锁进行高并发优化来应对这个场景? 转载 codeing_doc 最后发布于2018-11-23 09:44:41 阅读数 1073 ...

  7. 22-09-20 西安 谷粒商城(04)Redisson做分布式锁、布隆过滤器、AOP赋能、自定义注解做缓存管理、秒杀测试

    Redisson 1.Redisson做分布式锁  分布式锁主流的实现方案: 基于数据库实现分布式锁 基于缓存(Redis),性能最高 基于Zookeeper,可靠性最高 Redisson是一个在Re ...

  8. 【分布式】红包秒杀系统、高并发安全分布式锁

    分布式 内容管理 业务Intro 业务模块划分 数据库表设计 开发流程 红包金额随机生成算法 ---- Monte Carlo 方法 发红包模块 @EnableAsync 多线程异步 抢红包模块 并发 ...

  9. redisson的锁的类型_再有人问你分布式锁是什么,就把这个丢给他!

    [小宅按]现在面试都会聊聊分布式系统,通常面试官都会从服务框架(Spring Cloud.Dubbo),一路聊到分布式事务.分布式锁.ZooKeeper 等知识.今天就来聊聊分布式锁这块的知识,先具体 ...

最新文章

  1. 深度学习时间序列预测:GRU算法构建多变量时间序列预测模型+代码实战
  2. 2021年春季学期-信号与系统-第十二次作业参考答案-第六小题
  3. 【AOP 面向切面编程】Android Studio 中配置 AspectJ ( 下载并配置AS中 jar 包 | 配置 Gradle 和 Gradle 插件版本 | 配置 Gradle 构建脚本 )
  4. futuretask java 并发请求_图文并茂理解 Java 多线程
  5. R学习_multitaper包解析2:子函数spec.mtm.dpss,dpssHelper
  6. 大厂面试都爱问这4个问题,.NET开发必看!
  7. Maven 核心原理
  8. Java并发编程:4种线程池和缓冲队列BlockingQueue
  9. 数据建模的的参考工具EZDML
  10. Serial Interface之I2C:关于DS1624 2线通信SDA保持时间的说明
  11. 第二次结对编程作业——毕业导师智能匹配
  12. 算法设计与分析基础 第八章谜题
  13. 学习总结:Handler机制
  14. 使用 processon 画 UML 图
  15. 转置矩阵,矩阵的行列式,伴随矩阵,逆矩阵的概念及C#求解
  16. 编译原理常用简称或英文原称(思维导图形式)
  17. 虚拟机安装panabit详细图解
  18. html5学习计划,关于学习计划模板汇编5篇
  19. 骑士游历问题【JAVA板】代码详细流
  20. 微信小程序|使用小程序制作一个时间管理小工具

热门文章

  1. 就绪函数的定义_准备就绪的定义被认为是有害的
  2. cookie实现单点登录
  3. Redis——狂聊教程笔记
  4. linux 杂记 怎么解决 cuda 10.1 跑 cuda8下的代码 error
  5. Maven环境搭建及配置
  6. Python利用PyQt5制作一个获取网络实时数据NBA数据播报GUI
  7. cannot use message (variable of type protoreflect.ProtoMessage) as type protoiface.MessageV1 in argu
  8. 细说字体 Sans Serif 与 Serif
  9. 【Android实现返回主页,禁止返回上一层等功能】
  10. 分享2020年线上支付接口产品讲解