CC00055.hadoop——|HadoopMapReduce.V27|——|Hadoop.v27|源码剖析|DataNode启动流程|
### --- datanode的Main Class是DataNode,先找到DataNode.main()public class DataNode extends ReconfigurableBaseimplements InterDatanodeProtocol, ClientDatanodeProtocol,TraceAdminProtocol, DataNodeMXBean, ReconfigurationProtocol {public static final Logger LOG = LoggerFactory.getLogger(DataNode.class);static{HdfsConfiguration.init();}public static void main(String args[]) {if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {System.exit(0);}secureMain(args, null);}
...public static void secureMain(String args[], SecureResources resources) {int errorCode = 0;try {// 打印启动信息StringUtils.startupShutdownMessage(DataNode.class, args, LOG);// 完成创建datanode的主要工作DataNode datanode = createDataNode(args, null, resources);if (datanode != null) {datanode.join();} else {errorCode = 1;}} catch (Throwable e) {LOG.fatal("Exception in secureMain", e);terminate(1, e);} finally {LOG.warn("Exiting Datanode");terminate(errorCode);}}
---------------------------------------------------public static DataNode createDataNode(String args[], Configuration conf,SecureResources resources) throws IOException {// 完成大部分初始化的工作,并启动部分工作线程DataNode dn = instantiateDataNode(args, conf, resources);if (dn != null) {// 启动剩余工作线程dn.runDatanodeDaemon();}return dn;}
--------------------------------------------------/** Start a single datanode daemon and wait for it to finish.* If this thread is specifically interrupted, it will stop waiting.*/public void runDatanodeDaemon() throws IOException {// 在DataNode.instantiateDataNode()执行过程中会调用该方法(见后)blockPoolManager.startAll();dataXceiverServer.start();if (localDataXceiverServer != null) {localDataXceiverServer.start();}ipcServer.start();startPlugins(conf);}
--------------------------------------------------------public static DataNode instantiateDataNode(String args [], Configurationconf,SecureResources resources) throws IOException {if (conf == null)conf = new HdfsConfiguration();
... // 参数检查等Collection<StorageLocation> dataLocations = getStorageLocations(conf);UserGroupInformation.setConfiguration(conf);SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,DFS_DATANODE_KERBEROS_PRINCIPAL_KEY);return makeInstance(dataLocations, conf, resources);}
--------------------------------------------------------------------//DataNode.makeInstance()开始创建DataNodestatic DataNode makeInstance(Collection<StorageLocation> dataDirs,Configuration conf, SecureResources resources) throws IOException {
... // 检查数据目录的权限assert locations.size() > 0 : "number of data directories should be > 0";return new DataNode(conf, locations, resources);}
...DataNode(final Configuration conf,final List<StorageLocation> dataDirs,final SecureResources resources) throws IOException {super(conf);
... // 参数设置try {hostName = getHostName(conf);LOG.info("Configured hostname is " + hostName);startDataNode(conf, dataDirs, resources);} catch (IOException ie) {shutdown();throw ie;}}
...void startDataNode(Configuration conf,List<StorageLocation> dataDirs,SecureResources resources) throws IOException {
...// 参数设置// 初始化DataStoragestorage = new DataStorage();// global DN settings// 注册JMXregisterMXBean();// 初始化DataXceiver(流式通信),DataNode runDatanodeDaemon()中启动initDataXceiver(conf);// 启动InfoServer(Web UI)startInfoServer(conf);// 启动JVMPauseMonitor(反向监控JVM情况,可通过JMX查询)pauseMonitor = new JvmPauseMonitor(conf);pauseMonitor.start();
... // 略// 初始化IpcServer(RPC通信),DataNode-runDatanodeDaemon()中启动initIpcServer(conf);metrics = DataNodeMetrics.create(conf, getDisplayName());metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);// 按照namespace(nameservice)、namenode的结构进行初始化blockPoolManager = new BlockPoolManager(this);blockPoolManager.refreshNamenodes(conf);
... // 略}//BlockPoolManager抽象了datanode提供的数据块存储服务。BlockPoolManager按照namespace(nameservice)、namenode结构组织。//BlockPoolManager-refreshNamenodes()//除了初始化过程主动调用,还可以由namespace通过datanode心跳过程下达刷新命令void refreshNamenodes(Configuration conf)throws IOException {LOG.info("Refresh request received for nameservices: " + conf.get(DFSConfigKeys.DFS_NAMESERVICES));Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil.getNNServiceRpcAddressesForCluster(conf);synchronized (refreshNamenodesLock) {doRefreshNamenodes(newAddressMap);}}
-------------------------------------------------------private void doRefreshNamenodes(Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {assert Thread.holdsLock(refreshNamenodesLock);Set<String> toRefresh = Sets.newLinkedHashSet();Set<String> toAdd = Sets.newLinkedHashSet();Set<String> toRemove;synchronized (this) {// Step 1. For each of the new nameservices, figure out whether// it's an update of the set of NNs for an existing NS,// or an entirely new nameservice.for (String nameserviceId : addrMap.keySet()) {if (bpByNameserviceId.containsKey(nameserviceId)) {toRefresh.add(nameserviceId);} else {toAdd.add(nameserviceId);}}
... // 略// Step 2. Start new nameservicesif (!toAdd.isEmpty()) {LOG.info("Starting BPOfferServices for nameservices: " +Joiner.on(",").useForNull("<default>").join(toAdd));for (String nsToAdd : toAdd) {ArrayList<InetSocketAddress> addrs =Lists.newArrayList(addrMap.get(nsToAdd).values());// 为每个namespace创建对应的BPOfferServiceBPOfferService bpos = createBPOS(addrs);bpByNameserviceId.put(nsToAdd, bpos);offerServices.add(bpos);}}// 然后通过startAll启动所有BPOfferServicestartAll();}
... // 略}
------------------------------------------------protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {return new BPOfferService(nnAddrs, dn);}BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {Preconditions.checkArgument(!nnAddrs.isEmpty(),"Must pass at least one NN.");this.dn = dn;for (InetSocketAddress addr : nnAddrs) {this.bpServices.add(new BPServiceActor(addr, this));}}
--------------------------------------------//BlockPoolManager#startAll()启动所有BPOfferService(实际是启动所有BPServiceActor)。synchronized void startAll() throws IOException {try {UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Object>() {@Overridepublic Object run() throws Exception {for (BPOfferService bpos : offerServices) {bpos.start();}return null;}});} catch (InterruptedException ex) {IOException ioe = new IOException();ioe.initCause(ex.getCause());throw ioe;}}
-------------------------------------------------------//在datanode启动的主流程中,启动了多种工作线程,包括InfoServer、JVMPauseMonitor、BPServiceActor等。其中,最重要的是BPServiceActor线程,真正代表datanode与namenode通信的正是BPServiceActor线程。//DataNode--initBlockPool():/*** One of the Block Pools has successfully connected to its NN.* This initializes the local storage for that block pool,* checks consistency of the NN's cluster ID, etc.** If this is the first block pool to register, this also initializes* the datanode-scoped storage.** @param bpos Block pool offer service* @throws IOException if the NN is inconsistent with the local storage.*/void initBlockPool(BPOfferService bpos) throws IOException {
...// 略// 将blockpool注册到BlockPoolManagerblockPoolManager.addBlockPool(bpos);// 初步初始化存储结构initStorage(nsInfo);
... // 检查磁盘损坏// 启动扫描器initPeriodicScanners(conf);// 将blockpool添加到FsDatasetIpml,并继续初始化存储结构data.addBlockPool(nsInfo.getBlockPoolID(), conf);}
CC00055.hadoop——|HadoopMapReduce.V27|——|Hadoop.v27|源码剖析|DataNode启动流程|相关推荐
- bluetoothd源码剖析(一)启动流程
蓝牙系列: bluez调试笔记_weixin_41069709的博客-CSDN博客_bluezbluez移植https://blog.csdn.net/weixin_41069709/article/ ...
- Nginx源码分析:启动流程
nginx源码分析 nginx-1.11.1 参考书籍<深入理解nginx模块开发与架构解析> nginx简介 Nginx的作为服务端软件,表现的主要特点是更快.高扩展.高可靠性.低内存消 ...
- Kubelet源码分析(一):启动流程分析
源码版本 kubernetes version: v1.3.0 简介 在Kubernetes急群众,在每个Node节点上都会启动一个kubelet服务进程.该进程用于处理Master节点下发到本节点的 ...
- 带你从源码了解SpringBoot启动流程
从哪入手? 相信很多人尝试读过Spring Boot的源码,但是始终没有找到合适的方法.那是因为你对Spring Boot的各个组件.机制不是很了解,研究起来就像大海捞针. 至于从哪入手不是很简单的问 ...
- zygoteinit.java_源码跟踪之启动流程:从ZygoteInit到onCreate
Instrumentation SDK版本名称: Pie API Level: 28 一.源码调用时序图 1. Activity的启动流程 说明:其中ActivityThread中执行的schedul ...
- 以太坊Go-ethereum源码分析之启动流程
以太坊源码编译需要gov1.7以上,及C编译器,执行make geth 即可编译项目,编译后可执行的geth文件. Makefile文件: geth:build/env.sh go run build ...
- Android进阶——Small源码分析之启动流程详解
前言 插件化现在已经是Android工程师必备的技能之一,只是学会怎么使用是不行的,所以蹭有时间研究一下Small的源码.对于插件化主要解决的问题是四大组件的加载和资源的加载,读懂所有Small源码需 ...
- 【Flink源码】JobManager启动流程
写在前面 在 [Flink源码]再谈 Flink 程序提交流程(中) 一文中,笔者后来发现谬误颇多,且随着 Flink 版本的更迭,部分方法实现方式已发生较大改变.因此,思虑再三决定针对 JobMan ...
- Flask1.1.4 Werkzeug1.0.1 源码分析:启动流程
基于QuickStart中的一个demo来分析 from flask import Flaskapp = Flask(__name__)@app.route("/") def he ...
最新文章
- coverage代码覆盖率测试工具:基本原理分析与使用
- 无人驾驶 | 为什么双目自动驾驶系统难以普及?
- linux shell中小数的运算
- java编程思想第四版第十章总结
- 时时监控的rtsp流视频显示在前端与一些css;
- 洛谷P1006 传纸条(多维DP)
- 地图与输出之间的基准面发生冲突_【地图技巧】荒野乱斗: 全球锦标赛全地图英雄推荐...
- 诺德尔-2011-2003-V1新版 ghost安装版
- 如何使用jQuery打开Bootstrap模式窗口?
- Ubuntu系统各个版本的镜像下载地址
- OCP考点实战演练02-日常维护篇
- Typora 上传图片的自定义命令实现
- FBWF和EWF的对比
- 天涯明月刀如何修改登录服务器,天涯明月刀手游体验服和正式服怎么切换方法介绍...
- Java面向对象基础练习题(含答案超详细)
- python读取raw数据文件_pythonrawkit如何从原始文件读取元数据值?
- LocalDate 获取英文月份
- 什么是5G SAR测试,FCC/CE中5G Sub-6GHz与5G 毫米波测试,5G毫米波测试
- 保险业首季度保费收入猛增五成
- MMaction2中自定义AVA数据集(在window上实现)
热门文章
- 如何搭建自己的博客网站(手把手教你搭建免费个人博客网站)
- MFRC53101TOFE小知识
- ViKey加密狗对Word文档进行加密
- 问世到现在电子计算机的性能,一级计算机练习.doc
- 算法学习过程入门篇(2)-算法初步
- H5网页链接APP浏览器跳转小程序-邪少外链
- PS For Mac 内含破解文件下载地址
- 调用ins api获取个人照片信息
- (亲测解决)Tomcat启动时卡在“ Deploying web application directory ”很久的解决方法
- CAD命令栏窗口跑到屏幕外面怎么找回来