版权声明:本文为博主原创,转载请注明出处!


简介

本文主要解决以下问题:

1、ES启动过程中的Node对象都初始化了那些服务?

构造流程

Step 1、创建一个List暂存初始化失败时需要释放的资源,并使用临时的Logger对象输出开始初始化的日志。

这里首先创建了一个List<Closeable>然后输出日志initializing ...。代码比较简单:

final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
boolean success = false;{// use temp logger just to say we are starting. we can't use it later on because the node name might not be setLogger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings()));logger.info("initializing ...");}
Step 2、强制设置settingsclient.type的配置为node,设置node.name并检查索引data目录的设置。

这部分首先设置client.typenode,接下来调用TribeServiceprocessSettings方法来处理了“部落”的配置,然后创建NodeEnvironment,检查并设置node.name属性,最后按需检查索引数据的Path的配置并打印一些JVM的信息。代码如下:

 Settings tmpSettings = Settings.builder().put(environment.settings()).put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();tmpSettings = TribeService.processSettings(tmpSettings);// create the node environment as soon as possible, to recover the node id and enable loggingtry {nodeEnvironment = new NodeEnvironment(tmpSettings, environment);resourcesToClose.add(nodeEnvironment);} catch (IOException ex) {throw new IllegalStateException("Failed to create node environment", ex);}final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);Logger logger = Loggers.getLogger(Node.class, tmpSettings);final String nodeId = nodeEnvironment.nodeId();tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);if (DiscoveryNode.nodeRequiresLocalStorage(tmpSettings)) {checkForIndexDataInDefaultPathData(tmpSettings, nodeEnvironment, logger);}// this must be captured after the node name is possibly added to the settingsfinal String nodeName = NODE_NAME_SETTING.get(tmpSettings);if (hadPredefinedNodeName == false) {logger.info("node name [{}] derived from node ID [{}]; set [{}] to override", nodeName, nodeId, NODE_NAME_SETTING.getKey());} else {logger.info("node name [{}], node ID [{}]", nodeName, nodeId);}
Step 3、创建PluginsServiceEnvironment实例。

PluginsService的构造方法中会加载pluginsmodules目录下的jar包,并创建相应的pluginmodule实例。创建完以后,Node的构造方法中会调用pluginsServiceupdatedSettings方法来获取pluginmodule中定义的配置项。接下来Node或使用新的settingsnodeId来创建LocalNodeFactory,并使用最新的settings重新创建Environment对象。代码如下:

 this.pluginsService = new PluginsService(tmpSettings, environment.modulesFile(), environment.pluginsFile(), classpathPlugins);this.settings = pluginsService.updatedSettings();localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());// create the environment based on the finalized (processed) view of the settings// this is just to makes sure that people get the same settings, no matter where they ask them fromthis.environment = new Environment(this.settings);Environment.assertEquivalent(environment, this.environment);
Step 4、创建ThreadPoolThreadContext实例。

首先,通过pluginsService获取pluginmodule中提供的ExecutorBuilder对象列表。接下来基于settings及获取的ExecutorBuilder对象列表创建ThreadPoolThreadContext实例。代码如下:

 final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));// adds the context to the DeprecationLogger so that it does not need to be injected everywhereDeprecationLogger.setThreadContext(threadPool.getThreadContext());resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
Step 5、依次创建NodeClientResourceWatcherServiceScriptModuleAnalysisModuleSettingsModuleNetworkServiceClusterServiceIngestServiceClusterInfoService等主要模块。

ScriptModule中持有ScriptService通过该服务可以获取到ES中配置的各类脚本引擎的实例。AnalysisModule中持有AnalysisRegistry对象,通过该对象可以获取到ES中配置的各类查询分析器的实例。SettingModule中按类型保存了ES中可以解析的配置对象。NetworkService主要用来解析网络地址,ClusterService用例维护集群的信息。代码如下:

 final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());for (final ExecutorBuilder<?> builder : threadPool.builders()) {additionalSettings.addAll(builder.getRegisteredSettings());}client = new NodeClient(settings, threadPool);final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);final ScriptModule scriptModule = ScriptModule.create(settings, this.environment, resourceWatcherService,pluginsService.filterPlugins(ScriptPlugin.class));AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));additionalSettings.addAll(scriptModule.getSettings());// this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool// so we might be late here alreadyfinal SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter);scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());resourcesToClose.add(resourceWatcherService);final NetworkService networkService = new NetworkService(settings,getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,localNodeFactory::getNode);clusterService.addStateApplier(scriptModule.getScriptService());resourcesToClose.add(clusterService);final IngestService ingestService = new IngestService(clusterService.getClusterSettings(), settings, threadPool, this.environment,scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
Step 6、创建ModulesBuilder并加入各种Module

