elasticsearch的节点Node在启动的时候(也就是在start()方法中)开始加入集群,并准备参与选举。

在Node的start()方法中,会调用ZenDiscovery的startInitialJoin()方法开始加入集群并准备进行参与选举。

@Override
public void startInitialJoin() {// start the join thread from a cluster state update. See {@link JoinThreadControl} for details.clusterService.submitStateUpdateTask("initial_join", new LocalClusterUpdateTask() {@Overridepublic ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {// do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discoveredjoinThreadControl.startNewThreadIfNotRunning();return unchanged();}@Overridepublic void onFailure(String source, @org.elasticsearch.common.Nullable Exception e) {logger.warn("failed to start initial join process", e);}});
}

这里会向clusterService提交一个任务Task准备放入线程池中执行,这里的实现是一个LocalClusterUpdateTask,重写了execute(),其中实则是调用了joinThreadControl的startNewThreadIfNotRunning()。joinThreadControl作为ZenDiscovery的一个内部类,主要用来保证执行加入集群线程的唯一性。

private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicReference<Thread> currentJoinThread = new AtomicReference<>();

joinThreadControl通过一个AtomicBoolean类型的running来表示Node加入集群与选举的开始与结束,而currentJoinThread则通过AtomicReference来保证工作线程的可见性与唯一性。

在startNewThreadIfNotRunning()方法中先通过joinThreadActive()方法确保当前并没有工作线程在运行。

public boolean joinThreadActive() {Thread currentThread = currentJoinThread.get();return running.get() && currentThread != null && currentThread.isAlive();
}

如果没有,那么就新建一个工作线程准备开始加入集群。

public void startNewThreadIfNotRunning() {ClusterService.assertClusterStateThread();if (joinThreadActive()) {return;}threadPool.generic().execute(new Runnable() {@Overridepublic void run() {Thread currentThread = Thread.currentThread();if (!currentJoinThread.compareAndSet(null, currentThread)) {return;}while (running.get() && joinThreadActive(currentThread)) {try {innerJoinCluster();return;} catch (Exception e) {logger.error("unexpected error while joining cluster, trying again", e);// Because we catch any exception here, we want to know in// tests if an uncaught exception got to this point and the test infra uncaught exception// leak detection can catch this. In practise no uncaught exception should leakassert ExceptionsHelper.reThrowIfNotNull(e);}}// cleaning the current thread from currentJoinThread is done by explicit calls.}});
}

在其run()方法中,通过cas更新curentJoinThread,并在服务结束之前,并且当前线程是ZenDiscovery的工作线程之时,不断再循环中执行innerJoinCluster()。

InnerJoinCluster()分为两步,在本节点还未选择出自己所认定master节点之前,会一直不断循环调用findMaster()去得到自己认定的master节点。

while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {masterNode = findMaster();
}

在findMaster()中,会根据pingAndWait()方法去获取集群内其他节点关于选举的ping请求的回复,具体的获取在之前的文章已经详细解释。

final DiscoveryNode localNode = clusterService.localNode();// add our selves
assert fullPingResponses.stream().map(ZenPing.PingResponse::node).filter(n -> n.equals(localNode)).findAny().isPresent() == false;fullPingResponses.add(new ZenPing.PingResponse(localNode, null, clusterService.state()));

接下来会过滤掉本地节点的数据,重新加入当前本地节点的选举数据,由于刚加入选举的缘故,所以其还并没有master节点的选择。

final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);

在默认情况下,masterElectionIgnoreNonMasters为false,因此data节点的选举数据也会被考虑到选举的过程中。

List<DiscoveryNode> activeMasters = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {// We can't include the local node in pingMasters list, otherwise we may up electing ourselves without// any check / verifications from other nodes in ZenDiscover#innerJoinCluster()if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {activeMasters.add(pingResponse.master());}
}// nodes discovered during pinging
List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {if (pingResponse.node().isMasterNode()) {masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));}
}

之后,遍历所有收到的ping请求的节点结果,取出所有已经选出的不为自己的master节点,加入到activeMasters中。再遍历所有ping请求的节点结果,将所有属性master为true的节点加入到候选人数组masterCandidates当中。

之后选择如果activeMasters不为空,说明该集群中已经存在master节点,那么就在activeMasterss中选择id最小的节点作为自己投票选择的master节点,并返回。

public DiscoveryNode tieBreakActiveMasters(Collection<DiscoveryNode> activeMasters) {return activeMasters.stream().min(ElectMasterService::compareNodes).get();
}
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {if (o1.isMasterNode() && !o2.isMasterNode()) {return -1;}if (!o1.isMasterNode() && o2.isMasterNode()) {return 1;}return o1.getId().compareTo(o2.getId());
}

如果activeMasters为空,说明此时集群还并没有选举出master节点。

if (electMaster.hasEnoughCandidates(masterCandidates)) {final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);logger.trace("candidate {} won election", winner);return winner.getNode();
}

那么首先判断当前masterCandidates数组中的候选节点个数是否已经大于最小开始选举接节点数量(默认为-1),如果大于,则通过electMaster的electMaster()方法获取自己所投票的master节点并返回。

public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {assert hasEnoughCandidates(candidates);List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);sortedCandidates.sort(MasterCandidate::compare);return sortedCandidates.get(0);
}
public static int compare(MasterCandidate c1, MasterCandidate c2) {// we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted// list, so if c2 has a higher cluster state version, it needs to come first.int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);if (ret == 0) {ret = compareNodes(c1.getNode(), c2.getNode());}return ret;
}
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {if (o1.isMasterNode() && !o2.isMasterNode()) {return -1;}if (!o1.isMasterNode() && o2.isMasterNode()) {return 1;}return o1.getId().compareTo(o2.getId());
}

