0、分布式锁的常见面试题

  • Redis除了拿来做缓存,你还见过基于Redis的什么用法?
  • Redis做分布式锁的时候有需要注意的问题?
  • 如果是Redis是单点部署的,会带来什么问题? 那你准备怎么解决单点问题呢?
  • 集群模式下,比如主从模式,有没有什么问题呢?
  • 那你简单的介绍一下Redlock吧? 你简历上写redisson,你谈谈
  • Redis分布式锁如何续期?看门狗知道吗?

为回答如上关于分布式锁的问题,我搭建了一个超卖服务,模拟电商平台业务中出现的商品超卖现象,分析其中可能会遇到的各种问题,并予以解决方案

1、单机版的实现

1.1、工程环境搭建

  • 新建 Module 或者 Maven 子工程
    模块名为distributeLock_redis01
  • 编写 pom.xml 管理工程依赖
 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.3.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><modelVersion>4.0.0</modelVersion><groupId>com.xyl</groupId><artifactId>distributeLock_redis01</artifactId><version>0.0.1-SNAPSHOT</version><name>distributeLock_redis01</name><description>distributeLock_redis01</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-actuator --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><!-- https://mvnrepository.com/artifact/redis.clients/jedis --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.1.0</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-aop --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.redisson/redisson --><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
  • 编写 application.yml 配置文件(或者 application.properties 配置文件)
server.port=5100spring.redis.database=0
spring.redis.host=192.168.40.132
spring.redis.port=6379
#连接池最大连接数(使用负值表示没有限制)默认8
spring.redis.lettuce.pool.max-active=8
#连接池最大阻塞等待时间(使用负值表示没有限制)默认-1
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接默认8
spring.redis.lettuce.pool.max-idle=8
#连接池中的最小空闲连接默认0
spring.redis.lettuce.pool.min-idle=0
  • 编写主启动类
  • 编写配置类
