文章目录

  • 一、全局唯一ID
    • 1.1 为啥需要全局唯一ID
    • 1.2 全局ID生成器
    • 1.3 全局唯一ID生成策略
  • 二、实现优惠卷秒杀下单
    • 2.1 优惠卷表结构
    • 2.2 秒杀功能实现
  • 三、超卖问题
  • 四、实现优惠券一人一单
  • 五、集群下的线程并发安全问题(一人一单)
  • 六、分布式锁
    • 6.1 分布式锁具备哪些特性:
    • 6.2 分布式锁误删问题
    • 6.3 分布式锁的原子性问题
    • 6.4 Redis的Lua脚本
    • 6.5 基于Redis的分布式锁实现思路:
  • 七、Redisson
    • 7.1 Redisson上手使用
    • 7.2 Redisson可重入锁原理
    • 7.3 Redisson的锁充实和WatchDog机制
    • 7.4 Redisson分布式锁主从一致性问题
      • 7.4.1 不可重入的Redis分布式锁:
      • 7.4.2 可重入的Redis分布式锁:
      • 7.4.3 Redisson的multiLock:
  • 八、秒杀优化-异步秒杀思路
  • 九、Redis消息队列实现异步秒杀
    • 9.1 基于List结构实现消息队列
    • 9.2 基于PubSub实现消息队列
    • 9.3 基于Stream的消息队列
      • 9.3.1 单消费者模式
      • 9.3.2 消费者组模式
    • 9.3.3 消费者监听消息的基本思路
    • 9.3.4 Redis消息队列对比
    • 9.3.5 代码实现Stream结构作为消息队列,实现异步秒杀下单
      • 9.3.5.1 创建消费者组与消息队列
      • 9.3.5.2 修改秒杀下单Lua脚本。修改代码下单逻辑
      • 9.3.5.2 开启一个线程,消费队列中的消息,完成下单

一、全局唯一ID

1.1 为啥需要全局唯一ID

系统唯一id是我们在设计阶段常常遇到的问题。在复杂的分布式系统中,几乎都需要对大量的数据和消息进行唯一标识。在设计初期,我们需要考虑日后数据量的级别,如果可能会对数据进行分库分表,那么就需要有一个全局唯一id来标识一条数据或记录。

1.2 全局ID生成器

