作者 | 刘春龙

责编 | 郭芮

在很多互联网产品应用中,有些场景需要加锁处理,比如秒杀、全局递增ID、楼层生成等等,大部分的解决方案是基于DB实现的,Redis也是较为常见的方案之一。

Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制。

Redis命令介绍

使用Redis实现分布式锁,有两个重要函数需要介绍。

SETNX命令(SET if Not Exists)

语法:

SETNX key value

功能:

当且仅当 key 不存在,将 key 的值设为 value ,并返回1; 若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。

GETSET命令

语法:

GETSET key value

功能:

将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。

GET命令

语法:

GET key

功能:

返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。

DEL命令

语法:

DEL key [KEY …]

功能:

删除给定的一个或多个 key,不存在的 key 会被忽略。

兵贵精,不在多。分布式锁,我们就依靠这四个命令。但在具体实现,还有很多细节,需要仔细斟酌,因为在分布式并发多进程中,任何一点出现差错,都会导致死锁,hold住所有进程。

加锁实现

SETNX 可以直接加锁操作,比如说对某个关键词foo加锁,客户端可以尝试 SETNX foo.lock <current unix time>。

  • 如果返回1,表示客户端已经获取锁,可以往下操作,操作完成后,通过 DEL foo.lock 命令来释放锁。

  • 如果返回0,说明foo已经被其他客户端上锁,如果锁是非堵塞的,可以选择返回调用。如果是堵塞调用,就需要进入下一个重试循环,直至成功获得锁或者重试超时。

理想是美好的,现实是残酷的。仅仅使用SETNX加锁带有竞争条件的,在某些特定的情况会造成死锁错误。

处理死锁

在上面的处理方式中,如果获取锁的客户端执行时间过长,进程被kill掉,或者因为其他异常崩溃,导致无法释放锁,就会造成死锁。所以,需要对加锁要做时效性检测。

因此,我们在加锁时,把当前时间戳作为value存入此锁中,通过当前时间戳和redis中的时间戳进行对比,如果超过一定差值,认为锁已经时效,防止锁无限期的锁下去。

但是,在大并发情况,如果同时检测锁失效,并简单粗暴地删除死锁,再通过SETNX上锁,可能会导致竞争条件的产生,即多个客户端同时获取锁。

情景描述如下:

  • C1获取锁,并崩溃。C2和C3调用SETNX上锁返回0后,获得foo.lock的时间戳,通过比对时间戳,发现锁超时。

  • C2 向foo.lock发送DEL命令。

  • C2 向foo.lock发送SETNX获取锁。

  • C3 向foo.lock发送DEL命令,此时C3发送DEL时,其实DEL掉的是C2的锁。

  • C3 向foo.lock发送SETNX获取锁。

此时C2和C3都获取了锁,产生竞争条件,如果在更高并发的情况,可能会有更多客户端获取锁。

所以,DEL锁的操作,不能直接使用在锁超时的情况下,幸好我们有GETSET方法,假设我们现在有另外一个客户端C4,看看如何使用GETSET方式,避免这种情况产生。

  • C1获取锁,并崩溃。C2和C3调用SETNX上锁返回0后,调用GET命令获得foo.lock的时间戳T1,通过比对时间戳,发现锁超时。

  • C4(调用SETNX上锁返回0后,调用GET命令获得foo.lock的时间戳T1,通过比对时间戳,发现锁超时)向foo.lock发送GESET命令,GETSET foo.lock 并得到foo.lock中老的时间戳T2。

  • 如果T1=T2,说明C4获得锁。

  • 如果T1!=T2,说明C4之前有另外一个客户端C5通过调用GETSET方式获取并更改了时间戳,C4未获得锁。只能进入下次循环中。

时间戳问题

我们看到foo.lock的value值为时间戳,所以要在多客户端情况下,保证锁有效,一定要同步各服务器的时间。

如果各服务器间,时间有差异,时间不一致的客户端,在判断锁超时,就会出现偏差,从而产生竞争条件。锁的超时与否,严格依赖时间戳。

锁覆盖问题

现在唯一的问题是,C4设置foo.lock的新时间戳,是否会对C5获取得锁产生影响?