@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory connectionFactory){// 新建 RedisTemplate 对象,key 为 String 对象,value 为 Serializable(可序列化的)对象RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();// key 值使用字符串序列化器redisTemplate.setKeySerializer(new StringRedisSerializer());// value 值使用 json 序列化器,不建议使用FastJson的序列化器redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());// 传入连接工厂redisTemplate.setConnectionFactory(connectionFactory);// 返回 redisTemplate 对象return redisTemplate;}}
  • 编写业务类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/goods")
public class GoodsController {@Autowiredprivate StringRedisTemplate redisTemplate;@Value("${server.port}")  // application.properties配置文件中的server.port属性,方便知道请求访问的是哪一个服务private String serverPort;@GetMapping("/buy")public String buyGoods(){String count = redisTemplate.opsForValue().get("goods:001");int goodsNum = count == null ? null : Integer.parseInt(count);String ansStr;if(goodsNum > 0){int realNum = --goodsNum;redisTemplate.opsForValue().set("goods:001",String.valueOf(realNum));ansStr = "购买成功,商品库存还有的数量:" + realNum + ",\t 服务提供端口:"  + serverPort;}else {ansStr = "商品已售完/活动结束/库存不够/请求超时,欢迎下次光临!" + ",\t 服务提供端口:"  + serverPort;System.out.println(ansStr);}System.out.println(ansStr);return ansStr;}
}
  • 功能测试
    在Linux系统中启动Redis服务,启动当前搭建的服务,发送请求localhost:5100/goods/buy,可以正常地访问,请求发送一次,页面显示的库存就减1,形象地模拟了商品的售卖情况,
    Redis服务连接失败,可以移步 2.2、Redis连接的问题解决
  • 同如上的方式搭建第二个模块的服务,distributeLock_redis02,与distributeLock_redis01唯一不同的是端口号(port为5200),测试方式同上

1.2、单机版存在没加锁的问题

问题:单机版程序没有加锁,在并发测试下数字不对,会出现超卖现象
解决:加锁,那么问题又来了,加 synchronized 锁还是 ReentrantLock 锁呢?

synchronized:不见不散,等不到锁就会死等
ReentrantLock:过时不候,lock.tryLock() 提供一个过时时间的参数,时间一到自动放弃锁
如何选择:根据业务需求来选,如果非要抢到锁不可,就使用 synchronized 锁;如果可以暂时放弃锁,等一下会再来争抢锁,就使用 ReentrantLock 锁。

2.0 版本的代码:使用 synchronized 锁保证单机版程序在并发下的安全性

Ctrl + Alt + T ,用 synchronized或者lock将方法体中的代码包裹起来

  • synchronized加锁实现:
 @GetMapping("/buy")public String buyGoods(){synchronized (this) {   // 让用户线程一直等待,直到成功获取到锁String count = redisTemplate.opsForValue().get("goods:001");int goodsNum = count == null ? null : Integer.parseInt(count);String ansStr;if(goodsNum > 0){int realNum = --goodsNum;redisTemplate.opsForValue().set("goods:001",String.valueOf(realNum));ansStr = "购买成功,商品库存还有的数量:" + realNum + ",\t 服务提供端口:"  + serverPort;}else {ansStr = "商品已售完/活动结束/库存不够/请求超时,欢迎下次光临!" + ",\t 服务提供端口:"  + serverPort;System.out.println(ansStr);}System.out.println(ansStr);return ansStr;}
  • lock代码实现:
 @GetMapping("/buy")public String buyGoods() {Lock lock = new ReentrantLock();try {// 尝试获取锁,等待3秒钟,如果获取不到,就放弃等待if (lock.tryLock(3L, TimeUnit.SECONDS)) {lock.lock();String count = redisTemplate.opsForValue().get("goods:001");int goodsNum = count == null ? null : Integer.parseInt(count);String ansStr;if (goodsNum > 0) {int realNum = --goodsNum;redisTemplate.opsForValue().set("goods:001", String.valueOf(realNum));ansStr = "购买成功,商品库存还有的数量:" + realNum + ",\t 服务提供端口:" + serverPort;} else {ansStr = "商品已售完/活动结束/库存不够/请求超时,欢迎下次光临!" + ",\t 服务提供端口:" + serverPort;}System.out.println("lock --> "+ansStr);return "lock --> "+ansStr;}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}return "系统繁忙,请稍后再访问!"}

总结

  • 在单机环境下,可以使用 synchronized 锁或 Lock 锁来实现。
  • 但是在分布式系统中,因为竞争的线程可能不在同一个节点上,所以需要一个让所有进程都能访问到的锁来实现,比如 redis 或者 zookeeper 来构建;
  • 不同进程 jvm 层面的锁就不管用了,那么可以利用第三方的一个组件,来获取锁,未获取到锁,则阻塞当前想要运行的线程

1.3、分布式部署之后,单机版的锁失效

问题:

在分布式系统部署之后,单机版的锁将会失效,会导致超卖现象(即一件商品被售卖多次的情况),这时就需要需要分布式锁。
如下,在我们的两个微服务之上,挡了一个 nginx 服务器,用于实现负载均衡的功能

(图片来源于OnebyWang的Blog)

2、分布式锁的实现

2.1、Nginx实现负载均衡

1)安装Nginx

  • 安装 nginx之前,需要配置好依赖环境:
yum install gcc-c++
yum install -y pcre pcre-devel
yum install -y zlib zlib-devel
yum install -y openssl openssl-devel
## 使用 wget 命令下载 nginx 安装包,确保系统已经安装了wget,如果没有安装,执行 yum install wget 安装
wget -c https://nginx.org/download/nginx-1.12.0.tar.gz

如上操作后,将nginx的安装包下载到本机,需要在上传到Linux系统中

  • 使用 tar -zxvf 指令解压下载好的安装包,并进入解压后的目录:
tar -zxvf nginx-1.12.0.tar.gz
cd nginx-1.12.0
  • 配置 nginx

在 nginx-1.12.0 版本中你就不需要去配置相关东西,默认就可以了。当然,如果你要自己配置目录也是可以的。我这里采用默认配置,在 nginx 安装包目录下执行如下指令:

./configure

执行完 ./configure 命令之后会生成 Makefile 文件,我们编译安装程序就需要它

  • 编译 + 安装
make
make install
  • 查看 nginx 安装目录
whereis nginx
  • 启动Nginx
    首先进入 nginx 安装目录下的可执行文件存放的目录,再启动Nginx服务
cd /usr/local/nginx/sbin/
./nginx                # 启动 nginx 服务器
./nginx -s reload      # 重启nginx服务

如果Linux是图形化界面,可访问 http://localhost/(nginx 默认是 80 端口)

也可以通过命令来查看服务是否启动成功:

ps -ef | grep nginx

2)Nginx配置负载均衡

进入 /usr/local/nginx/conf/,再修改该文件中包含的nginx 配置文件:nginx.conf

cd /usr/local/nginx/conf/
vim nginx.conf

  • nginx.conf添加配置如下(192.168.1.4是我Windows本机IP,5100、5200分别对应搭建的两个服务,两个服务的权重为1,则轮询访问):
 upstream mynginx{server 192.168.1.4:5100 weight=1;server 192.168.1.4:5200 weight=1;}server {listen       80;server_name  localhost;#charset koi8-r;#access_log  logs/host.access.log  main;location / {proxy_pass http://mynginx;index  index.html index.htm;}}
  • 编辑好nginx.conf后,重新启动nginx服务:
## 重新启动nginx服务
./nginx -s reload
## 启动nginx服务
./nginx

再启动搭建前面的两个服务,Windows本机访问,连续发送多个请求:
http://192.168.40.132/goods/buy,成功用nginx实现服务间的负载均衡(轮询访问)

2.2、Redis连接的问题解决

可以先检查防火墙是否关闭,如果没有关闭,则关闭防火墙,命令如下:

## 检查防火墙是否关闭
systemctl status firewalld
## 关闭防火墙,但重启虚拟机,防火墙还是处于开启状态
systemctl stopfirewalld
## 永久关闭防火墙,重启后,虚拟机不会开启防火墙
systemctl disable firewalld

如果还是无法在本机连接虚拟机中的Redis服务,则可以修改redis解压后对应目录下的redis.conf配置文件:

vim redis-6.2.1/redis.conf

protected-mode字段设置为 no,将 bind字段的配置注释掉,将 stop-writes-on-bgsave-error 字段的值设置为 no,然后重启 redis 服务,如此,我在本机连接上了虚拟机中的Redis服务,项目测试成功!


2.3、JMeter压测


在【Test Plan】上右击,选择【Add】–>【Threads】–>【Thread Group】,添加线程组:

设置如下四个参数:

  • Name:线程组的名称
  • Number of Threads(users):打出去的线程数量
  • Ramp-up period(seconds):在多长时间内需要将这些线程打出去
  • Loop Count:循环次数,选择 Infinite 表示无限重复执行

    在线程组之上右击,选择【Add】–>【Sampler】–>【HTTP Request】,添加 HTTP 请求

    设置如下三个参数:
  • Server Name or IP:服务器名称或者 IP 地址
  • Port Number:访问的端口号
  • Path:访问的路径

    save保存HTTP Request 后,再进行压测(绿色按钮):

    如此,可以IDEA控制台可以看到相同的商品被售卖两次,出现超卖现象。
    下面使用分布式锁来进行解决商品超卖的问题

2.4、使用redis分布式锁

  • Redis具有极高的性能,且其命令对分布式锁支持友好,借助 SET 命令即可实现加锁处理

The SET command supports a set of options that modify its behavior:
EX seconds – Set the specified expire time, in seconds.
PX milliseconds – Set the specified expire time, in milliseconds.
EXAT timestamp-seconds – Set the specified Unix time at which the key will expire, in seconds.
PXAT timestamp-milliseconds – Set the specified Unix time at which the key will expire, in milliseconds.
NX – Only set the key if it does not already exist.
XX – Only set the key if it already exist.
KEEPTTL – Retain the time to live associated with the key.
GET – Return the old value stored at key, or nil when key did not exist.

  • 使用当前请求的 UUID + 线程名作为分布式锁的 value,执行 setIfAbsent(REDIS_LOCK_KEY, value) 方法尝试抢占锁,如果抢占失败,则返回值为 false;如果抢占成功,则返回值为 true,最后使用delete(REDIS_LOCK_KEY) 方法释放分布式锁
  public static final String REDIS_LOCK = "redisLock";@Autowiredprivate StringRedisTemplate redisTemplate;@Value("${server.port}")  // application.properties配置文件中的server.port属性,方便知道请求访问的是哪一个服务private String serverPort;@GetMapping("/buy")public String buyGoods() {// 生成用户的唯一标识,来抢占锁String userId = UUID.randomUUID().toString() + Thread.currentThread().getName();// 加锁操作:  加锁失败,flag为false,加锁成功,flag为trueBoolean flag = redisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, userId);  // 相当于 setnx命令if(!flag){return "抢锁失败!!";}String count = redisTemplate.opsForValue().get("goods:001");  // 查看商品的库存数量int goodsNum = count == null ? null : Integer.parseInt(count);String ansStr;if (goodsNum > 0) {int realNum = --goodsNum;redisTemplate.opsForValue().set("goods:001", String.valueOf(realNum));ansStr = "购买成功,商品库存还有的数量:" + realNum + ",\t 服务提供端口:" + serverPort;} else {ansStr = "商品已售完/活动结束/库存不够/请求超时,欢迎下次光临!" + ",\t 服务提供端口:" + serverPort;}System.out.println(ansStr);redisTemplate.delete(REDIS_LOCK);  // 释放分布式锁return ansStr;}

2.5、finally确保分布式锁的释放

  • 如果代码在执行的过程中出现异常,那么就可能无法释放锁,因此必须要在代码层面加上 finally 代码块,保证锁的释放
  public static final String REDIS_LOCK = "redisLock";@Autowiredprivate StringRedisTemplate redisTemplate;@Value("${server.port}")  // application.properties配置文件中的server.port属性,方便知道请求访问的是哪一个服务private String serverPort;// 1)确保加锁后,出现异常也能保证锁的释放,释放锁的操作 放在finally语句中// 2)如果服务宕机,则可以设置锁的过期时间,即使没有执行到finally语句来释放锁,也可以保证锁的释放@GetMapping("/buy")public String buyGoods() {// 生成用户的唯一标识,来抢占锁String userId = UUID.randomUUID().toString() + Thread.currentThread().getName();try {// 加锁操作:  加锁失败,flag为false,加锁成功,flag为true//  相当于 setnx命令,如果不存在就新建锁Boolean flag = redisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, userId);// 如果出现某些故障(如微服务宕机等),导致没有执行finally中释放锁的语句,redis会根据设置的过期时间自动删除锁/释放锁redisTemplate.expire(REDIS_LOCK,10L,TimeUnit.SECONDS);if(!flag){return "抢锁失败!!";}String count = redisTemplate.opsForValue().get("goods:001");  // 查看商品的库存数量int goodsNum = count == null ? null : Integer.parseInt(count);String ansStr;if (goodsNum > 0) {int realNum = --goodsNum;redisTemplate.opsForValue().set("goods:001", String.valueOf(realNum));ansStr = "购买成功,商品库存还有的数量:" + realNum + ",\t 服务提供端口:" + serverPort;} else {ansStr = "商品已售完/活动结束/库存不够/请求超时,欢迎下次光临!" + ",\t 服务提供端口:" + serverPort;}System.out.println(ansStr);return ansStr;} finally {redisTemplate.delete(REDIS_LOCK);  // 释放分布式锁}}

2.6、服务宕机,锁无法释放(需要保证原子性)

问题描述:

  • 1)加锁操作后,如果出现某些故障(如微服务宕机等),导致没有执行finally中释放锁的语句,导致锁无法释放。
  • 2)假设仅仅只是执行了加锁操作后,服务就宕机了,而加锁与设置过期时间的操作分开了(两者不是原子操作,也一样会导致无法正确释放锁

解决方案:

  • 1)redisTemplate.expire(REDIS_LOCK,10L,TimeUnit.SECONDS),设置锁的过期时间,如果用户线程没有正常释放锁,redis自动到点释放锁

  • 2)执行 redisTemplate.expire(REDIS_LOCK_KEY, 10L, TimeUnit.SECONDS); 方法为分布式锁设置过期时间,使加锁和设置过期时间是原子操作,从而保证锁的释放

Boolean flag = redisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, order);
//  如果出现某些故障(如微服务宕机等),导致没有执行finally中释放锁的语句,redis会根据设置的过期时间自动删除锁/释放锁
redisTemplate.expire(REDIS_LOCK,10L,TimeUnit.SECONDS);

