前言

作为这一段时间学习分布式锁的总结,本文总结了四种Java分布式锁的实现方式,简单编写了代码,进行模拟实现。相关代码存放在我的github仓库。

为什么要用锁

系统内,有多个消费者,需要对同一共享数据并发访问和消费时,会有线程安全问题。例如在秒杀、抢优惠券等场景下,商品库存的数量是有限的,在高并发下,会有"超买"或"超卖"的问题。因此我们需要使用锁,解决多线程对共享数据并发访问的线程安全问题。

我们可以这样来模拟并发访问:

1、模拟商品库存只有一件,并且提供一个减少库存数量的方法,当库存数量大于0时,可以减少库存,返回true。当库存不大于0时,不减少库存,返回false。

public class Stock {//库存数量private static int num=1;// 减少库存数量的方法public boolean reduceStock(){if(num>0){try {//一些逻辑处理Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}num--;return true;}return false;}
}

2、模拟两个线程,同时进行减少库存操作。

@SpringBootTest
public class SampleTest {static class StockThread implements Runnable{public void run() {//调用减少库存的方法boolean b = new Stock().reduceStock();if (b) {System.out.println(Thread.currentThread().getName()+"减少库存成功");}else {System.out.println(Thread.currentThread().getName()+"减少库存失败");}}}@Testpublic void test() throws InterruptedException {new Thread(new StockThread(),"线程1").start();new Thread(new StockThread(),"线程2").start();Thread.sleep(3000);}

3、从运行结果可以看出,在并发时,虽然库存数量为1,本应该只有一个线程减少库存成功,另一个线程减少库存失败,但运行时两个线程都操作成功了,出现了线程安全问题。

单机下的锁使用方式

如果单机版的系统,我们有很多种解决方案。我们可以使用JDK提供的ReentrantLock,在减少库存前加锁,减少库存后释放锁。这样就能避免线程安全问题。代码示例如下:

@SpringBootTest
public class ReentrantLockTest {private static Lock lock = new ReentrantLock();static class StockThread implements Runnable{public void run() {//减少库存前加锁lock.lock();//调用减少库存的方法boolean b = new Stock().reduceStock();//减少库存后释放锁lock.unlock();if (b) {System.out.println(Thread.currentThread().getName()+"减少库存成功");}else {System.out.println(Thread.currentThread().getName()+"减少库存失败");}}}@Testpublic static void main(String[] args) throws InterruptedException {new Thread(new StockThread(),"线程1").start();new Thread(new StockThread(),"线程2").start();Thread.sleep(4000);}}

为什么要用分布式锁

当系统使用分布式架构时,服务会有多个实例存在。不同服务实例内的线程,使用ReentrantLock是无法感知彼此是否对同一资源进行了加锁操作的。当多个请求进入到不同实例时,使用ReentrantLock则依然有线程安全问题,因为我们需要使用分布式锁,保证一个资源在同一时间内只能被同一个线程执行。

使用数据库实现分布式锁

数据库分布式锁,是通过操作一张锁表来实现的。对该锁表设置唯一索引,不同请求对该表插入同一个主键的数据,如果插入成功,则认为成功获取到锁;如果插入失败,则判断获取锁失败。

代码实现

1、创建锁表。

CREATE TABLE `lock_record` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键', `lock_name` varchar(50) DEFAULT NULL COMMENT '锁名称', PRIMARY KEY (`id`), UNIQUE KEY `lock_name` (`lock_name`)
)ENGINE=InnoDB AUTO_INCREMENT=38 DEFAULT CHARSET=utf8

2、在项目内引入相关依赖,这里使用Mybatis做持久化映射。

<!--数据库驱动-->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version>
</dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version>
</dependency>
<!--引入mybatis plus 就不需要引入mybatis了-->
<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.0</version>
</dependency>

3、配置数据库连接属性。

spring:datasource:url: jdbc:mysql://${mysqlAddress}:3306/${dbName}?useUnicode=true&characterEncoding=utf-8&useSSL=falseusername: ${userName}password: ${password}driver-class-name: com.mysql.jdbc.Driver

4、在启动类添加mybatis包扫描注解。

@SpringBootApplication
@MapperScan("com.haha.mapper")
public class LockDemoApplication {public static void main(String[] args) {SpringApplication.run(LockDemoApplication.class, args);}}

4、编写entity和mapper。

@Data
public class LockRecord {private Integer id;private String lockName;
}
public interface LockRecordMapper extends BaseMapper<LockRecord> {}

5、编写数据库锁类,实现Lock接口。

@Service
public class DbLock implements Lock {private static final String LOCK_NAME = "db_lock";@Autowiredprivate LockRecordMapper lockRecordMapper;/*** 上锁*/public synchronized void lock() {while (true){boolean b = tryLock();if(b){//添加记录LockRecord lockRecord = new LockRecord();lockRecord.setLockName(LOCK_NAME);lockRecordMapper.insert(lockRecord);return;}else{try {Thread.sleep(1500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("等待中");}}}/*** 尝试获取锁,根据指定的名称,在数据库表中发起查询* @return*/public boolean tryLock() {QueryWrapper<LockRecord> queryWrapper = new QueryWrapper<>();queryWrapper.eq("lock_name",LOCK_NAME);List<LockRecord> lockRecords = lockRecordMapper.selectList(queryWrapper);if(lockRecords.size()==0){return true;}return false;}/*** 解锁 删除指定名称的记录*/public void unlock() {QueryWrapper<LockRecord> queryWrapper = new QueryWrapper<>();queryWrapper.eq("lock_name",LOCK_NAME);lockRecordMapper.delete(queryWrapper);}public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}public void lockInterruptibly() throws InterruptedException {}public Condition newCondition() {return null;}
}

6、模拟两个线程,使用数据库锁,同时进行减少库存操作。

@SpringBootTest
public class DbLockTest {@Autowiredprivate DbLock dbLock;static class StockThread implements Runnable{private DbLock dbLock;public StockThread(DbLock dbLock){this.dbLock = dbLock;}public void run() {dbLock.lock();//调用减少库存的方法boolean b = new Stock().reduceStock();dbLock.unlock();if (b) {System.out.println(Thread.currentThread().getName()+"减少库存成功");}else {System.out.println(Thread.currentThread().getName()+"减少库存失败");}}}@Testpublic void test() throws InterruptedException {new Thread(new StockThread(this.dbLock),"线程1").start();new Thread(new StockThread(this.dbLock),"线程2").start();Thread.sleep(4000);}
}

数据库锁缺陷

1、数据库锁强依赖数据库的可用性,一旦数据库宕机,会导致业务系统不可用,因此需要对数据库做HA。

2、数据库锁没有失效时间,一旦获取该锁的请求所在的服务实例宕机,会导致该资源被长期锁住,其他请求无法获取该锁。

使用redis实现分布式锁

redis分布式锁,是通过代码调用setnx命令,只在键不存在的情况下, 将键的值设置为某个值。若键已经存在, 则setnx命令不做任何动作。为了能处理"获取该锁的请求所在的服务实例宕机,会导致该资源被长期锁住,其他请求无法获取该锁"这种情况,我们还需要设置超时时间。

代码实现

1、在项目内引入redis相关依赖。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.3.3.RELEASE</version>
</dependency>

2、在项目中配置redis。

spring:redis:host: 127.0.0.1port: 6379

3、编写redis分布式锁。

@Component
public class RedisLock implements Lock {private static final String LOCK_NAME = "redis_lock";@Autowiredprivate RedisTemplate redisTemplate;public void lock() {while (true){Boolean b = redisTemplate.opsForValue().setIfAbsent("lockName", LOCK_NAME,10,TimeUnit.SECONDS);if (b) {return;}else{System.out.println("循环等待中");}}}public void lockInterruptibly() throws InterruptedException {}public boolean tryLock() {return false;}public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}public void unlock() {redisTemplate.delete("lockName");}public Condition newCondition() {return null;}
}

4、模拟两个线程,使用redis分布式锁,同时进行减少库存操作。

@SpringBootTest
public class RedisLockTest {@Autowiredprivate RedisLock redisLock;static class StockThread implements Runnable{private RedisLock redisLock;public StockThread(RedisLock redisLock){this.redisLock= redisLock;}public void run() {redisLock.lock();//调用减少库存的方法boolean b = new Stock().reduceStock();redisLock.unlock();if (b) {System.out.println(Thread.currentThread().getName()+"减少库存成功");}else {System.out.println(Thread.currentThread().getName()+"减少库存失败");}}}@Testpublic void test() throws InterruptedException {new Thread(new StockThread(redisLock),"线程1").start();new Thread(new StockThread(redisLock),"线程2").start();Thread.sleep(4000);}}

redis分布式锁缺陷

1、强依赖redis的可用性,一旦redis宕机,会导致业务系统不可用,因此最好搭建redis集群。

2、因为对锁设置了超时时间,如果某次请求不能在该次限制时间内完成操作,也会导致在某些时刻,多个请求获取到锁。

解决方案也很简单,我们在调用setnx时,将值设置为该次请求线程的id,并且在服务实例内,设置一个守护线程,当锁快要超时时,判断请求是否完成,如果未完成,延长超时时间。

使用redission实现分布式锁

redisson是一个常用的Redis Java客户端,为Java上的分布式应用程序提供了基于Redis的对象,集合,锁,同步器和服务的分布式实现。使用redisson,我们可以实现基于redis集群的分布式锁。

代码实现

1、启动redis集群(此处用单机模拟)。

2、在项目内引入redisson相关依赖。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.3.3.RELEASE</version>
</dependency>
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.6.5</version>
</dependency>

3、在项目中配置redisson,在配置文件添加redis配置和配置类。

spring:redis:host: 127.0.0.1port: 6379
@Configuration
public class RedissonConfig {@Autowiredprivate RedisProperties redisProperties;@Beanpublic RedissonClient redissonClient(){Config config = new Config();String redisUrl = String.format("redis://%s:%s",redisProperties.getHost()+"",redisProperties.getPort()+"");config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());config.useSingleServer().setDatabase(3);return Redisson.create(config);}
}

4、模拟两个线程,使用redisson,同时进行减少库存操作。

@SpringBootTest
public class RedissionLockTest {@Autowiredprivate Redisson redisson;static class StockThread implements Runnable{private RLock mylock;public StockThread(RLock lock){this.mylock = lock;}public void run() {mylock.lock();//调用减少库存的方法boolean b = new Stock().reduceStock();mylock.unlock();if (b) {System.out.println(Thread.currentThread().getName()+"减少库存成功");}else {System.out.println(Thread.currentThread().getName()+"减少库存失败");}}}@Testpublic void test() throws InterruptedException {RLock lock = redisson.getLock("redisson_lock");new Thread(new StockThread(lock),"线程1").start();new Thread(new StockThread(lock),"线程2").start();Thread.sleep(4000);}
}

使用zookeeper实现分布式锁

zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,天生就适合用于实现分布式锁。

使用zookeeper实现分布式锁的步骤

1、设置锁的根节点。判断锁的根节点是否存在,如果不存在,首先创建根节点,例如叫“/locks”。

2、客户端如果需要占用锁,则在“/locks”下创建临时且有序的子节点,然后判断自己创建的子节点是否为当前子节点列表中序号最小的子节点。如果是则认为获得锁,否则监听前一个子节点变更消息。

3、当监听到前一个子节点已经从zookeper上被删除,则认为获得锁。

4、当客户端获取锁并完成业务流程后,则删除对应的子节点,完成释放锁的工作,以便后面的节点获得分布式锁。

5、如果客户端获取锁之后,因为某些原因宕机,此时由于客户端与zookeeper断开连接,该客户端创建的临时有序节点也会自动从zookeeper上移除,从而让后面的节点获得分布式锁。

代码实现

1、在项目内引入zookeeper相关依赖。

<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.12</version><!-- 排除冲突jar --><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions>
</dependency>

2、在项目中配置zookeeper,在配置文件添加redis配置和编写配置类。

zookeeper:address: 127.0.0.1:2181timeout: 4000
@Configuration
public class ZookeeperConfig {@Value("${zookeeper.address}")private String connectString;@Value("${zookeeper.timeout}")private Integer timeout;@Bean(name = "zkClient")public ZooKeeper zkClient(){ZooKeeper zooKeeper=null;try {final CountDownLatch countDownLatch = new CountDownLatch(1);zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {if(Event.KeeperState.SyncConnected==event.getState()){//如果收到了服务端的SyncConnected响应事件,表示连接成功countDownLatch.countDown();}}});countDownLatch.await();}catch (Exception e){e.printStackTrace();}return  zooKeeper;}
}

3、编写zookeeper分布式锁。

public class ZkLock implements Lock {//zk客户端private ZooKeeper zk;//锁的根节点private String root ="/locks";//当前锁的名称private String lockName;//当前线程创建的临时有序节点private ThreadLocal<String> nodeId = new ThreadLocal<>();private final static byte[] data = new byte[0];public ZkLock(ZooKeeper zooKeeper, String lockName){this.zk = zooKeeper;this.lockName = lockName;try {Stat stat = zk.exists(root, false);if(stat==null) {//创建根节点zk.create(root, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}/*** 监听器,监听临时节点的删除*/class LockWatcher implements Watcher{private CountDownLatch latch;public LockWatcher(CountDownLatch latch){this.latch = latch;}public void process(WatchedEvent watchedEvent) {if(watchedEvent.getType()== Event.EventType.NodeDeleted){latch.countDown();}}}public void lock() {try {//在根节点下创建临时有序节点String myNode = zk.create(root + "/" + lockName, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(Thread.currentThread().getName()+myNode+" create");//获取根节点下的所有根节点下List<String> subNodes = zk.getChildren(root, false);//排序TreeSet<String> sortedNodes = new TreeSet<String>();for(String node:subNodes){sortedNodes.add(root+"/"+node);}//获取序号最下的子节点String smallNode = sortedNodes.first();//如果该次创建的临时有序节点是最小的子节点,则表示取得锁if(myNode.equals(smallNode) ){System.out.println(Thread.currentThread().getName()+myNode+" get lock");this.nodeId.set(myNode);return;}//否则,取得当前节点的前一个节点String preNode = sortedNodes.lower(myNode);CountDownLatch latch = new CountDownLatch(1);//查询前一个节点,同时注册监听Stat stat = zk.exists(preNode, new LockWatcher(latch));// 如果比自己小的前一个节点查询时,已经不存在则无需等待,如果存在则监听if(stat!=null){System.out.println(Thread.currentThread().getName()+myNode+" waiting for "+ root+"/"+preNode+" release lock");latch.await();//等待}nodeId.set(myNode);}catch (Exception e){throw new RuntimeException(e);}}public void unlock() {try{//释放锁时,只需要将本次创建的临时有序节点移除掉System.out.println(Thread.currentThread().getName()+" unlock");if(nodeId!=null){zk.delete(nodeId.get(),-1);}nodeId.remove();}catch (InterruptedException e){e.printStackTrace();}catch (KeeperException e){e.printStackTrace();}}public void lockInterruptibly() throws InterruptedException {}public boolean tryLock() {return false;}public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}public Condition newCondition() {return null;}
}

4、模拟两个线程,使用zookeeper分布式锁,同时进行减少库存操作。

@SpringBootTest
public class ZookeeperLockTest {@Autowiredprivate ZooKeeper zooKeeper;private static final String LOCK_NAME = "zk_lock";static class StockThread implements Runnable{private ZkLock zkLock;public StockThread(ZkLock zkLock){this.zkLock= zkLock;}public void run() {zkLock.lock();//调用减少库存的方法boolean b = new Stock().reduceStock();zkLock.unlock();if (b) {System.out.println(Thread.currentThread().getName()+"减少库存成功");}else {System.out.println(Thread.currentThread().getName()+"减少库存失败");}}}@Testpublic void test() throws InterruptedException {ZkLock zkLock = new ZkLock(zooKeeper,LOCK_NAME);new Thread(new ZookeeperLockTest.StockThread(zkLock),"线程1").start();new Thread(new ZookeeperLockTest.StockThread(zkLock),"线程2").start();Thread.sleep(4000);}
}

zookeeper分布式锁缺陷

1、强依赖zookeeper的可用性,一旦zookeeper宕机,会导致业务系统不可用,因此最好搭建zookeeper集群。

java分布式_分布式锁的四种JAVA实现方式相关推荐

