基于Consul的分布式锁实现
我们在构建分布式系统的时候,经常需要控制对共享资源的互斥访问。这个时候我们就涉及到分布式锁(也称为全局锁)的实现,基于目前的各种工具,我们已经有了大量的实现方式,比如:基于Redis的实现、基于Zookeeper的实现。本文将介绍一种基于Consul 的Key/Value存储来实现分布式锁以及信号量的方法。
分布式锁实现
基于Consul的分布式锁主要利用Key/Value存储API中的acquire和release操作来实现。acquire和release操作是类似Check-And-Set的操作:
- acquire操作只有当锁不存在持有者时才会返回true,并且set设置的Value值,同时执行操作的session会持有对该Key的锁,否则就返回false
- release操作则是使用指定的session来释放某个Key的锁,如果指定的session无效,那么会返回false,否则就会set设置Value值,并返回true
具体实现中主要使用了这几个Key/Value的API:
- create session:https://www.consul.io/api/session.html#session_create
- delete session:https://www.consul.io/api/session.html#delete-session
- KV acquire/release:https://www.consul.io/api/kv.html#create-update-key
基本流程
具体实现
public class Lock { private static final String prefix = "lock/"; // 同步锁参数前缀 private ConsulClient consulClient; private String sessionName; private String sessionId = null; private String lockKey; /** * * @param consulClient * @param sessionName 同步锁的session名称 * @param lockKey 同步锁在consul的KV存储中的Key路径,会自动增加prefix前缀,方便归类查询 */ public Lock(ConsulClient consulClient, String sessionName, String lockKey) { this.consulClient = consulClient; this.sessionName = sessionName; this.lockKey = prefix + lockKey; } /** * 获取同步锁 * * @param block 是否阻塞,直到获取到锁为止 * @return */ public Boolean lock(boolean block) { if (sessionId != null) { throw new RuntimeException(sessionId + " - Already locked!"); } sessionId = createSession(sessionName); while(true) { PutParams putParams = new PutParams(); putParams.setAcquireSession(sessionId); if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) { return true; } else if(block) { continue; } else { return false; } } } /** * 释放同步锁 * * @return */ public Boolean unlock() { PutParams putParams = new PutParams(); putParams.setReleaseSession(sessionId); boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue(); consulClient.sessionDestroy(sessionId, null); return result; } /** * 创建session * @param sessionName * @return */ private String createSession(String sessionName) { NewSession newSession = new NewSession(); newSession.setName(sessionName); return consulClient.sessionCreate(newSession, null).getValue(); } } |
单元测试
下面单元测试的逻辑:通过线程的方式来模拟不同的分布式服务来竞争锁。多个处理线程同时以阻塞方式来申请分布式锁,当处理线程获得锁之后,Sleep一段随机事件,以模拟处理业务逻辑,处理完毕之后释放锁。
public class TestLock { private Logger logger = Logger.getLogger(getClass()); @Test public void testLock() throws Exception { new Thread(new LockRunner(1)).start(); new Thread(new LockRunner(2)).start(); new Thread(new LockRunner(3)).start(); new Thread(new LockRunner(4)).start(); new Thread(new LockRunner(5)).start(); Thread.sleep(200000L); } class LockRunner implements Runnable { private Logger logger = Logger.getLogger(getClass()); private int flag; public LockRunner(int flag) { this.flag = flag; } @Override public void run() { Lock lock = new Lock(new ConsulClient(), "lock-session", "lock-key"); try { if (lock.lock(true)) { logger.info("Thread " + flag + " start!"); Thread.sleep(new Random().nextInt(3000L)); logger.info("Thread " + flag + " end!"); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } } |
单元测试执行结果如下:
2017-04-12 21:28:09,698 INFO [Thread-0] LockRunner - Thread 1 start!2017-04-12 21:28:12,717 INFO [Thread-0] LockRunner - Thread 1 end!2017-04-12 21:28:13,219 INFO [Thread-2] LockRunner - Thread 3 start!2017-04-12 21:28:15,672 INFO [Thread-2] LockRunner - Thread 3 end!2017-04-12 21:28:15,735 INFO [Thread-1] LockRunner - Thread 2 start!2017-04-12 21:28:17,788 INFO [Thread-1] LockRunner - Thread 2 end!2017-04-12 21:28:18,249 INFO [Thread-4] LockRunner - Thread 5 start!2017-04-12 21:28:19,573 INFO [Thread-4] LockRunner - Thread 5 end!2017-04-12 21:28:19,757 INFO [Thread-3] LockRunner - Thread 4 start!2017-04-12 21:28:21,353 INFO [Thread-3] LockRunner - Thread 4 end! |
从测试结果我们可以看到,通过分布式锁的形式来控制并发时,多个同步操作只会有一个操作能够被执行,其他操作只有在等锁释放之后才有机会去执行,所以通过这样的分布式锁,我们可以控制共享资源同时只能被一个操作进行执行,以保障数据处理时的分布式并发问题。
优化建议
本文我们实现了基于Consul的简单分布式锁,但是在实际运行时,可能会因为各种各样的意外情况导致unlock操作没有得到正确地执行,从而使得分布式锁无法释放。所以为了更完善的使用分布式锁,我们还必须实现对锁的超时清理等控制,保证即使出现了未正常解锁的情况下也能自动修复,以提升系统的健壮性。那么如何实现呢?请持续关注我的后续分解!
参考文档
Key/Value的API:https://www.consul.io/api/kv.html
选举机制:https://www.consul.io/docs/guides/leader-election.html
实现代码
- GitHub:https://github.com/dyc87112/consul-distributed-lock
- 开源中国:http://git.oschina.net/didispace/consul-distributed-lock
基于Consul的分布式锁实现相关推荐
- java B2B2C源码电子商务平台-基于Consul的分布式锁实现
分布式锁实现 需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码:壹零叁八柒柒肆六二六 基于Consul的分布式锁主要利用Key/Value存储API中的ac ...
- java如何保证redis设置过期时间的原子性_redis专题系列22 -- 如何优雅的基于redis实现分布式锁
几个概念 线程锁:主要用来给方法.代码块加锁.当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段.线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是依靠线程之间共享内存实现的,比 ...
- 基于Consul的分布式信号量实现
本文将继续讨论基于Consul的分布式锁实现.信号量是我们在实现并发控制时会经常使用的手段,主要用来限制同时并发线程或进程的数量,比如:Zuul默认情况下就使用信号量来限制每个路由的并发数,以实现不同 ...
- 基于Redis的分布式锁真的安全吗?
说明: 我前段时间写了一篇用consul实现分布式锁,感觉理解的也不是很好,直到我看到了这2篇写分布式锁的讨论,真的是很佩服作者严谨的态度, 把这种分布式锁研究的这么透彻,作者这种技术态度真的值得我好 ...
- etcd 笔记(08)— 基于 etcd 实现分布式锁
1. 为什么需要分布式锁? 在分布式环境下,数据一致性问题一直是个难点.分布式与单机环境最大的不同在于它不是多线程而是多进程.由于多线程可以共享堆内存,因此可以简单地采取内存作为标记存储位置.而多进程 ...
- 基于 Redis 实现分布式锁思考
以下文章来源方志朋的博客,回复"666"获面试宝典 来源:blog.csdn.net/xuan_lu/article/details/111600302 分布式锁 基于redis实 ...
- nx set 怎么实现的原子性_基于Redis的分布式锁实现
前言 本篇文章主要介绍基于Redis的分布式锁实现到底是怎么一回事,其中参考了许多大佬写的文章,算是对分布式锁做一个总结 分布式锁概览 在多线程的环境下,为了保证一个代码块在同一时间只能由一个线程访问 ...
- 基于Redis的分布式锁和Redlock算法
来自:后端技术指南针 1 前言 今天开始来和大家一起学习一下Redis实际应用篇,会写几个Redis的常见应用. 在我看来Redis最为典型的应用就是作为分布式缓存系统,其他的一些应用本质上并不是杀手 ...
- redis系列:基于redis的分布式锁
一.介绍 这篇博文讲介绍如何一步步构建一个基于Redis的分布式锁.会从最原始的版本开始,然后根据问题进行调整,最后完成一个较为合理的分布式锁. 本篇文章会将分布式锁的实现分为两部分,一个是单机环境, ...
最新文章
- 十大流行AI框架和库的优缺点对比
- 提高工作效率:15个有用的项目管理工具
- 微信公众号自定义菜单跳转小程序
- [Java]Annotation元数据的几个应用
- 对我影响最大的三个老师
- 错误: 找不到或无法加载主类 com.leyou.LeyouItemApplication Process finished with exit code 1...
- [Leedcode][JAVA][第739题][每日温度][暴力][单调栈]
- 立足国产自主可控技术 达梦DM8数据库新品化繁为简
- python里面print是什么意思_python里print是什么意思
- 2017华为机试题--Floyd算法
- asp.net文本编辑器FCKeditor使用方法详解 - 橙色大地 - 博客园
- Netty工作笔记0035---Reactor模式图剖析
- java地铁最短,地铁最短路径
- python 生成nc文件_Python生成器处理大文本文件的代码
- ORA-02049: 超时: 分布式事务处理等待锁
- 奔图P3305DN安装官网windows驱动 打印乱码解决方法
- 常用计算机硬件故障检查方法,电脑硬件常见的故障检测及处理方法
- 对字符串按“红黄蓝”进行排序,如“蓝黄红红黄”,输出结果为“红红黄黄蓝”
- warning: implicit declaration of function ‘XXX’; did you mean ‘YYY’? [-Wimplicit-function-declarati
- jq和js的关系_jquery与js的区别是什么?