将上面的两行代码变成下面的原子语句:

 redisTemplate.expire(REDIS_LOCK,10L,TimeUnit.SECONDS);

2.7、如何保证当前线程不删除其他线程的锁

存在的问题:

  • 张冠李戴,删除了别人的锁:我们无法保证一个业务的执行时间,有可能是 10s,有可能是 20s,也有可能更长。因为执行业务的时候可能会调用其他服务,我们并不能保证其他服务的调用时间。如果设置的锁过期了,当前业务还正在执行,那么就有可能出现超卖问题,并且还有可能出现当前业务执行完成后,释放了其他业务的锁.

  • 如下图,假设进程 A 在 T2 时刻设置了一把过期时间为 30s 的锁,在 T5 时刻该锁过期被释放,在 T5 和 T6 期间,Test 这把锁已经失效了,并不能保证进程 A 业务的原子性了。于是进程 B 在 T6 时刻能够获取 Test 这把锁,但是进程 A 在 T7 时刻删除了进程 B 加的锁,进程 B 在 T8 时刻删除锁的时候就懵逼了,我 TM 锁呢?

解决方案:

  • 在前面代码的基础上,修改finally语句中释放锁的代码,即释放锁之前,需要判断当前的锁是否为自己的锁,如果是,释放锁(redisTemplate.delete(REDIS_LOCK)
finally {// redisTemplate.delete(REDIS_LOCK);  // 释放分布式锁,可能会出现张冠李戴的bug// 每个线程只能删除自己的锁,假设线程A获取到锁,程序执行到一半阻塞了(还没有阻塞),但等到 过了锁的有效时间后,锁会自动释放// 此时,线程B获取到锁,如果线程A又继续运行了,执行完后,会错误地删除掉线程B的锁// 解决: 释放锁之前,先判断当前的锁是不是自己持有的锁if (redisTemplate.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(order)){redisTemplate.delete(REDIS_LOCK);}}

2.8、解锁需要保证原子性

问题描述:

  • 在 finally 代码块中的判断与删除并不是原子操作,假设执行 if 判断的时候,这把锁还是属于当前业务,但是有可能刚执行完 if 判断,这把锁就被其他业务给释放了,还是会出现误删锁的情况
try {// ...}finally {// 判断加锁与解锁是不是同一个客户端if (redisTemplate.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(order)){//  若在此时,这把锁突然不是这个客户端的,则会误解锁redisTemplate.delete(REDIS_LOCK);}}

解决:

  • 使用 redis 自身事务保证原子性操作
  • 使用 lua 脚本保证原子性操作(推荐)

1)使用 redis 自身事务保证原子性

1)redis 事务介绍

  • Redis的事务是通过MULTl,EXEC,DISCARD和WATCH这四个命令来完成。
  • Redis的单个命令都是原子性的,所以这里确保事务性的对象是命令集合
  • Redis将命令集合序列化确保处于同一事务的命令集合连续且不被打断地执行
  • Redis不支持回滚的操作

2)相关命令

  • MULTI
    用于标记事务块的开始。
    Redis会将后续的命令逐个放入队列中,然后使用EXEC命令原子化地执行这个命令序列。
    语法:MULTI

  • EXEC
    在一个事务中执行所有先前放入队列的命令,然后恢复正常的连接状态。
    语法:EXEC

  • DISCARD
    清除所有先前在一个事务中放入队列的命令,然后恢复正常的连接状态。
    语法:DISCARD

  • WATCH
    当某个事务需要按条件执行时,就要使用这个命令将给定的键设置为受监控的状态。
    语法:WATCH key[key……]注:该命令可以实现redis的乐观锁,即希望没有其他线程去修改数据,但如果有其他线程去提交了修改操作,事务的提交就会失败。

  • UNWATCH
    清除所有先前为一个事务监控的键。
    语法:UNWATCH

  • 3)事务的使用案例


    客户端thA执行完set k3 333后,还没有执行EXEC来提交事务,此时,开启客户端thB来修改k1的值,thA端在开启事务前,WATCH k3来监控k3,如果k3在事务提交前,有别的线程去修改k3,就会导致thA中的事务执行失败,即事务中的所有命令不会执行,如果没有别的线程去修改,thA的事务就执行成功

  • 开启客户端thB,并且修改k3

  • 再去提交thA中的事务,则事务不会执行成功(k1、k2、k3还是一开始加入的值)