  1. java 中lock,java中lock获取锁的四种方法

    在java接口中会存放着许多方法,方便线程使用时的直接调用.对于lock接口大家都不陌生,我们已经初步对概念进行了理解.那么在获取锁的方法上想必还不是很清楚.下面我们就lock获取锁的四种方法分别进行 ...

  2. java 按钮 监听_Button的四种监听方式

    Button按钮设置点击的四种监听方式 注:加粗放大的都是改变的代码 1.使用匿名内部类的形式进行设置 使用匿名内部类的形式,直接将需要设置的onClickListener接口对象初始化,内部的onC ...

  3. java中的json_JAVA中的四种JSON解析方式详解

    JAVA中的四种JSON解析方式详解 我们在日常开发中少不了和JSON数据打交道,那么我们来看看JAVA中常用的JSON解析方式. 1.JSON官方 脱离框架使用 2.GSON 3.FastJSON ...

  4. java定时14点30分_单机定时任务的四种基本实现方式

    引言 在实际项目开发中,定时任务调度是经常会出现的一类需求. 定时任务的场景可以说非常广泛,例如: 购买某些视频网站的会员后,每天给会员送成长值,每月给会员送电影券 在保证最终一致性的场景中,利用定时 ...

  5. 【Java 并发编程】线程锁机制 ( 锁的四种状态 | 无锁状态 | 偏向锁 | 轻量级锁 | 重量级锁 | 锁竞争 | 锁升级 )

