一、什么是分布式锁?

分布式锁是相对于单体单机应用而言的一种锁机制。在单机应用时由于共享一个jvm,可以使用同一个java Lock对象进行获取锁,解锁操作。当为分布式集群时存在跨机器请求执行,无法共享同一个java对象锁,但又需要对需要加锁保护的代码逻辑进行执行,此时分布式锁就相应而出现了。

百度百科这这样介绍到:分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,这个时候,便需要使用到分布式锁。

如上图:同一商品在同一时间有多个用户进行下单操作,在负载均衡服务器的地址路由之后可能会将请求分发到不同的节点上面。不同的节点进行出库操作。如果在没有并发的情况下,某一条节点接收订单请求之后,执行完出库操作,以及数据库将数据进行同步之完成之后,下一个订单请求再进来判断库存执行出库都毫无问题。但是如果当请求并发比较高时,在其中一个节点在操作数据库时,另一个节点也执行到了操作数据库过程,那么有可能虽然都下订单成功,实际上下订单数量已经大于实际库存数量,将导致下单问题出现。

那么,分布式锁是如果解决上述问题的,实际上分布式锁主要依靠所有集群节点操作一个共享数据实现锁逻辑。比如通过读写同一个数据库,读写同一个redis等等来实现在一个时间内只有获得锁的请求可以去操作库存数据库,等数据同步完成之后,在进行解锁操作。如下图:如果共享存储中存在lock_name 时,其他请求等待。当获取锁的请求执行完成之后,则删除lock_name。其他某一条线程在获得锁再在共享存储中设置上lock_name。从而达到同一时间只有一条请求执行加锁逻辑代码块。

二、使用Redis的手写实现分布式锁

在使用Redis手写实现主要是借助于setnx(set if not exists) 命令特性(相同key值只有一个执行命令可以返回正确,其他失败),客户端效果如下:

借此特性,我们在并发情况下可以根据此特性保证只有一个请求获得锁(返回1的)。当该请求需要执行加锁逻辑时,首先去执行redis setnx 方法,获得成功返回后,执行加锁逻辑代码。此时其他并发请求在执行到加锁逻辑时,执行setnx 则返回失败,此时可以不间断(间隔时间越短效率越高,但是耗费资源)去执行setnx操作。直到当前获得锁的请求执行完加锁逻辑后,执行删除key操作进行解锁后,其他线程才可以获得锁。setnx 我们在java代码中可以通过如下函数实现:

redisTemplate.opsForValue().setIfAbsent(LOCK_NAME, "1")

首先我们定义一个库存对象Stock.java,包含库存当前数量以及下订单操作。在实际中可以是多台集群机器中的sql执行减一修改操作。

package com.xiaohui.bean;public class Stock {//库存当前数量public static int count = 1;/*** 下订单,减少库存操作* @return true 下单成功,false 下单失败*/public static boolean reduceStock(){if(count <= 0){return false;}try {Thread.sleep(2000);}catch (Exception e){e.printStackTrace();}count-- ;return  true;}
}

接下来我们执行一段没有加锁的测试代码 LockMain.java

package com.xiaohui.web;import com.xiaohui.bean.Stock;public class LockMain {public static void main(String[] args) {new Thread(new Runnable() {@Overridepublic void run() {boolean b = Stock.reduceStock();System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));}}).start();new Thread(new Runnable() {@Overridepublic void run() {boolean b = Stock.reduceStock();System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));}}).start();new Thread(new Runnable() {@Overridepublic void run() {boolean b = Stock.reduceStock();System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));}}).start();try {Thread.sleep(3000);}catch (Exception e){e.printStackTrace();}System.out.println("Stock.count = " +Stock.count);}
}

打印如下:我们可以看到在没有锁的情况下我们将库存更新为了负数。这不是我们预期的小于0时下订单失败的效果的。

接下来我们使用redisTemplate对象通过操作redis 实现分布式锁。

代码环境使用SpringBoot 工程 + Redis +web工程测试

1,pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xiaohui</groupId><artifactId>zklockdemo</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.7.RELEASE</version><relativePath/></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies></project>

2, application.properties  主要配置web模块访问端口以及redis的链接信息

server.port=8888# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
# 连接超时时间(毫秒)
spring.redis.timeout=5000# 连接池设置
spring.redis.jedis.pool.max-idle=8
spring.redis.jedis.pool.max-wait=
spring.redis.jedis.pool.min-idle=0
spring.redis.jedis.pool.max-active=8

3,Redis 配置类RedisConfig.java