ES使用google开源的Guice管理程序中的依赖。加入ModulesBuilder中的Module有:通过PluginsService获取的插件提供的ModuleNodeModule内部持有MonitorServiceClusterModule内部持有ClusterService及相关的ClusterPluginIndicesModule内部持有MapperPluginSearchModule内部持有相关的SearchPluginActionModule内部持有ThreadPoolActionPluginNodeClientCircuitBreakerServiceGatewayModuleRepositoriesModule内部持有RepositoryPluginSttingsModule内部ES可用的各类配置对象等;最好调用modulescreateInjector方法创建应用的“依赖注入器”。

Step 7、收集各pluginLifecycleComponent对象,并出初始化NodeClient

代码如下:

 List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream().filter(p -> p instanceof LifecycleComponent).map(p -> (LifecycleComponent) p).collect(Collectors.toList());pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream().map(injector::getInstance).collect(Collectors.toList()));resourcesToClose.addAll(pluginLifecycleComponents);this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),() -> clusterService.localNode().getId());if (NetworkModule.HTTP_ENABLED.get(settings)) {logger.debug("initializing HTTP handlers ...");actionModule.initRestHandlers(() -> clusterService.state().nodes());}logger.info("initialized");
Step 8、调用NodeStart方法,在该方法内依次调用各重要模块的start方法。

依次启动各个关键服务。代码如下:

 // hack around dependency injection problem (for now...)injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class));pluginLifecycleComponents.forEach(LifecycleComponent::start);injector.getInstance(MappingUpdatedAction.class).setClient(client);injector.getInstance(IndicesService.class).start();injector.getInstance(IndicesClusterStateService.class).start();injector.getInstance(IndicesTTLService.class).start();injector.getInstance(SnapshotsService.class).start();injector.getInstance(SnapshotShardsService.class).start();injector.getInstance(RoutingService.class).start();injector.getInstance(SearchService.class).start();injector.getInstance(MonitorService.class).start();final ClusterService clusterService = injector.getInstance(ClusterService.class);final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);nodeConnectionsService.start();clusterService.setNodeConnectionsService(nodeConnectionsService);// TODO hack around circular dependencies problemsinjector.getInstance(GatewayAllocator.class).setReallocation(clusterService, injector.getInstance(RoutingService.class));injector.getInstance(ResourceWatcherService.class).start();injector.getInstance(GatewayService.class).start();Discovery discovery = injector.getInstance(Discovery.class);clusterService.setDiscoverySettings(discovery.getDiscoverySettings());clusterService.addInitialStateBlock(discovery.getDiscoverySettings().getNoMasterBlock());clusterService.setClusterStatePublisher(discovery::publish);// start before the cluster service since it adds/removes initial Cluster state blocksfinal TribeService tribeService = injector.getInstance(TribeService.class);tribeService.start();// Start the transport service now so the publish address will be added to the local disco node in ClusterServiceTransportService transportService = injector.getInstance(TransportService.class);transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));transportService.start();validateNodeBeforeAcceptingRequests(settings, transportService.boundAddress(), pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));clusterService.addStateApplier(transportService.getTaskManager());clusterService.start();assert localNodeFactory.getNode() != null;assert transportService.getLocalNode().equals(localNodeFactory.getNode()): "transportService has a different local node than the factory provided";assert clusterService.localNode().equals(localNodeFactory.getNode()): "clusterService has a different local node than the factory provided";// start after cluster service so the local disco is knowndiscovery.start();transportService.acceptIncomingRequests();discovery.startInitialJoin();// tribe nodes don't have a master so we shouldn't register an observer  sfinal TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);if (initialStateTimeout.millis() > 0) {final ThreadPool thread = injector.getInstance(ThreadPool.class);ClusterState clusterState = clusterService.state();ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());if (clusterState.nodes().getMasterNodeId() == null) {logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);final CountDownLatch latch = new CountDownLatch(1);observer.waitForNextChange(new ClusterStateObserver.Listener() {@Overridepublic void onNewClusterState(ClusterState state) { latch.countDown(); }@Overridepublic void onClusterServiceClose() {latch.countDown();}@Overridepublic void onTimeout(TimeValue timeout) {logger.warn("timed out while waiting for initial discovery state - timeout: {}",initialStateTimeout);latch.countDown();}}, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);try {latch.await();} catch (InterruptedException e) {throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");}}}if (NetworkModule.HTTP_ENABLED.get(settings)) {injector.getInstance(HttpServerTransport.class).start();}if (WRITE_PORTS_FILE_SETTING.get(settings)) {if (NetworkModule.HTTP_ENABLED.get(settings)) {HttpServerTransport http = injector.getInstance(HttpServerTransport.class);writePortsFile("http", http.boundAddress());}TransportService transport = injector.getInstance(TransportService.class);writePortsFile("transport", transport.boundAddress());}// start nodes now, after the http server, because it may take some timetribeService.startNodes();logger.info("started");