4)代码优化

  • 开启事务不断监视(watch) REDIS_LOCK 这把锁有没有被别人动过,如果已经被别人动过了,那么继续重新执行删除操作(重新尝试去释放锁),否则就解除监视(unwatch)
finally {// redisTemplate.delete(REDIS_LOCK);  // 释放分布式锁// 每个线程只能删除自己的锁,假设线程A获取到锁,程序执行到一半阻塞了(还没有阻塞),但等到 过了锁的有效时间后,锁会自动释放// 此时,线程B获取到锁,如果线程A又继续运行了,执行完后,会错误地删除掉线程B的锁// 解决: 释放锁之前,先判断当前的锁是不是自己持有的锁
//            if (redisTemplate.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(order)){//                redisTemplate.delete(REDIS_LOCK);
//            }// 如上释放锁的操作,并不能保证 判断锁的拥有者和释放锁 是原子操作,现用Redis的事务来保证原子性while(true){redisTemplate.watch(REDIS_LOCK);  // 开启哨兵监控if (redisTemplate.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(order)){redisTemplate.setEnableTransactionSupport(true); // 开启 事务支持的配置redisTemplate.multi();   // 开启事务redisTemplate.delete(REDIS_LOCK); // 释放当前线程的锁List<Object> exec = redisTemplate.exec(); // 事务可以是多个命令的集合,命令集合放在队列中,最后exec统一执行if (exec == null){continue;}}redisTemplate.unwatch();break;}}

2)使用 lua 脚本保证原子性(推荐)