全局ID生成器,是一种在分布式系统下用来生成全局ID的工具,一般要满足下列特性:

  • 唯一性
  • 高可用
  • 高性能
  • 递增性
  • 安全性

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tILHZT7J-1677636574276)(https://note.youdao.com/yws/api/personal/file/77C7E49B4493489682CD25E42B677FBF?method=download&shareKey=cf310bbd4dd3b523de98338bb4275831)]

ID的组成部分:

  • 符号位:1bit,永远为0
  • 时间戳:31bit,以秒为单位,可以使用69年
  • 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

序列号生成方法:

@Component
public class RedisIdWorker {/*** 开始时间戳*/private static final long BEGIN_TIMESTAMP = 1640995200L;/*** 序列号的位数*/private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}/*** 生成全局唯一id* @param keyPrefix 缓存前缀* @return long 唯一id*/public long nextId(String keyPrefix) {// 1.生成时间戳LocalDateTime now = LocalDateTime.now();// 当前秒数long nowSecond = now.toEpochSecond(ZoneOffset.UTC);// 当前时间-开始时间戳long timestamp = nowSecond - BEGIN_TIMESTAMP;// 2.生成序列号// 2.1.获取当前日期,精确到天String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));// 2.2.自增长,记录对应key+日期 记录生成多少个唯一id 可用于统计long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);// 3.拼接并返回  时间戳向左移32位 拼接自增长数值return timestamp << COUNT_BITS | count;}public static void main(String[] args) {// 2022年1月1日0时0分0秒LocalDateTime dateTime = LocalDateTime.of(2022, 1, 1, 0, 0,0);long second = dateTime.toEpochSecond(ZoneOffset.UTC);System.out.println(second);}
}

1.3 全局唯一ID生成策略

  • UUID
  • Redis自增
  • snowflake算法(雪花算法)
  • 数据库自增

Redis自增ID策略:

  • 每天一个key,方便统计订单量
  • ID构造是时间戳+计数器

二、实现优惠卷秒杀下单

2.1 优惠卷表结构

CREATE TABLE `tb_voucher` (`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`shop_id` bigint unsigned DEFAULT NULL COMMENT '商铺id',`title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '代金券标题',`sub_title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '副标题',`rules` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '使用规则',`pay_value` bigint unsigned NOT NULL COMMENT '支付金额,单位是分。例如200代表2元',`actual_value` bigint NOT NULL COMMENT '抵扣金额,单位是分。例如200代表2元',`type` tinyint unsigned NOT NULL DEFAULT '0' COMMENT '0,普通券;1,秒杀券',`status` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '1,上架; 2,下架; 3,过期',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=COMPACT COMMENT='优惠券表';CREATE TABLE `tb_seckill_voucher` (`voucher_id` bigint unsigned NOT NULL COMMENT '关联的优惠券的id',`stock` int NOT NULL COMMENT '库存',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`begin_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '生效时间',`end_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '失效时间',`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`voucher_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=COMPACT COMMENT='秒杀优惠券表,与优惠券是一对一关系';CREATE TABLE `tb_voucher_order` (`id` bigint NOT NULL COMMENT '主键',`user_id` bigint unsigned NOT NULL COMMENT '下单的用户id',`voucher_id` bigint unsigned NOT NULL COMMENT '购买的代金券id',`pay_type` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '支付方式 1:余额支付;2:支付宝;3:微信',`status` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '订单状态,1:未支付;2:已支付;3:已核销;4:已取消;5:退款中;6:已退款',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下单时间',`pay_time` timestamp NULL DEFAULT NULL COMMENT '支付时间',`use_time` timestamp NULL DEFAULT NULL COMMENT '核销时间',`refund_time` timestamp NULL DEFAULT NULL COMMENT '退款时间',`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=COMPACT COMMENT='优惠券的订单表';

2.2 秒杀功能实现

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Q5cg5RuH-1677636574280)(https://note.youdao.com/yws/api/personal/file/8017510A8FFF4BA98B9B43BB3B4C8ECA?method=download&shareKey=c2b2b761b856ca0fa389efa059ee9fd0)]

普通正常流程优惠卷抢购

/*** 优惠卷秒杀功能* @param voucherId 优惠卷id* @return Result*/@Override@Transactionalpublic Result seckillVoucher(Long voucherId) {// 查询优惠卷SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);// 判断秒杀是否开始if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}// 判断秒杀是否已经结束if (seckillVoucher.getBeginTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀已经结束!");}// 判断库存是否充足if (seckillVoucher.getStock() < 1) {return Result.fail("库存不足!");}// 扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();if (!success) {return Result.fail("库存不足!");}// 创建订单VoucherOrder voucherOrder = new VoucherOrder();// 订单id,生成唯一idlong orderId = redisIdWorker.nextId("order");// 用户idLong userId = UserHolder.getUser().getId();// 代金券idvoucherOrder.setId(orderId).setUserId(userId).setVoucherId(voucherId);// 保存订单save(voucherOrder);// 返回订单idreturn Result.ok(orderId);}

三、超卖问题

在高并发的情况下,会有多个线程去同时执行优惠卷抢购操作,线程执行顺序差异,可能在库存未扣减时,查询库存充足,同时进行扣减操作,导致线程安全问题。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ByHWEumv-1677636574284)(https://note.youdao.com/yws/api/personal/file/23EE9E382B304444B7F189AC0C90E1F3?method=download&shareKey=2fdff8f59ecc5428d07dcbb624048553)]

常见解决方案:

  • 悲观锁

认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。

例如:Synchronized、Lock都属于悲观锁

// ReentrantLock(可重入锁)
Lock lock = new ReentrantLock();// 两种方式 lock()方法  tryLock()方法// 获取锁lock.lock();try {System.out.println(thread.getName()+"得到了锁");for(int i=0;i<5;i++) {arrayList.add(i);}} catch (Exception e) {// TODO: handle exception}finally {System.out.println(thread.getName()+"释放了锁");// 释放锁lock.unlock();}// 尝试获取锁,成功则执行逻辑if(lock.tryLock()) {try {System.out.println(thread.getName()+"得到了锁");for(int i=0;i<5;i++) {arrayList.add(i);}} catch (Exception e) {// TODO: handle exception}finally {System.out.println(thread.getName()+"释放了锁");// 释放锁lock.unlock();}} else {System.out.println(thread.getName()+"获取锁失败");// 执行其他业务逻辑,或者直接返回失败}
  • 乐观锁

认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其他线程对数据做了修改。

1、如果没有修改则认为是安全的,自己才更新数据。

2、如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。

乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:

  • 版本号法

扣减库存的同时,修改增加版本号+1,同时判断版本号是否与原本查询所得一致,一致则修改成功。

id stock version
1 1 1

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WKbrfk9x-1677636574286)(https://note.youdao.com/yws/api/personal/file/F9EF9686FBA5486299A0284B8A617F14?method=download&shareKey=533a1481977c553aabe66e7ce950979a)]

  • CAS法

CAS全称CompareAndSet或者CompareAndSwap,其中【比较-交换】的操作是原子的,每次修改前,比较原值是否已经发生了变化,变化则不修改,反之修改。但是会存在ABA问题,可以使用版本号法或者使用原子类AtomicMarkableReference和AtomicStampedReference来解决

id stock
1 1

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bItW78Ol-1677636574287)(https://note.youdao.com/yws/api/personal/file/3A92B879843F4205AFD930A0438870AE?method=download&shareKey=51131248a225a62b90c0c38dc74d150c)]

采用CAS法来解决超卖问题

// 修改内容,sql条件增加只有当库存>0时才扣减库存
// 扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();

四、实现优惠券一人一单

利用悲观锁实现一人一单逻辑

  • synchronized
 /*** 悲观锁 采用Synchronized** @param voucherId 优惠券id* @return Result*/@Transactionalpublic Result createOrderSync(Long voucherId) {Long userId = UserHolder.getUser().getId();// 一人一单// 查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 判断是否存在if (count > 0) {// 用户已经购买过该优惠券return Result.fail("用户已经购买一次了!");}// 扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {return Result.fail("库存不足!");}// 创建订单VoucherOrder voucherOrder = new VoucherOrder();// 订单id,生成唯一idLong orderId = redisIdWorker.nextId("order");// 代金券idvoucherOrder.setId(orderId).setUserId(userId).setVoucherId(voucherId);// 保存订单save(voucherOrder);return Result.ok(orderId);}/*** 优惠卷秒杀功能** @param voucherId 优惠卷id* @return Result*/@Overridepublic Result seckillVoucher(Long voucherId) {// 查询优惠卷SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);// 判断秒杀是否开始if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}// 判断秒杀是否已经结束if (seckillVoucher.getBeginTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀已经结束!");}// 判断库存是否充足if (seckillVoucher.getStock() < 1) {return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {// 获取代理对象IVoucherOrderService1 proxy = (IVoucherOrderService1) AopContext.currentProxy();return proxy.createOrderSync(voucherId);}}// 在当前类中用this调用当前类的其他事务方法,事务会失效,可以获取当前类的代理对象后再调用当前类的事务方法
// 使用代理对象需要添加依赖<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency>// 启动类增加注解,暴露代理对象
@EnableAspectJAutoProxy(exposeProxy = true)
  • Lock锁
 /*** 悲观锁 采用Lock锁** @param voucherId 优惠券id* @return Result*/private Result createOrderLock(Long voucherId) {Lock lock = new ReentrantLock();long orderId = 0;// 尝试获取锁if (lock.tryLock()) {try {Long userId = UserHolder.getUser().getId();// 一人一单// 查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 判断是否存在if (count > 0) {// 用户已经购买过该优惠券return Result.fail("用户已经购买一次了!");}// 扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {return Result.fail("库存不足!");}// 创建订单VoucherOrder voucherOrder = new VoucherOrder();// 订单id,生成唯一idorderId = redisIdWorker.nextId("order");// 代金券idvoucherOrder.setId(orderId).setUserId(userId).setVoucherId(voucherId);// 保存订单save(voucherOrder);} catch (Exception e) {throw new RuntimeException(e);} finally {// 释放锁lock.unlock();}} else {return Result.fail("不允许重复下单!");}// 返回订单idreturn Result.ok(orderId);}

五、集群下的线程并发安全问题(一人一单)

集群部署情况下,每一个项目都有自己的JVM,在JVM内部维护了一个锁监视器,锁监视器保证了同一时刻只有一个线程获取到锁。

但是并发请求的情况下面,多个请求通过负载均衡到不同服务上,每个JVM上都可以获取各自的锁,就会导致两个请求在不同服务上出现同时扣减库存成功的情况,造成并发安全问题。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-k1QeP3zv-1677636574289)(https://note.youdao.com/yws/api/personal/file/E2B7171CDE2E4115B81EF91D928509A9?method=download&shareKey=46c52658350e55bfd4887442de3faaf9)]

六、分布式锁

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PhFCYFnn-1677636574291)(https://note.youdao.com/yws/api/personal/file/618CBFDBD0C14532B9B0CBBD017B91DF?method=download&shareKey=1be0df5b6cd9a06d29166010aa2a7d2c)]

6.1 分布式锁具备哪些特性:

  • 互斥:在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
  • 高可用:高可用的获取锁与释放锁
  • 高性能:高性能的获取锁与释放锁
  • 可重入性:具备可重入特性(可理解为重新进入,由多于一个任务并发使用,而不必担心数据错误)
  • 安全性:具备锁失效机制,即自动解锁,防止死锁
  • 非阻塞性:具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:

\ MySQL Redis Zookeeper
互斥 利用mysql本身的互斥锁机制 利用setnx这样的互斥命令 利用节点的唯一性和有序性实现互斥
高可用
高性能 一般 一般
安全性 断开连接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开连接自动释放

实现分布所时需要实现两个基本方法:

  • 获取锁:

    • 互斥:确保只能有一个线程获取锁
    • 非阻塞:尝试一次,成功返回true,失败返回false
// 添加锁,利用setnx的互斥特性
setnx lock thread1
// ex 设置超时时间 秒为单位 有效期10秒 NX:互斥
SET key value [NX|XX] [GET] [EX seconds|PX milliseconds|EXAT unix-time-seconds|PXAT unix-time-milliseconds|KEEPTTL]set lock thread1 EX 10 NX// 添加锁过期时间,避免服务宕机引起的死锁
EXPIRE lock 5
  • 释放锁:

    • 手动释放
    • 超时释放:获取锁时添加一个超时时间
// 释放锁,删除即可
del lock

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sJHVknH7-1677636574292)(https://note.youdao.com/yws/api/personal/file/91E49C05F410497D883B307707D1A356?method=download&shareKey=f120065cfe3b7b01e70108b8544b30af)]

代码实现:

// Redis分布式锁接口
public interface ILock1 {/*** 尝试获取锁* @param timeoutSec 锁持有的超时时间,过期后自动释放* @return true代表获取锁成功; false代表获取锁失败*/boolean tryLock(long timeoutSec);/*** 释放锁*/void unlock();
}// Redis分布式锁实现类
public class SimpleRedisLock1 implements ILock1 {private StringRedisTemplate stringRedisTemplate;// 分布式锁名称private String name;public SimpleRedisLock1(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.name = name;}// 锁固定前缀private static final String KEY_PREFIX = "lock:";/*** 获取锁* @param timeoutSec 锁持有的超时时间,过期后自动释放* @return boolean 是否上锁成功*/@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标识long threadId = Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return BooleanUtil.isTrue(success);}/*** 释放锁*/@Overridepublic void unlock() {stringRedisTemplate.delete(KEY_PREFIX + name);}
}// 优惠券下单调用部分代码// 创建锁对象SimpleRedisLock1 redisLock = new SimpleRedisLock1(stringRedisTemplate,"order:" + userId);// 获取锁boolean isLock = redisLock.tryLock(1200);// 判断是否获取锁成功if (!isLock) {return  Result.fail("不允许重复下单");}try {// 获取代理对象IVoucherOrderService1 proxy = (IVoucherOrderService1) AopContext.currentProxy();return proxy.createOrderSync(voucherId);} finally {redisLock.unlock();}

6.2 分布式锁误删问题

问题描述:

1、线程1获取锁,因为其他原因导致业务阻塞,造成锁超时自动释放。
2、线程2进入获取到锁,开始执行业务,线程1业务阻塞完成,执行释放锁操作,这时就会导致误删掉线程2获取的锁。
3、线程3进入获取到锁,开始执行业务,就出现了线程安全问题。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ME0DKj9l-1677636574293)(https://note.youdao.com/yws/api/personal/file/4D31DF531BC843C796873CA5B6AE376E?method=download&shareKey=4dab1a2994f63df3ff2d1e97504dd48b)]

解决思路:获取锁时,存入线程标识,业务执行完成之后,判断锁中线程标识是否为当前线程标识,判断一致则释放锁,不一致则不释放锁。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-unKMaxsO-1677636574294)(https://note.youdao.com/yws/api/personal/file/EE8E413911B94DD78C140764CEF2EE02?method=download&shareKey=5ebdf4626d17322204b02587bde7bcfc)]

代码实现:

    // UUID 锁 前缀private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";/*** 获取锁* @param timeoutSec 锁持有的超时时间,过期后自动释放* @return boolean 是否上锁成功*/@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标识 UUID + 线程idString threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return BooleanUtil.isTrue(success);}/*** 释放锁*/@Overridepublic void unlock() {// 当前线程idString threadId = ID_PREFIX + Thread.currentThread().getId();// 从缓存里获取当前线程idString id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);if (threadId.equals(id)) {stringRedisTemplate.delete(KEY_PREFIX + name);}}

6.3 分布式锁的原子性问题

问题描述:

1、线程1获取锁,执行业务完成后,开始释放锁操作,出现了阻塞操作,比如jvm FullGC操作。释放锁阻塞,导致超时释放锁。

2、线程1的锁已经释放。这时线程2获取锁,开始执行业务,这时线程1阻塞完成,执行释放锁操作,释放了线程2的锁,造成了误删。

3、线程2的锁被误删,线程3进入获取锁,开始执行业务,就还是出现了线程安全问题。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KFqdjO31-1677636574295)(https://note.youdao.com/yws/api/personal/file/ADBE659027EB4CDDA00A4D7E217A2E46?method=download&shareKey=324bc0d8af835017fbccb4d556332cb7)]

6.4 Redis的Lua脚本

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua参考网站:https://www.runoob.com/lua/lua-tutorial.html

Redis 提供了两种方式调用Lua脚本:

  • redis.call()

call()方法是遇到就停止执行后面的内容并直接返回错误

  • redis.pcall()

pcall遇到异常会忽略掉继续执行

// 执行命令 set name cy 脚本就是如下:
redis.call('set','name','cy')// 先执行 set name cy,再执行get name,脚本如下:
redis.call('set','name','cy')
// 再执行get name
local name = redis.call('get','name')
// 返回
return nameEVAL script numkeys [key [key ...]] [arg [arg ...]]summary: Execute a Lua script server sidesince: 2.6.0group: scripting// 控制台调用脚本
EVAL "redis.call('set','name','cy')" 0  // 0代表脚本需要的key类型的参数个数

如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其他参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数

// 调用脚本
EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name cy// redis下标默认从1开始  1 代表参数个数 name放入到KEYS[1]中,cy放入到ARGV[1]中

用脚本实现释放锁的业务流程

1、在resource目录下,创建unlock.lua脚本

-- 比较线程标示与锁中的标示是否一致
if(redis.call('get', KEYS[1]) ==  ARGV[1]) then-- 释放锁 del keyreturn redis.call('del', KEYS[1])
end
return 0

2、修改释放锁代码逻辑

// Spring提供了RedisScript接口,方便开发者调用Lua脚本。private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();// 获取脚本文件UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));// 设置返回值UNLOCK_SCRIPT.setResultType(Long.class);}/*** 释放锁*/public void unlock() {// 调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());}

6.5 基于Redis的分布式锁实现思路:

  • 利用set nx ex获取锁,并且设置过期时间,保存线程标识
  • 释放锁时先判断线程标识是否与自己一致,一致则删除锁

特性:

  • 利用set nx满足互斥性
  • 利用set nx保证故障时锁依然能释放,避免死锁,提高安全性
  • 利用Redis集群保证高可用和高并发特性

七、Redisson

Redisson官方文档

Redisson是一个Redis的基础上实现的Java驻内存数据网络(In-Memory Data Frid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kp1cCb3a-1677636574297)(https://note.youdao.com/yws/api/personal/file/43CC4A08D9794A2D803A18CE9E4EDB02?method=download&shareKey=c3fe38bb66e51537b4940c2605b2f702)]

基于setnx实现的分布式锁存在下面的问题:

  • 不可重入:同一个线程无法多次获取同一把锁
  • 不可重试:获取锁只尝试一次就返回false,没有重试机制
  • 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患。
  • 主从一致性:如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从未同步主中锁数据,则会出现锁失效

7.1 Redisson上手使用

  1. 引入依赖
        <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.19.3</version></dependency>

2.配置Redisson客户端:

@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){// 配置类Config config = new Config();// 添加redis地址,setAddress添加单节点地址,也可以使用 config.useClusterServers().setNodeAddresses() 添加集群地址config.useSingleServer().setAddress("redis://111.22.268.127:6379").setPassword("123456");// 创建客户端return Redisson.create(config);}
}
  1. 使用Redisson的分布式锁
    // 引入Redisson客户端@ResourceRedissonClient redissonClient;// 创建锁RLock lock = redisson.getLock("lockKey");// 最常见的使用方法lock.lock();// 加锁以后10秒钟自动解锁// 无需调用unlock方法手动解锁lock.lock(10, TimeUnit.SECONDS);// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);if (res) {try {...} finally {lock.unlock();}}

7.2 Redisson可重入锁原理

在原有判断是否是同一线程标识的基础上,增加可重入逻辑,同一线程执行业务获取锁,增加锁计数器+1,执行完毕锁计数器-1,直到锁计数器为0,则释放锁。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jZdRtXjq-1677636574297)(https://note.youdao.com/yws/api/personal/file/151A609E1613476AB4B383CC9575CDD6?method=download&shareKey=db981c6d893000a385adc3dcd333f007)]

// RedissonLock 源码中的获取锁 lua脚本if ((redis.call('exists', KEYS[1]) == 0) or
(redis.call('hexists', KEYS[1], ARGV[2]) == 1)) thenredis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;return redis.call('pttl', KEYS[1]);
// RedissonLock 源码中的释放锁 lua脚本
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;

7.3 Redisson的锁充实和WatchDog机制

获取锁流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JMoGQjIj-1677636574299)(https://note.youdao.com/yws/api/personal/file/F4A4F4540E6C4F75B8B7A49C6C7892F2?method=download&shareKey=c94727231fff7307ca6d699e1f1bb0cc)]

释放锁流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-emyv6VJk-1677636574299)(https://note.youdao.com/yws/api/personal/file/F5BD911795624469A97F1674DF191645?method=download&shareKey=ae1e4f5978e0f76545193c5f99f9a4e0)]

Redisson分布式锁原理:

  • 可重入:利用hash结构记录线程id和重入次数
  • 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
  • 超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置超时时间

7.4 Redisson分布式锁主从一致性问题

7.4.1 不可重入的Redis分布式锁:
  • 原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标识
  • 缺陷:不可重入、无法重试、锁超时失效
7.4.2 可重入的Redis分布式锁:
  • 原理:利用hash结构,记录线程标识和冲入次数;利用watchDog机制延续锁时间;利用信号量控制锁重试等待
  • 缺陷:redis宕机引起锁失效问题
7.4.3 Redisson的multiLock:
  • 原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才能算获取锁成功
  • 缺陷:运维成本高、实现复杂

八、秒杀优化-异步秒杀思路

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kVadEqoJ-1677636574301)(https://note.youdao.com/yws/api/personal/file/D88FB04DBE954AE5B3E3FAB86888EFA9?method=download&shareKey=d78c86b7fbaeb596821b1a5cb9c3d2c2)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KQ3bOLkR-1677636574302)(https://note.youdao.com/yws/api/personal/file/594D73530875433AB01357C1F82B1697?method=download&shareKey=d2e089d6ca661c804e555c9c62c5f19b)]

具体流程:

  1. 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
  2. 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
  3. 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
  4. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

代码实现:

  1. 生成优惠券时,保存优惠券库存信息到redis中
    @Overridepublic void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);// 将优惠券库存信息存入redis中stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());}
  1. 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功,脚本存放在resource目录下

-- 参数列表
-- 1 优惠券id
local voucherId = ARGV[1]
-- 2 用户id
local userId = ARGV[2]-- 数据key
-- 1 库存key
local stockKey = 'seckill:stock' .. voucherId
-- 2 订单key
local orderKey = 'seckill:order' .. voucherId-- 脚本业务
-- 1 判断库存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 库存不足,返回1return 1
end
-- 2 判断用户是否下单
if (redis.call('SISMEMBER', orderKey, userId) == 1) then-- 存在,说明重复下单,返回2return 2
end-- 扣库存,缓存中-1
redis.call('incrby', stockKey, -1)
-- 下单,阻塞队列中添加用户id
redis.call('sadd', orderKey, userId)
return 0
  1. 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
  2. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
    // 阻塞队列private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);// 单个线程执行下单操作即可private static final ExecutorService ORDER_EXECUTOR = Executors.newSingleThreadExecutor();private IVoucherOrderService1 proxy;@PostConstructprivate void init() {ORDER_EXECUTOR.submit(new VoucherOrderHandler());}// 开启线程private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 获取队列中的订单信息VoucherOrder voucherOrder = orderTasks.take();handlerVoucher(voucherOrder);} catch (Exception e) {log.error("创建订单异常", e);}}}}// 生成订单逻辑private void handlerVoucher(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order" + userId);// 获取锁boolean isLock = lock.tryLock();// 判断是否获取锁成功if (!isLock) {log.error("不允许重复下单");return;}try {// 获取代理对象,这里是利用子线程去调用,获取不到当前代理对象,只能通过传参或者设置成员变量proxy.createOrderSync(voucherOrder);} finally {lock.unlock();}}@Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();// 执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());// 判断结果是否为0// 不为0,没有购买资格if (result != null && result != 0) {return Result.fail(result == 1 ? "库存不足" : "不允许重复下单");}// 为0,有购买资格,把下单信息保存到阻塞队列当中// 生成唯一订单idlong orderId = redisIdWorker.nextId("order");// 创建订单信息VoucherOrder voucherOrder = new VoucherOrder();// 代金券idvoucherOrder.setId(orderId).setUserId(userId).setVoucherId(voucherId);// 保存阻塞队列orderTasks.add(voucherOrder);// 异步线程获取队列中信息进行下单操作// 获取代理对象proxy = (IVoucherOrderService1) AopContext.currentProxy();// 返回订单idreturn Result.ok(orderId);}/*** 乐观锁 cas* @param voucherOrder 优惠券id* @return Result*/@Transactionalpublic void createOrderSync(VoucherOrder voucherOrder) {Long userId = UserHolder.getUser().getId();// 一人一单// 查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();// 判断是否存在if (count > 0) {// 用户已经购买过该优惠券log.error("用户已经购买一次了!");}// 扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0).update();if (!success) {log.error("库存不足");}// 保存订单save(voucherOrder);}

阻塞队列实现秒杀存在问题:

  • Jvm内存存在上限,当有大量的并发请求来同时创建订单的时候,就有可能超出Jvm阻塞队列上限。
  • 数据安全问题,Jvm内存是没有持久化机制的,如果当服务重启或者宕机,阻塞队列中所有订单任务都会丢失,或者当我们从阻塞队列中拿到一个任务时,这时候出现了异常,那么这个订单任务就没有机会处理,相当于丢失了。

九、Redis消息队列实现异步秒杀

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SL5WqQrh-1677636574303)(https://note.youdao.com/yws/api/personal/file/0F3D2ED2FA2F4D2992DDB952100D724B?method=download&shareKey=552e9ffe3c2bdb962f4a6a431cd18271)]

Redis提供了三种不同的方式来实现消息队列:

  • list结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

9.1 基于List结构实现消息队列

队列是入口和出口不在一边,因此可以使用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现。

不过当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者LRPOP来实现阻塞效果。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6Pwfbpy5-1677636574304)(https://note.youdao.com/yws/api/personal/file/70F5C7C268B743A2A4ABD57C2B2BD9C1?method=download&shareKey=94066f800af0c3a8b3b6a8b932494651)]

基于List的消息队列优缺点:

优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息时序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

9.2 基于PubSub实现消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者像对应的channel发送消息后,所有订阅者都能收到相关消息。

常见命令:

  • SUBSCRIBE channel [channel…]:订阅一个或多个频道
  • PUBLISH channel msg:像一个频道发送消息
  • PSUBSCRIBE pattern[pattern]:订阅与pattern格式匹配的所有频道

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vuZqo2mH-1677636574304)(https://note.youdao.com/yws/api/personal/file/9CDA035D2C124614A9454611F42CE435?method=download&shareKey=a936c26e20b8f9fe77a82614d90de89a)]

基于PubSub的消息队列有哪些优缺点:

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

9.3 基于Stream的消息队列

Stream是Redis5.0引入的一种新的数据类型,可以实现一个功能非常完善的消息队列。

9.3.1 单消费者模式

发送消息的命令:

[NOMKSTREAM]:如果队列不存在,是否自动创建队列,默认是自动创建,填了就是不创建
[MAXLEN|MINID [=|~] threshold [LIMIT count]]:设置消息队列的最大消息数量
*|id:消息的唯一id,*代表是Redis自己生成。格式是“时间戳-递增数字”,例如“1664804662707-0”
field value [field value ...]:发送到队列中的消息,称为Entry。格式就是多个key-value键值对XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]summary: Appends a new entry to a streamsince: 5.0.0group: stream## 创建名为users的队列,并向其中发送一个消息,内容是:{name=cy,age=25},并且使用redis自动生成IDXADD users * name cy age 25

读取消息的方式之一:XREAD

[COUNT count]:每次读取消息的最大数量
[BLOCK milliseconds]:当没有消息时,是否阻塞,阻塞时长
STREAMS key [key ...]:要从哪个队列读取消息,key就是队列名
id [id ...]:起始id,只返回大于该ID的消息
0:代表从第一个消息开始
$:代表从最新的消息开始XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]summary: Return never seen elements in multiple streams, with IDs greater than the ones reported by the caller for each stream. Can block.since: 5.0.0group: stream## 读取1条消息,队列名为users 从第一个消息开始读
XREAD count 1 Streams users 0结果:
1) 1) "users"2) 1) 1) "1677200622178-0"2) 1) "name"2) "cy"3) "age"4) "25"## 阻塞读取队列中的消息  block 0 代表一直阻塞,单位毫秒
XREAD count 1 block 0 Streams users $## 另一边发送消息
xadd users * jj GG## 得到消息
1) 1) "users"2) 1) 1) "1677201143631-0"2) 1) "jj"2) "GG"

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sY70SFcF-1677636574305)(https://note.youdao.com/yws/api/personal/file/28FB9CF710AB495A80B0AF9663917F77?method=download&shareKey=3f9f4135da37420e910beae6873b9167)]

Stream类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

9.3.2 消费者组模式

消费者组:将多个消费者划分到一个组中,监听同一个队列。具备以下特点:

  1. 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
  2. 消息标识:消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息。确保每一个消息都会被消费。
  3. 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除

创建消费者组:

XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read]summary: Create a consumer group.since: 5.0.0group: stream# 创建消费组 u1 队列为users
XGROUP CREATE users u1 0key:队列名称
groupName:消费者组名称
ID:起始ID标识,$代表队列中的最后一个消息,0代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列其他常见命令:
# 删除指定的消费者组
XGROUP DESTROY key groupname# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]summary: Return new entries from a stream using a consumer group, or access the history of the pending entries for a given consumer. Can block.since: 5.0.0group: stream group:消费组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时最长等待时间
NOACK:无需手动ACK,获取到消息后自动确认
STREAMS key:指定消息队列名称
ID:获取消息的起始ID">":从下一个未消费的消息开始其他:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