其实我们可以看到C4和C5只有在调用GET命令获得foo.lock的时间戳,通过比对时间戳,发现锁超时后,几乎同时调用GETSET方式获取锁,执行的时间差值极小,并且写入foo.lock中的都是有效时间戳,所以对锁并没有影响。

为了让这个锁更加强壮,获取锁的客户端应该在调用关键业务时,再次调用GET方法获取T1,和写入的T0时间戳进行对比,以免锁因其他情况被执行DEL意外解开而不知。但是如果遇到上面描述得问题,则T0则会与T1不一致,当然差别一般会很小。这就是锁覆盖问题。

锁覆盖会导致什么问题呢?

当客户端的锁过期时间被覆盖,会造成锁不具有标识性,会造成客户端无法释放锁(客户端只能释放明确自己持有的锁)。

nil 问题

GET返回nil时应该走哪种逻辑?

1、第一种走循环走setnx逻辑

  • C1客户端获取锁,并且处理完后,DEL掉锁。

  • 在DEL锁之前,C2通过SETNX向foo.lock设置时间戳T0失败,发现有客户端获取锁,进入GET操作。C2 向foo.lock发送GET命令,获取返回值T1(nil)(因为此时C1执行DEL删除锁)。

  • C2 循环,进入下一次SETNX逻辑。

2、第二种走超时逻辑

  • C1客户端获取锁,并且处理完后,DEL掉锁。

  • 在DEL锁之前,C2通过SETNX向foo.lock设置时间戳T0发现有客户端获取锁,进入GET操作。C2 向foo.lock发送GET命令,获取返回值T1(nil)(因为此时C1执行DEL删除锁)。

  • C2 通过 `T0 > T1 + expire` 对比,进入GETSET流程。

  • C2调用GETSET向foo.lock发送T0时间戳,返回foo.lock的原值T2,C2判断如果T2=T1相等,获得锁,如果T2!=T1,未获得锁。

两种逻辑貌似都是OK,但是从逻辑处理上来说,当GET返回nil,表示锁是被删除的,而不是超时,应该走SETNX逻辑加锁。

对于"第二种走超时逻辑"是否会造成死锁,尚不清楚,不过推荐采用第一种方式。

GETSET返回nil时应该怎么处理?

前提:假设C4客户端获取锁后由于异常退出等原因未正常释放锁,导致锁超时。此时,C1、C2和C3客户端同时请求获取锁。C1、C2和C3客户端调用GET接口,C1返回T1,此时C3网络情况更好,快速进入获取锁,并执行DEL删除锁,C2返回T2(nil)。C1进入超时处理逻辑。C2面临上面提到「GET返回nil时应该走哪种逻辑?」的两种选择:1. 也进入超时处理逻辑;2. 继续循环走setnx逻辑(推荐)。

  • C1向foo.lock发送GETSET命令,获取返回值T11(nil)。C1比对C1和C11发现两者不同,处理逻辑认为未获取锁,然后继续循环走setnx逻辑。

  • C2有两种选择:

  • 进入超时处理逻辑。C2向foo.lock发送GETSET命令,获取返回值T22(C1写入的时间戳)。C2比对T2和T22发现两者不同,处理逻辑认为未获取锁,然后继续循环走setnx逻辑。

  • 继续循环走setnx逻辑;

  • 很明显,C1和C2最终都会继续循环走setnx逻辑,然后通过SETNX向foo.lock设置时间戳T0会失败,这其实是因为在步骤1中C1执行GETSET命令导致的。此时C1和C2都认为未获取锁,其实C1是已经获取锁了,但是他的处理逻辑没有考虑GETSET返回nil的情况,只是单纯的用GET和GETSET值进行对比。

至于为什么会出现这种情况?就如上面设想的场景那样,多客户端时,每个客户端连接Redis后,发出的命令并不是连续的,导致从单客户端看到的好像连续的命令,到rRedis Server后,这两条命令之间可能已经插入大量的其他客户端发出的命令,比如DEL、SETNX等。

正确的处理方式就是GETSET返回nil时,获取锁成功。