    文章目录 一.悲观锁示例 ( ReentrantLock ) 二.重量级锁弊端 三.锁的四种状态 ( 无锁状态 | 偏向锁 | 轻量级锁 | 重量级锁 ) 四.锁的四种状态之间的转换 ( 无锁状态 - ...

  6. 对象头、锁的四种状态、Java和处理器实现原子操作的方式(CAS、锁机制;总线锁定、缓存锁定)

    1.对象头 Java对象头里的Mark Word里默认存储对象的HashCode.分代年龄和锁标记位. 32位JVM的Mark Word的默认存储结构如下图所示: 在运行期间,Mark Word里存储 ...

  7. Java并发—锁的四种状态

    目录 无锁 偏向锁 轻量级锁 重量级锁 总结 锁的四种状态:无锁.偏向锁.轻量级锁和重量级锁 无锁 无锁就是没有真正意义上的上锁,所有的线程还是能访问并修改同一个资源,但是通过算法控制,实现同时只有一 ...

  8. java将一个整数按字节输出_在java中的整数类型有四种,分别是 byte  short int long 其中byte只有一个字节 0或1,在此不详细讲解。其他的三种类型如下:1、...

    在java中的整数类型有四种,分别是 byte  short int long 其中byte只有一个字节 0或1,在此不详细讲解. 其他的三种类型如下: 1. 基本类型:short 二进制位数:16 ...

  9. java按钮权限控制_详解Spring Security 中的四种权限控制方式

    Spring Security 中对于权限控制默认已经提供了很多了,但是,一个优秀的框架必须具备良好的扩展性,恰好,Spring Security 的扩展性就非常棒,我们既可以使用 Spring Se ...

最新文章

  1. 洛谷P2746 [USACO5.3]校园网Network of Schools
  2. python代码用c语言封装_使用C语言扩展Python程序的简单入门指引
  3. LeetCode:Unique Binary Search Trees
  4. 数组copyWithin()方法以及JavaScript中的示例
  5. Windows平台下go编译器LiteIDE的安装和使用
  6. leanote 支持php,Leanote source leanote源码导读
  7. 数据库基础:什么是列?数据类型是什么?
  8. 解决上传窗口弹不出的问题
  9. Pytorch中的强化学习
  10. JS字符串截取(获取指定字符后面的所有字符内容)
  11. AngularJs依赖注入的研究
  12. AD18的一些简单操作
  13. Docker-07:Docker网络管理
  14. confluence编辑文件和文字_知识管理Confluence:常用基本操作
  15. linux下的系统监控软件,管理员必备的20个Linux系统监控工具
  16. word中批量修改图片大小的两个方法
  17. PCB工程的BOM表设置隔行隔列显示不同颜色操作方法
  18. 论文中出现的 cf. i.e. s.t. e.g. w.r.t. et al. etc等英文缩写是什么意思
  19. Hadoop完全分布式搭建全过程
  20. 联想G40进入BIOS

热门文章

  1. html5中新增的语义化的标签
  2. java对象引用传递和值传递的一些总结
  3. SQL Server 2016/2014/2012/2008/2005/2000简体中文企业版下载地址
  4. 虚拟机VM10装Mac OS X 10.9.3
  5. 一个老程序员对数据库的一点纠结
  6. 模拟电子技术不挂科学习笔记3(放大电路的分析方法)
  7. mongodb创建数据库用户名和密码_Linux运维老司机,教你MongoDB 的不同连接方式
  8. Python+django网页设计入门(3):使用SQLite数据库
  9. 几行Python代码生成饭店营业额模拟数据并保存为CSV文件
  10. java条件触发_java – 当给定75:android时,条件不会触发