确认消息:

XACK key group id [id ...]summary: Marks a pending message as correctly processed, effectively removing it from the pending entries list of the consumer group. Return value of the command is the number of messages successfully acknowledged, that is, the IDs we were actually able to resolve in the PEL.since: 5.0.0group: stream# 确认对应的消息后,才会从pending-list中移除
XACK users u1 1677200622178-0 1677201143631-0

查看消费组指定队列未确认消息:

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]summary: Return information and entries from a stream consumer group pending entries list, that are messages fetched but never acknowledged.since: 5.0.0group: stream# 获取最小到最大时间范围10条未消费消息
XPENDING users u1 - + 10

9.3.3 消费者监听消息的基本思路

实现思路

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FaZmV7nW-1677636574307)(https://note.youdao.com/yws/api/personal/file/077BACA6DF0D41998FCC3B3D4819E4AB?method=download&shareKey=f778bf99c900cf16f161352b9a098fac)]

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

9.3.4 Redis消息队列对比

\ List PubSub Stream
消息持久化 支持 不支持 支持
阻塞读取 支持 支持 支持
消息堆积处理 受限于内存空间,可以利用多消费者加快处理 受限于消费者缓冲区 受限于队列长度,可以利用消费者组提高消费速度,减少堆积
消息确认机制 不支持 不支持 支持
消息回溯 不支持 不支持 支持

9.3.5 代码实现Stream结构作为消息队列,实现异步秒杀下单

实现步骤:

  1. 创建一个Stream类型的消息队列,名为stream.orders
  2. 修改之前的秒杀下单Lua脚本,在认定有抢够资格后,直接向stream.orders中添加消息,内容包含vocherId、userId、orderId
  3. 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

9.3.5.1 创建消费者组与消息队列

XGROUP CREATE stream.orders g1 0 MKSTREAM

9.3.5.2 修改秒杀下单Lua脚本。修改代码下单逻辑

# 增加一个参数
-- 3 订单id
local orderId = ARGV[3]# 结尾向消息队列中发送消息
-- 发送消息到消息队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)# 完整脚本
-- 参数列表
-- 1 优惠券id
local voucherId = ARGV[1]
-- 2 用户id
local userId = ARGV[2]
-- 3 订单id
local orderId = ARGV[3]-- 数据key
-- 1 库存key
local stockKey = 'seckill:stock' .. voucherId
-- 2 订单key
local orderKey = 'seckill:order' .. voucherId-- 脚本业务
-- 1 判断库存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 库存不足,返回1return 1
end
-- 2 判断用户是否下单
if (redis.call('SISMEMBER', orderKey, userId) == 1) then-- 存在,说明重复下单,返回2return 2
end-- 扣库存,缓存中-1
redis.call('incrby', stockKey, -1)
-- 下单,阻塞队列中添加用户id
redis.call('sadd', orderKey, userId)
-- 发送消息到消息队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

