我们在构建分布式系统的时候,经常需要控制对共享资源的互斥访问。这个时候我们就涉及到分布式锁(也称为全局锁)的实现,基于目前的各种工具,我们已经有了大量的实现方式,比如:基于Redis的实现、基于Zookeeper的实现。本文将介绍一种基于Consul 的Key/Value存储来实现分布式锁以及信号量的方法。

分布式锁实现

基于Consul的分布式锁主要利用Key/Value存储API中的acquire和release操作来实现。acquire和release操作是类似Check-And-Set的操作:

  • acquire操作只有当锁不存在持有者时才会返回true,并且set设置的Value值,同时执行操作的session会持有对该Key的锁,否则就返回false
  • release操作则是使用指定的session来释放某个Key的锁,如果指定的session无效,那么会返回false,否则就会set设置Value值,并返回true

具体实现中主要使用了这几个Key/Value的API:

  • create session:https://www.consul.io/api/session.html#session_create
  • delete session:https://www.consul.io/api/session.html#delete-session
  • KV acquire/release:https://www.consul.io/api/kv.html#create-update-key

基本流程

具体实现

public class Lock {

    private static final String prefix = "lock/";  // 同步锁参数前缀

    private ConsulClient consulClient;    private String sessionName;    private String sessionId = null;    private String lockKey;

    /** * * @param consulClient * @param sessionName 同步锁的session名称 * @param lockKey 同步锁在consul的KV存储中的Key路径,会自动增加prefix前缀,方便归类查询 */    public Lock(ConsulClient consulClient, String sessionName, String lockKey) {        this.consulClient = consulClient;        this.sessionName = sessionName;        this.lockKey = prefix + lockKey;    }

    /** * 获取同步锁 * * @param block 是否阻塞,直到获取到锁为止 * @return */    public Boolean lock(boolean block) {        if (sessionId != null) {            throw new RuntimeException(sessionId + " - Already locked!");        }        sessionId = createSession(sessionName);        while(true) {            PutParams putParams = new PutParams();            putParams.setAcquireSession(sessionId);            if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) {                return true;            } else if(block) {                continue;            } else {                return false;            }        }    }

    /** * 释放同步锁 * * @return */    public Boolean unlock() {        PutParams putParams = new PutParams();        putParams.setReleaseSession(sessionId);        boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue();        consulClient.sessionDestroy(sessionId, null);        return result;    }

    /** * 创建session * @param sessionName * @return */    private String createSession(String sessionName) {        NewSession newSession = new NewSession();        newSession.setName(sessionName);        return consulClient.sessionCreate(newSession, null).getValue();    }

}

单元测试

下面单元测试的逻辑:通过线程的方式来模拟不同的分布式服务来竞争锁。多个处理线程同时以阻塞方式来申请分布式锁,当处理线程获得锁之后,Sleep一段随机事件,以模拟处理业务逻辑,处理完毕之后释放锁。

public class TestLock {

    private Logger logger = Logger.getLogger(getClass());

    @Test    public void testLock() throws Exception {        new Thread(new LockRunner(1)).start();        new Thread(new LockRunner(2)).start();        new Thread(new LockRunner(3)).start();        new Thread(new LockRunner(4)).start();        new Thread(new LockRunner(5)).start();        Thread.sleep(200000L);    }

    class LockRunner implements Runnable {

        private Logger logger = Logger.getLogger(getClass());        private int flag;

        public LockRunner(int flag) {            this.flag = flag;        }

        @Override        public void run() {            Lock lock = new Lock(new ConsulClient(), "lock-session", "lock-key");            try {                if (lock.lock(true)) {                    logger.info("Thread " + flag + " start!");                    Thread.sleep(new Random().nextInt(3000L));                    logger.info("Thread " + flag + " end!");                }            } catch (Exception e) {                e.printStackTrace();            } finally {                lock.unlock();            }        }    }

}

单元测试执行结果如下:

2017-04-12 21:28:09,698 INFO  [Thread-0] LockRunner - Thread 1 start!2017-04-12 21:28:12,717 INFO  [Thread-0] LockRunner - Thread 1 end!2017-04-12 21:28:13,219 INFO  [Thread-2] LockRunner - Thread 3 start!2017-04-12 21:28:15,672 INFO  [Thread-2] LockRunner - Thread 3 end!2017-04-12 21:28:15,735 INFO  [Thread-1] LockRunner - Thread 2 start!2017-04-12 21:28:17,788 INFO  [Thread-1] LockRunner - Thread 2 end!2017-04-12 21:28:18,249 INFO  [Thread-4] LockRunner - Thread 5 start!2017-04-12 21:28:19,573 INFO  [Thread-4] LockRunner - Thread 5 end!2017-04-12 21:28:19,757 INFO  [Thread-3] LockRunner - Thread 4 start!2017-04-12 21:28:21,353 INFO  [Thread-3] LockRunner - Thread 4 end!

从测试结果我们可以看到,通过分布式锁的形式来控制并发时,多个同步操作只会有一个操作能够被执行,其他操作只有在等锁释放之后才有机会去执行,所以通过这样的分布式锁,我们可以控制共享资源同时只能被一个操作进行执行,以保障数据处理时的分布式并发问题。

优化建议

