简介: # 前言 在JVM中,怎么样防止因多线程并发造成的数据不一致或逻辑混乱?大家一定都能想到锁,能想到`java.util.concurrent`包下的各种各样的用法。 但在分布式环境下,不同的机器、不同的jvm,那些好用的并发锁工具,并不能满足需要。 所以我们需要有一个`分布式锁`,来解决分布式环境下的并发问题。而本文正是在这条道路上,走出的一些经验的总结。 我们按照待解决问题的

前言

在JVM中,怎么样防止因多线程并发造成的数据不一致或逻辑混乱?大家一定都能想到锁,能想到java.util.concurrent包下的各种各样的用法。
但在分布式环境下,不同的机器、不同的jvm,那些好用的并发锁工具,并不能满足需要。

所以我们需要有一个分布式锁,来解决分布式环境下的并发问题。而本文正是在这条道路上,走出的一些经验的总结。

我们按照待解决问题的场景,一步一步看下去。

问题一:如何实现一个分布式锁;

锁,大多是基于一个只能被1个线程占用、得到的资源来实现的。
JVM的锁,是基于CPU对于寄存器的修改指令cmpxchg,来保证旧值改为新值的操作,只有一个能成功。这里的旧值,就是这个被争夺的资源,大多数情况,并发时第一个线程对旧值修改成功后,其他线程就没有机会了。(当然,ABA是另外一个话题,这里就不说了。。)

所以,在分布式环境下,要实现一个锁,我们就要找个一个只能被1个线程占用的资源。
有经验的开发很快能想到,共享磁盘文件、缓存、mysql数据库,这些分布式环境下,数据表现为单份的,都应当能满足需求。

然而,基于文件、DB会遭遇各式各样的问题,性能,经常也会是瓶颈。
因此,我们这里使用的,是淘系用的最多的缓存中间件产品--Tair。

查看com.taobao.tair.TairManager接口,发现有以下几个接口或许适合使用:

  • TairManager.incr 加法计数器。首次使用时,返回值能等于默认值,而不被+n的机会,只有一个。貌似可以用。
  • TairManager.lock 看起来名字像是这个意思,姑且拿来一试。
  • TairManager.put 传入version版本进行校验,cas原则会保证只有一个能成功。貌似可以用。

(注:mdb有可能丢、且invalid时不保证跨机房一致性,所以这个锁肯定需要用ldb来实现的。)

在线上多机房情况下,做了一下测试,测试程序核心代码大约是:

void test(){           // 2个机房的jvm实例,每个实例n个线程同时执行本方法;while(true){if(tryLock()){     // tryLock\unLock 的实现对应有3套;try{logger.info("Got lock! Hostname:{} ThreadName:{}",getHostname(),Thread.currentThread().getName());Thread.sleep(1000);}finally {unLock();logger.info("Release lock! Hostname:{} ThreadName:{}",getHostname(),Thread.currentThread().getName());}}}
}

测试的结论如下:

  • TairManager.incr 初始几次锁的获取和释放没有问题,但是后来返回值很大,就谁也拿不到锁;怀疑和接口超时、或invalid机制有关;

    • 补充: 后面发现incr做锁的一种可靠方案是,使用值限定参数:int lowBoundint upperBound

      比如:lowBound=0, upperBound=1,则value一直在0与1之间切换,用过多次,还是很靠谱的。
  • TairManager.lock 完全不行,lock接口看来根本不是做这个用的;真正的使用场景是锁住一个key不容许更新,不是锁机制。

0

  • TairManager.put 非常稳定、靠谱;有个现象值得关注:A机房invalid之后,B机房会先拿到锁;因为invalid先从远程机房开始;

最后,给出ldb put实现的分布式锁的核心代码(__后面都基于ldb put来实现__):

public boolean trylock(String key) {ResultCode code = ldbTairManager.put(NAMESPACE, key, "This is a Lock.", 2, 0);if (ResultCode.SUCCESS.equals(code))return true;elsereturn false;
}
public boolean unlock(String key) {ldbTairManager.invalid(NAMESPACE, key);
}

问题二:lock之后,程序发布或者进程crash,trylock就永远false了;

每次发布,总发现有些异常数据,拿不到锁,不能继续向前走;
仔细分析,原来tair的lock,一直没能释放;