总结

  • 必要的超时机制:获取锁的客户端一旦崩溃,一定要有过期机制,否则其他客户端都降无法获取锁,造成死锁问题。

  • 分布式锁,多客户端的时间戳不能保证严格意义的一致性,所以在某些特定因素下,有可能存在问题。要适度的机制,可以承受小概率的事件产生。

  • 只对关键处理节点加锁,良好的习惯是,把相关的资源准备好,比如连接数据库后,调用加锁机制获取锁,直接进行操作,然后释放,尽量减少持有锁的时间。

  • 在持有锁期间要不要CHECK锁,如果需要严格依赖锁的状态,最好在关键步骤中做锁的CHECK检查机制,但是根据我们的测试发现,在大并发时,每一次CHECK锁操作,都要消耗掉几个毫秒,而我们的整个持锁处理逻辑才不到10毫秒,玩客没有选择做锁的检查。

  • sleep学问,为了减少对Redis的压力,获取锁尝试时,循环之间一定要做sleep操作。但是sleep时间是多少是门学问。需要根据自己的Redis的QPS,加上持锁处理时间等进行合理计算。如果Redis的QPS足够高,也可以考虑循环之间不sleep,循环一定次数/时间执行yeild,提高响应速度。

  • 至于为什么不使用Redis的muti、expire、watch等机制,可以查下参考资料,找下原因。

代码实现

代码库

https://github.com/HuTu92/distributed-lock

源码

package com.github.hutu92.concurrent.locks;

import com.alibaba.fastjson.JSON;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;

