前言

分布式ID生成策略基本要求就是全局不重复,最好还能递增,长度较短,性能高,可用性强。关于相关的实现方案有很多,本文着重使用美团开源的分布式ID生成解决方案——Leaf。

关于Leaf,美团官方的介绍文档主要如下,强烈建议阅读文章大致了解Leaf的工作流程与原理,这对本文后续的源码解析有很大的帮助。

  1. Leaf:美团分布式ID生成服务开源
  2. Leaf——美团点评分布式ID生成系统

本系列Leaf源码解析部分按照使用的方式也分为号段模式和snowflake模式两篇文章,本文就来着重研究号段模式的源码实现。

本文的Leaf源码注释地址:https://github.com/MrSorrow/Leaf

I. 导入项目

Leaf由Maven构建,源码地址:https://github.com/Meituan-Dianping/Leaf
首先先Fork官方仓库到自己的仓库,我的源码注释版:https://github.com/MrSorrow/Leaf
下载源码,导入IDEA,导入成功依赖下载完成后项目结构大致如下:

II. 测试号段模式

「创建数据库表」

DROP TABLE IF EXISTS `leaf_alloc`;CREATE TABLE `leaf_alloc` (`biz_tag` varchar(128)  NOT NULL DEFAULT '' COMMENT '业务key',`max_id` bigint(20) NOT NULL DEFAULT '1' COMMENT '当前已经分配了的最大id',`step` int(11) NOT NULL COMMENT '初始步长,也是动态调整的最小步长',`description` varchar(256)  DEFAULT NULL COMMENT '业务key的描述',`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据库维护的更新时间',PRIMARY KEY (`biz_tag`)
) ENGINE=InnoDB;

「开启号段模式」

leaf.name=com.sankuai.leaf.opensource.test
leaf.segment.enable=true
leaf.jdbc.url=jdbc:mysql://localhost:3306/leaf_test?useUnicode=true&characterEncoding=utf8&characterSetResults=utf8
leaf.jdbc.username=root
leaf.jdbc.password=1234leaf.snowflake.enable=false
#leaf.snowflake.zk.address=
#leaf.snowflake.port=

「测试号段模式」

开启号段模式并配置好数据库连接后,点击启动 leaf-server 模块的 LeafServerApplication,将服务跑起来。

浏览器输入http://localhost:8080/api/segment/get/leaf-segment-test来获取分布式递增id;

监控号段模式:http://localhost:8080/cache

数据库表:

III. 号段模式源码解析

正式进入源码前,再强烈建议阅读官方的两篇博客,对Leaf的号段模式工作模式有个大致的理解。

我们从http://localhost:8080/api/segment/get/leaf-segment-test入口来分析。该请求会交由 com.sankuai.inf.leaf.server.LeafController 处理:

@Autowired
SegmentService segmentService;/*** 号段模式获取id* @param key 对应数据库表的biz_tag* @return*/
@RequestMapping(value = "/api/segment/get/{key}")
public String getSegmentID(@PathVariable("key") String key) {// 核心是segmentService的getId方法return get(key, segmentService.getId(key));
}private String get(@PathVariable("key") String key, Result id) {Result result;if (key == null || key.isEmpty()) {throw new NoKeyException();}result = id;if (result.getStatus().equals(Status.EXCEPTION)) {throw new LeafServerException(result.toString());}return String.valueOf(result.getId());
}

可以看到主要是调用 SegmentServicegetId(key) 方法。key 参数其实就是路径上对应的 leaf-segment-test,也就是数据库对应的 biz_taggetId(key) 方法返回的是 com.sankuai.inf.leaf.common.Result 对象,封装了 id 和 状态 status

public class Result {private long id;private Status status;// getter and setter....
}public enum  Status {SUCCESS,EXCEPTION
}

创建SegmentService

我们进入 SegmentService 类中,再调用 getId(key) 方法之前,我们先看一下 SegmentService 类的实例化构造函数逻辑。可以看到:

package com.sankuai.inf.leaf.server;@Service("SegmentService")
public class SegmentService {private Logger logger = LoggerFactory.getLogger(SegmentService.class);IDGen idGen;DruidDataSource dataSource;/*** 构造函数,注入单例SegmentService时,完成以下几件事:* 1. 加载leaf.properties配置文件解析配置* 2. 创建Druid dataSource* 3. 创建IDAllocDao* 4. 创建ID生成器实例SegmentIDGenImpl并初始化* @throws SQLException* @throws InitException*/public SegmentService() throws SQLException, InitException {// 1. 加载leaf.properties配置文件Properties properties = PropertyFactory.getProperties();// 是否开启号段模式boolean flag = Boolean.parseBoolean(properties.getProperty(Constants.LEAF_SEGMENT_ENABLE, "true"));if (flag) {// 2. 创建Druid dataSourcedataSource = new DruidDataSource();dataSource.setUrl(properties.getProperty(Constants.LEAF_JDBC_URL));dataSource.setUsername(properties.getProperty(Constants.LEAF_JDBC_USERNAME));dataSource.setPassword(properties.getProperty(Constants.LEAF_JDBC_PASSWORD));dataSource.init();// 3. 创建DaoIDAllocDao dao = new IDAllocDaoImpl(dataSource);// 4. 创建ID生成器实例SegmentIDGenImplidGen = new SegmentIDGenImpl();((SegmentIDGenImpl) idGen).setDao(dao);// 初始化SegmentIDGenImpl(加载db的tags至内存cache中,并开启定时同步更新任务)if (idGen.init()) {logger.info("Segment Service Init Successfully");} else {throw new InitException("Segment Service Init Fail");}} else {// ZeroIDGen一直返回id=0idGen = new ZeroIDGen();logger.info("Zero ID Gen Service Init Successfully");}}/*** 根据key获取id* @param key* @return*/public Result getId(String key) {return idGen.get(key);}/*** 获取号段模式id生成器SegmentIDGenImpl* @return*/public SegmentIDGenImpl getIdGen() {if (idGen instanceof SegmentIDGenImpl) {return (SegmentIDGenImpl) idGen;}return null;}
}

SegmentService 类的构造函数,主要完成以下几件事:

  1. 加载 leaf.properties 配置文件,并解析配置
  2. 创建 Druid 数据源对象 dataSource
  3. 创建 IDAllocDao 接口实例 IDAllocDaoImpl
  4. 创建ID生成器实例 SegmentIDGenImpl 并初始化

① 解析leaf.properties配置文件

通过 PropertyFactory 读取了 leaf.properties 配置文件并进行解析。其中所以的key-value配置信息最终封装为 Properties 中。

/*** 加载leaf.properties配置文件中配置信息*/
public class PropertyFactory {private static final Logger logger = LoggerFactory.getLogger(PropertyFactory.class);private static final Properties prop = new Properties();static {try {prop.load(PropertyFactory.class.getClassLoader().getResourceAsStream("leaf.properties"));logger.debug("Load leaf.properties successfully!");} catch (IOException e) {logger.warn("Load Properties Ex", e);}}public static Properties getProperties() {return prop;}
}

② 手动创建数据源

解析完配置文件后需要判断是否开启号段模式:

// 是否开启号段模式
boolean flag = Boolean.parseBoolean(properties.getProperty(Constants.LEAF_SEGMENT_ENABLE, "true"));
if (flag) {// 2. 创建Druid dataSourcedataSource = new DruidDataSource();dataSource.setUrl(properties.getProperty(Constants.LEAF_JDBC_URL));dataSource.setUsername(properties.getProperty(Constants.LEAF_JDBC_USERNAME));dataSource.setPassword(properties.getProperty(Constants.LEAF_JDBC_PASSWORD));dataSource.init();······} else {// ZeroIDGen一直返回id=0idGen = new ZeroIDGen();logger.info("Zero ID Gen Service Init Successfully");
}

如果没有开启号段模式,则创建默认返回id为0的id生成器 ZeroIDGen

public class ZeroIDGen implements IDGen {@Overridepublic Result get(String key) {return new Result(0, Status.SUCCESS);}@Overridepublic boolean init() {return true;}
}

第二步主要通过配置文件配置的数据库连接信息,手动创建出数据源 DruidDataSource

③ 创建IDAllocDaoImpl

我们先来查看 IDAllocDao 接口中的方法。

public interface IDAllocDao {List<LeafAlloc> getAllLeafAllocs();LeafAlloc updateMaxIdAndGetLeafAlloc(String tag);LeafAlloc updateMaxIdByCustomStepAndGetLeafAlloc(LeafAlloc leafAlloc);List<String> getAllTags();
}

再查看 IDAllocDaoImpl 实现类对应的方法实现。

public class IDAllocDaoImpl implements IDAllocDao {SqlSessionFactory sqlSessionFactory;public IDAllocDaoImpl(DataSource dataSource) {// 手动初始化sqlSessionFactoryTransactionFactory transactionFactory = new JdbcTransactionFactory();Environment environment = new Environment("development", transactionFactory, dataSource);Configuration configuration = new Configuration(environment);configuration.addMapper(IDAllocMapper.class);sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);}/*** 获取所有的业务key对应的发号配置* @return*/@Overridepublic List<LeafAlloc> getAllLeafAllocs() {SqlSession sqlSession = sqlSessionFactory.openSession(false);try {return sqlSession.selectList("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.getAllLeafAllocs");} finally {sqlSession.close();}}/*** 更新数据库的最大id值,并返回LeafAlloc* @param tag* @return*/@Overridepublic LeafAlloc updateMaxIdAndGetLeafAlloc(String tag) {SqlSession sqlSession = sqlSessionFactory.openSession();try {// 更新tag对应记录中的max_id,max_id = max_id + step,step为数据库中设置的stepsqlSession.update("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.updateMaxId", tag);// 获取更新完的记录,封装成LeafAlloc对象返回LeafAlloc result = sqlSession.selectOne("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.getLeafAlloc", tag);// 提交事务sqlSession.commit();return result;} finally {sqlSession.close();}}/*** 依据动态调整的step值,更新DB的最大id值,并返回更新后的记录* @param leafAlloc* @return*/@Overridepublic LeafAlloc updateMaxIdByCustomStepAndGetLeafAlloc(LeafAlloc leafAlloc) {SqlSession sqlSession = sqlSessionFactory.openSession();try {sqlSession.update("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.updateMaxIdByCustomStep", leafAlloc);LeafAlloc result = sqlSession.selectOne("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.getLeafAlloc", leafAlloc.getKey());sqlSession.commit();return result;} finally {sqlSession.close();}}/*** 从数据库查询出所有的biz_tag* @return*/@Overridepublic List<String> getAllTags() {// 设置false,表示手动事务SqlSession sqlSession = sqlSessionFactory.openSession(false);try {return sqlSession.selectList("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.getAllTags");} finally {sqlSession.close();}}
}

对于接口的四个方法的作用都有详细的注释,读者大致有个印象,我们后面解析获取id流程时会继续详细查看。比较陌生的应该是方法的返回实体类 LeafAlloc,其他它就是对应着数据库表。

/*** 分配bean,和数据库表记录基本对应*/
public class LeafAlloc {private String key;  // 对应biz_tagprivate long maxId;  // 对应最大idprivate int step;    // 对应步长private String updateTime;  // 对应更新时间// getter and setter
}

我们先来看一下这一步创建 IDAllocDaoImpl 中构造函数的逻辑,可以看到主要是按照使用MyBatis的流程创建出 SqlSessionFactory 对象。

④ 创建并初始化ID生成器

先来查看ID生成器接口:

public interface IDGen {/*** 获取指定key下一个id* @param key* @return*/Result get(String key);/*** 初始化* @return*/boolean init();
}

接口主要包含两个方法,分别是获取指定key的下一个id值,和初始化生成器的方法。

该接口的实现类有三个,分别是号段模式、snowflake以及默认一直返回0的生成器。

创建号段模式ID生成器
com.sankuai.inf.leaf.segment.SegmentIDGenImpl 是我们分析整个流程的重点,我们先来简单的查看其内部几个重要的成员变量:

/*** 号段模式ID生成器*/
public class SegmentIDGenImpl implements IDGen {·······/*** 最大步长不超过100,0000*/private static final int MAX_STEP = 1000000;/*** 一个Segment维持时间为15分钟*/private static final long SEGMENT_DURATION = 15 * 60 * 1000L;/*** 线程池,用于执行异步任务,比如异步准备双buffer中的另一个buffer*/private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new UpdateThreadFactory());/*** 标记自己是否初始化完毕*/private volatile boolean initOK = false;/*** cache,存储所有业务key对应双buffer号段,所以是基于内存的发号方式*/private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<String, SegmentBuffer>();/*** 查询数据库的dao*/private IDAllocDao dao;········
}

cache 是号段模式基于内存发号的关键,它是一个key为数据库表中不同业务的tag,value是一个 SegmentBuffer 对象,如果阅读过官方的博客可以知道双 buffer 优化的事情,这里的SegmentBuffer 对象就是封装了两个 Segment 号段的数据结构。

回到 SegmentService 构造函数的第四步中来,创建 SegmentIDGenImpl 实例时使用的是默认构造函数,紧接着将第三步创建数据库 dao 注入进 SegmentIDGenImpl 。然后调用生成器的初始化方法。

// 4. 创建ID生成器实例SegmentIDGenImpl
idGen = new SegmentIDGenImpl();
((SegmentIDGenImpl) idGen).setDao(dao);
// 初始化SegmentIDGenImpl(加载db的tags至内存cache中,并开启定时同步更新任务)
if (idGen.init()) {logger.info("Segment Service Init Successfully");
} else {throw new InitException("Segment Service Init Fail");
}

初始化号段模式ID生成器

我们查看 SegmentIDGenImpl 的初始化方法逻辑,可以看到主要调用了两个方法,并且设置了自己的初始化标记为OK状态。如果没有初始化成功,会抛出异常,这在上面代码可以看出。

@Override
public boolean init() {logger.info("Init ...");// 确保加载到kv后才初始化成功updateCacheFromDb();initOK = true;// 定时1min同步一次db和cacheupdateCacheFromDbAtEveryMinute();return initOK;
}

我们具体来查看 updateCacheFromDb()updateCacheFromDbAtEveryMinute() 方法逻辑。通过方法名其实我们可以推测方法含义是从数据库中取出数据更新 cache,第二个方法则是一个定时任务,每分钟都执行一遍第一个方法。我们具体查看一下。

/*** 将数据库表中的tags同步到cache中*/
private void updateCacheFromDb() {logger.info("update cache from db");StopWatch sw = new Slf4JStopWatch();try {// 获取数据库表中所有的biz_tagList<String> dbTags = dao.getAllTags();if (dbTags == null || dbTags.isEmpty()) {return;}// 获取当前的cache中所有的tagList<String> cacheTags = new ArrayList<String>(cache.keySet());// 数据库中的tagList<String> insertTags = new ArrayList<String>(dbTags);List<String> removeTags = new ArrayList<String>(cacheTags);// 下面两步操作:保证cache和数据库tags同步// 1. cache新增上数据库表后添加的tags// 2. cache删除掉数据库表后删除的tags// 1. db中新加的tags灌进cache,并实例化初始对应的SegmentBufferinsertTags.removeAll(cacheTags);for (String tag : insertTags) {SegmentBuffer buffer = new SegmentBuffer();buffer.setKey(tag);// 零值初始化当前正在使用的Segment号段Segment segment = buffer.getCurrent();segment.setValue(new AtomicLong(0));segment.setMax(0);segment.setStep(0);cache.put(tag, buffer);logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);}// 2. cache中已失效的tags从cache删除removeTags.removeAll(dbTags);for (String tag : removeTags) {cache.remove(tag);logger.info("Remove tag {} from IdCache", tag);}} catch (Exception e) {logger.warn("update cache from db exception", e);} finally {sw.stop("updateCacheFromDb");}
}

首先通过dao层查询出数据库表中最新的所有的 biz_tag,紧接着就是同步数据库中的 tags 和内存中的 cache。同步的方式包含两步操作:

  1. 插入 cache 中不存在但是数据库新增的 biz_tag ;
  2. 删除 cache 中仍然存在但是数据库表中已经删除的biz_tag

上面这段代码主要完成的就是这两步操作,代码逻辑仔细阅读还是比较清晰的,配合注释读者可以相应理解,不再赘述。
需要额外提及的是 cache 的key我们已经知道是 biz_tag,但value我们仅仅知道是封装了两个 Segment 号段的 SegmentBuffer。我们具体来看看 SegmentBuffer 的定义。

/*** 双buffer——双号段* 双Buffer的方式,保证无论何时DB出现问题,都能有一个Buffer的号段可以正常对外提供服务* 只要DB在一个Buffer的下发的周期内恢复,就不会影响整个Leaf的可用性*/
public class SegmentBuffer {private String key; // 数据库的业务tagprivate Segment[] segments; //双buffer,双号段private volatile int currentPos; //当前的使用的segment的indexprivate volatile boolean nextReady; //下一个segment是否处于可切换状态private volatile boolean initOk; //是否DB数据初始化完成private final AtomicBoolean threadRunning; //线程是否在运行中private final ReadWriteLock lock; // 读写锁private volatile int step;  // 动态调整的stepprivate volatile int minStep;  // 最小stepprivate volatile long updateTimestamp;  // 更新时间戳public SegmentBuffer() {// 创建双号段,能够异步准备,并切换segments = new Segment[]{new Segment(this), new Segment(this)};currentPos = 0;nextReady = false;initOk = false;threadRunning = new AtomicBoolean(false);lock = new ReentrantReadWriteLock();}public int nextPos() {return (currentPos + 1) % 2;}public void switchPos() {currentPos = nextPos();}public Lock rLock() {return lock.readLock();}public Lock wLock() {return lock.writeLock();}
}

可以看见 SegmentBuffer 中包含了一个号段数组,包含两个 Segment,每一次只用一个,另一个异步的准备好,等到当前号段用完,就可以切换另一个,像Young GC的两个Survivor区倒来倒去的思想。我们再来看一下号段 Segment 的定义。

/*** 号段类*/
public class Segment {/*** 内存生成的每一个id号*/private AtomicLong value = new AtomicLong(0);/*** 当前号段允许的最大id值*/private volatile long max;/*** 步长,会根据数据库的step动态调整*/private volatile int step;/*** 当前号段所属的SegmentBuffer*/private SegmentBuffer buffer;public Segment(SegmentBuffer buffer) {this.buffer = buffer;}/*** 获取号段的剩余量* @return*/public long getIdle() {return this.getMax() - getValue().get();}
}

value 就是用来产生id值的,它是一个 AtomicLong 类型,多线程下可以利用它的一些原子API操作。max 则代表自己(号段对象)能产生的最大的id值,也就是value的上限,用完了就需要切换号段,自己重新从数据库获取下一个号段区间。step 是动态调整的步长,关于动态调整,官方博客也有所解释,这里先不赘述。当自己用完了,就需要从数据库请求新的号段区间,区间大小就是由这个 step 决定的。

介绍完Leaf的号段,双Buffer数据结构后,我们回过头查看同步DB到 cache 的逻辑中插入新的 SegmentBuffer 是如何创建的。

for (String tag : insertTags) {SegmentBuffer buffer = new SegmentBuffer();buffer.setKey(tag);// 零值初始化当前正在使用的Segment号段Segment segment = buffer.getCurrent();segment.setValue(new AtomicLong(0));segment.setMax(0);segment.setStep(0);cache.put(tag, buffer);
}

可以看到对于 SegmentBuffer 我们仅仅设置了key,然后就是依靠 SegmentBuffer 自身的构造函数对其内部成员进行了默认初始化,也可以说是零值初始化。特别注意,此时 SegmentBufferinitOk 标记还是 false,这也说明这个标记其实并不是标记零值初始化是否完成。然后程序接着对0号 Segment 的所有成员进行了零值初始化。

同步完成后,即将数据库中的所有 tags 记录加载到内存后,便将ID生成器的初始化标记设置为 true

我们再来查看 updateCacheFromDbAtEveryMinute() 方法逻辑。

/*** 每分钟同步db到cache*/
private void updateCacheFromDbAtEveryMinute() {ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("check-idCache-thread");t.setDaemon(true);return t;}});service.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {updateCacheFromDb();}}, 60, 60, TimeUnit.SECONDS);
}

可以看到方法中创建了一个定时执行任务线程池,任务就是 updateCacheFromDb(),也就是上面那个方法,定时时间为60s,也就是1min。

获取ID

上一小节我们主要是在分析创建 SegmentService 过程中做了哪些事情,总结下来最重要的就是从数据库表中准备好 cachecache 中包含每个key对应的双号段,经过第一部分已经零值初始化好双号段的当前使用号段。接下来我们继续分析 SegmentServicegetId() 方法,我们的控制层就是通过该方法获取id的。

/*** 根据key获取id* @param key* @return*/
public Result getId(String key) {return idGen.get(key);
}

再次分析号段生成器 SegmentIDGenImplget() 方法。

/*** 获取对应key的下一个id值* @param key* @return*/
@Override
public Result get(final String key) {// 必须在 SegmentIDGenImpl 初始化后执行init()方法// 也就是必须将数据库中的tags加载到内存cache中,并开启定时同步任务if (!initOK) {return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);}if (cache.containsKey(key)) {// 获取cache中对应的SegmentBuffer,SegmentBuffer中包含双buffer,两个号段SegmentBuffer buffer = cache.get(key);// 双重判断,避免多线程重复执行SegmentBuffer的初始化值操作// 在get id前检查是否完成DB数据初始化cache中key对应的的SegmentBuffer(之前只是零值初始化),需要保证线程安全if (!buffer.isInitOk()) {synchronized (buffer) {if (!buffer.isInitOk()) {// DB数据初始化SegmentBuffertry {// 根据数据库表中key对应的记录 来初始化SegmentBuffer当前正在使用的SegmentupdateSegmentFromDb(key, buffer.getCurrent());logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());buffer.setInitOk(true);} catch (Exception e) {logger.warn("Init buffer {} exception", buffer.getCurrent(), e);}}}}// SegmentBuffer准备好之后正常就直接从cache中生成id即可return getIdFromSegmentBuffer(cache.get(key));}// cache中不存在对应的key,则返回异常错误return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
}

首先先从 cache 中获取 key 对应的 SegmentBuffer,然后判断 SegmentBuffer 是否是初始化完成,也就是 SegmentBufferinitOk 标记。这里用了双重判断+synchronized 方式确保 SegmentBuffer 只被初始化一次。那么这里初始化究竟是指什么,才算初始化完成呢?

① 初始化SegmentBuffer

初始化 SegmentBuffer 的核心逻辑就是调用下面这个方法。

// 根据数据库表中key对应的记录 来初始化SegmentBuffer当前正在使用的Segment
updateSegmentFromDb(key, buffer.getCurrent());

查看方法名,也可以知道是从数据库表查询数据更新号段 Segment,对于号段初始状态来说,该方法含义可以理解为初始化 Segment 的值,对于用完的号段来讲,可以理解为从数据库获取下一号段值。

所以这里初始化是指DB数据初始化当前号段,初始化完成就标记 SegmentBufferinitOktrue,也就表明 SegmentBuffer 中有一个号段已经准备完成了。

我们具体查看 updateSegmentFromDb(key, buffer.getCurrent()) 方法:

/*** 从数据库表中读取数据更新SegmentBuffer中的Segment* @param key* @param segment*/
public void updateSegmentFromDb(String key, Segment segment) {StopWatch sw = new Slf4JStopWatch();/*** 1. 先设置SegmentBuffer*/// 获取Segment号段所属的SegmentBufferSegmentBuffer buffer = segment.getBuffer();LeafAlloc leafAlloc;// 如果buffer没有DB数据初始化(也就是第一次进行DB数据初始化)if (!buffer.isInitOk()) {// 更新数据库中key对应记录的maxId(maxId表示当前分配到的最大id,maxId=maxId+step),并查询更新后的记录返回leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);// 数据库初始设置的step赋值给当前buffer的初始step,后面后动态调整buffer.setStep(leafAlloc.getStep());// leafAlloc中的step为DB中设置的step,buffer这里是未进行DB数据初始化的,所以DB中step代表动态调整的最小下限buffer.setMinStep(leafAlloc.getStep());}// 如果buffer的更新时间是0(初始是0,也就是第二次调用updateSegmentFromDb())else if (buffer.getUpdateTimestamp() == 0) {// 更新数据库中key对应记录的maxId(maxId表示当前分配到的最大id,maxId=maxId+step),并查询更新后的记录返回leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);// 记录buffer的更新时间buffer.setUpdateTimestamp(System.currentTimeMillis());// leafAlloc中的step为DB中的stepbuffer.setMinStep(leafAlloc.getStep());}// 第三次以及之后的进来 动态设置nextStepelse {// 计算当前更新操作和上一次更新时间差long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();int nextStep = buffer.getStep();/***  动态调整step*  1) duration < 15 分钟 : step 变为原来的2倍, 最大为 MAX_STEP*  2) 15分钟 <= duration < 30分钟 : nothing*  3) duration >= 30 分钟 : 缩小step, 最小为DB中配置的step**  这样做的原因是认为15min一个号段大致满足需求*  如果updateSegmentFromDb()速度频繁(15min多次),也就是*  如果15min这个时间就把step号段用完,为了降低数据库访问频率,我们可以扩大step大小*  相反如果将近30min才把号段内的id用完,则可以缩小step*/// duration < 15 分钟 : step 变为原来的2倍. 最大为 MAX_STEPif (duration < SEGMENT_DURATION) {if (nextStep * 2 > MAX_STEP) {//do nothing} else {// 步数 * 2nextStep = nextStep * 2;}}// 15分钟 < duration < 30分钟 : nothingelse if (duration < SEGMENT_DURATION * 2) {//do nothing with nextStep}// duration > 30 分钟 : 缩小step ,最小为DB中配置的步数else {nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;}logger.info("leafKey[{}], dbStep[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);/*** 根据动态调整的nextStep更新数据库相应的maxId*/// 为了高效更新记录,创建一个LeafAlloc,仅设置必要的字段的信息LeafAlloc temp = new LeafAlloc();temp.setKey(key);temp.setStep(nextStep);// 根据动态调整的step更新数据库的maxIdleafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);// 记录更新时间buffer.setUpdateTimestamp(System.currentTimeMillis());// 记录当前buffer的动态调整的step值buffer.setStep(nextStep);// leafAlloc的step为DB中的step,所以DB中的step值代表着下限buffer.setMinStep(leafAlloc.getStep());}/*** 2. 准备当前Segment号段*/// 设置Segment号段id的起始值,value就是id(start=max_id-step)long value = leafAlloc.getMaxId() - buffer.getStep();// must set value before set max(https://github.com/Meituan-Dianping/Leaf/issues/16)segment.getValue().set(value);segment.setMax(leafAlloc.getMaxId());segment.setStep(buffer.getStep());sw.stop("updateSegmentFromDb", key + " " + segment);
}

这个函数的逻辑非常重要,还包含了动态调整步长的逻辑。首先,该方法被调用的时机我们需要明确,每当我们需要从数据库获取一个号段才会被调用。方法的第一部分主要先通过数据库并设置 SegmentBuffer 相关值,第二部分再准备 Segment

第一部分的逻辑按照调用该方法的次数分为第一次准备号段、第二次准备号段和第三次及之后的准备号段。

  1. 第一次准备号段,也就是 SegmentBuffer 还没有DB初始化,我们要从数据库获取一个号段,记录 SegmentBuffer 的当前步长、最小步长都是数据库设置的步长;
  2. 第二次准备号段,也就是双buffer的异步准备另一个号段 Segment 时,会进入这一逻辑分支。仍然从数据库获取一个号段,此时记录这次获取下一个号段的时间戳,设置最小步长是数据库设置的步长;
  3. 之后再次准备号段,首先要动态调整这次申请号段的区间大小,也就是代码中的 nextStep,调整规则主要跟号段申请频率有关,具体可以查看注释以及代码。计算出动态调整的步长,需要根据新的步长去数据库申请号段,同时记录这次获取号段的时间戳,保存动态调整的步长到 SegmentBuffer,设置最小步长是数据库设置的步长。

第二部分逻辑主要是准备 Segment 号段,将 Segment 号段的四个成员变量进行新一轮赋值,value 就是 idstart=max_id-step)。

② 从号段中获取id

SegmentBuffer 和 其中一个号段 Segment 准备好,就可以进行从号段中获取id。我们具体查看号段ID生成器 SegmentIDGenImplgetIdFromSegmentBuffer() 方法。

/*** 从SegmentBuffer生成id返回* @param buffer* @return*/
public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {// 自旋获取idwhile (true) {try {// 获取buffer的共享读锁,在平时不操作Segment的情况下益于并发buffer.rLock().lock();// 获取当前正在使用的Segmentfinal Segment segment = buffer.getCurrent();// ===============异步准备双buffer的另一个Segment==============// 1. 另一个Segment没有准备好// 2. 当前Segment已经使用超过10%则开始异步准备另一个Segment// 3. buffer中的threadRunning字段. 代表是否已经提交线程池运行,是否有其他线程已经开始进行另外号段的初始化工作.使用CAS进行更新保证buffer在任意时刻,只会有一个线程进行异步更新另外一个号段.if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {// 线程池异步执行【准备Segment】任务service.execute(new Runnable() {@Overridepublic void run() {// 获得另一个Segment对象Segment next = buffer.getSegments()[buffer.nextPos()];boolean updateOk = false;try {// 从数据库表中准备SegmentupdateSegmentFromDb(buffer.getKey(), next);updateOk = true;logger.info("update segment {} from db {}", buffer.getKey(), next);} catch (Exception e) {logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);} finally {// 如果准备成功,则通过独占写锁设置另一个Segment准备标记OK,threadRunning为false表示准备完毕if (updateOk) {// 读写锁是不允许线程先获得读锁继续获得写锁,这里可以是因为这一段代码其实是线程池线程去完成的,不是获取到读锁的线程buffer.wLock().lock();buffer.setNextReady(true);buffer.getThreadRunning().set(false);buffer.wLock().unlock();} else {// 失败了,则还是没有准备好,threadRunning恢复false,以便于下次获取id时重新再异步准备Segmentbuffer.getThreadRunning().set(false);}}}});}// 原子value++(返回旧值),也就是下一个id,这一步是多线程操作的,每一个线程加1都是原子的,但不一定保证顺序性long value = segment.getValue().getAndIncrement();// 如果获取到的id小于maxIdif (value < segment.getMax()) {return new Result(value, Status.SUCCESS);}} finally {// 释放读锁buffer.rLock().unlock();}// 等待线程池异步准备号段完毕waitAndSleep(buffer);// 执行到这里,说明当前号段已经用完,应该切换另一个Segment号段使用try {// 获取独占式写锁buffer.wLock().lock();// 获取当前使用的Segment号段final Segment segment = buffer.getCurrent();// 重复获取value, 多线程执行时,Segment可能已经被其他线程切换。再次判断, 防止重复切换Segmentlong value = segment.getValue().getAndIncrement();if (value < segment.getMax()) {return new Result(value, Status.SUCCESS);}// 执行到这里, 说明其他的线程没有进行Segment切换,并且当前号段所有号码用完,需要进行切换Segment// 如果准备好另一个Segment,直接切换if (buffer.isNextReady()) {buffer.switchPos();buffer.setNextReady(false);}// 如果另一个Segment没有准备好,则返回异常双buffer全部用完else {logger.error("Both two segments in {} are not ready!", buffer);return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);}} finally {// 释放写锁buffer.wLock().unlock();}}
}

首先该方法最外层套了一个循环,不断地尝试获取id。整个方法的逻辑大致包含:

  1. 首先获取共享读锁,多个线程能够同时进来获取id。如果能够不需要异步准备双buffer的另一个 Segment 且分发的id号没有超出maxId,那么可以直接返回id号。多个线程并发获取id号,靠 AtomicLonggetAndIncrement() 原子操作保证不出问题。
  2. 如果需要异步准备另一个 Segment,则将准备任务提交到线程池中进行完成。多线程执行下,要保证只有一个线程去提交任务。这一点是靠 SegmentBuffer 中的 threadRunning 字段实现的。threadRunning 字段用 volatile 修饰保证多线程可见性,其含义代表了异步准备号段任务是否已经提交线程池运行,是否有其他线程已经开始进行另外号段的初始化工作。使用CAS操作进行更新,保证 SegmentBuffer 在任意时刻只会有一个线程进行异步更新另外一个号段。
  3. 如果号段分配的 id 号超出了maxId,则需要进行切换双buffer的操作。在进行直接切换之前,需要再次判断是否 id 还大于 maxId,因为多线程下,号段已经被其他线程切换成功,自己还不知道,所以为了避免重复切换出错,需要再次判断。切换操作为了保证同一时间只能有一个线程切换,这里利用了独占式的写锁。

美团Leaf源码——号段模式源码解析相关推荐

  1. 美团开源分布式ID生成系统——Leaf源码阅读笔记(Leaf的号段模式)

    Leaf 最早期需求是各个业务线的订单ID生成需求.在美团早期,有的业务直接通过DB自增的方式生成ID,有的业务通过redis缓存来生成ID,也有的业务直接用UUID这种方式来生成ID.以上的方式各自 ...

  2. 深度解析leaf分布式id生成服务源码(号段模式)

    原创不易,转载请注明出处 文章目录 前言 1.实现原理推演 1.1 基于mysql最简单分布式ID实现 1.2 flickr分布式id解决方案 1.3 号段+mysql 2.源码剖析 2.1初始化 2 ...

  3. 分布式系统概念 | 分布式ID:数据库、号段模式、雪花算法(Snowflake)、Redis实现方案

    文章目录 分布式ID 数据库 自增ID 多主模式 号段模式 雪花算法 Redis 总结 分布式ID ID是数据的唯一标识,传统的做法是使用数据库的自增ID,但是随着业务规模的不断发展,数据量将越来越大 ...

  4. ID生成方案之号段模式

    号段模式ID生成器组件地址 https://github.com/15928587230/os-component-idworker 开箱即用, github上面有简洁的使用说明. 一 Leaf号段模 ...

  5. 基于号段模式、百度UID实现的分布式ID生成器kylin-id

    1.简介 1.1.开源项目 kylin-id:麒麟分布式id生成器,支持号段模式.雪花算法 并未发布jar到中央仓库,需要自己本地构建 1.2.介绍 参考滴滴[tinyid] 整合百度[UID] 麒麟 ...

  6. 序列号及序列号生成器(号段模式,数据库模式)详细介绍(建议收藏)

    前言 现如今,数据的唯一性和可追溯性变得越来越重要.从简单的数据库主键到复杂的分布式系统,唯一标识符在各种场景中都发挥着关键作用.序列号,作为一种广泛应用的唯一标识符,为我们提供了确保数据完整性和一致 ...

  7. 美团leaf生成分布式唯一id

    1. 介绍 https://github.com/Meituan-Dianping/Leaf.git 源码 改为下载Leaf-feature-spring-boot-starter.zip包 本地安装 ...

  8. 美团Leaf实战(分布式Id算法)

    文章目录 1. Leaf-segment号段模式 1.1 数据库配置 1.2 导入并修改leaf项目 1.3 Leaf-segment双buffer模式 1.4 Leaf segment监控 1.5 ...

  9. 虚拟码号应用行业介绍!

    虚拟码号基本上什么行业都能使用,看具体的使用场景. 虚拟码号最基本的服务模式一般称为AX和AXB. AX模式,说白了就是一个真实码号捆死一个虚拟码号的方式,这种方式,一般用来管"人" ...

最新文章

  1. 李飞飞团队最新论文:基于anchor关键点的类别级物体6D位姿跟踪
  2. 315道Python面试题,欢迎挑战
  3. Paths on a Grid
  4. matlba 正交基
  5. linux下的socket通信小程序分享——第三圣子
  6. 021_jdbc-mysql入门
  7. mysql控制台教程视频教程_mysql 控制台操作
  8. 颠覆与重构——戴尔助力徐工集团等行业客户实现业务转型
  9. poj 2439 ArcticNetwork 最小生成树Kruskal、(Prim方法还没做
  10. Leetcode每日一题:242.有效的字母异位词
  11. css设置按钮竖直方向居中_button内flex垂直居中竟然不居中的解决
  12. MySQL-第N篇一些经验
  13. 二进制、十进制、十六进制数值对照表
  14. Http的会话跟踪和跨站攻击(xss)
  15. 并发编程 CAS算法
  16. YAML语法详细总结
  17. freeotp使用教程_PPT模板怎么用
  18. 酒店管理系统——界面设计
  19. nginx: [emerg] still could not bind()
  20. 微博有哪些营销价值?

热门文章

  1. windows下打开命令提示符的方式
  2. 谷粒商城篇章5 ---- P173-P192 ---- 检索服务【分布式高级篇二】
  3. 删 卡尔 波普尔_卡尔波普尔与进化论的可证伪性
  4. Herader 头常用项注解
  5. 单片机c语言99秒的定时器,通过51单片机定时器1实现99秒倒计时
  6. 怎么查看PDF的纸张大小以及如何调整
  7. 谁说0基础参加北京培训不能成为UI设计师
  8. 公务员c语言试题,公务员-计算机类C语言复习参考答案.doc
  9. Who Am I? Personality Detection based on Deep Learning for Texts 阅读笔记
  10. 微信公众号无感登录总结