要解决这个问题,可以先不管原因,无脑的给tair put加上超时时间就行,这样业务至少可以自行恢复。
但是,这个超时时间需要仔细考虑把握,需要在业务承受范围之内。

注: 像程序发布、进程crash,这种情况,是无可避免的让锁没机会释放。还有其他可能性,大多是bug了。。

public boolean trylock(String key, int timeout) {ResultCode code = ldbTairManager.put(NAMESPACE, key, "This is a Lock.", 2, timeout);if (ResultCode.SUCCESS.equals(code))return true;elsereturn false;
}

问题二:tair存在让人苦恼的超时问题,即使千分之1,本业务有时也不能容忍

tair的大神给的回复很简单:超时请重试

仔细想一下,put超时分两种情况:

  1. 上一次put其实已经成功了;那么重试肯定会失败;
  2. 上一次put其实失败了;那么若这一次顺利,就会成功;

那如何解决上面的第1点呢?

其实,锁应该是能够经得起复查的(类似偏向锁):A拿到的锁,没有unlock之前,无论A重试检查多少次,都是A的!

怎么实现?

既然用的是ldb缓存,它是key-value结构的,前面version控制等,都只用到了key。
这里,我们可以从tair value里做文章:让value包含机器ip+线程name,trylock内先get value做检查

于是,实现变为:

public boolean trylock(String key, int timeout) {Result<DataEntry> result = locker.ldbTairManager.get(NAMESPACE, lockKey);if (result == null) return null;if (ResultCode.DATANOTEXSITS.equals(result.getRc())) { // means lock is freeResultCode code = ldbTairManager.put(NAMESPACE, key, getLockValue(), 2, timeout);if (ResultCode.SUCCESS.equals(code))return true;}else if(result.getValue() != null && getLockValue().equals(result.getValue().getValue())){return true;}return false;
}private String getLockValue(){  return Utils.getHostname() + ":" + Thread.currentThread().getName();
}

注意: 其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,需复用锁时传入uuid。

public boolean trylock(String key, int timeout, String uuid){ ... }

问题三:新方案其实多了一次get操作,若是get也超时怎么办?

超时无法避免,还是要靠重试!(前提是逻辑可以重试)

public boolean trylock(String key, int timeout) {Result<DataEntry> result = locker.ldbTairManager.get(NAMESPACE, key);if (result == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc())) // get timeout retry caseresult = locker.ldbTairManager.get(NAMESPACE, key);if (result == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc())){ // 还是超时,则留下日志痕迹logger.error("ldb tair get timeout. key:{}.",key);return false;}if (ResultCode.DATANOTEXSITS.equals(result.getRc())) { // means lock is freeResultCode code = ldbTairManager.put(NAMESPACE, key, getLockValue(), 2, timeout);if (ResultCode.SUCCESS.equals(code))return true;else if(code==null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc())){  // put timeout retry casereturn trylock(key, timeout); // 递归尝试}}else if(result.getValue() != null && getLockValue().equals(result.getValue().getValue())){return true;}return false;
}// 【注意】其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,复用锁传入uuid。
private String getLockValue(){return Utils.getHostname() + ":" + Thread.currentThread().getName();
}

进一步的,我们还可以对get/put的retry做次数控制;
真实线上的情况,一般一次retry就能解决问题,次数多了,反而可能导致雪崩,需要慎重;

多一次Get的性能影响

有代码洁癖、性能洁癖的人可能会想:普通tair锁,一次put就能搞定,这里却要先get再put,浪费啊。。。

这里梳理一下:

  • 若是锁已经被持有,那么get之后,麻烦发现被持有,直接返回失败;这时,并不会再次put,开销是一样的;(甚至get的开销,比put要小,至少不会占用put的限流阈值)
  • 若是没人持有锁,确实这时get有些浪费的,但是为了锁可以复查这个特性(可重试)、为了能解决超时这个问题,我认为还是值得的。在实际场景中,开发者自己可以评估是否需要。比如:拿前面的uuid的样例API讲,若不需要这个特性时,就不传入uuid,那么实现代码里,可以自动降级为只有一个put的锁实现;

问题四:批量锁

批量锁,主要注意拿锁的顺序和释放锁相反,伪代码如下:

