一、[源码剖析之DataNode启动流程] :DataNode 启动流程
### --- datanode的Main Class是DataNode,先找到DataNode.main()public class DataNode extends ReconfigurableBaseimplements InterDatanodeProtocol, ClientDatanodeProtocol,TraceAdminProtocol, DataNodeMXBean, ReconfigurationProtocol {public static final Logger LOG = LoggerFactory.getLogger(DataNode.class);static{HdfsConfiguration.init();}public static void main(String args[]) {if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {System.exit(0);}secureMain(args, null);}
...public static void secureMain(String args[], SecureResources resources) {int errorCode = 0;try {// 打印启动信息StringUtils.startupShutdownMessage(DataNode.class, args, LOG);// 完成创建datanode的主要工作DataNode datanode = createDataNode(args, null, resources);if (datanode != null) {datanode.join();} else {errorCode = 1;}} catch (Throwable e) {LOG.fatal("Exception in secureMain", e);terminate(1, e);} finally {LOG.warn("Exiting Datanode");terminate(errorCode);}}
---------------------------------------------------public static DataNode createDataNode(String args[], Configuration conf,SecureResources resources) throws IOException {// 完成大部分初始化的工作,并启动部分工作线程DataNode dn = instantiateDataNode(args, conf, resources);if (dn != null) {// 启动剩余工作线程dn.runDatanodeDaemon();}return dn;}
--------------------------------------------------/** Start a single datanode daemon and wait for it to finish.* If this thread is specifically interrupted, it will stop waiting.*/public void runDatanodeDaemon() throws IOException {// 在DataNode.instantiateDataNode()执行过程中会调用该方法(见后)blockPoolManager.startAll();dataXceiverServer.start();if (localDataXceiverServer != null) {localDataXceiverServer.start();}ipcServer.start();startPlugins(conf);}
--------------------------------------------------------public static DataNode instantiateDataNode(String args [], Configurationconf,SecureResources resources) throws IOException {if (conf == null)conf = new HdfsConfiguration();
...     // 参数检查等Collection<StorageLocation> dataLocations = getStorageLocations(conf);UserGroupInformation.setConfiguration(conf);SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,DFS_DATANODE_KERBEROS_PRINCIPAL_KEY);return makeInstance(dataLocations, conf, resources);}
--------------------------------------------------------------------//DataNode.makeInstance()开始创建DataNodestatic DataNode makeInstance(Collection<StorageLocation> dataDirs,Configuration conf, SecureResources resources) throws IOException {
...     // 检查数据目录的权限assert locations.size() > 0 : "number of data directories should be > 0";return new DataNode(conf, locations, resources);}
...DataNode(final Configuration conf,final List<StorageLocation> dataDirs,final SecureResources resources) throws IOException {super(conf);
...     // 参数设置try {hostName = getHostName(conf);LOG.info("Configured hostname is " + hostName);startDataNode(conf, dataDirs, resources);} catch (IOException ie) {shutdown();throw ie;}}
...void startDataNode(Configuration conf,List<StorageLocation> dataDirs,SecureResources resources) throws IOException {
...// 参数设置// 初始化DataStoragestorage = new DataStorage();// global DN settings// 注册JMXregisterMXBean();// 初始化DataXceiver(流式通信),DataNode runDatanodeDaemon()中启动initDataXceiver(conf);// 启动InfoServer(Web UI)startInfoServer(conf);// 启动JVMPauseMonitor(反向监控JVM情况,可通过JMX查询)pauseMonitor = new JvmPauseMonitor(conf);pauseMonitor.start();
...     // 略// 初始化IpcServer(RPC通信),DataNode-runDatanodeDaemon()中启动initIpcServer(conf);metrics = DataNodeMetrics.create(conf, getDisplayName());metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);// 按照namespace(nameservice)、namenode的结构进行初始化blockPoolManager = new BlockPoolManager(this);blockPoolManager.refreshNamenodes(conf);
...     // 略}//BlockPoolManager抽象了datanode提供的数据块存储服务。BlockPoolManager按照namespace(nameservice)、namenode结构组织。//BlockPoolManager-refreshNamenodes()//除了初始化过程主动调用,还可以由namespace通过datanode心跳过程下达刷新命令void refreshNamenodes(Configuration conf)throws IOException {LOG.info("Refresh request received for nameservices: " + conf.get(DFSConfigKeys.DFS_NAMESERVICES));Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil.getNNServiceRpcAddressesForCluster(conf);synchronized (refreshNamenodesLock) {doRefreshNamenodes(newAddressMap);}}
-------------------------------------------------------private void doRefreshNamenodes(Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {assert Thread.holdsLock(refreshNamenodesLock);Set<String> toRefresh = Sets.newLinkedHashSet();Set<String> toAdd = Sets.newLinkedHashSet();Set<String> toRemove;synchronized (this) {// Step 1. For each of the new nameservices, figure out whether// it's an update of the set of NNs for an existing NS,// or an entirely new nameservice.for (String nameserviceId : addrMap.keySet()) {if (bpByNameserviceId.containsKey(nameserviceId)) {toRefresh.add(nameserviceId);} else {toAdd.add(nameserviceId);}}
...     // 略// Step 2. Start new nameservicesif (!toAdd.isEmpty()) {LOG.info("Starting BPOfferServices for nameservices: " +Joiner.on(",").useForNull("<default>").join(toAdd));for (String nsToAdd : toAdd) {ArrayList<InetSocketAddress> addrs =Lists.newArrayList(addrMap.get(nsToAdd).values());// 为每个namespace创建对应的BPOfferServiceBPOfferService bpos = createBPOS(addrs);bpByNameserviceId.put(nsToAdd, bpos);offerServices.add(bpos);}}// 然后通过startAll启动所有BPOfferServicestartAll();}
...         // 略}
------------------------------------------------protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {return new BPOfferService(nnAddrs, dn);}BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {Preconditions.checkArgument(!nnAddrs.isEmpty(),"Must pass at least one NN.");this.dn = dn;for (InetSocketAddress addr : nnAddrs) {this.bpServices.add(new BPServiceActor(addr, this));}}
--------------------------------------------//BlockPoolManager#startAll()启动所有BPOfferService(实际是启动所有BPServiceActor)。synchronized void startAll() throws IOException {try {UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Object>() {@Overridepublic Object run() throws Exception {for (BPOfferService bpos : offerServices) {bpos.start();}return null;}});} catch (InterruptedException ex) {IOException ioe = new IOException();ioe.initCause(ex.getCause());throw ioe;}}
-------------------------------------------------------//在datanode启动的主流程中,启动了多种工作线程,包括InfoServer、JVMPauseMonitor、BPServiceActor等。其中,最重要的是BPServiceActor线程,真正代表datanode与namenode通信的正是BPServiceActor线程。//DataNode--initBlockPool():/*** One of the Block Pools has successfully connected to its NN.* This initializes the local storage for that block pool,* checks consistency of the NN's cluster ID, etc.** If this is the first block pool to register, this also initializes* the datanode-scoped storage.** @param bpos Block pool offer service* @throws IOException if the NN is inconsistent with the local storage.*/void initBlockPool(BPOfferService bpos) throws IOException {
...// 略// 将blockpool注册到BlockPoolManagerblockPoolManager.addBlockPool(bpos);// 初步初始化存储结构initStorage(nsInfo);
...     // 检查磁盘损坏// 启动扫描器initPeriodicScanners(conf);// 将blockpool添加到FsDatasetIpml,并继续初始化存储结构data.addBlockPool(nsInfo.getBlockPoolID(), conf);}

CC00055.hadoop——|HadoopMapReduce.V27|——|Hadoop.v27|源码剖析|DataNode启动流程|相关推荐

  1. bluetoothd源码剖析(一)启动流程

    蓝牙系列: bluez调试笔记_weixin_41069709的博客-CSDN博客_bluezbluez移植https://blog.csdn.net/weixin_41069709/article/ ...

  2. Nginx源码分析:启动流程

    nginx源码分析 nginx-1.11.1 参考书籍<深入理解nginx模块开发与架构解析> nginx简介 Nginx的作为服务端软件,表现的主要特点是更快.高扩展.高可靠性.低内存消 ...

  3. Kubelet源码分析(一):启动流程分析

    源码版本 kubernetes version: v1.3.0 简介 在Kubernetes急群众,在每个Node节点上都会启动一个kubelet服务进程.该进程用于处理Master节点下发到本节点的 ...

  4. 带你从源码了解SpringBoot启动流程

    从哪入手? 相信很多人尝试读过Spring Boot的源码,但是始终没有找到合适的方法.那是因为你对Spring Boot的各个组件.机制不是很了解,研究起来就像大海捞针. 至于从哪入手不是很简单的问 ...

  5. zygoteinit.java_源码跟踪之启动流程:从ZygoteInit到onCreate

    Instrumentation SDK版本名称: Pie API Level: 28 一.源码调用时序图 1. Activity的启动流程 说明:其中ActivityThread中执行的schedul ...

  6. 以太坊Go-ethereum源码分析之启动流程

    以太坊源码编译需要gov1.7以上,及C编译器,执行make geth 即可编译项目,编译后可执行的geth文件. Makefile文件: geth:build/env.sh go run build ...

  7. Android进阶——Small源码分析之启动流程详解

    前言 插件化现在已经是Android工程师必备的技能之一,只是学会怎么使用是不行的,所以蹭有时间研究一下Small的源码.对于插件化主要解决的问题是四大组件的加载和资源的加载,读懂所有Small源码需 ...

  8. 【Flink源码】JobManager启动流程

    写在前面 在 [Flink源码]再谈 Flink 程序提交流程(中) 一文中,笔者后来发现谬误颇多,且随着 Flink 版本的更迭,部分方法实现方式已发生较大改变.因此,思虑再三决定针对 JobMan ...

  9. Flask1.1.4 Werkzeug1.0.1 源码分析:启动流程

    基于QuickStart中的一个demo来分析 from flask import Flaskapp = Flask(__name__)@app.route("/") def he ...

最新文章

  1. coverage代码覆盖率测试工具:基本原理分析与使用
  2. 无人驾驶 | 为什么双目自动驾驶系统难以普及?
  3. linux shell中小数的运算
  4. java编程思想第四版第十章总结
  5. 时时监控的rtsp流视频显示在前端与一些css;
  6. 洛谷P1006 传纸条(多维DP)
  7. 地图与输出之间的基准面发生冲突_【地图技巧】荒野乱斗: 全球锦标赛全地图英雄推荐...
  8. 诺德尔-2011-2003-V1新版 ghost安装版
  9. 如何使用jQuery打开Bootstrap模式窗口?
  10. Ubuntu系统各个版本的镜像下载地址
  11. OCP考点实战演练02-日常维护篇
  12. Typora 上传图片的自定义命令实现
  13. FBWF和EWF的对比
  14. 天涯明月刀如何修改登录服务器,天涯明月刀手游体验服和正式服怎么切换方法介绍...
  15. Java面向对象基础练习题(含答案超详细)
  16. python读取raw数据文件_pythonrawkit如何从原始文件读取元数据值?
  17. LocalDate 获取英文月份
  18. 什么是5G SAR测试,FCC/CE中5G Sub-6GHz与5G 毫米波测试,5G毫米波测试
  19. 保险业首季度保费收入猛增五成
  20. MMaction2中自定义AVA数据集(在window上实现)

热门文章

  1. 如何搭建自己的博客网站(手把手教你搭建免费个人博客网站)
  2. MFRC53101TOFE小知识
  3. ViKey加密狗对Word文档进行加密
  4. 问世到现在电子计算机的性能,一级计算机练习.doc
  5. 算法学习过程入门篇(2)-算法初步
  6. H5网页链接APP浏览器跳转小程序-邪少外链
  7. PS For Mac 内含破解文件下载地址
  8. 调用ins api获取个人照片信息
  9. (亲测解决)Tomcat启动时卡在“ Deploying web application directory ”很久的解决方法
  10. CAD命令栏窗口跑到屏幕外面怎么找回来