这里所要投票的master节点的选择则是从候选节点数组中选择id最小版本最新的节点。

这样,当前节点所要在选举中投票的master节点已经被选出。

此时存在两种情况,如果当前节点所选择的master节点正式自己,则会正式准备成为master节点,但是前提是他必须收到集群中别的节点的投票中有半数以上投向自己。

那么便会开始调用waitToBeElectedAsMaster()方法准备接收别的节点的投票结果等待投自己的超过半数以成为master节点。

final CountDownLatch done = new CountDownLatch(1);
final ElectionCallback wrapperCallback = new ElectionCallback() {@Overridepublic void onElectedAsMaster(ClusterState state) {done.countDown();callback.onElectedAsMaster(state);}@Overridepublic void onFailure(Throwable t) {done.countDown();callback.onFailure(t);}
};

首先会生成一个CountDoneLatch用来等待别的集群的投票和等待的timeout,并生成一个结束阻塞的callback用来在完成时结束阻塞并去完成一个节点正式成为master节点要做的流程。

synchronized (this) {assert electionContext != null : "waitToBeElectedAsMaster is called we are not accumulating joins";myElectionContext = electionContext;electionContext.onAttemptToBeElected(requiredMasterJoins, wrapperCallback);checkPendingJoinsAndElectIfNeeded();
}try {if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {// callback handles everythingreturn;}
} catch (InterruptedException e) {}

CountDownLatch进入await在timeout的时间限制内等待别的节点的投票。

在ZenDiscovery的构造方法中就已经根据路径discovery/zen/join注册了相应的requestHandler,其中会触发MemberShipListener的onJoin()方法,并调用handleJoinRequest()方法,在这个方法里,主要会对于发送join请求(也就是选举投票)的节点进行验证,验证通过之后将会根据nodeJoinController的handleJoinRequest()方法对成为master节点的要求的投票给自己的节点数量masterJoinsCount加一,并判断是否可以成为master节点。

public synchronized void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {if (electionContext != null) {electionContext.addIncomingJoin(node, callback);checkPendingJoinsAndElectIfNeeded();} else {clusterService.submitStateUpdateTask("zen-disco-node-join",node, ClusterStateTaskConfig.build(Priority.URGENT),joinTaskExecutor, new JoinTaskListener(callback, logger));}
}
private synchronized void checkPendingJoinsAndElectIfNeeded() {assert electionContext != null : "election check requested but no active context";final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {if (logger.isTraceEnabled()) {logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins,electionContext.requiredMasterJoins);}} else {if (logger.isTraceEnabled()) {logger.trace("have enough joins for election. Got [{}], required [{}]", pendingMasterJoins,electionContext.requiredMasterJoins);}electionContext.closeAndBecomeMaster();electionContext = null; // clear this out so future joins won't be accumulated}
}

在checkPendingJoinsAndElectIfNeed()方法中,如果已经接收到的join请求也就是投票自己的节点数量已经超过集群中节点数量的半数,那么调用closeAndBecomeMaster()方法结束本次选举正式成为master节点。

public void markThreadAsDoneAndStartNew(Thread joinThread) {ClusterService.assertClusterStateThread();if (!markThreadAsDone(joinThread)) {return;}startNewThreadIfNotRunning();
}

如果在规定的timeout里,并没有收到足够的投票,那么说明本节点的选举失败。则会回到通过markThreadAsDoneAndStartNew()关闭当前线程,并重新启动一个线程在startNewThreadIfNotRunning()方法中开始下一次循环中,继续上述选举的流程参与选举。

如果通过findMaster()得到的所要选举的节点并不是自己,则会通过joinElectedMaster()方法向所选举成为master的节点发送自己的投票。

while (true) {try {logger.trace("joining master {}", masterNode);membership.sendJoinRequestBlocking(masterNode, clusterService.localNode(), joinTimeout);return true;} catch (Exception e) {final Throwable unwrap = ExceptionsHelper.unwrapCause(e);if (unwrap instanceof NotMasterException) {if (++joinAttempt == this.joinRetryAttempts) {logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);return false;} else {logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);}} else {if (logger.isTraceEnabled()) {logger.trace((Supplier<?>) () -> new ParameterizedMessage("failed to send join request to master [{}]", masterNode), e);} else {logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ExceptionsHelper.detailedMessage(e));}return false;}}try {Thread.sleep(this.joinRetryDelay.millis());} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}