lua脚本详情见 官网redis命令手册

  • redis 可以通过 eval 命令保证代码执行的原子性

    代码完善:
  • RedisUtils 工具类
    getJedis() 方法用于从 jedisPool 中获取一个连接块对象
public class RedisUtils {private static JedisPool jedisPool;private static String hostAddr = "192.168.40.132";  // 提供服务的IPstatic {JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(10);jedisPool = new JedisPool(jedisPoolConfig, hostAddr, 6379);}public static Jedis getJedis() throws Exception {if (null != jedisPool) {return jedisPool.getResource();}throw new Exception("Jedispool is not ok");}}
  • 修改finally语句中释放锁的代码
    使用lua脚本来保证判断锁的所属者和释放锁是原子操作
  @GetMapping("/buy")public String buyGoods() throws Exception {// 生成用户的唯一标识,来抢占锁String order = UUID.randomUUID().toString() + Thread.currentThread().getName();try {// 加锁操作:  加锁失败,flag为false,加锁成功,flag为true//  相当于 setnx命令,如果不存在就新建锁// 加锁和设置锁的过期时间这两个操作需要是原子操作Boolean flag = redisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, order, 10L, TimeUnit.SECONDS);if (!flag) {return "抢锁失败!!";}String count = redisTemplate.opsForValue().get("goods:001");  // 查看商品的库存数量int goodsNum = count == null ? null : Integer.parseInt(count);String ansStr;if (goodsNum > 0) {int realNum = --goodsNum;redisTemplate.opsForValue().set("goods:001", String.valueOf(realNum));ansStr = "购买成功,商品库存还有的数量:" + realNum + ",\t 服务提供端口:" + serverPort;} else {ansStr = "商品已售完/活动结束/库存不够/请求超时,欢迎下次光临!" + ",\t 服务提供端口:" + serverPort;}System.out.println(ansStr);return ansStr;} finally {// redisTemplate.delete(REDIS_LOCK);  // 释放分布式锁// 每个线程只能删除自己的锁,假设线程A获取到锁,程序执行到一半阻塞了(还没有阻塞),但等到 过了锁的有效时间后,锁会自动释放// 此时,线程B获取到锁,如果线程A又继续运行了,执行完后,会错误地删除掉线程B的锁// 解决: 释放锁之前,先判断当前的锁是不是自己持有的锁
//            if (redisTemplate.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(order)){//                redisTemplate.delete(REDIS_LOCK);
//            }// 如上释放锁的操作,并不能保证 判断锁的拥有者和释放锁 是原子操作,如下用lua脚本来保证原子性// 获取连接对象Jedis jedis = RedisUtils.getJedis();// lua 脚本,摘自官网String script = "if redis.call('get', KEYS[1]) == ARGV[1]" + "then "+ "return redis.call('del', KEYS[1])" + "else " + "  return 0 " + "end";try {// 执行 lua 脚本 并返回执行结果//  Collections.singletonList(REDIS_LOCK): 将变量 变成只有一个元素(单例)的list集合Object evalAns = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(order));if ("1".equals(evalAns.toString())){System.out.println("delete REDIS_LOCK successfully !");}else {System.out.println("delete REDIS_LOCK error!");}}finally {if (jedis != null){jedis.close();}}}}

经过如上的完善和优化,已经完成单机版的分布式锁,当前的超卖程序基本可以去应对并发不是太大的情景,但是还需要继续优化。

2.9、Redis分布式锁如何续期,如何保证数据一致性

  • 无法判断该给锁设置多长的过期时间,加锁操作后,可能需要去调用诸多业务,其他业务如果出现故障,就会阻塞当前服务直到锁过期,会出现RedisLock过期时间大于业务执行时间的情况,或者,举例来说,一个锁设置了1分钟超时释放,如果拿到这个锁的线程在一分钟内没有执行完毕,那么这个锁就会被其他线程拿到,可能会导致严重的线上问题,所以需要使用一种给锁延期的机制(如果没有问题,就正常地释放锁,类似于FutrueTask,主业务流程向下执行,后台启用线程来保证主业务正常执行)

1)看门狗机制:

  • Redisson 锁的加锁机制如上图所示,线程去获取锁,获取成功则执行lua脚本,保存数据到redis数据库。如果获取失败: 一直通过while循环尝试获取锁(可自定义等待时间,超时后返回失败),获取成功后,执行lua脚本,保存数据到redis数据库。

  • Redisson提供的分布式锁是支持锁自动续期的,也就是说,如果线程仍旧没有执行完,那么redisson会自动给redis中的目标key延长超时时间,这在Redisson中称之为 Watch Dog 机制,即看门狗机制,同时 redisson 还有公平锁、读写锁的实现。

  • 如果拿到分布式锁的节点宕机,且这个锁正好处于锁住的状态时,会出现锁死的状态,为了避免这种情况的发生,锁都会设置一个过期时间。这样也存在一个问题,加入一个线程拿到了锁 且设置了30s超时,但30s后这个线程还没有执行完毕,锁超时释放了,就会导致问题,Redisson给出了自己的答案,就是 watch dog 自动延期机制

  • Redisson提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期,也就是说,如果一个拿到锁的线程一直没有完成逻辑,那么看门狗会帮助线程不断的延长锁超时时间,锁不会因为超时而被释放。
    默认情况下,看门狗的续期时间是30s,也可以通过修改Config.lockWatchdogTimeout来另行指定。另外Redisson 还提供了可以指定leaseTime参数的加锁方法来指定加锁的时间。超过这个时间后锁便自动解开了,不会延长锁的有效期。

2)redis 与 zookeeper 在 CAP 方面的对比:

  • Redis(AP)
    Redis 异步复制造成的锁丢失, 比如:主节点没来的及把刚刚 set 进来这条数据给从节点,就挂了,那么主节点和从节点的数据就不一致。此时如果集群模式下,就需要使用Redisson 来解决

  • Zookeeper (CP)
    Zookeeper 保持强一致性原则,对于集群中所有节点来说,要么同时更新成功,要么失败,即只有当主节点中的数据全部同步到所有从节点后再向客户端返回信息,即使主节点宕机,由于选举机制,从节点转变为主节点后,可以照样保证数据一致性地提供服务,因此使用 zookeeper 集群并不存在主从节点数据丢失的问题,但丢失了速度方面的性能

3)集群环境下,Redisson实现分布式锁