修改代码逻辑:

/*** 使用stream结构 消息队列实现* @param voucherId 优惠卷id* @return Result*/@Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();// 生成唯一订单idlong orderId = redisIdWorker.nextId("order");// 执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString(),String.valueOf(orderId));// 判断结果是否为0// 不为0,没有购买资格if (result != null && result != 0) {return Result.fail(result == 1 ? "库存不足" : "不允许重复下单");}// 获取代理对象proxy = (IVoucherOrderService1) AopContext.currentProxy();// 返回订单idreturn Result.ok(orderId);}

9.3.5.2 开启一个线程,消费队列中的消息,完成下单

@PostConstructprivate void init() {ORDER_EXECUTOR.submit(new VoucherOrderHandler());}// 开启线程private class VoucherOrderHandler implements Runnable {String queueName = "streams.orders";@Overridepublic void run() {while (true) {try {// 获取消息队列中的订单信息,XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >List<MapRecord<String, Object, Object>> mapRecords = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.latest()));// 判断消息是否获取成功if (null == mapRecords || mapRecords.isEmpty()) {// 如果获取失败,说明没有消息,则继续下一次循环continue;}// 解析消息中的订单信息MapRecord<String, Object, Object> mapRecord = mapRecords.get(0);// 获取其中订单信息,MapRecord<String, Object, Object>  Object, Object 就是对应的key value键值对Map<Object, Object> values = mapRecord.getValue();// 转成订单对象VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 获取成功,可以下单handlerVoucher(voucherOrder);// ACK确认, SACK stream.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", mapRecord.getId());} catch (Exception e) {log.error("处理订单异常", e);// 处理pending-list中的异常消息handPendingList();}}}/*** 处理pending-list中异常订单消息*/private void handPendingList() {while (true) {try {// 获取消息队列中的订单信息,XREADGROUP GROUP g1 c1 COUNT 1 STREAMS streams.order 0List<MapRecord<String, Object, Object>> mapRecords = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));// 判断消息是否获取成功if (null == mapRecords || mapRecords.isEmpty()) {// 如果获取失败,说明没有消息,则继续下一次循环break;}// 解析消息中的订单信息MapRecord<String, Object, Object> mapRecord = mapRecords.get(0);// 获取其中订单信息,MapRecord<String, Object, Object>  Object, Object 就是对应的key value键值对Map<Object, Object> values = mapRecord.getValue();// 转成订单对象VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 获取成功,可以下单handlerVoucher(voucherOrder);// ACK确认, SACK stream.order g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", mapRecord.getId());} catch (Exception e) {log.error("处理pending-list订单异常", e);// 如果担心循环太频繁,可以稍微休眠一下try {Thread.sleep(20);} catch (InterruptedException interruptedException) {interruptedException.printStackTrace();}}}}}