package com.xiaohui.cfg;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();// 配置连接工厂template.setConnectionFactory(factory);//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper om = new ObjectMapper();// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和publicom.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jacksonSeial.setObjectMapper(om);// 值采用json序列化template.setValueSerializer(jacksonSeial);//使用StringRedisSerializer来序列化和反序列化redis的key值template.setKeySerializer(new StringRedisSerializer());// 设置hash key 和value序列化模式template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(jacksonSeial);template.afterPropertiesSet();return template;}
}

4.Redis锁实现类(重要)ReadisLock.java

package com.xiaohui.bean;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;@Component
public class ReadisLock implements Lock {@AutowiredRedisTemplate redisTemplate;private static String LOCK_NAME = "lock_name";@Overridepublic void lock() {while (true){Boolean name = redisTemplate.opsForValue().setIfAbsent(LOCK_NAME, "222");//为防止加锁后报错无法解锁,可以给缓存设置失效时间,解决死锁问题。
//            Boolean name = redisTemplate.opsForValue().setIfAbsent(LOCK_NAME, "1",15,TimeUnit.SECONDS);if(name){ // true 未加锁 加锁成功System.out.println(Thread.currentThread().getName()+" 加锁成功。。");break;}else{// false 已加锁System.out.println(Thread.currentThread().getName()+"继续等待....");}}}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {return false;}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return false;}@Overridepublic void unlock() {redisTemplate.delete(LOCK_NAME);System.out.println(Thread.currentThread().getName()+" 完成解锁。。");}@Overridepublic Condition newCondition() {return null;}
}

再此类中我们实现了java包中的Lock接口。主要实现其 lock加锁方法,以及unlock解锁方法。

在lock方法中我们通过循环redisTemplate.opsForValue().setIfAbsent(LOCK_NAME, "222") 对redis进行操作,如果设置成功则表示取得锁。退出lock函数。其他没有获取成功的将继续执行等待获取。。。

在解锁函数中主要就是删除其锁过程。

注意:为了避免在取得锁后,在解锁之前报错,导致无法解锁出现死锁状态,我们可以通过setIfAbsent(LOCK_NAME, "1",15,TimeUnit.SECONDS);四个参数的方法为缓存设置失效时间,来保证在无法解锁的情况下 过期锁自动解开。

5,web测试代码

package com.xiaohui.web;import com.xiaohui.bean.ReadisLock;
import com.xiaohui.bean.Stock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class WebController {@AutowiredReadisLock lock;@GetMapping("/startReduce")public String startReduce(){new Thread(new Runnable() {@Overridepublic void run() {lock.lock();boolean b = Stock.reduceStock();System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));lock.unlock();}}).start();new Thread(new Runnable() {@Overridepublic void run() {lock.lock();boolean b = Stock.reduceStock();System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));lock.unlock();}}).start();new Thread(new Runnable() {@Overridepublic void run() {lock.lock();boolean b = Stock.reduceStock();System.out.println(Thread.currentThread().getName()+"下单:"+( b ? "成功":"失败"));lock.unlock();}}).start();try {Thread.sleep(3000);}catch (Exception e){e.printStackTrace();}System.out.println("Stock.count = " +Stock.count);return  "set ok!";}}

在该接口中我们模拟了三个并发操作下订单逻辑。并在每一个下订单前后都加上了锁操作。

6,SpringBoot启动类代码

package com.xiaohui;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class MainApplication {public static void main(String[] args) {SpringApplication.run(MainApplication.class,args);}
}

7,启动访问web 测试地址 http://127.0.0.1:8888/startReduce 控制台打印如下:我们可以看到 第一个加锁成功的线程13 下单成功,后面在渠道锁的都下单失败,和预期下单锁效果一致。

Thread-13 加锁成功。。
Thread-15继续等待....
Thread-14继续等待....
Thread-14继续等待....
//若干重复打印14 15 线程等待
Thread-14继续等待....
Thread-13下单:成功
Thread-15继续等待....
Thread-13 完成解锁。。
Thread-15 加锁成功。。
Thread-15下单:失败
Thread-14继续等待....
Thread-15 完成解锁。。
Thread-14 加锁成功。。
Thread-14下单:失败
Thread-14 完成解锁。。

项目工程结构如下:

ZooKeeper(三) 什么是分布式锁以及使用Redis手写实现相关推荐

  1. 什么是分布式锁?redis、zookeeper、etcd实现分布式锁有什么不同之处?

    目录 分布式锁定义 目的 基于redis分布式锁 基于zookeeper实现的分布式锁 edis.zookeeper.etcd实现分布式锁的比较 建议选择etcd实现分布式锁 分布式锁定义 分布式环境 ...

  2. 分布式锁(基于redis和zookeeper)详解

    分布式锁(基于redis和zookeeper)详解 https://blog.csdn.net/a15835774652/article/details/81775044 为什么写这篇文章? 目前网上 ...

  3. zookeeper 分布式锁_关于redis分布式锁,zookeeper分布式锁原理的一些学习与思考

    编辑:业余草来源:https://www.xttblog.com/?p=4946 首先分布式锁和我们平常讲到的锁原理基本一样,目的就是确保,在多个线程并发时,只有一个线程在同一刻操作这个业务或者说方法 ...

  4. 纠结!分布式锁到底用Redis好还是ZooKeeper好?

    前言 1. 什么是分布式锁 分布式锁是控制分布式系统之间同步访问共享资源的一种方式.在分布式系统中,常常需要协调他们的动作.如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些 ...

  5. 阿里面试官:分布式锁到底用Redis好?还是Zookeeper好?

    首先,分布式锁和我们平常讲到的锁原理基本一样,目的就是确保,在多个线程并发时,只有一个线程在同一刻操作这个业务或者说方法.变量. 在一个进程中,也就是一个jvm 或者说应用中,我们很容易去处理控制,在 ...

  6. 分布式锁实现:Redis

    前言 单机环境下我们可以通过JAVA的Synchronized和Lock来实现进程内部的锁,但是随着分布式应用和集群环境的出现,系统资源的竞争从单进程多线程的竞争变成了多进程的竞争,这时候就需要分布式 ...

  7. python redis分布式锁_Python 使用 Redis 实现分布式锁

    前言 随着互联网技术的不断发展,用户量的不断增加,越来越多的业务场景需要用到分布式系统.而在分布式系统中访问共享资源就需要一种互斥机制,来防止彼此之间的互相干扰,以保证一致性,这个时候就需要使用分布式 ...

  8. 分布式锁 哨兵模式_手撕redis分布式锁,隔壁张小帅都看懂了!

    前言 上一篇老猫和小伙伴们分享了为什么要使用分布式锁以及分布式锁的实现思路原理,目前我们主要采用第三方的组件作为分布式锁的工具.上一篇运用了Mysql中的select -for update实现了分布 ...

  9. Apache ZooKeeper - 使用ZK实现分布式锁(非公平锁/公平锁/共享锁 )

    文章目录 什么是分布式锁 分布式死锁 分类 排他锁 共享锁 实现 创建锁 获取锁 释放锁 Demo Jmeter配置 方案零 缺陷版本 方案一 非公平锁方案 缺陷 (羊群效应) 方案二 公平锁方案 方 ...

最新文章

  1. vue 启动时卡死_使用 Vue 两年后
  2. 【创新应用】未来10年,这些黑科技必将颠覆我们的生活
  3. 解决linux下无线网卡被物理禁用问题
  4. Mysql 中如何创建触发器
  5. php代码格式化工具 php-cs-fixer的使用
  6. python安装wxpython库_wxPython:python 首选的 GUI 库
  7. 错误解析 error:unable to find numeric literal operator ‘operator““a/b/c/...‘
  8. paip.快捷方式分组管理最佳实践ObjectDock
  9. 实验一 SNMP网络管理架构的验证
  10. J-Flash使用方法
  11. .net core6 简单控制台读取数据库操作封装
  12. 怎么关闭Deep Freeze (冰点还原精灵单机版)
  13. 局部边缘保持滤波(LEP)高动态范围图像HDR压缩 matlab程序(二)
  14. 《黑客帝国》说的是什么?
  15. 阿里云服务器ECS有哪些功能特性?
  16. ICC 图文学习——LAB3:Placement 布局
  17. Hive数据仓库数据分析
  18. Activiti的学习
  19. 杰奇python采集器_极速杰奇采集器
  20. 华为c语言机试题库及答案,华为C语言机试题面试题汇总.doc

热门文章

  1. Eclipse远程调试Java代码的三种方法
  2. Elasticsearch安装X-Pack插件
  3. Android服务的通信方式,android客户端与服务器通信的HTTP通信
  4. if函数判断单元格颜色_excel中的if函数,实现自动判断
  5. ubuntu12.04编译rtems doc目录
  6. 防止误删的神器-ECS实例删除保护
  7. App-V轻量级应用程序虚拟化之三客户端测试
  8. 适合于小团队产品迭代的APP测试流程 1
  9. Java基础——数组应用之StringBuilder类和StringBuffer类
  10. DOM(二)使用DOM