分布式锁

  • 1、分布式锁
  • 2、传统锁
    • 2.1、经典问题——卖票
    • 2.2、并发导致超卖现象
    • 2.3、JVM锁
    • 2.4、事务与JVM锁
    • 2.5、MySql锁
      • 2.5.1、一个SQL
      • 2.5.2、悲观锁
      • 2.5.3、乐观锁
    • 2.6、Redis锁
      • 2.6.1、解决方案1 - JVM本地锁
      • 2.6.2、解决方案2 - 乐观锁
  • 3、Redis分布式锁
    • 3.1、实现一个redis分布式锁
    • 3.2、递归优化
    • 3.3、防止死锁
    • 3.4、防误删
    • 3.5、Redis当中的LUA脚本
    • 3.6、可重入锁
      • 3.6.1、加锁
      • 3.6.2、解锁
      • 3.6.3、代码实现
    • 3.7、自动续期
    • 3.8、Redisson
      • 3.8.1、Redisson Hello World
      • 3.8.2、Redisson可重入锁
      • 3.8.3、公平锁
      • 3.8.4、联锁
      • 3.8.5、红锁
      • 3.8.6、读写锁
      • 3.8.7、信号量
      • 3.8.8、闭锁
  • 4、ZooKeeper分布式锁
    • 4.1、使用java Api操作ZooKeeper
    • 4.2 实现一个ZK分布式锁
      • 4.2.1、思路分析
      • 4.2.2、代码实现
  • 5、MySql分布式锁
    • 5.1、实现MySql分布式锁
    • 5.2、缺陷

1、分布式锁

在应用开发中,特别是web工程开发,通常都是并发编程,不是多进程就是多线程。这种场景下极易出现线程并发性安全问题,此时不得不使用锁来解决问题。在多线程高并发场景下,为了保证资源的线程安全问题,jdk为我们提供了synchronized关键字和ReentrantLock可重入锁,但是它们只能保证一个工程内的线程安全。在分布式集群、微服务、云原生横行的当下,如何保证不同进程、不同服务、不同机器的线程安全问题,jdk并没有给我们提供既有的解决方案。此时,我们就必须借助于相关技术手动实现了。目前主流的实现有以下方式:

  • 基于mysql关系型实现
  • 基于redis非关系型数据实现
  • 基于zookeeper/etcd实现

2、传统锁

2.1、经典问题——卖票

多线程并发安全问题最典型的代表就是超卖现象。库存在并发量较大情况下很容易发生超卖现象,一旦发生超卖现象,就会出现多成交了订单而发不了货的情况。

场景:

车票余量为5时,用户A和B同时来购买,此时查询余数都为5,余票充足则开始卖票:

用户A:update db_stock set stock = stock - 1 where id = 1

用户B:update db_stock set stock = stock - 1 where id = 1

并发情况下,更新后的结果可能是4,而实际的最终余票应该是3才对

2.2、并发导致超卖现象

首先我们构建好一个springboot程序,连接好数据库,新增建表语句,如下

CREATE TABLE `db_stock` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`product_code` varchar(255) DEFAULT NULL COMMENT '商品编号',`stock_code` varchar(255) DEFAULT NULL COMMENT '仓库编号',`count` int(11) DEFAULT NULL COMMENT '库存量',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

而且我们直接实例化一个对象,并且给上初始值5000,我们单个直接调用模拟卖票操作。

    public synchronized void reduce() {stock.setStocks(stock.getStocks() - 1);System.out.println("剩余 =====>>>>> " + stock.getStocks());}

而之后我们直接使用JMeter压力测试工具来进行模拟多线程操作,直接执行5000次查看最后操作之后的结果


可以看到我们直接并发5000个请求,到最后并不会将余数5000减为0,这就是并发导致的数据问题,

2.3、JVM锁

上面2.2存在的并发问题这该如何进行解决呢?我们只需要给这一段代码上锁即可,有以下两种方式,分别是直接加上synchronized 将方法变为原子操作,以及通过ReentrantLock给代码上锁。

    public synchronized void reduce() {stock.setStocks(stock.getStocks() - 1);System.out.println("剩余 =====>>>>> " + stock.getStocks());}private ReentrantLock lock = new ReentrantLock();public void reduceLock() {lock.lock();try {stock.setStocks(stock.getStocks() - 1);System.out.println("剩余 =====>>>>> " + stock.getStocks());} finally {lock.unlock();}}

JVM锁原理:
添加synchronized关键字之后,StockService就具备了对象锁,由于添加了独占的排他锁,同一时刻只有一个请求能够获取到锁,并减库存。此时,所有请求只会one-by-one执行下去,也就不会发生超卖现象。