if(trylock("A") && trylock("B") && trylock("C")){try{// do something}finally{  // 注意这里的顺序要反过来unlock("C");unlock("B");unlock("A");}
}

最后总结下,给一个完整代码 :smile:

import com.taobao.tair.DataEntry;
import com.taobao.tair.Result;
import com.taobao.tair.ResultCode;
import com.taobao.tair.TairManager;
import org.apache.commons.lang.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.FormattingTuple;
import org.slf4j.helpers.MessageFormatter;import javax.annotation.Resource;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;public class CommonLocker {private static final Logger logger = LoggerFactory.getLogger(CommonLocker.class);@Resourceprivate TairManager ldbTairManager;private static final short NAMESPACE = 1310;private static CommonLocker locker;public void init() {if (locker != null) return;synchronized (CommonLocker.class) {if (locker == null)locker = this;}}public static Lock newLock(String format, Object... argArray) {FormattingTuple ft = MessageFormatter.arrayFormat(format, argArray);return newLock(ft.getMessage());}public static Lock newLock(String strKey) {String key = "_tl_" + strKey;return new TairLock(key, CommonConfig.lock_default_timeout);}public static Lock newLock(String strKey, int timeout) {String key = "_tl_" + strKey;return new TairLock(key, timeout);}private static class TairLock implements Lock {private String lockKey;private boolean gotLock = false;private int retryGet = 0;private int retryPut = 0;private int timeout;public TairLock(String key, int timeout) {this.lockKey = tokey(key);this.timeout = timeout;}public boolean tryLock() {return tryLock(timeout);}/*** need finally do unlock** @return*/public boolean tryLock(int timeout) {Result<DataEntry> result = locker.ldbTairManager.get(NAMESPACE, lockKey);while (retryGet++ < CommonConfig.lock_get_max_retry &&(result == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc()))) // 重试一次result = locker.ldbTairManager.get(NAMESPACE, lockKey);if (ResultCode.DATANOTEXSITS.equals(result.getRc())) { // lock is free// 已验证version 2表示为空,若不是为空,则返回version errorResultCode code = locker.ldbTairManager.put(NAMESPACE, lockKey, locker.getValue(), 2, timeout);if (ResultCode.SUCCESS.equals(code)) {gotLock = true;return true;} else if (retryPut++ < CommonConfig.lock_put_max_retry &&(code == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc()))) {return tryLock(timeout);}} else if (result.getValue() != null && locker.getValue().equals(result.getValue().getValue())) {
// 【注意】其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,复用锁传入uuid。// 若是自己的锁,自己继续用gotLock = true;return true;}// 到这里表示没有拿到锁return false;}public void unlock() {if (gotLock) {ResultCode invalidCode = locker.ldbTairManager.invalid(NAMESPACE, lockKey);gotLock = false;}}public void lock() {throw new NotImplementedException();}public void lockInterruptibly() throws InterruptedException {throw new NotImplementedException();}public boolean tryLock(long l, TimeUnit timeUnit) throws InterruptedException {throw new NotImplementedException();}public Condition newCondition() {throw new NotImplementedException();}}// 【注意】其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,复用锁传入uuid。private String getValue() {return getHostname() + ":" + Thread.currentThread().getName();}/*** 获得机器名** @return*/public static String getHostname() {try {return InetAddress.getLocalHost().getHostName();} catch (UnknownHostException e) {return "[unknown]";}}public void setLdbTairManager(TairManager ldbTairManager) {this.ldbTairManager = ldbTairManager;}
}

使用样例

Lock lockA = CommonLocker.newLock("hs_room_{}_uid_{}", roomDo.getUuid(), roomDo.getMaster().getUid());
Lock lockB = CommonLocker.newLock("hs_room_{}_uid_{}", roomDo.getUuid(), roomDo.getPartnerList().get(0).getUid());
try {if (lockA.tryLock() && lockB.tryLock()) {// 分布式锁定本任务// do something....}
} finally {lockB.unlock();lockA.unlock();
}

Tair分布式锁 实践经验相关推荐

  1. Tair分布式锁 实践经验(160805更新)

    前言 在JVM中,怎么样防止因多线程并发造成的数据不一致或逻辑混乱?大家一定都能想到锁,能想到java.util.concurrent包下的各种各样的用法. 但在分布式环境下,不同的机器.不同的jvm ...

  2. redis分布式锁实践 并实现看门狗锁续期机制

    redis分布式锁最佳实践(并实现锁续期机制) 文章目录 redis分布式锁最佳实践(并实现锁续期机制) 1. 分布式锁是什么? 2. setnx 和 AQS state 3. jedis完成分布式锁 ...

  3. spring-boot 中实现标准 redis 分布式锁

    一.前言 redis 现在已经成为系统缓存的必备组件,针对缓存读取更新操作,通常我们希望当缓存过期之后能够只有一个请求去更新缓存,其它请求依然使用旧的数据.这就需要用到锁,因为应用服务多数以集群方式部 ...

  4. 分布式锁在存储系统中的技术实践

    简介: 阿里云存储提供了完整的分布式锁解决方案,经过了阿里云众多云产品宝贵的业务场景中长期锤炼,稳定高可靠,且提供了多种语言的SDK选择,甚至是RESTful集成方案. 1 背景 针对共享资源的互斥访 ...

  5. 每秒上千订单场景下的分布式锁高并发优化实践!

    本文授权转自石杉的架构笔记 背景引入 首先,我们一起来看看这个问题的背景? 前段时间有个朋友在外面面试,然后有一天找我聊说:有一个国内不错的电商公司,面试官给他出了一个场景题: 假如下单时,用分布式锁 ...

  6. js 拉勾网效果_Node.js 中实践基于 Redis 的分布式锁实现

    在一些分布式环境下.多线程并发编程中,如果对同一资源进行读写操作,避免不了的一个就是资源竞争问题,通过引入分布式锁这一概念,可以解决数据一致性问题. 作者简介:五月君,Nodejs Developer ...

  7. Zookeeper实践与应用--分布式锁实现

    分布式锁 分布式锁是控制分布式系统之间同步访问资源的一种方式,如果不同系统是同一个系统的不同主机之间共享一个或一组资源,那么访问这些资源的时候,往往需要通过一些呼哧手段来防止彼此之间的干扰保证统一性, ...

  8. Java架构-每秒上千订单场景下的分布式锁高并发优化实践!

    "上一篇文章我们聊了聊Redisson这个开源框架对Redis分布式锁的实现原理,如果有不了解的兄弟可以看一下:<拜托,面试请不要再问我Redis分布式锁实现原理>. 今天就给大 ...

  9. 基于tair的分布式锁实现原理

    分布式锁概述 分布式锁的实现主要分为三种方式: 1.基于Mysql的行锁实现 优点: 实现简单,不需要额外的中间件来协助实现 缺点: 增大了数据库的读写压力 可能增大数据库的死锁的产生.例如:如果琐是 ...

最新文章

  1. 【干货】大数据和人工智能.pdf
  2. php$上传_如何实现PHP上传视频的功能?(图文+视频)
  3. linux socket 套接字状态 EAGAIN EWOULDBLOCK EINTR 与非阻塞 简介
  4. 台湾国立大学郭彦甫Matlab教程笔记(17)numerical integration
  5. 土木计算机2级,请教各位,我是学土木的,考计算机二级的哪个比较好?
  6. java的setbounds_java Swing组件setBounds()简单用法实例分析
  7. java中的多线程使用方式
  8. 阿里 AI 研究成果入选国际顶会 ICML 2020,AI 推理速度提升 3 倍
  9. 蓝桥杯 算法训练 矩阵乘法
  10. 算法导论学习--学习笔记0527
  11. 计算机硬盘格式化三个步骤,电脑格式化五种操作方法教程
  12. ACM144-小柯的烦恼zy
  13. 成长小记-老天为我又设了一道大坎
  14. 如何维持手机电池寿命_手机电池寿命是多久? 如何延长手机电池寿命?
  15. chrome浏览器无法访问此网站并且响应时间长的方法
  16. 类似于萝卜书摘的书摘app推荐
  17. java中什么是空指针异常
  18. python调用笔记本摄像头
  19. ajax contentType 设置
  20. 麦田厦门小区信息数据爬取

热门文章

  1. html中h3字体不加粗取消,css如何取消加粗
  2. 程序员怎么提高英语阅读水平?
  3. Linux ar命令说明
  4. mysql的gis_MySQL的GIS功能
  5. LMK04828时钟芯片配置——配置理解
  6. Linux——linux脚本命令集合
  7. 自制合成孔径雷达(7) 声卡实现测速声呐
  8. [数据库]数据库临时表
  9. 移动互联网时代电商如何突围?
  10. ArangoDB——图遍历 Graph