Redis 实现优惠券秒杀、分布式锁、消费队列相关推荐

  1. 好代码实践:基于Redis的轻量级分布式均衡消费队列

    简介: 好代码,给人第一个印象的感觉,就像一篇好文章一样,读起来朗朗上口.不同的文章有不同的风格体裁,不同的代码也有不同的编程风格要求.Python有严格的缩进,像诗歌一样工整对仗:C语言面向过程像散 ...

  2. Redis 基础 - 优惠券秒杀《分布式锁(初级)》

    参考 Redis基础 - 基本类型及常用命令 Redis基础 - Java客户端 Redis 基础 - 短信验证码登录 Redis 基础 - 用Redis查询商户信息 Redis 基础 - 优惠券秒杀 ...

  3. 视频教程- 19年录制Redis实战教程 高可用秒杀分布式锁布隆过滤器实战 SpringBoot教程整合-Java

    19年录制Redis实战教程 高可用秒杀分布式锁布隆过滤器实战 SpringBoot教程整合 7年的开发架构经验,曾就职于国内一线互联网公司,开发工程师,现在是某创业公司技术负责人, 擅长语言有nod ...

  4. Redis如何实现分布式锁延时队列以及限流应用丨Redis源码原理|跳表|B+树|分布式锁|中间件|主从同步|存储原理

    Redis如何实现分布式锁延时队列以及限流应用 视频讲解如下,点击观看: Redis如何实现分布式锁延时队列以及限流应用丨Redis源码原理|跳表|B+树|分布式锁|中间件|主从同步|存储原理|数据模 ...

  5. Redis 基础 - 优惠券秒杀《初步优化(异步秒杀)》

    Redis基础 - 基本类型及常用命令 Redis基础 - Java客户端 Redis 基础 - 短信验证码登录 Redis 基础 - 用Redis查询商户信息 Redis 基础 - 优惠券秒杀< ...

  6. 在 Redis 上实现的分布式锁

    由于近排很忙,忙各种事情,还有工作上的项目,已经超过一个月没写博客了,确实有点惭愧啊,没能每天或者至少每周坚持写一篇博客.这一个月里面接触到很多新知识,同时也遇到很多技术上的难点,在这我将对每一个有用 ...

  7. Redis应用学习——Redis事务与实现分布式锁

    2019独角兽企业重金招聘Python工程师标准>>> 1. Redis事务机制 1. 与MySQL等关系数据库相同,Redis中也有事务机制,Redis的事务实质上是命令的集合,但 ...

  8. 基于ZooKeeper的分布式锁和队列

    分布式锁的几种实现: 1.zookeeper分布式锁,基于自增节点 2.Redis分布式锁,基于setnx命令, 基于Redis实现分布式锁:http://blog.csdn.net/daiyudon ...

  9. Redis使用setnx实现分布式锁及其问题、优化

    最近在工作中用到了分布式锁,然后查了很多分布式锁的实现方式.比较熟悉redis或者说,redis的用法比较简单,所以查了一下redis使用setnx实现分布式锁的方式.其中有一篇文章搜索到的次数最多, ...

  10. Day137-139.尚品汇:制作SKU、商品详情、项目优化:Redis缓存、redssion分布式锁

    目录 Day5  制作SKU 1. 制作SKU 2. 多表查询如何写? 3. 制作SKU 4. Thymeleaf Day06 商品详情 1. 获取分类信息 2. 获取最新价格信息 3. 获取销售信息 ...