本文我们实现了基于Consul的简单分布式锁,但是在实际运行时,可能会因为各种各样的意外情况导致unlock操作没有得到正确地执行,从而使得分布式锁无法释放。所以为了更完善的使用分布式锁,我们还必须实现对锁的超时清理等控制,保证即使出现了未正常解锁的情况下也能自动修复,以提升系统的健壮性。那么如何实现呢?请持续关注我的后续分解!

参考文档

Key/Value的API:https://www.consul.io/api/kv.html

选举机制:https://www.consul.io/docs/guides/leader-election.html

实现代码

  • GitHub:https://github.com/dyc87112/consul-distributed-lock
  • 开源中国:http://git.oschina.net/didispace/consul-distributed-lock

基于Consul的分布式锁实现相关推荐

  1. java B2B2C源码电子商务平台-基于Consul的分布式锁实现

    分布式锁实现 需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码:壹零叁八柒柒肆六二六 基于Consul的分布式锁主要利用Key/Value存储API中的ac ...

  2. java如何保证redis设置过期时间的原子性_redis专题系列22 -- 如何优雅的基于redis实现分布式锁

    几个概念 线程锁:主要用来给方法.代码块加锁.当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段.线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是依靠线程之间共享内存实现的,比 ...

  3. 基于Consul的分布式信号量实现

    本文将继续讨论基于Consul的分布式锁实现.信号量是我们在实现并发控制时会经常使用的手段,主要用来限制同时并发线程或进程的数量,比如:Zuul默认情况下就使用信号量来限制每个路由的并发数,以实现不同 ...

  4. 基于Redis的分布式锁真的安全吗?

    说明: 我前段时间写了一篇用consul实现分布式锁,感觉理解的也不是很好,直到我看到了这2篇写分布式锁的讨论,真的是很佩服作者严谨的态度, 把这种分布式锁研究的这么透彻,作者这种技术态度真的值得我好 ...

  5. etcd 笔记(08)— 基于 etcd 实现分布式锁

    1. 为什么需要分布式锁? 在分布式环境下,数据一致性问题一直是个难点.分布式与单机环境最大的不同在于它不是多线程而是多进程.由于多线程可以共享堆内存,因此可以简单地采取内存作为标记存储位置.而多进程 ...

  6. 基于 Redis 实现分布式锁思考

    以下文章来源方志朋的博客,回复"666"获面试宝典 来源:blog.csdn.net/xuan_lu/article/details/111600302 分布式锁 基于redis实 ...

  7. nx set 怎么实现的原子性_基于Redis的分布式锁实现

    前言 本篇文章主要介绍基于Redis的分布式锁实现到底是怎么一回事,其中参考了许多大佬写的文章,算是对分布式锁做一个总结 分布式锁概览 在多线程的环境下,为了保证一个代码块在同一时间只能由一个线程访问 ...

  8. 基于Redis的分布式锁和Redlock算法

    来自:后端技术指南针 1 前言 今天开始来和大家一起学习一下Redis实际应用篇,会写几个Redis的常见应用. 在我看来Redis最为典型的应用就是作为分布式缓存系统,其他的一些应用本质上并不是杀手 ...

  9. redis系列:基于redis的分布式锁

    一.介绍 这篇博文讲介绍如何一步步构建一个基于Redis的分布式锁.会从最原始的版本开始,然后根据问题进行调整,最后完成一个较为合理的分布式锁. 本篇文章会将分布式锁的实现分为两部分,一个是单机环境, ...

最新文章

  1. 十大流行AI框架和库的优缺点对比
  2. 提高工作效率:15个有用的项目管理工具
  3. 微信公众号自定义菜单跳转小程序
  4. [Java]Annotation元数据的几个应用
  5. 对我影响最大的三个老师
  6. 错误: 找不到或无法加载主类 com.leyou.LeyouItemApplication Process finished with exit code 1...
  7. [Leedcode][JAVA][第739题][每日温度][暴力][单调栈]
  8. 立足国产自主可控技术 达梦DM8数据库新品化繁为简
  9. python里面print是什么意思_python里print是什么意思
  10. 2017华为机试题--Floyd算法
  11. asp.net文本编辑器FCKeditor使用方法详解 - 橙色大地 - 博客园
  12. Netty工作笔记0035---Reactor模式图剖析
  13. java地铁最短,地铁最短路径
  14. python 生成nc文件_Python生成器处理大文本文件的代码
  15. ORA-02049: 超时: 分布式事务处理等待锁
  16. 奔图P3305DN安装官网windows驱动 打印乱码解决方法
  17. 常用计算机硬件故障检查方法,电脑硬件常见的故障检测及处理方法
  18. 对字符串按“红黄蓝”进行排序,如“蓝黄红红黄”,输出结果为“红红黄黄蓝”
  19. warning: implicit declaration of function ‘XXX’; did you mean ‘YYY’? [-Wimplicit-function-declarati
  20. jq和js的关系_jquery与js的区别是什么?

热门文章

  1. jquery中怎么删除ul中的整个li包括节点
  2. Google C++编程风格指南(一):背景
  3. 胃部不适,原来好辛苦!
  4. jsp 连接MS server 数据库的例子
  5. 被LCD调戏睡不着了
  6. c html联调,JS与native 交互简单应用
  7. 每日一题(18)—— 指定地址存入数据
  8. 一、express 路由 todos案例
  9. LeetCode 907. 子数组的最小值之和(单调栈)
  10. LeetCode 659. 分割数组为连续子序列(哈希)