2.4、事务与JVM锁

开启事务也会导致锁失效,当每一次操作进入代码之后,由于开启了事务,当有一个请求获取锁之后,其余请求都会进行阻塞住,之后当请求操作完成之后释放锁。但是这个时候还没有提交事务,另外一个请求就已经重新获取锁了,这是这个请求获取的值还是没有进行改变的值。这样就会导致锁失效

    @Transactionalpublic synchronized void reduceByMysqlAndTransactional() {StockMysql stockMysql = stockMysqlMapper.selectOne(new QueryWrapper<StockMysql>().eq("product_code", "DN001"));if (stockMysql.getCount() > 0) {stockMysql.setCount(stockMysql.getCount() - 1);System.out.println("剩余 ====>>>>>> " + stockMysql.getCount());stockMysqlMapper.updateById(stockMysql);}}

如何解决事务导致锁失效的问题:我们只需要指定事务的方式

@Transactional(isolation = Isolation.READ_UNCOMMITTED)

2.5、MySql锁

除了使用jvm锁之外,还可以使用数据锁:悲观锁 或者 乐观锁

  • 一个sql:直接更新时判断,在更新中判断库存是否大于0
    update table set surplus = (surplus - buyQuantity) where id = 1 and (surplus - buyQuantity) > 0 ;

  • 悲观锁:在读取数据时锁住那几行,其他对这几行的更新需要等到悲观锁结束时才能继续 。
    select … for update

  • 乐观锁:读取数据时不锁,更新时检查是否数据已经被更新过,如果是则取消当前更新进行重试。
    version 或者 时间戳(CAS思想)。

2.5.1、一个SQL

在这里部分场景我们可以使用一个SQL解决数据并发问题,但是很多业务场景并不能直接通过一条sql来进行实现

    /*** 一个SQL解决数据并发问题* @param  reduce 每次减少数* @param productCode 商品编号* @return int* */@Update("update t_stock set count = count - #{reduce} where product_code = #{productCode} and count >=1")int updateByLock(@Param("reduce") Integer reduce, @Param("productCode") String productCode);

2.5.2、悲观锁

2.5.2.1、表级锁

在mysql当中我们开启事务,进行更新,这时我们的表是一个最简单表,只有一个主键,其他的都是普通字段,在一个终端开启事务更新数据,但是这时我们不提交也不回滚不进行任何操作,并且同时新开一个终端也来进行更新操作,这时到这里更新会被锁住,不会直接更新,当我们将事务结束之后,这个更新才会执行,可以看到这一条更新语句是十几秒之后才被执行的,由此我们可以得到一个结论,该事务锁住了整张表。


2.5.2.2、行级锁

而当我们给表增加索引之后,并且来根据这个索引来进行查询的时候,这个时候就不会将整个表给锁住,只会锁住对应的行

表结构:

CREATE TABLE `t_stock` (`id` int(16) unsigned NOT NULL AUTO_INCREMENT,`product_code` varchar(16) NOT NULL,`wear_house` varchar(16) NOT NULL,`count` int(16) NOT NULL,PRIMARY KEY (`id`),UNIQUE KEY `pcode` (`product_code`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8

只会锁住对应的行,这时更新另外的数据可以直接更新。

select * from t_stock where id =1 for update;
select * from t_stock where id =2;

2.5.3、乐观锁

乐观锁( Optimistic Locking ) 相对悲观锁而言,乐观锁假设认为数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则重试。那么我们如何实现乐观锁呢

使用数据版本(Version)记录机制实现,这是乐观锁最常用的实现 方式。一般是通过为数据库表增加一个数字类型的 “version” 字段来实现。当读取数据时,将version字段的值一同读出,数据每更新一次,对此version值加一。当我们提交更新的时候,判断数据库表对应记录 的当前版本信息与第一次取出来的version值进行比对,如果数据库表当前版本号与第一次取出来的version值相等,则予以更新。

给表添加version字段(用来记录和控制版本):

ALTER TABLE `t_stock` ADD VERSION INT(32) NOT NULL

添加代码实现:

    public void reduceByMysqlLockOptimistic() throws InterruptedException {StockMysql stock = stockMysqlMapper.selectOne(new QueryWrapper<StockMysql>().eq("product_code", "DN001"));Integer version = stock.getVersion();stock.setCount(stock.getCount() - 1);stock.setVersion(stock.getVersion() + 1);if(stockMysqlMapper.update(stock,new UpdateWrapper<StockMysql>().eq("id",stock.getId()).eq("version",version)) == 0){// 在更新之前来判断version字段是否被改变Thread.sleep(20);this.reduceByMysqlLockOptimistic();}}

在这里我们使用JMeter来进行压力测试,并且我们分别并发500次和5000次来进行控制变量测试吞吐量。



说明乐观锁在并发量越大的情况下,性能越低(因为需要大量的重试);并发量越小,性能越高。

乐观锁存在的问题:
当mysql服务是部署的主从复制(读写分离)这个时候读和写的分开进行操作的,而在进行mysql数据同步的时候也需要时间,这个时候我们每次读取的数据不是最新的概率就会更大,就会导致乐观锁重试的次数增多,cpu占用率飙升

2.6、Redis锁

redis分布式缓存 参考这篇文章,这里不做过多阐述了。单击前往

在redis当中我们添加一个key,之后我们通过JMeter进行压力测试,压测之后,我们再看redis当中的stock的值,可以看到在进行并发之后还是会存在数据问题。

    public void redisStock(){String stock = stringRedisTemplate.opsForValue().get("stock");Integer integer = Integer.valueOf(stock);if(stock != null && integer > 0){stringRedisTemplate.opsForValue().set("stock",String.valueOf(--integer));}}

还是一样我们使用JMeter来进行压力测试,并发2000个请求之后来看一下redis当中的stock数据是不是预想的减了2000。很显然不会,并发修改还是会存在问题,那如何解决呢?再后文会提供解决方案。

2.6.1、解决方案1 - JVM本地锁

这里直接使用synchronized关键字和ReentrantLock上锁即可。

2.6.2、解决方案2 - 乐观锁

首先我们熟悉一下Redis的事务:

在Redis中开启事务的命令是 multi 命令, 而执行事务的命令是 exec 命令。multi 到 exec 命令之间的 Redis 命令将采取进入队列的形式,直至 exec 命令的出现,才会一次性发送队列里的命令去执行,而在执行这些命令的时候其他客户端就不能再插入任何命令了。如果回滚事务,可以使用 discard 命令取消事务中所有命令,使事务中的方法不会被执行了。

代码实现:

    public void redisStockOptimistic() {stringRedisTemplate.execute(new SessionCallback<Object>() {@Overridepublic Object execute(RedisOperations redisOperations) throws DataAccessException {redisOperations.watch("stock");String stock = (String) redisOperations.opsForValue().get("stock");Integer integer = Integer.valueOf(stock);if (stock != null && integer > 0) {redisOperations.multi();redisOperations.opsForValue().set("stock", String.valueOf(--integer));List exec = redisOperations.exec();// 执行事务之后判断是否成功以及是否需要递归if (exec == null || exec.size() == 0) {try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}redisStockOptimistic();}return exec;}return null;}});}

在开启Redis的事务之后我们同样的来进行2000条并发测试,注意对应的吞吐量以及最后的stock数据是否和预期的一样。可以看到吞吐量很明显的下降了,这是因为多并发会导致事务执行时,其余操作都会阻塞。


使用乐观锁之后普遍会存在的问题,和mysql乐观锁一样当并发量上去之后,重试的次数会越来越多导致性能问题

3、Redis分布式锁

3.1、实现一个redis分布式锁

借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)。

    public void redisDistributedLock() {Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1");if (!lock) {try {Thread.sleep(100);redisDistributedLock();} catch (InterruptedException e) {throw new RuntimeException(e);}} else {try {String stock = stringRedisTemplate.opsForValue().get("stock");Integer integer = Integer.valueOf(stock);if (stock != null && integer > 0) {stringRedisTemplate.opsForValue().set("stock", String.valueOf(--integer));}} finally {stringRedisTemplate.delete("lock");}}}

3.2、递归优化

在进行递归的时候,每次都会往栈里面加一个方法,并发过大的时候可能会导致栈内存溢出,我们可以通过while循环来进行优化

        while(lock){try {String stock = stringRedisTemplate.opsForValue().get("stock");Integer integer = Integer.valueOf(stock);if (stock != null && integer > 0) {stringRedisTemplate.opsForValue().set("stock", String.valueOf(--integer));}} finally {stringRedisTemplate.delete("lock");}}

3.3、防止死锁

首先我们了解一下什么是死锁。setnx刚刚获取到锁,当前服务器宕机,导致del释放锁无法执行,进而导致锁无法锁无法释放(死锁)。那如何解决死锁问题呢?只需要给锁设置过期时间,自动释放锁即可。

首先我们熟悉一下在redis当中设置锁的超时时间的命令

set lock 1
expire lock 20set lock 1 ex 3 nx
# 查看过期时间
ttl lock

然后既然我们知道了,那就好办了,我们直接在代码当中加锁的超时,我们直接在这里加上一个超时的设置。


但是我们代码直接加在这里还是可以继续优化的,我们不能保证加锁和设置锁的超时时间的原子性,所以我们可以直接在创建锁的时候一起设置超时时间

Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1", 3, TimeUnit.SECONDS);

3.4、防误删

这一个存在的问题就是,当我们第一次请求进来之后设置一个lock锁,此时锁的销毁时间是3秒,但是后续的业务处理却花费了5秒,这时在3秒的时候就释放了锁,这时第一个请求的业务还没执行完,第二个请求也获取锁来开始执行业务了,就会导致业务混乱

解决方案:给每一个锁设置一个uuid来进行标识,释放锁时只释放自己的锁

    public void redisLockUUID() {String uuid = UUID.randomUUID().toString();Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);while (!lock) {try {Thread.sleep(100);redisLockUUID();} catch (InterruptedException e) {throw new RuntimeException(e);}}try {// 做业务} finally {if (StringUtils.equals(stringRedisTemplate.opsForValue().get("lock"), uuid)) {stringRedisTemplate.delete("lock");}}}

3.5、Redis当中的LUA脚本

redis采用单线程架构,可以保证单个命令的原子性,但是无法保证一组命令在高并发场景下的原子性。如果redis客户端通过lua脚本把3个命令一次性发送给redis服务器,那么这三个指令就不会被其他客户端指令打断。Redis 也保证脚本会以原子性(atomic)的方式执行: 当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。 这和使用 MULTI/ EXEC 包围的事务很类似。

    public void redisLua() {String uuid = UUID.randomUUID().toString();Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);while (!lock) {try {Thread.sleep(100);redisLua();} catch (InterruptedException e) {throw new RuntimeException(e);}}try {String stock = stringRedisTemplate.opsForValue().get("stock");Integer integer = Integer.valueOf(stock);if (stock != null && integer > 0) {stringRedisTemplate.opsForValue().set("stock", String.valueOf(--integer));}} finally {String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList("lock"), uuid);}}

3.6、可重入锁

由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行。

用以下代码来进行简单说明:首先代码进来之后执行a方法,a方法直接加上一个锁,此时加锁之后再执行b方法,而b方法这里又同样的需要设置一个同名锁a,这个时候锁是不可重入的,b永远无法获取锁,这就很奇怪,我要自己释放我自己?

    public void a(){// 设置锁 锁名称 ab();}public void b(){// 设置锁 锁名称 a}

3.6.1、加锁

使用lua脚本来进行实现可重入锁,这里用到的是redis当中的set数据类型,先使用exists用来判断锁是否存在,之后通过hset命令往hash里面添加键值数据,并且同时通过expire设置(更新)过期时间,之后再加判断,使用hsxists判断锁的hash是否存在,存在即重入锁,并且更新过期时间

if redis.call('exists','lock') == 0
then redis.call('hset','lock',uuid,1) redis.call('expire','lock',30)return 1
elseif redis.call('hsxists','lock',uuid) == 1
thenredis.call('hsxists','expirelock',uuid,1)redis.call('expire','lock',30)return 1
elsereturn 0
end

将这一段脚本的判断优化一下,并且将对应的值更换成keys和argv用来后续动态传递,脚本就变成了这样

if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1)
thenredis.call('hincrby', KEYS[1], ARGV[1], 1);redis.call('expire', KEYS[1], ARGV[2]);return 1;
elsereturn 0;
end

如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次数加1。

3.6.2、解锁

解锁操作,首先通过uuid来判断该锁是否存在,不存在直接返回-1用来标识解锁失败,而该锁存在的只需对这个锁-1,减到0的时候将锁给释放掉。

if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then return -1;
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then return 0;
else redis.call('del', KEYS[1]); return 1;
end;

3.6.3、代码实现

首先我们定义一个工具类用来对redis分布式锁进行加锁解锁统一管理。首先继承Lock接口实现对应方法,在构造方法当中需要由外部传入对应的三个参数,分别为操作redis、锁名称、uuid。在这里主要方法为tryLock(time,unit)在该方法中首先对入参时间进行判断,之后直接通过lua脚本来进行做加锁操作,在unlock解锁操作也如此。

public class DistributedRedisLock implements Lock {private StringRedisTemplate stringRedisTemplate;private String lockName;private String uuid;private long expire = 30;DistributedRedisLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuid) {this.stringRedisTemplate = stringRedisTemplate;this.lockName = lockName;this.uuid = uuid;}@Overridepublic void lock() {this.tryLock();}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {try {return tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}/*** @deprecated 加锁方法*/@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time != -1) {this.expire = unit.toSeconds(time);}String script = "if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) thenredis.call('hincrby', KEYS[1], ARGV[1], 1);redis.call('expire', KEYS[1], ARGV[2]);return 1;else return 0; end";while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {Thread.sleep(100);}return true;}@Overridepublic void unlock() {String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then return -1;  elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then return 0;  else redis.call('del', KEYS[1]); return 1 end";Long execute = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid);if (execute == -1) {throw new IllegalMonitorStateException("恶意释放锁");}}@Overridepublic Condition newCondition() {return null;}public String getUuid() {return uuid + Thread.currentThread().getId();}
}

并且我们加入一个统一管理redisLock的公共类来进行管理,后续使用只需要注入该类即可

@Component
public class DistributedLockClient {@Autowiredprivate StringRedisTemplate stringRedisTemplate;private String uuid;/*** 由于该类是单例对象,所以每次进来生产的UUID都是一样的,解决方案:再后续使用UUID的时候拼接一个线程id* */public DistributedLockClient() {this.uuid = UUID.randomUUID().toString();}public DistributedRedisLock getRedisLock(String lockName) {return new DistributedRedisLock(stringRedisTemplate, lockName, uuid);}
}

最后直接注入该类进行加锁解锁操作

    public void reentrantLock() {DistributedRedisLock lock = distributedLockClient.getRedisLock("stock_lock");lock.tryLock();try {String stock = stringRedisTemplate.opsForValue().get("stock").toString();if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {stringRedisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {lock.unlock();}}

3.7、自动续期

lua脚本

if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then redis.call('expire', KEYS[1], ARGV[2]); return 1;
else return 0;
end

新增自动续期代码:

    public void renewExpire() {String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +"   return redis.call('expire', KEYS[1], ARGV[2]) " +"else " +"   return 0 " +"end";new Timer().schedule(new TimerTask() {@Overridepublic void run() {if (stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), getUuid(), String.valueOf(expire))) {renewExpire();}}}, this.expire * 1000 / 3);}

并且再设置uuid的时候,需要修改一下,应该在构造函数当中就设置对应的uuid,而不是在后续通过方法拼接

    DistributedRedisLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuid) {this.stringRedisTemplate = stringRedisTemplate;this.lockName = lockName;this.uuid = uuid + Thread.currentThread().getId();}

3.8、Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

Github 托管地址:https://github.com/redisson/redisson

3.8.1、Redisson Hello World

SpringBoot整合Redisson,首先先整合redisson的依赖

        <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.17.6</version></dependency>

而后我们采用统一配置加载redisson的配置

    @Beanpublic RedissonClient redisConfig() {Config config = new Config();config.setTransportMode(TransportMode.EPOLL);config.useClusterServers()//可以用"rediss://"来启用SSL连接.addNodeAddress("redis://127.0.0.1:7181");return Redisson.create(config);}

而我们可以直接使用redisson提供的方法来进行操作加锁与解锁

    @Autowiredprivate RedissonClient redissonClient;public void stockByRedisson(){RLock lock = redissonClient.getLock("lock");lock.lock();try{String stock = stringRedisTemplate.opsForValue().get("stock");Integer integer = Integer.valueOf(stock);if (stock != null && integer > 0) {stringRedisTemplate.opsForValue().set("stock", String.valueOf(--integer));}}finally {lock.unlock();}}

3.8.2、Redisson可重入锁

如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。

RLock lock = redissonClient.getLock("lock");
lock.lock(10, TimeUnit.SECONDS);

3.8.3、公平锁

基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。

RLock fairLock = redissonClient.getFairLock("fairLock");

3.8.4、联锁

基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。同时加锁:lock1 lock2 lock3,所有的锁都上锁成功才算成功。

     // 需要给多个不同的redisson实例来进行获取锁RLock lock1 = redissonClient.getLock("lock1");RLock lock2 = redissonClient.getLock("lock2");RLock lock3 = redissonClient.getLock("lock3");RedissonMultiLock redissonMultiLock = new RedissonMultiLock(lock1, lock2, lock3);

3.8.5、红锁

和联锁一样,在这里还提供了一个RedissonRedLock红锁类,这个和联锁的区别在于,红锁只需要当大部分锁获取成功即为成功

RedissonRedLock redissonRedLock = new RedissonRedLock(lock1, lock2, lock3);

3.8.6、读写锁

基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。

    public void readLock(){RReadWriteLock rwLock = redissonClient.getReadWriteLock("rwLock");rwLock.readLock().lock(10,TimeUnit.SECONDS);System.out.println("read lock");}public void writeLock(){RReadWriteLock rwLock = redissonClient.getReadWriteLock("rwLock");rwLock.writeLock().lock(10,TimeUnit.SECONDS);System.out.println("write lock");}
  • 同时访问写:一个写完之后,等待一会儿(约10s),另一个写开始
  • 同时访问读:不用等待
  • 先写后读:读要等待(约10s)写完成
  • 先读后写:写要等待(约10s)读完成

3.8.7、信号量

基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。而使用信号量就适用于服务器容量不够承受大并发请求,就可以通过信号量还进行限流操作

先康康Semaphore这个类占用与释放的一段demo代码

    public static final int max = 6;public static void main(String[] args) {Semaphore semaphore = new Semaphore(3);for (int i = 0; i < max; i++) {new Thread(() -> {try {semaphore.acquire();System.out.println(Thread.currentThread().getName() + "占用 ====  " + new Date());TimeUnit.SECONDS.sleep(new Random().nextInt(30));semaphore.release();System.out.println(Thread.currentThread().getName() + "释放 ====  " + new Date());} catch (Exception e) {throw new RuntimeException(e);}}, i + "").start();}}

而在redisson当中给我们封装好了对应的信号量,我们直接使用即可

    public void semaphoreLock(){RSemaphore semaphore = redissonClient.getSemaphore("semaphore");semaphore.trySetPermits(3);try {semaphore.acquire();TimeUnit.SECONDS.sleep(5);System.out.println(new Date());semaphore.release();}catch (Exception e){e.printStackTrace();}}

3.8.8、闭锁

基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。适用于一个线程等待一组线程执行

    public static final int max = 6;public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(6);for (int i = 0; i < max; i++) {new Thread(() -> {try {TimeUnit.SECONDS.sleep(new Random().nextInt(5) + 3);System.out.println(Thread.currentThread().getName() + "递减释放 ====  " + new Date());countDownLatch.countDown();} catch (Exception e) {throw new RuntimeException(e);}}, i + "").start();}countDownLatch.await();System.out.println("over");}

而在redisson当中给我们封装好了对应的闭锁,我们直接使用即可

    public void countDownLatchLock() throws InterruptedException {RCountDownLatch countDownLatch = redissonClient.getCountDownLatch("countDownLatch");countDownLatch.trySetCount(10);countDownLatch.await();System.out.println("over");}public void countDown(){RCountDownLatch countDownLatch = redissonClient.getCountDownLatch("countDownLatch");countDownLatch.countDown();}

4、ZooKeeper分布式锁

在这里对于ZooKeeper的安装、使用就不过多赘述了,直接参考该文章:ZooKeeper详解

4.1、使用java Api操作ZooKeeper

首先我们直接导入官方提供的依赖。

        <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.0</version></dependency>

使用api操作zk节点

    public static void main(String[] args) throws InterruptedException, KeeperException {ZooKeeper zooKeeper = null;CountDownLatch countDownLatch = new CountDownLatch(1);try {zooKeeper = new ZooKeeper("localhost:2181", 30000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected.equals(event.getState())&& Event.EventType.None.equals(event.getType())) {System.out.println("get link ====>>>> " + event.getState());} else {System.out.println("event listen === >>>>> " + event.getType());}}});/*** 新增节点* 入参说明 节点目录、节点值、节点的访问权限、节点类型* */String s = zooKeeper.create("/dia", "huangdiaomao".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("create node dia ====>>>> " + s);/*** 判断节点是否存在* */Stat exists = zooKeeper.exists("/dia", true);System.out.println("exists node ====>>>>  " + exists.toString());/*** 获取节点数据* */byte[] data = zooKeeper.getData("/dia", false, null);System.out.println("dia node res ===== >>>>  " + new String(data));/*** 获取节点下所有字节但* */List<String> children = zooKeeper.getChildren("/zookeeper", true);for (String child : children) {System.out.println("child ==== >>>> " + child);}/*** 更新数据* */zooKeeper.setData("/dia", "change data".getBytes(), exists.getVersion());/*** 删除节点* */// zooKeeper.delete("/qing", -1);countDownLatch.await();} catch (Exception e) {e.printStackTrace();}}

4.2 实现一个ZK分布式锁

4.2.1、思路分析

分布式锁的步骤:

  1. 获取锁:create一个节点
  2. 删除锁:delete一个节点
  3. 重试:没有获取到锁的请求重试

参照redis分布式锁的特点

  1. 互斥 排他
  2. 防死锁:
    1. 可自动释放锁(临时节点) :获得锁之后客户端所在机器宕机了,客户端没有主动删除子节点;如果创建的是永久的节点,那么这个锁永远不会释放,导致死锁;由于创建的是临时节点,客户端宕机后,过了一定时间zookeeper没有收到客户端的心跳包判断会话失效,将临时节点删除从而释放锁。
    2. 可重入锁:借助于ThreadLocal
  3. 防误删:宕机自动释放临时节点,不需要设置过期时间,也就不存在误删问题。
  4. 加锁/解锁要具备原子性
  5. 单点问题:使用Zookeeper可以有效的解决单点问题,ZK一般是集群部署的。
  6. 集群问题:zookeeper集群是强一致性的,只要集群中有半数以上的机器存活,就可以对外提供服务。

4.2.2、代码实现

  1. 多个请求同时添加一个相同的临时节点,只有一个可以添加成功。添加成功的获取到锁
  2. 执行业务逻辑
  3. 完成业务流程后,删除节点释放锁。

首先我们创建一个DistributedZkLock,该类实现Lock接口,直接在该类当中进行加锁和解锁操作

public class DistributedZkLock implements Lock {private ZooKeeper zooKeeper;private String lockName;private final static String ROOT_PATH = "/locks";public DistributedZkLock(ZooKeeper zooKeeper, String lockName) throws InterruptedException, KeeperException {this.zooKeeper = zooKeeper;this.lockName = lockName;if (zooKeeper.exists(ROOT_PATH, false) != null) {zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}@Overridepublic void lock() {this.tryLock();}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {try {this.zooKeeper.create(ROOT_PATH + "/" + lockName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);} catch (Exception e) {e.printStackTrace();try {Thread.sleep(80);} catch (InterruptedException ex) {throw new RuntimeException(ex);}this.tryLock();}return false;}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}@Overridepublic void unlock() {try {this.zooKeeper.delete(ROOT_PATH + "/" + this.lockName, -1);} catch (Exception e) {e.printStackTrace();}}@Overridepublic Condition newCondition() {return null;}
}

并且在这里同样的我们在项目启动的时候连接上Zk

@Component
public class ZkClient {public static final String ZK_SERVER_IP = "localhost:2181";public ZooKeeper zooKeeper;/*** @PostConstruct 注解的方法在项目启动的时候执行这个方法,也可以理解为在spring容器启动的时候执行,可作为一些数据的常规化加载,比如数据字典之类的。*/@PostConstructpublic void init() {try {zooKeeper = new ZooKeeper(ZK_SERVER_IP, 30000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (Event.KeeperState.SyncConnected.equals(event.getState())&& Event.EventType.None.equals(event.getType())) {System.out.println("get link ====>>>> " + event.getState());} else {System.out.println("event listen === >>>>> " + event.getType());}}});} catch (IOException e) {throw new RuntimeException(e);}}/*** 被@PreDestroy修饰的方法会在服务器卸载Servlet的时候运行,并且只会被服务器调用一次,类似于Servlet的destroy()方法。被@PreDestroy修饰的方法会在destroy()方法之后运行,在Servlet被彻底卸载之前。*/@PreDestroypublic void destroy() {try {zooKeeper.close();} catch (InterruptedException e) {throw new RuntimeException(e);}}public DistributedZkLock getLock(String lockName) throws InterruptedException, KeeperException {return new DistributedZkLock(zooKeeper, lockName);}
}

最后我们对ZK实现的分布式锁进行测试

    @Autowiredprivate ZkClient zkClient;public void zkLock() throws InterruptedException, KeeperException {DistributedZkLock lock = zkClient.getLock("lock");lock.lock();// 直接操作mysql进行减库存lock.unlock();}

5、MySql分布式锁

不管是jvm锁还是mysql锁,为了保证线程的并发安全,都提供了悲观独占排他锁。所以独占排他也是分布式锁的基本要求。在前面对于Redis和Zk都是利用对应的这个特性进行实现的。而对于MySql来说,我们可以利用其唯一索引(UNIQUE KEY)即可。

5.1、实现MySql分布式锁

思路:

  1. 通过insert插入进行加锁
  2. 再通过delete进行解锁
  3. 加锁失败进行递归重试

这里就不贴代码了,直接通过mybatis-plus进行实现对应的方法就行了

5.2、缺陷

  1. 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
    解决方案:给 锁数据库 搭建主备

  2. 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
    解决方案:只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。

  3. 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
    解决方案:记录获取锁的主机信息和线程信息,如果相同线程要获取锁,直接重入。

  4. 受制于数据库性能,并发能力有限。
    解决方案:无法解决。

分布式锁?我一手synchronized 什么高并发,什么秒杀通通拿下(狗头)相关推荐

  1. SpringBoot实战实现分布式锁一之重现多线程高并发场景

    实战前言:上篇博文我总体介绍了我这套视频课程:"SpringBoot实战实现分布式锁" 总体涉及的内容,从本篇文章开始,我将开始介绍其中涉及到的相关知识要点,感兴趣的小伙伴可以关注 ...

  2. Redisson分布式锁实战(适用于Redis高并发场景)

    实现方式一:存在抛异常后lock值无法归0的问题 @Autowired private StringRedisTemplate stringRedisTemplate;@RequestMapping( ...

  3. PHP高并发商品秒杀问题的解决方案

    前言 秒杀会产生一个瞬间的高并发,使用数据库会增加数据库的访问压力,也会降低访问速度,所以我们应该使用缓存,来降低数据库的访问压力: 可以看出这里的操作和原来的下单是不一样的:产生的秒杀预订单不会马上 ...

  4. 如何解决高并发,秒杀问题

    相信不少人会被这个问题困扰,分享大家一篇这样的文章,希望能够帮到你! 一.秒杀业务为什么难做? 1)im系统,例如qq或者微博,每个人都读自己的数据(好友列表.群列表.个人信息): 2)微博系统,每个 ...

  5. 【高并发】秒杀系统架构解密,不是所有的秒杀都是秒杀(升级版)!!

    写在前面 很多小伙伴反馈说,高并发专题学了那么久,但是,在真正做项目时,仍然不知道如何下手处理高并发业务场景!甚至很多小伙伴仍然停留在只是简单的提供接口(CRUD)阶段,不知道学习的并发知识如何运用到 ...

  6. php redis下单,redis 队列简单实现高并发抢购/秒杀

    redis 队列简单实现高并发抢购/秒杀 2019-03-21 14:34 阅读数 82 前提为每人限购1件 <>开抢前 把秒杀商品库存存进 Redis 队列中 $redis = new ...

  7. 电商网站50W-100W高并发,秒杀功能是怎么实现的?

    电商网站50W-100W高并发,秒杀功能是怎么实现的? 在淘宝.天猫.京东等国内大型电商平台"造节"的带领下,国内各电商平台纷纷跟进,双十一.双十二.618等电商专属节日也吸引了大 ...

  8. redis解决“高并发定时秒杀”库存误差问题

    前言:高并发的秒杀活动中,通过查询数据库判断是否还有库存,然后对库存字段进行增减,极易出现库存超出或者库存为负的情况,一般来说有3中解决办法(数据库表加锁,memche缓存,redis队列): 我们这 ...

  9. 如何解决高并发,秒杀问题 1

    如何解决高并发,秒杀问题 参考文章: (1)如何解决高并发,秒杀问题 (2)https://www.cnblogs.com/apollo1616/articles/10166870.html 备忘一下 ...

最新文章

  1. c语言开发游戏趋势,都9012年了,为何我还坚持用C语言开发游戏
  2. oracle中的not in和not exists注意事项
  3. 图数据的攻与防:智谱AI和biendata联合组织KDD Cup 2020
  4. 图像缩放--OpenCV cvResize函数
  5. asp.net session 如何知道是哪个浏览器客户端_小弟该如何复习 Java?
  6. gradle普通项目构建外部依赖jar的终极方法gradle瘦身
  7. java 怎么启动线程_线程如何正确的启动
  8. 当一个人把一个行业说得特别容易赚钱的时候
  9. MATLAB中三维曲面命令
  10. POJ1979(DFS)
  11. 设备管理系统未来发展的四大趋势
  12. science图表_Science和Nature大部分图表都出自这款绘图软件,了解一下?
  13. Hi3519使用·记录
  14. 【期末大作业】公益网站ps平面设计
  15. 酷睿i7 12700k核显相当于什么显卡 i712700k参数 i7 12700k什么水平
  16. AD domain 环境下VBS自动生成邮件签名
  17. vue项目,h5图片放大后,支持手指缩放功能
  18. wps英文文档排版小技巧
  19. 实时内核μC/OS-II下的网络监控系统的设计
  20. android中的ImageView,ImageView加载网络图片

热门文章

  1. 千与千寻,真是一部给大人看的动画片
  2. 从多个Word文档中批量取值,整理到Excel表中。
  3. 计算机二级不能使用快捷键,你不可不知的几个Office2010另类快捷键_计算机二级_Office快捷键_Office考试_课课家...
  4. animation-delay负值妙用,你不来了解一下吗
  5. Python爬虫进阶--js逆向-某天下与某某二手房密码加密分析
  6. “有幸笔墨是小舟 任我自在游”在林曦的小世界里每日滋养自己
  7. 色彩系列教程(3):实际运用
  8. 人类计划软件测试,人类分裂了16种人格,测测你是哪一种?
  9. vue2.0生命周期数据共享
  10. 深度揭密SSD中的原片-白片-黑片:莫贪小便宜