1、概述

Coordator作为Discovery的实现类,在集群选举中负责master的选举。

2、创建

Coordinator是通过DiscoveryModule创建的

discovery = new Coordinator(NODE_NAME_SETTING.get(settings),settings, clusterSettings,transportService, namedWriteableRegistry, allocationService, masterService, gatewayMetaState::getPersistedState,seedHostsProvider, clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService,electionStrategy, nodeHealthService);

其中seedHostProvider为

final SeedHostsProvider seedHostsProvider = hostsResolver -> {final List<TransportAddress> addresses = new ArrayList<>();for (SeedHostsProvider provider : filteredSeedProviders) {addresses.addAll(provider.getSeedAddresses(hostsResolver));}return Collections.unmodifiableList(addresses);};

clusterApplier为ClusterService的ClusterApplierService

persistedStateSupplier为GatewayMetaState#getPersistedState

3、启动

调用doStart方法

创建CoordinationState,同时peerFinder设置currentTerm。

在单结点发现模式下,检查votingConfiguration是否满足quorm。

创建ClusterState,设置applierState,设置clusterApplier的初始状态。

protected void doStart() {synchronized (mutex) {CoordinationState.PersistedState persistedState = persistedStateSupplier.get();coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy));peerFinder.setCurrentTerm(getCurrentTerm());configuredHostsResolver.start();final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();if (lastAcceptedState.metadata().clusterUUIDCommitted()) {logger.info("cluster UUID [{}]", lastAcceptedState.metadata().clusterUUID());}final VotingConfiguration votingConfiguration = lastAcceptedState.getLastCommittedConfiguration();if (singleNodeDiscovery &&votingConfiguration.isEmpty() == false &&votingConfiguration.hasQuorum(Collections.singleton(getLocalNode().getId())) == false) {throw new IllegalStateException("cannot start with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] set to [" +DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] when local node " + getLocalNode() +" does not have quorum in voting configuration " + votingConfiguration);}ClusterState initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK).addGlobalBlock(noMasterBlockService.getNoMasterBlock())).nodes(DiscoveryNodes.builder().add(getLocalNode()).localNodeId(getLocalNode().getId())).build();applierState = initialState;clusterApplier.setInitialState(initialState);}}

4 、选举

调用startInitialJoin开始选举

4.1 becomeCandidate

主要执行操作包含

  • 设置当前mode为CANDIDATE。InitialJoinAccumulator关闭。
  • 设置当前的joinAccumulator为CandidateJoinAccumulator
  • 激活peerFinder(CoordinatorPeerFinder),获取参与选举的所有节点。
  • PreVoteCollector更新PreVoteResponse(currentTerm当前任期,lastAcceptedTerm上次集群状态接受的任期, lastAcceptedVersion上次集群状态接受的版本)

通过SeedHostsResolver解析得到集群中参与选举的节点,使用HandshakingTransportAddressConnector异步连接到远端主节点,同时将TransportAddress与远端节点Peer对应关系放入peersByAddress的map中。

4.1.1 获取参与选举的所有节点

PeerFinder来查找所有可以参与选举的节点。

在找到选举节点后,首先会与对端节点建立连接,然后发送REQUEST_PEERS_ACTION_NAME请求。

CoordinatorPeerFinder#onFoundPeersUpdated在满足条件时开始发起选举。

protected void onFoundPeersUpdated() {synchronized (mutex) {final Iterable<DiscoveryNode> foundPeers = getFoundPeers();if (mode == Mode.CANDIDATE) {final VoteCollection expectedVotes = new VoteCollection();foundPeers.forEach(expectedVotes::addVote);expectedVotes.addVote(Coordinator.this.getLocalNode());final boolean foundQuorum = coordinationState.get().isElectionQuorum(expectedVotes);if (foundQuorum) {if (electionScheduler == null) {startElectionScheduler();}} else {closePrevotingAndElectionScheduler();}}}clusterBootstrapService.onFoundPeersUpdated();}private void startElectionScheduler() {assert electionScheduler == null : electionScheduler;if (getLocalNode().isMasterNode() == false) {return;}final TimeValue gracePeriod = TimeValue.ZERO;electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {@Overridepublic void run() {synchronized (mutex) {if (mode == Mode.CANDIDATE) {final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();if (localNodeMayWinElection(lastAcceptedState) == false) {logger.trace("skip prevoting as local node may not win election: {}",lastAcceptedState.coordinationMetadata());return;}final StatusInfo statusInfo = nodeHealthService.getHealth();if (statusInfo.getStatus() == UNHEALTHY) {logger.debug("skip prevoting as local node is unhealthy: [{}]", statusInfo.getInfo());return;}if (prevotingRound != null) {prevotingRound.close();}prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());}}}@Overridepublic String toString() {return "scheduling of new prevoting round";}});}

4.2 PreVoteCollector

4.2.1 start

创建PreVotingRound, 开始预投票

public Releasable start(final ClusterState clusterState, final Iterable<DiscoveryNode> broadcastNodes) {PreVotingRound preVotingRound = new PreVotingRound(clusterState, state.v2().getCurrentTerm());preVotingRound.start(broadcastNodes);return preVotingRound;}