最新文章

  1. 转牛人博客 稀疏矩阵定义CSR COO CSC 第一个讲的清晰易懂的人
  2. 实现超长焦梦想的捷径——试用适马150-500毫米F5-6.3 OS镜头
  3. 那些美的让人流连忘返的风景照片
  4. ios开发Base64编码以及加密相关学习
  5. git stash命令的用法
  6. 2018蓝桥杯省赛---java---C---3(字母阵列)
  7. LeetCode 135. 分发糖果(DP)
  8. python依赖注入_如何做依赖注入python方式?
  9. 我们的内存中都放了什么
  10. matlab ifft频率分辨率,[FFT] matlab中关于FFT的使用(理解频率分辨率、补零问题)
  11. wooden sticks
  12. ThinkPHP项目笔记之RBAC(权限)补充篇
  13. 访问服务器显示我被拒绝,连接到服务器localhost:8080被拒绝(The connection to the server localhost:8080 was refused)...
  14. Android使用AIUI快速搭建智能助手
  15. 天耀18期 – 6.面向对象-类和对象【作业】.
  16. switch分支语句注意事项及注册界面的使用思路
  17. OpenCV4 Viz模块使用学习(一)
  18. JS 调试分析 + 字体解析(汽车之家)
  19. Google Alphabet
  20. 四年上册级计算机教学计划,2021年四年级信息技术教学计划集锦5篇

热门文章

  1. Buffer Status Reporting(BSR)
  2. 安装 python 虚拟环境 > pip install virtualenv -i https://pypi.tuna.tsinghua.edu.cn/simple/报错解决办法
  3. 骁龙相机动态设置选项卡
  4. toefl 听力 architecture
  5. Docker三剑客之docker-compose+wordpress的博客搭建
  6. 达梦数据库配置mmp流程手册
  7. android:inputType常用取值
  8. 旅日杂谈——与日本上司谈XP和CMM
  9. 4Fang Web打印组件使用心得
  10. 混合高斯模型及其求解方法