使用 Redisson 实现自动续期功能并且保证数据一致性

代码实现(附上完整代码):

  • 注入 Redisson 对象
    在 RedisConfig 配置类中注入 Redisson 对象
import org.redisson.Redisson;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.io.Serializable;@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory connectionFactory){RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(connectionFactory);redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());return redisTemplate;}@Value("${spring.redis.host}")private String redisHost;@Value("${spring.redis.port}")private String redisPort;@Beanpublic Redisson redisson() {Config config = new Config();config.useSingleServer().setAddress("redis://" + redisHost + ":"+redisPort).setDatabase(0);return (Redisson) Redisson.create(config);}}
  • 业务类
import com.xyl.redis01.config.RedisConfig;
import com.xyl.redis01.utils.RedisUtils;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import redis.clients.jedis.Jedis;import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;@RestController
@RequestMapping("/goods")
public class GoodsController {public static final String REDIS_LOCK = "redisLock";@Autowiredprivate StringRedisTemplate redisTemplate;@Value("${server.port}")  private String serverPort;@Autowiredprivate Redisson redisson;// 在集群环境下,给已经过期的锁续期,并且保证数据一致性,可以使用Redisson// Redisson实现分布式锁@GetMapping("/buy")public String buyGoods() throws Exception {// 生成用户的唯一标识,来抢占锁String order = UUID.randomUUID().toString() + Thread.currentThread().getName();// 获取分布式锁RLock redissonLock = redisson.getLock(REDIS_LOCK);redissonLock.lock();  // 加锁try {// 从Redis中获取商品的库存数量String count = redisTemplate.opsForValue().get("goods:001");  // 查看商品的库存数量int goodsNum = count == null ? null : Integer.parseInt(count);String ansStr;if (goodsNum > 0) {int realNum = --goodsNum;redisTemplate.opsForValue().set("goods:001", String.valueOf(realNum));ansStr = "redissonLock --> 购买成功,商品库存还有的数量: " + realNum + ",\t 服务提供端口:" + serverPort;} else {ansStr = "redissonLock -->商品已售完/活动结束/库存不够/请求超时,欢迎下次光临!" + ",\t 服务提供端口:" + serverPort;}System.out.println(ansStr);return ansStr;} finally {// redissonLock.unlock();   // 释放锁// 按照如上方式直接释放锁,在超高并发情况下,还是会有万分之一的概率有问题,为保证程序的健壮性,完善如下:if (redissonLock.isLocked()   // 确定是否还处在锁定状态&& redissonLock.isHeldByCurrentThread())  //    // 判断锁 是否被当前线程所持有{redissonLock.unlock();   // 释放锁}}}}
  • 注意:不能直接使用redissonLock.unlock();来释放锁,因为在超高并场景下,会出现illegalMonitorStateException,为保证程序的健壮性,需要进行锁是否还处于锁定状态,锁是否被当前线程所持有,异常信息如下图:

  • 两个服务的所有代码实现相同(只有服务的端口号不同)

  • 测试
    启动前面完善的两个服务和Linux端的服务(包括Linux系统的redis服务、nginx服务),手动发送请求:
    http://192.168.40.132/goods/buy,或者使用JMeter压测均没有问题,控制台输出如图所示,两个服务以轮询的方式提供服务,没有出现商品超卖的情况,库存减为0后,就不能再进行售卖,一切正常!

3、总结

  • 1)在单机环境下,使用synchronized或lock就可保证线程安全,如果是分布式微服务,单机锁就会出现许多问题
  • 2)使用Redis实现分布式锁(setnx --> setIfAbsent方法,设置过期时间),
  • 3)加锁后,业务逻辑执行完,一定要在finally代码块中释放锁,以免出现异常导致锁无法释放的情况
  • 4)如果服务宕机,无法执行finally中释放锁的操作,导致REDIS_LOCK无法删除,所以需要设置锁的过期时间,且加锁和设置过期时间必须是原子操作 --> redisTemplate.opsForValue().setIfAbsent(REDIS_LOCK, order, 10L, TimeUnit.SECONDS)
  • 5)释放锁之前,需要判断锁的拥有者,用户线程只能删除自己的锁,可使用 lua 脚本或者Redis事务实现
  • 6)在集群环境下,如果Redis主机出现宕机,但还没有异步复制到Redis从机,经过选举机制,从机转变为主机后,继续提供服务会导致数据不一致,则可以使用Redisson来解决商品的超卖现象,但要注意:需要确定是否还处在锁定状态和锁是否还被当前线程所持有,即添加判断
    redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()

4、参考资料

1)尚硅谷周阳讲解Redis分布式锁
2)Redis使用总结
3)redisson中的看门狗机制总结
4)Redis官网
5)Redisson官方文档