发送的目标节点的路径也正是前文中所提到的discovery/zen/join。如果在有限次数内成功,就说明当前节点所投票的目标节点成功成为master节点,本次选举也宣告完成。

而若是在有限次数内都没有成功(选举的节点没有收到超过半数master选票或者因为种种原因关闭)则会返回false,会和之前试图成为master节点失败一样,重新开启一个线程去参与下一轮选举。如果成功,直接退出即可,有关跟master同步的master fault detection已经在clusterService中被开启。

elasticsearch集群选举源码解析相关推荐

  1. Tomcat集群实现源码级别剖析

    随着互联网快速发展,各种各样供外部访问的系统越来越多且访问量越来越大,以前Web容器可以包揽接收-逻辑处理-响应整个请求生命周期的工作,现在为了构建让更多用户访问更强大的系统,人们通过不断地业务解耦. ...

  2. Redis集群模式源码分析

    目录 1 主从复制模式 2 Sentinel(哨兵)模式 3 Cluster模式 4.参考文档 1 主从复制模式 主库负责读写操作,从库负责数据同步,接受来自主库的同步命令.通过分析Redis的客户端 ...

  3. Dubbo 实现原理与源码解析系列 —— 精品合集

    摘要: 原创出处 http://www.iocoder.cn/Dubbo/good-collection/ 「芋道源码」欢迎转载,保留摘要,谢谢! 1.[芋艿]精尽 Dubbo 原理与源码专栏 2.[ ...

  4. Zookeeper源码之集群选举

    前言 zookeeper算是一个流行的分布式协调框架,在大量java分布式中间件中广泛使用.在学习zookeeper的源码前建议先了解一下分布式一致性协议的概念,zookeeper自己实现了一套满足c ...

  5. dubbo源码解析(三十五)集群——cluster

    集群--cluster 目标:介绍dubbo中集群容错的几种模式,介绍dubbo-cluster下support包的源码. 前言 集群容错还是很好理解的,就是当你调用失败的时候所作出的措施.先来看看有 ...

  6. dubbo源码解析-集群容错架构设计

    前言 本来是想把整个dubbo源码解析一次性弄完,再做成一个系列来发布的,但是正巧最近有位好朋友要去杭州面试,就和我交流了一下.本着对dubbo源码略有心得的心态,在交流过程中也发表了个人的一些粗劣的 ...

  7. dubbo源码解析(四十一)集群——Mock

    集群--Mock 目标:介绍dubbo中集群的Mock,介绍dubbo-cluster下关于服务降级和本地伪装的源码. 前言 本文讲解两块内容,分别是本地伪装和服务降级,本地伪装通常用于服务降级,比如 ...

  8. Seatunnel提交任务到Flink集群源码解析

    一:首先查看seatunnel提交任务到flink集群的时候的shell脚本start-seatunnel-flink-13-connector-v2.sh,查看最后会调用一个类FlinkStarte ...

  9. 渣渣菜鸡的 ElasticSearch 源码解析 —— 启动流程(上)

    关注我 转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/08/11/es-code02/ 前提 上篇文章写了 ElasticSearch 源码解析 -- ...

最新文章

  1. 三十六亿的《哪吒》历时五年,如何用AI解决动画创作难题?
  2. inv(a) matlab,设A为矩阵,b为列向量,则Matlab中运算A\b 和运算inv(A)*b
  3. ArrayBlockingQueue原理分析-remove方法
  4. Python爬虫入门六Cookie的使用
  5. Django中--自定义模型管理器类
  6. android studio安装教程完整,Android Studio 安装配置方法完整教程【小白秒懂】
  7. Winform开发框架中工作流模块之申请单草稿处理
  8. 95-150-045-源码-Sink-Streaming Parquet File
  9. WINDOWS SERVER 2008 R2 GHO 纯净版
  10. WindowsServers2019上手体验
  11. Windows核心编程_判断是否管理员权限运行
  12. 为什么我们更宠爱“随机”梯度下降?(SGD)
  13. MVC5 + EF6 + Bootstrap3 (11) 排序、搜索、分页
  14. 知识图谱实现公安情报分析(人工智能系列)
  15. python安装不了whl文件_python安装.whl文件失败
  16. java导出excel锁定状态_Java设置excel单元格锁定状态 | 学步园
  17. 191018 pwn-HITB_dubai polyfill
  18. Echarts绘制各种数据可视化图表案例(效果+代码)
  19. 版本管理·玩转git(团队合作)
  20. muduo源码分析2——Singleton分析

热门文章

  1. 在安卓模拟器中,adb安装apk常见错误
  2. Android之使用AlertDialog类和AlertDialog.Builder类创建带取消,确定,中立的对话框
  3. 哈希表(HashTable),哈希冲突的避免、解决
  4. C盘文件内容及清理思路
  5. ubuntu安装 rust nightly_一起学Rust编程「1」:开发环境
  6. 企业架构规划及服务器优化参数
  7. Ibatis 生成工具ibator的使用 适用于ibaits2
  8. 算法踩坑4-冒泡排序
  9. 美国纽约拟将电话亭变WiFi热点
  10. spring boot几个初始配置文件