发送PreVoteRequest到其他节点,收到请求后,更新节点见到的最大任期,返回当前节点的PreVoteResponse。接收到响应后,更新见到的最大任期,并且开始正式选举。时序图为

4.3 startElection

创建StartJoinRequest,sourceNode为本地节点 ,term为见到的任期及当前集群状态任期中的最大值+1,向集群中的其他节点发送START_JOIN_ACTION_NAME请求。对端收到StartJoinRequest处理,发送 JoinRequest,并且返回 StartJoinResponse。详细的流程图为

4.4 JoinAccumulator

不同的角色处理join请求不同,有三种角色+一种初始化角色

4.5 ClusterStateTaskExecutor

处理集群状态任务执行器

​​​​​​​

es中的Coordinator相关推荐

  1. es中修改某个字段值_搜索引擎之laravel中使用elasticsearch(一)

    一.概述 Elasticsearch官方提供了Composer包可直接引用就好,不过要注意:如果你使用的elastcsearch不是最新的那么我建议你指定合适的版本下载,而不要采用最新的,因为不同版本 ...

  2. Elasticsearch和Hive整合,将hive数据同步到ES中

    1 Elasticsearch整合Hive 1.1 软件环境 Hadoop软件环境 Hive软件环境 ES软件环境 1.2 ES-Hadoop介绍 1.2.1 官网 https://www.elast ...

  3. win下配置的ES中的数据在哪里可以看到?三种方式你看那种更加高大上!!!(win_Elasticsearch)

    在上一篇博客<使用logstash将Mysql中的数据导入到ElasticSearch中(详细步骤,win_Elasticsearch)>中我们提到将数据插入到es中,那我怎么知道数据是否 ...

  4. ES中的RollUp概念

    在最新的ES中出现了一个X特性,卷数据(Data Rollup,不知道如何翻译,姑且这么称呼吧).这个卷数据是个什么概念呢? 如果做过监控(monitoring)的都知道,监控实际上是个大数据问题,这 ...

  5. SpringData ES中字段名和索引中的列名字不一致导致的无法查询数据的解决方法

    为什么80%的码农都做不了架构师?>>>    用SpringDataElasticsearch查询数据的时候可以将实体中的字段名与ES中mapping中field的名字起成一样的名 ...

  6. es 全量同步mysql_使用canal将mysql同步到es中

    因为自己项目中需要用到mysql数据同步到es中,查找了相关资料最后决定用canal来做,所以便有了本文,下面一起来看如何使用canal吧 canal教程 根据 https://github.com/ ...

  7. es中的search_type简单理解

    es中只要用到了两种search_type,一种是query_then_fetch(默认),另一种是dfs_query_then_fetch,我们知道计算某个文档的得分时主要使用的是tf/idf公式, ...

  8. es中主分片和副本分片

    我们知道es中保存数据的时候是有主分片和副本分片的,那么副本分片的作用有哪些呢? 1.作为备份,防止主分片崩溃 2.分担查询请求,请求会在主分片和副本分片之间均匀分布 第一点:主副本之前角色的切换如何 ...

  9. logstash增量读取mysql中的数据到es中

    在工作中,需要把mysql中的数据写入到es中进行分析: 官方文档:https://www.elastic.co/guide/en/logstash/6.3/plugins-inputs-jdbc.h ...

最新文章

  1. 浅谈C#中的异步编程
  2. 最早做无糖茶的统一茶里王,是怎样错过年轻人的?
  3. SQL server 数据库危险存储过程删除与恢复
  4. IOS开发学习笔记011-xcode使用技巧
  5. 【软工项目组】第九次会议
  6. c#服务器后台搭建_【环境搭建】Docker简明安装教程
  7. laravel框架学习之路(一)前后台用户认证分离
  8. 机器学习之使用sklearn构造决策树模型
  9. Object强转为实体类类型失败!!!!!!
  10. Inno Setup 操作XML
  11. 使用java映射ipv4,ipv6到阿里DDNS,适用于黑白群晖或其他用途
  12. Skyline软件二次开发初级——8如何在WEB页面中的三维地图上管理信息树
  13. win10系统驱动备份及还原-命令行操作
  14. 细谈Axios中那些不为人知的秘密!一文读懂Axios
  15. 禅修内观 | 一个璀璨的思想成就
  16. 局部敏感哈希(Locality-Sensitive Hashing, LSH)
  17. openGL之API学习(一二七)dFdx、dFdy偏导数
  18. VMWare中CentOS7增加系统盘空间
  19. 电力电子技术(16)——直流斩波电路
  20. [论文翻译]A review on image segmentation techniques

热门文章

  1. 【转】Hive学习路线图
  2. java线程池【转】
  3. 3142:[HNOI2013]数列 - BZOJ
  4. 黑马程序员-JAVA基础-IO流中的装饰设计模式
  5. SQL Server 表分区注意事项(转载)
  6. 初学者自学python要看什么书-学习Python可以看书籍学习吗?老男孩Python入门课程...
  7. python是什么语言开发的-少儿编程有什么好处?儿童编程课程学习Python的4大原因...
  8. python3.5怎么使用-Python3.5中NumPy模块的使用图文教程
  9. python做小程序-抖音最火的整蛊表白小程序如何做出来的?教你用python做出
  10. python中文版-Python中文版