【Elasticsearch 5.6.12 源码】——【3】启动过程分析(下)...相关推荐

  1. 渣渣菜鸡的 ElasticSearch 源码解析 —— 启动流程(上)

    关注我 转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/08/11/es-code02/ 前提 上篇文章写了 ElasticSearch 源码解析 -- ...

  2. springboot集成mybatis源码分析-启动加载mybatis过程(二)

    springboot集成mybatis源码分析-启动加载mybatis过程(二) 1.springboot项目最核心的就是自动加载配置,该功能则依赖的是一个注解@SpringBootApplicati ...

  3. 嵌入式之uboot源码分析-启动第二阶段学习笔记(下篇)

    接上部分---->嵌入式之uboot源码分析-启动第二阶段学习笔记(上篇) 注:如下内容来自朱老师物联网大讲堂uboot课件 3.2.14 CFG_NO_FLASH (1)虽然NandFlash ...

  4. android 12 源码编译与虚拟机调试

    android 12 源码编译与虚拟机调试 android 12编译环境搭建 安装android 12环境依赖 android 12源码下载 下载repo工具 repo工具下载 国内repo工具修改 ...

  5. APISIX源码解析-启动篇【ops.lua - start】

    APISIX源码解析-启动篇[ops.lua - start] 命令开始入口 ops.execute(env, arg) 命令参数校验 目前仅支持:help.version.init.init_etc ...

  6. golang源码分析-启动过程概述

    golang源码分析-启动过程概述 golang语言作为根据CSP模型实现的一种强类型的语言,本文主要就是通过简单的实例来分析一下golang语言的启动流程,为深入了解与学习做铺垫. golang代码 ...

  7. sofa源码学习----启动获取ServerConfig流程

    蚂蚁金服sofa rpc框架.公司想使用它作为架构的一部分,所以记录学习笔记. 1.从github下载源代码,版本为 5.6.0-SNAPSHOT,整个项目结构如下: 2.为了尽可能地只关注sofa ...

  8. Android 12源码编译报错:FAILED: out/soong/build.ninja

    Android 12源码编译报错:FAILED: out/soong/build.ninja android12源码编译中报如下错误: 网上查了比较多资料发现是swap分区不够导致的,报错时使用fre ...

  9. 1对1直播源码刚启动时出现空白的解决方案

    做1对1直播源码启动的时候发现,点击icon之后会出现一个短暂的白屏的界面,再进入第一个activity,后来查了资料才发现,是因为Them的原因,默认背景是白色的所以解决方案就是给activiy加个 ...

最新文章

  1. 如何给iOS 分类添加 属性
  2. android 周报,MAndroid 周报第八期
  3. Could not close the output stream for file hdfs://192.168.190.129:9000/BJ_4.c
  4. python3.5 安装PyCrypto
  5. 【人工智能】全网首发!2020年AI、CV、NLP等最全国际会议、顶会时间汇总!!
  6. linux异常级别,linux性能异常定位之进程级别
  7. [解决方案]ios用fd抓包进app无网络
  8. 维护和维修涉密计算机网络 必须严格采取,安全保密管理员主要负责涉密网络的日常安全保密管理工作,包括()。A.涉密网络的日常运行维护工 - 普法考试题库问答...
  9. 算法刻意练习-LeetCode实战29-加油站(C++)
  10. 【C#】基于System.Speech库实现语音合成与语音识别
  11. python批量下载txt图片批量导入到ppt
  12. java集合框架的练习之斗地主洗牌发牌的模拟(升级版)
  13. Cloudflare发布全球最快的DNS
  14. Quantopian 入门系列一
  15. unexpected EOF while looking for matching ``'
  16. 职高学生计算机学情分析,高职学生学情分析
  17. NUCLEO-F767ZI以太网功能实现笔记本电脑不开盖开机
  18. ESP8266 Nodemcu 开发板 + Blinker 电脑远程开机支持小爱同学和手机APP
  19. 鸿蒙系统操作界面跟苹果很像,鸿蒙界面提前“泄密”,安卓和苹果的结合体,但内核已经换了!...
  20. 抓紧时间,先考张CCIE-EI证书

热门文章

  1. FFmpeg中可执行文件ffmpeg用法汇总
  2. FFmpeg中RTSP客户端拉流测试代码
  3. 激活函数之softmax介绍及C++实现
  4. RANSAC鲁棒参数估计
  5. 【FFmpeg】ffmpeg工具源码分析(二):转码核心函数 transcode
  6. c++ssh连接_一步步使SSH连接您的github仓库
  7. c语言10000以内的质数,for语句计算输出10000以内最大素数怎么搞最简单??各位大神们...
  8. 怎么用python画个电脑_python语言还是java如何用python画爱心
  9. bpython ipython_安装ipython后命令找不到ipython bpython -bash: *python: command not found
  10. wxpython有没有可视化设计_wxPython - GUI Builder工具( GUI Builder Tools)