从青铜到王者,带你完成Redis分布式锁的实现和优化相关推荐

  1. redis 分布式锁 看门狗_带你研究Redis分布式锁,源码走起

    前言 前阵子我们讲了分布式锁的实现方式之一:zookeeper,那么这次我们来讲讲同样流行,甚至更胜一筹的Redis. 除了这两种其实还有数据库实现分布式锁啊,但是这种方式是非主流,所以咱这里就不讲了 ...

  2. 这才叫细:带你深入理解Redis分布式锁

    什么是分布式锁 说到Redis,我们第一想到的功能就是可以缓存数据,除此之外,Redis因为单进程.性能高的特点,它还经常被用于做分布式锁. 锁我们都知道,在程序中的作用就是同步工具,保证共享资源在同 ...

  3. 电商项目实战之缓存与Redis分布式锁

    电商项目实战之缓存与Redis分布式锁 缓存失效 缓存穿透 缓存雪崩 缓存击穿 分布式缓存 分布式锁 SpringBoot整合Redisson实现分布式锁 实现过程 缓存和数据库一致性 场景分析 解决 ...

  4. Redis 分布式锁没这么简单,网上大多数都有 bug

    Redis 分布式锁这个话题似乎烂大街了,不管你是面试还是工作,随处可见,「码哥」为啥还写? 因为看过很多文章没有将分布式锁的各种问题讲明白,所以准备写一篇,也当做自己的学习总结. 在进入正文之前,我 ...

  5. 深度剖析:Redis分布式锁到底安全吗?看完这篇文章彻底懂了!

    ‍‍‍‍‍‍‍‍‍‍‍‍阅读本文大约需要 20 分钟. 大家好,我是 Kaito. 这篇文章我想和你聊一聊,关于 Redis 分布式锁的「安全性」问题. Redis 分布式锁的话题,很多文章已经写烂了 ...

  6. **Java有哪些悲观锁的实现_80% 人不知道的 Redis 分布式锁的正确实现方式(Java 版)...

    点击上方"小哈学Java",选择"星标" 回复"资源",领取全网最火的Java核心知识总结 来源:http://sina.lt/gfZU 前 ...

  7. Redis分布式锁【正确实现方式】

    前言 分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于Redis的分布式锁:3. 基于ZooKeeper的分布式锁.本篇博客将介绍第二种方式,基于Redis实现分布式锁.虽然网上已经有各种介 ...

  8. Redis分布式锁(Redlock官方文档的理解)

    Redis分布式锁(Redlock官方文档的理解) 我github博客原文 官网解释 分布式锁在许多不同进程下需要对共享资源进行互斥操作的环境下,十分需要 Redis作者提出了 Redlock 算法 ...

  9. Redis分布式锁的正确实现方式(Java版)

    转自:https://wudashan.cn/2017/10/23/Redis-Distributed-Lock-Implement/ 前言 分布式锁一般有三种实现方式:1. 数据库乐观锁:2. 基于 ...

最新文章

  1. Go 学习笔记(40)— Go 标准库之 log
  2. 记录rewrite url我之前不知道的地方
  3. 常用的上网 发帖技巧
  4. 线程的切入和切出(切入: 一个线程被系统选中占用处理器开始或继续运行)
  5. 1.6 多项式回归-机器学习笔记-斯坦福吴恩达教授
  6. 为什么阿里工程师纷纷在内网晒代码?
  7. tar/gzip/zip文件打包、压缩命令
  8. SQLite轻量级数据库,操作数据常用语句
  9. OCR完整技术栈10天掌握!教程完全开源,更有产学研大佬们联合授课!
  10. 复杂场景下的多目标跟踪 --心得
  11. pyspider 安装及问题处理(pyspider一直卡在result_worker starting的解决办法)
  12. Express の 文件下载
  13. win7蓝牙驱动的使用方法
  14. 通过云计算机管理档案,云计算下档案信息管理研究
  15. 【工具使用】GPU的各项参数说明
  16. linux搭建帝国CMS网站,帝国CMS CentOS7 服务器搭建
  17. 数组,异质结构以及指针的详解
  18. Ubuntu下wps英文界面切换成中文界面的方法
  19. 调试spi转can芯片MCP2518和can芯片MCP2542FD
  20. Github十大深度学习项目

热门文章

  1. java数字转换字母_java-将数字转换为字母的程序
  2. 体验服服务器更新维护,《武林外传》体验服12月27日更新维护公告
  3. 始终差半截,严监管给微粒贷带来反扑蚂蚁借呗契机?
  4. 《探索着传说》(The Sword of Truth改编) 偶喜欢的奇幻剧(小说) 简介
  5. 小米性能服务器设置介绍,全新的小型家用服务器 小米路由器产品特点综述
  6. 计算机专业学习的核心是什么?
  7. C语言 fabs()函数的作用
  8. Windows下搭建Mysql集群
  9. nginx实现双向认证
  10. 联通出4G iPhone 5S(A1528)用不了