/** * Created by liuchunlong on 2018/8/31. * <p> * 基于redis的分布式锁 v1 * * 需要客户端时间同步 */public class DistributedLock {

    private static final long RETRY_BARRIER = 3 * 1000; // 请求锁重试屏障,单位毫秒

    private final JedisPool jedisPool; // redis连接池    private final String lockKey; // lock Key    private final long lockExpiryInNanos; // 锁的过期时长,单位纳秒

    private static final ThreadLocal<Lock> lockThreadLocal = new ThreadLocal<Lock>();

    /**     * 构造方法     *     * @param jedisPool          redis连接池     * @param lockKey            锁的Key     * @param lockExpiryInMillis 锁的过期时长,单位毫秒     */    public DistributedLock(JedisPool jedisPool, String lockKey, long lockExpiryInMillis) {        this.jedisPool = jedisPool;        this.lockKey = lockKey;        this.lockExpiryInNanos = lockExpiryInMillis * 1000;    }

    /**     * 构造方法     * <p>     * 使用锁默认的过期时长Integer.MAX_VALUE,即锁永远不会过期     *     * @param jedisPool redis连接池     * @param lockKey   锁的Key     */    public DistributedLock(JedisPool jedisPool, String lockKey) {        this(jedisPool, lockKey, Integer.MAX_VALUE);    }

    /**     * 获取锁在redis中的Key标记     *     * @return locks key     */    public String getLockKey() {        return this.lockKey;    }

    /**     * 锁的过期时长     *     * @return     */    public long getLockExpiryInNanos() {        return lockExpiryInNanos;    }

    /**     * 请求分布式锁,不会阻塞,直接返回     *     * @param jedis redis 连接     * @return 成功获取锁返回true, 否则返回false     */    private boolean tryAcquire(Jedis jedis) {

        final Lock newLock = new Lock(System.nanoTime() + this.lockExpiryInNanos);

        /**         * 将新锁(newLock)写入redis中。如果成功写入,redis中不存在锁,获取锁成功;否则,redis中已存在锁,获取锁失败;         */        if (jedis.setnx(this.lockKey, newLock.toString()) == 1) {            lockThreadLocal.set(newLock);            return true;        }

        /**         * 至此,说明redis中已存在锁,获取锁失败,则需要进行如下操作:         * 1. 判断redis中已存在的锁是否过期,如果过期则直接获取锁;         * 2. 否则,获取锁失败;         */

        final String currentLockValue = jedis.get(lockKey);        // 特别的,当jedis.get()获取已存在的锁currentLockValue为空时,应该重新SETNX        if (currentLockValue == null || currentLockValue.length() == 0) {            tryAcquire(jedis);        }        final Lock currentLock = Lock.fromJson(currentLockValue); // redis中已存在的锁

        // 如果redis中已存在的锁已超时,则重新获取锁        if (isExpired(currentLock)) {            String originLockValue = jedis.getSet(lockKey, newLock.toString());

            /**             * 这里还有个前置条件:             *      会对已存在的锁进行校验,jedis.get()和jedis.getSet()获取的锁必须是同一锁,重新获取锁才成功             */

            // 特别的,当jedis.getSet()获取已存在的锁originLockValue为空时,则认定获取锁成功            if (originLockValue == null || originLockValue.length() == 0) {                lockThreadLocal.set(newLock);                return true;            }

            if (originLockValue.equals(currentLockValue)) {                lockThreadLocal.set(newLock);                return true;            }        }

        return false;    }

    /**     * 请求分布式锁,不会阻塞,直接返回     *     * @return 成功获取锁返回true, 否则返回false     */    public boolean tryAcquire() {

        Jedis jedis = null;        try {            jedis = jedisPool.getResource();            return tryAcquire(jedis);        } finally {            if (jedis != null) {                jedis.close();            }        }    }

    /**     * 超时请求分布式锁,会阻塞     *     * 采用"自旋获取锁"的方式,直至获取锁成功或者请求锁超时     *     * @param acquireTimeoutInMillis 锁的请求超时时长     * @return     */    public boolean acquire(long acquireTimeoutInMillis) {

        Jedis jedis = null;        try {            jedis = jedisPool.getResource();

            long acquireTime = System.currentTimeMillis();

            // 锁的请求到期时间            long expiryTime = System.currentTimeMillis() + acquireTimeoutInMillis;

            while (expiryTime >= System.currentTimeMillis()) {                boolean result = tryAcquire(jedis);                if (result) { // 获取锁成功直接返回,否则循环重试                    return true;                }

                if ((System.currentTimeMillis() - acquireTime) > RETRY_BARRIER) {                    Thread.yield();                }            }

        } finally {            if (jedis != null) {                jedis.close();            }        }        return false;    }

    /**     * 释放锁     */    public void release() {

        Jedis jedis = null;        try {            jedis = jedisPool.getResource();            release(jedis);        } finally {            if (jedis != null) {                jedis.close();            }        }    }

    /**     * 释放锁     *     * @param jedis     */    private void release(Jedis jedis) {        Lock currlock = lockThreadLocal.get();        if (currlock != null) {            final String currentLockValue = jedis.get(lockKey);            if (currentLockValue != null && currentLockValue.length() != 0) {                final Lock currentLock = Lock.fromJson(currentLockValue); // redis中已存在的锁                if (currlock.equals(currentLock)) {                    lockThreadLocal.remove();                    jedis.del(lockKey);                }            }        }    }

    /**     * 判断当前线程是否持有锁     *     * 未持有锁或者锁超时,返回false     *     * @return     */    public boolean isLocked() {        Lock currlock = lockThreadLocal.get();        // 如果当前线程保存的lock不为null,并且未超时,则当前线程必然持有锁,锁未被意外释放        return currlock != null && !currlock.isExpired();    }

    /**     * 判断指定的lock是否是当前线程持有的锁     *     * @return     */    boolean isMine(final Lock lock) {        Lock currlock = lockThreadLocal.get();        return currlock != null && currlock.equals(lock);    }

    /**     * 判断锁是否超时     *     * @param lock     * @return     */    boolean isExpired(final Lock lock) {        return lock.isExpired();    }

    /**     * 锁     */    protected static class Lock {

        private long expiryTime; // 锁的过期时间,注意,不是过期时长,单位纳秒

        Lock(long expiryTime) {            this.expiryTime = expiryTime;        }

        /**         * 解析字符串,根据解析出的过期时间构造Lock         *         * @param json         * @return         */        static Lock fromJson(String json) {            return JSON.parseObject(json, Lock.class);        }

        @Override        public String toString() {            return JSON.toJSONString(this, false);        }

        public long getExpiryTime() {            return expiryTime;        }

        /**         * 判断锁是否超时,如果锁的过期时间小于当前系统时间,则判定锁超时         *         * @return         */        boolean isExpired() {            return this.expiryTime < System.nanoTime();        }

        @Override        public boolean equals(Object obj) {            return obj != null                    && obj instanceof Lock                    && this.expiryTime == ((Lock) obj).getExpiryTime();        }    }}

优化

上面存在的锁覆盖问题是不可避免的,还有就是要求客户端时间同步。下面我们进一步优化这一问题。

Redis命令介绍:SET

1、语法:

SET key value [EX seconds] [PX milliseconds] [NX|XX]

2、功能:

  • 将字符串值 value 关联到 key 。

  • 如果 key 已经持有其他值, SET 就覆写旧值,无视类型。

  • 对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时,这个键原有的 TTL 将被清除。

3、可选参数:

从 Redis 2.6.12 版本开始,SET 命令的行为可以通过一系列参数来修改:

  • EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。

  • PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。

  • NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。

  • XX :只在键已经存在时,才对键进行设置操作。

因为 SET 命令可以通过参数来实现和 SETNX 、SETEX 和 PSETEX 三个命令的效果,所以将来的 Redis 版本可能会废弃并最终移除 SETNX 、 SETEX 和 PSETEX 这三个命令。

4、返回值:

  • 在 Redis 2.6.12 版本以前, SET 命令总是返回 OK 。

  • 从 Redis 2.6.12 版本开始, SET 在设置操作成功完成时,才返回 OK。

  • 如果设置了 NX 或者 XX ,但因为条件没达到而造成设置操作未执行,那么命令返回空批量回复(NULL Bulk Reply)。

5、使用模式:

命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。客户端执行以上的命令:

  • 如果服务器返回 OK ,那么这个客户端获得锁。

  • 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。

设置的过期时间到达之后,锁将自动释放。可以通过以下修改,让这个锁实现更健壮:

  • 不使用固定的字符串作为键的值,而是设置一个不可猜测(non-guessable)的长随机字符串,作为口令串(token)。

  • 不使用 DEL 命令来释放锁,而是发送一个 Lua 脚本,这个脚本只在客户端传入的值和键的口令串相匹配时,才对键进行删除。

这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。

以下是一个简单的解锁脚本示例:

if redis.call("get",KEYS[1]) == ARGV[1]then    return redis.call("del",KEYS[1])else    return 0end

6、源码:

package com.github.hutu92;

import com.alibaba.fastjson.JSON;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;

import java.util.Collections;import java.util.UUID;import java.util.concurrent.locks.ReentrantLock;

/** * Created by liuchunlong on 2018/9/4. * <p> * 基于redis的分布式锁 v2 * <p> * 不需要客户端时间同步 */public class DistributedLock {

    private static final long RETRY_BARRIER = 600; // 重试屏障,单位毫秒    private static final long INTERVAL_TIMES = 200; // 下一次重试等待,单位毫秒

    private final JedisPool jedisPool; // redis连接池    private final String lockKey; // lock Key    private final long lockExpiryInMillis; // 锁的过期时长,单位纳秒

    private final ThreadLocal<Lock> lockThreadLocal = new ThreadLocal<Lock>();

    /**     * 构造方法     *     * @param jedisPool          redis连接池     * @param lockKey            锁的Key     * @param lockExpiryInMillis 锁的过期时长,单位毫秒     */    public DistributedLock(JedisPool jedisPool, String lockKey, long lockExpiryInMillis) {        this.jedisPool = jedisPool;        this.lockKey = lockKey;        this.lockExpiryInMillis = lockExpiryInMillis;    }

    /**     * 构造方法     * <p>     * 使用锁默认的过期时长Integer.MAX_VALUE,即锁永远不会过期     *     * @param jedisPool redis连接池     * @param lockKey   锁的Key     */    public DistributedLock(JedisPool jedisPool, String lockKey) {        this(jedisPool, lockKey, Integer.MAX_VALUE);    }

    /**     * 获取锁在redis中的Key标记     *     * @return locks key     */    public String getLockKey() {        return this.lockKey;    }

    /**     * 锁的过期时长     *     * @return     */    public long getLockExpiryInMillis() {        return lockExpiryInMillis;    }

    /**     * can override     *     * @param jedis     * @return     */    private String nextUid(Jedis jedis) {        // 可以考虑雪花算法..        return UUID.randomUUID().toString();    }

    private synchronized Jedis getClient() {        return jedisPool.getResource();    }

    private synchronized void closeClient(Jedis jedis) {        jedis.close();    }

    /**     * 请求分布式锁,不会阻塞,直接返回     *     * @param jedis redis 连接     * @return 成功获取锁返回true, 否则返回false     */    private boolean tryAcquire(Jedis jedis) {

        final Lock nLock = new Lock(nextUid(jedis));        String result = jedis.set(this.lockKey, nLock.toString(), "NX", "PX", this.lockExpiryInMillis);        if ("OK".equals(result)) {            lockThreadLocal.set(nLock);            return true;        }        return false;    }

    /**     * 请求分布式锁,不会阻塞,直接返回     *     * @return 成功获取锁返回true, 否则返回false     */    public boolean tryAcquire() {

        Jedis jedis = null;        try {            jedis = getClient();            return tryAcquire(jedis);        } finally {            if (jedis != null) {                closeClient(jedis);            }        }    }

    /**     * 超时请求分布式锁,会阻塞     *     * 采用"自旋获取锁"的方式,直至获取锁成功或者请求锁超时     *     * @param acquireTimeoutInMillis 锁的请求超时时长     * @return     */    public boolean acquire(long acquireTimeoutInMillis) throws InterruptedException {

        Jedis jedis = null;        try {

            jedis = getClient();

            long acquireTime = System.currentTimeMillis();            long expiryTime = System.currentTimeMillis() + acquireTimeoutInMillis; // 锁的请求到期时间

            while (expiryTime >= System.currentTimeMillis()) {                boolean result = tryAcquire(jedis);                if (result) { // 获取锁成功直接返回,否则循环重试                    return true;                }

                Thread.sleep(INTERVAL_TIMES);            }

        } finally {            if (jedis != null) {                closeClient(jedis);            }        }        return false;    }

    /**     * 释放锁     *     * @return     */    public boolean release() throws InterruptedException {        return release(Integer.MAX_VALUE);    }

    /**     * 释放锁     *     * @return     */    public boolean release(long releaseTimeoutInMillis) throws InterruptedException {

        Jedis jedis = null;        try {            jedis = getClient();            return release(jedis, releaseTimeoutInMillis);        } finally {            if (jedis != null) {                closeClient(jedis);            }        }    }

    /**     * 释放锁     *     * @param jedis     * @param releaseTimeoutInMillis     * @return     */    private boolean release(Jedis jedis, long releaseTimeoutInMillis) throws InterruptedException {        Lock cLock = lockThreadLocal.get();        if (cLock == null) {            System.out.println("lock is null!");        }        if (cLock != null) {            String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

            long releaseTime = System.currentTimeMillis();            long expiryTime = System.currentTimeMillis() + releaseTimeoutInMillis; // 锁的释放到期时间

            while (expiryTime >= System.currentTimeMillis()) {                Object result = jedis.eval(luaScript, Collections.singletonList(this.lockKey),                        Collections.singletonList(cLock.toString()));                if (((Long) result) == 1L) {                    lockThreadLocal.remove();                    return true;                }

                Thread.sleep(INTERVAL_TIMES);            }        }        return false;    }

    /**     * 锁     */    protected static class Lock {

        private String uid; // lock 唯一标识

        Lock(String uid) {            this.uid = uid;        }

        public String getUid() {            return uid;        }

        @Override        public String toString() {            return JSON.toJSONString(this, false);        }    }

}

性能调优

这里我们使用ab性能测试工具来模拟测试。

由于没有使用队列,对高并发请求进行削峰,所以所有的压力都会被打到redis上。为了测试方便我这里只是本地启动了单机redis,没有做其它的调优配置。

我们并发测试场景是1000个并发请求,总共2000个请求。

ab -n 2000 -c 1000 "localhost:8080/lock/v2/seckill"

上述的地址是一个接口,接口代码如下:

@RestController@RequestMapping("/lock")public class LockController {

    private static LongAdder longAdder = new LongAdder();    private static Long ACQUIRE_TIMEOUT_IN_MILLIS = (long) Integer.MAX_VALUE;    private static Long stock = 100000L;    private static DistributedLock lock;

    static {        longAdder.add(stock);    }

    private final JedisPool jedisPool;

    @Autowired    public LockController(JedisPool jedisPool) {        this.jedisPool = jedisPool;        lock = new DistributedLock(jedisPool, "seckillV2_" + UUID.randomUUID().toString());    }

    @GetMapping("/v2/seckill")    public String seckillV2() throws InterruptedException {

        boolean acquireResult = false;        try {            acquireResult = lock.acquire(ACQUIRE_TIMEOUT_IN_MILLIS);

            if (!acquireResult) {                return "人太多了,换个姿势操作一下!";            }

            if (longAdder.longValue() == 0L) {                return "已抢光!";            }

            doSomeThing(jedisPool);

            longAdder.decrement();

            System.out.println("已抢: " + (stock - longAdder.longValue()) + ", 还剩下: " + longAdder.longValue());

        } finally {            if (acquireResult) {                boolean releaseResult = lock.release();                if (!releaseResult) {                    System.out.println("释放锁失败!");                }            }        }

        return "OK";    }

    private void doSomeThing(JedisPool jedisPool) {        Jedis jedis = null;        try {            jedis = jedisPool.getResource();

            jedis.incr("already_bought");        } finally {            if (jedis != null) {                jedis.close();            }        }    }}

那么我们这里说的性能调优指的是什么呢?

仔细分析上面的源码你会发现,获取锁的逻辑是循环获取的,再每次循环之间,应该怎么去处理?如果不做任何处理,直接继续下一个循环,表面上看能够及时的获取锁,但这会给Redis更大的压力,如果Redis扛不住,到最后只会适得其反;而如果sleep等待,那么等待多久呢?等待久了,锁的获取和释放就会不及时;使用yield如何?等等......

1、

if ((System.currentTimeMillis() - acquireTime) > RETRY_BARRIER) {    Thread.yield();}

请求获取锁的前600毫秒内直接循环重试,如果超过600毫秒还未获取到锁则每次循环都将线程推迟到下一个时间片执行。

主要参数说明:

  • Failed requests:失败的请求;

  • Time per request:每个请求的平均耗时。

2、

if ((System.currentTimeMillis() - acquireTime) > RETRY_BARRIER) {    Thread.sleep(INTERVAL_TIMES);} else {    Thread.yield();}

请求获取锁的前600毫秒内每次循环重试都先将线程推迟到下一个时间片,如果超过600毫秒还未获取到锁则每次循环都将线程休眠200毫秒。

很明显,出错率降低了很多,每个请求的耗时也减少了一半。这是因为No1中在600毫秒内的直接循环重试,会产生很多意义的请求,给Redis造成了巨大的压力,无法响应请求。

3、

Thread.sleep(INTERVAL_TIMES);

请求获取锁的每次循环重试都将线程休眠200毫秒。

4、

Thread.sleep(INTERVAL_TIMES * 10);

请求获取锁的每次循环重试都将线程休眠2秒。

很明显,休眠时间过长,会使部分线程请求锁的时间变长,不能够及时获取到锁。

5、

Thread.yield();

请求获取锁的每次循环重试都将线程推迟到下一个时间片执行。

总结

总的来说,No2与No3表现得都还可以。但是No2使用了Thread.yield();也会给Redis造成压力,我们对比下两者的 Percentage of the requests served within a certain time (ms) 数据。可以看到No3的90%以下请求的用户平均时间要明显低于No2的。所以最终我们选择No3策略。

当然你也可以根据你Redis的QPS自行调整策略。

作者:刘春龙,曾就职国美、优信,现就职于金山,担任消息队列服务开发。

声明:本文为作者投稿,版权归其个人所有。

推荐阅读:

  • 这届 Windows 不行,是因为微软不卖“软件”改卖“服务”?

  • JavaScript 详解:为什么写好的代码非常重要

  • 10 张有关程序员的趣图,图图扎心

  • BCH硬分叉完毕,澳本聪放话:一切尚未结束,游戏继续!

  • Python告诉你:这类程序员最赚钱!

  • 清华夺ASC、ISC、SC三项超算比赛大满贯

  • 刚写完排序算法,就被开除了…

程序员如何 Get 分布式锁的正确姿势?| 技术头条相关推荐

  1. 【分布式缓存系列】Redis实现分布式锁的正确姿势

    一.前言 在我们日常工作中,除了Spring和Mybatis外,用到最多无外乎分布式缓存框架--Redis.但是很多工作很多年的朋友对Redis还处于一个最基础的使用和认识.所以我就像把自己对分布式缓 ...

  2. 作为一名Python程序员,论听歌的正确姿势?

    程序员听歌的正确姿势. 这有啥,无非就是跪.趴.躺- 啊呸,说错了,正确姿势可能是? 打开网易云–>找到榜单–>选歌 But!!! 这也太普通太随意了嘛,来看一个Python程序员的打开方 ...

  3. 掌握Redis分布式锁的正确姿势

    本文中案例都会在上传到git上,请放心浏览 git地址:https://github.com/muxiaonong/Spring-Cloud/tree/master/order-lock 本文会使用到 ...

  4. 这才是实现分布式锁的正确姿势!

    都9102年了,你还在手写分布式锁吗? 经常被问到"如何实现分布式锁",看来这是大家的一个痛点. 其实Java世界的"半壁江山"--Spring早就提供了分布式 ...

  5. 【最全】Spring Boot 实现分布式锁——这才是实现分布式锁的正确姿势!

    ava世界的"半壁江山"--Spring早就提供了分布式锁的实现.早期,分布式锁的相关代码存在于Spring Cloud的子项目Spring Cloud Cluster中,后来被迁 ...

  6. Redis实现分布式锁的正确姿势 | Spring Cloud 36

    一.分布式锁 1.1 什么是分布式锁 分布式锁,即分布式系统中的锁.在单体应用中我们通过锁解决的是控制共享资源访问的问题,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题.与单体应用不同的是, ...

  7. 程序员如何掌握 React 开发的黄金法则? | 技术头条

    本文有关 React 的"黄金法则"只是以一个新的角度,对展示组件和容器组件的现有概念的重新阐述. 作者 | Rico Kahler 译者 | 苏本如 责编 | 屠敏 出品 | C ...

  8. Python程序员Debug利器,和Print说再见 | 技术头条

    整理 | Rachel 责编 | Jane 出品 | Python大本营(id:pythonnews) [导语]程序员每日都在和 debug 相伴.新手程序员需要学习的 debug 手段复杂多样,设置 ...

  9. 金山银四来了!论程序员和老板谈加薪的正确姿势,针不戳!

    提加薪之前要做的事: 1.好好打探你的"市场行情",考察一下你的工作在市场上同等工作中薪水情况如何 2.好好对自己的工作表现进行一个评估,这是决定你能否提加薪的重要前提和资本 3. ...

最新文章

  1. LeetCode实战:逆波兰表达式求值
  2. html获取文本框的值,如何获取输入框的内容
  3. automapper
  4. svn update 报错,必须先cleanup,然后cleanup失败解决方法
  5. python多行注释符号_涨知识Python 为什么用 # 号作注释符?
  6. .NET开源工作流CCFlow-快速入门
  7. 为什么要使用getter/setter
  8. 犹太教、基督教和伊斯兰教的简单关系
  9. 新手进阶:巧用 macOS 帮助菜单?
  10. 工艺仿真Process Simulate新亮点
  11. 什么是 503 服务不可用错误?
  12. 手机如何注册163邮箱?注册邮箱的方法步骤
  13. 将论文奇数页与偶数页页眉添加不同的下划线
  14. windows cmd打开新窗口关闭窗口
  15. idea2018下载-补丁破解激活
  16. 三菱PLC步进伺服控制程序 用三菱plc和威纶触摸屏编写
  17. 华为上机英文数字翻译
  18. c语言delay函数的作用,delay用法(delay函数使用)
  19. win11修改wifi mac地址
  20. 判断输入是否为回车键

热门文章

  1. leetcode python3 简单题204. Count Primes
  2. aspnet还有人用吗_微信公众号软件安装管家会员真的那么好吗
  3. 不同路径(I和II)--动态规划
  4. 第一章 密码学和加密交易的介绍
  5. 内燃机附件和部件行业调研报告 - 市场现状分析与发展前景预测
  6. 中国水稻种子行业市场供需与战略研究报告
  7. 中国数码电影摄影机行业市场供需与战略研究报告
  8. linux nginx反向代理配置
  9. 为什么开发人员应该学习 Kubernetes?
  10. 如何设计一个能够扩展到百万用户的系统?