es中的Coordinator
1、概述
Coordator作为Discovery的实现类,在集群选举中负责master的选举。
2、创建
Coordinator是通过DiscoveryModule创建的
discovery = new Coordinator(NODE_NAME_SETTING.get(settings),settings, clusterSettings,transportService, namedWriteableRegistry, allocationService, masterService, gatewayMetaState::getPersistedState,seedHostsProvider, clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService,electionStrategy, nodeHealthService);
其中seedHostProvider为
final SeedHostsProvider seedHostsProvider = hostsResolver -> {final List<TransportAddress> addresses = new ArrayList<>();for (SeedHostsProvider provider : filteredSeedProviders) {addresses.addAll(provider.getSeedAddresses(hostsResolver));}return Collections.unmodifiableList(addresses);};
clusterApplier为ClusterService的ClusterApplierService
persistedStateSupplier为GatewayMetaState#getPersistedState
3、启动
调用doStart方法
创建CoordinationState,同时peerFinder设置currentTerm。
在单结点发现模式下,检查votingConfiguration是否满足quorm。
创建ClusterState,设置applierState,设置clusterApplier的初始状态。
protected void doStart() {synchronized (mutex) {CoordinationState.PersistedState persistedState = persistedStateSupplier.get();coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy));peerFinder.setCurrentTerm(getCurrentTerm());configuredHostsResolver.start();final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();if (lastAcceptedState.metadata().clusterUUIDCommitted()) {logger.info("cluster UUID [{}]", lastAcceptedState.metadata().clusterUUID());}final VotingConfiguration votingConfiguration = lastAcceptedState.getLastCommittedConfiguration();if (singleNodeDiscovery &&votingConfiguration.isEmpty() == false &&votingConfiguration.hasQuorum(Collections.singleton(getLocalNode().getId())) == false) {throw new IllegalStateException("cannot start with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] set to [" +DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] when local node " + getLocalNode() +" does not have quorum in voting configuration " + votingConfiguration);}ClusterState initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK).addGlobalBlock(noMasterBlockService.getNoMasterBlock())).nodes(DiscoveryNodes.builder().add(getLocalNode()).localNodeId(getLocalNode().getId())).build();applierState = initialState;clusterApplier.setInitialState(initialState);}}
4 、选举
调用startInitialJoin开始选举
4.1 becomeCandidate
主要执行操作包含
- 设置当前mode为CANDIDATE。InitialJoinAccumulator关闭。
- 设置当前的joinAccumulator为CandidateJoinAccumulator
- 激活peerFinder(CoordinatorPeerFinder),获取参与选举的所有节点。
- PreVoteCollector更新PreVoteResponse(currentTerm当前任期,lastAcceptedTerm上次集群状态接受的任期, lastAcceptedVersion上次集群状态接受的版本)
通过SeedHostsResolver解析得到集群中参与选举的节点,使用HandshakingTransportAddressConnector异步连接到远端主节点,同时将TransportAddress与远端节点Peer对应关系放入peersByAddress的map中。
4.1.1 获取参与选举的所有节点
PeerFinder来查找所有可以参与选举的节点。
在找到选举节点后,首先会与对端节点建立连接,然后发送REQUEST_PEERS_ACTION_NAME请求。
CoordinatorPeerFinder#onFoundPeersUpdated在满足条件时开始发起选举。
protected void onFoundPeersUpdated() {synchronized (mutex) {final Iterable<DiscoveryNode> foundPeers = getFoundPeers();if (mode == Mode.CANDIDATE) {final VoteCollection expectedVotes = new VoteCollection();foundPeers.forEach(expectedVotes::addVote);expectedVotes.addVote(Coordinator.this.getLocalNode());final boolean foundQuorum = coordinationState.get().isElectionQuorum(expectedVotes);if (foundQuorum) {if (electionScheduler == null) {startElectionScheduler();}} else {closePrevotingAndElectionScheduler();}}}clusterBootstrapService.onFoundPeersUpdated();}private void startElectionScheduler() {assert electionScheduler == null : electionScheduler;if (getLocalNode().isMasterNode() == false) {return;}final TimeValue gracePeriod = TimeValue.ZERO;electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {@Overridepublic void run() {synchronized (mutex) {if (mode == Mode.CANDIDATE) {final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();if (localNodeMayWinElection(lastAcceptedState) == false) {logger.trace("skip prevoting as local node may not win election: {}",lastAcceptedState.coordinationMetadata());return;}final StatusInfo statusInfo = nodeHealthService.getHealth();if (statusInfo.getStatus() == UNHEALTHY) {logger.debug("skip prevoting as local node is unhealthy: [{}]", statusInfo.getInfo());return;}if (prevotingRound != null) {prevotingRound.close();}prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());}}}@Overridepublic String toString() {return "scheduling of new prevoting round";}});}
4.2 PreVoteCollector
4.2.1 start
创建PreVotingRound, 开始预投票
public Releasable start(final ClusterState clusterState, final Iterable<DiscoveryNode> broadcastNodes) {PreVotingRound preVotingRound = new PreVotingRound(clusterState, state.v2().getCurrentTerm());preVotingRound.start(broadcastNodes);return preVotingRound;}
发送PreVoteRequest到其他节点,收到请求后,更新节点见到的最大任期,返回当前节点的PreVoteResponse。接收到响应后,更新见到的最大任期,并且开始正式选举。时序图为
4.3 startElection
创建StartJoinRequest,sourceNode为本地节点 ,term为见到的任期及当前集群状态任期中的最大值+1,向集群中的其他节点发送START_JOIN_ACTION_NAME请求。对端收到StartJoinRequest处理,发送 JoinRequest,并且返回 StartJoinResponse。详细的流程图为
4.4 JoinAccumulator
不同的角色处理join请求不同,有三种角色+一种初始化角色
4.5 ClusterStateTaskExecutor
处理集群状态任务执行器
es中的Coordinator相关推荐
- es中修改某个字段值_搜索引擎之laravel中使用elasticsearch(一)
一.概述 Elasticsearch官方提供了Composer包可直接引用就好,不过要注意:如果你使用的elastcsearch不是最新的那么我建议你指定合适的版本下载,而不要采用最新的,因为不同版本 ...
- Elasticsearch和Hive整合,将hive数据同步到ES中
1 Elasticsearch整合Hive 1.1 软件环境 Hadoop软件环境 Hive软件环境 ES软件环境 1.2 ES-Hadoop介绍 1.2.1 官网 https://www.elast ...
- win下配置的ES中的数据在哪里可以看到?三种方式你看那种更加高大上!!!(win_Elasticsearch)
在上一篇博客<使用logstash将Mysql中的数据导入到ElasticSearch中(详细步骤,win_Elasticsearch)>中我们提到将数据插入到es中,那我怎么知道数据是否 ...
- ES中的RollUp概念
在最新的ES中出现了一个X特性,卷数据(Data Rollup,不知道如何翻译,姑且这么称呼吧).这个卷数据是个什么概念呢? 如果做过监控(monitoring)的都知道,监控实际上是个大数据问题,这 ...
- SpringData ES中字段名和索引中的列名字不一致导致的无法查询数据的解决方法
为什么80%的码农都做不了架构师?>>> 用SpringDataElasticsearch查询数据的时候可以将实体中的字段名与ES中mapping中field的名字起成一样的名 ...
- es 全量同步mysql_使用canal将mysql同步到es中
因为自己项目中需要用到mysql数据同步到es中,查找了相关资料最后决定用canal来做,所以便有了本文,下面一起来看如何使用canal吧 canal教程 根据 https://github.com/ ...
- es中的search_type简单理解
es中只要用到了两种search_type,一种是query_then_fetch(默认),另一种是dfs_query_then_fetch,我们知道计算某个文档的得分时主要使用的是tf/idf公式, ...
- es中主分片和副本分片
我们知道es中保存数据的时候是有主分片和副本分片的,那么副本分片的作用有哪些呢? 1.作为备份,防止主分片崩溃 2.分担查询请求,请求会在主分片和副本分片之间均匀分布 第一点:主副本之前角色的切换如何 ...
- logstash增量读取mysql中的数据到es中
在工作中,需要把mysql中的数据写入到es中进行分析: 官方文档:https://www.elastic.co/guide/en/logstash/6.3/plugins-inputs-jdbc.h ...
最新文章
- 浅谈C#中的异步编程
- 最早做无糖茶的统一茶里王,是怎样错过年轻人的?
- SQL server 数据库危险存储过程删除与恢复
- IOS开发学习笔记011-xcode使用技巧
- 【软工项目组】第九次会议
- c#服务器后台搭建_【环境搭建】Docker简明安装教程
- laravel框架学习之路(一)前后台用户认证分离
- 机器学习之使用sklearn构造决策树模型
- Object强转为实体类类型失败!!!!!!
- Inno Setup 操作XML
- 使用java映射ipv4,ipv6到阿里DDNS,适用于黑白群晖或其他用途
- Skyline软件二次开发初级——8如何在WEB页面中的三维地图上管理信息树
- win10系统驱动备份及还原-命令行操作
- 细谈Axios中那些不为人知的秘密!一文读懂Axios
- 禅修内观 | 一个璀璨的思想成就
- 局部敏感哈希(Locality-Sensitive Hashing, LSH)
- openGL之API学习(一二七)dFdx、dFdy偏导数
- VMWare中CentOS7增加系统盘空间
- 电力电子技术(16)——直流斩波电路
- [论文翻译]A review on image segmentation techniques
热门文章
- 【转】Hive学习路线图
- java线程池【转】
- 3142:[HNOI2013]数列 - BZOJ
- 黑马程序员-JAVA基础-IO流中的装饰设计模式
- SQL Server 表分区注意事项(转载)
- 初学者自学python要看什么书-学习Python可以看书籍学习吗?老男孩Python入门课程...
- python是什么语言开发的-少儿编程有什么好处?儿童编程课程学习Python的4大原因...
- python3.5怎么使用-Python3.5中NumPy模块的使用图文教程
- python做小程序-抖音最火的整蛊表白小程序如何做出来的?教你用python做出
- python中文版-Python中文版