前言


在现有的HDFS中,为了保证其高可用性,社区在早些年就已经完成HDFS的HA机制,也就是One Active,One Standby。在此种情况下,HDFS能够容忍其中一个节点出现失败的情况。这套HA机制的实现的确给用户带来了很大的帮助,基于此特性,我们可以做很多集群上的热操作,比如热迁移NameNode,或者滚动升级HDFS等等。可能唯一让人感觉还不是最好的一点是,它不能容忍更多失败的情况,比如2个NameNode都发生失败的情况。在其他的一些分布式系统中,例如Zookeeper,它的内部就可以容忍其中2节点出现崩溃的情况,当它启动了5个节点的时候。在HDFS内部的block副本设计上,也是保证了3副本的设计理念,同样可以容忍2个副本损坏的情况。所以我们不禁开始联想,在HDFS中什么时候也能容忍更多的出错情况?更具体地说,就是在只有一个Active NameNode情况下,同时有多个Standby NameNode。这样的话,HDFS的HA特性看上去就非常的强大了。本文我们就来好好聊聊这个话题。

HDFS多Standby节点机制概述


前面的铺垫内容说了这么多,那么到底目前是否已经有多Standby节点的实现机制呢?答案是有的,但是它还没有发布,目标发布版本Hadoop 3.0.又是在3.0版本,之前本人介绍了许多很棒的特性都是在这个版本发布的(比如HDFS EC),大家敬请期待这个版本吧。社区JIRA HDFS-6440(Support more than 2 NameNodes)最终实现了HA中支持多Standby的特性。本文是我阅读完此JIRA上的设计文档以及代码实现后所写的总结性文章,更多设计细节可以查看原文档。

在之前HDFS的HA的设计实现中,其实已经帮我们实现了许多在未来可能有多Standby节点出现的情况。所以在这里,我们只需要在原来One Active,One Standby完善的机制下,做局部的修改,来满足多Standby的情况即可。以下为几个需要修改的点:

  • Zkfc的Active选举,此时不是只有另外一个可选节点,而是很多个Standby节点。
  • Checkpoint过程以及Active NameNode上的Fsimage同步问题,之前都是一个Standby NameNode定期发给Active NameNode,这个时候有多个Standby,怎么办。
  • Bootstrap过程。之前都是向另外一个Active NameNode进行bootstrap,而现在有多个节点。
  • Block token id的生成。

主要为以上4点,其中第2点最为重要,因为涉及到元数据的更新同步,逻辑也作为复杂。在下一小节中,我们将会针对这4点做详细的分析。

多Standby节点细节实现


此小节将会针对以上提出的4点展开分析,下面首先是zkfc相关的改造。

Zkfc的选举


与原先的HA机制相比,多Standby的情况会造成锁竞争的加剧,因为每个Standby节点上的zkfc进程都要尝试获取锁,然后才会将自己的状态切到Active。所以在此建议的Standby数量不宜过多,3~5个足够了。还有当进行手动切换的时候,这个时候要保证其他节点此时不发生切换动作。

Checkpoint元数据同步过程


先来回顾一下原先HA机制的元数据同步过程:

Standby节点周期性的读取JournalNode上的editlog,等到了一次checkpoint周期,然后做一次checkpoint,然后将新的fsimage同步到Active节点。

在这个如果是多个Standby节点的情况,这个处理过程就没有那么简单了,下面几个是主要要解决的问题:

  • 这么多个Standby节点,每个节点上都有自己的fsimage,该选哪个作为最终上传镜像文件的节点呢?
    答:选择元数据最新的Standby,评判标准是看当前最新的txid。
  • 如果Active节点当前已经同步了最新fsimage,而Standby节点又将稍老的fsimage同步过去,怎么办?
    答:Active节点会进行比较,如果的确是老的fsimage,会给出失败的回复应答。

以上两点在后面代码实现的部分会有具体的体现。

Bootstrap过程


我们知道bootstrap的用处一般是在集群开始搭建时,将Active上的fsimage等元数据同步到当前的节点上,然后启动当前节点。而在当前多Standby节点的变化是,由向原来另外一个Active获取元数据变为同时向多个其他节点抓取元数据,直到有一个节点能抓取到元数据为止。

Block token id的构造


在block token id的生成中,会根据当前NameNode index下标来生成serialNo序列号数字,然后将此数字应用到token id的生成。生成代码如下:

   public synchronized void setSerialNo(int serialNo) {this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31); }

但是原先的处理逻辑,只适用于2个NameNode的情况,也就是下标0和1的情况。在多个Standby出现的情况,NameNode的下标就有可能出现2,3,4等情况。因此此逻辑也需要进行修改。具体改动可见HDFS-6440上的设计文档。

多Standby情况下的Checkpoint同步


因为在多Standby情况下的checkpoint,fsmage同步过程最为复杂,此节我们从源代码实现层面来学习一下其中的过程,主要涉及以下2个类的改造:

  • StandbyCheckpointer:Standby NameNode上专门控制做checkpoint以及上传fsimage到Active NameNode的服务。
  • ImageServlet:NameNode服务请求处理类,里面包含了fsimage上传请求的处理逻辑。

我们首先进入StandbyCheckpointer类,

  public StandbyCheckpointer(Configuration conf, FSNamesystem ns)throws IOException {this.namesystem = ns;this.conf = conf;// checkpoint配置类初始化this.checkpointConf = new CheckpointConf(conf);// 定期checkpoint线程初始化this.thread = new CheckpointerThread();this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TransferFsImageUpload-%d").build();// active结点地址初始化setNameNodeAddresses(conf);}

这里会有active节点地址的初始化,

  private void setNameNodeAddresses(Configuration conf) throws IOException {// Look up our own address.myNNAddress = getHttpAddress(conf);// 获取其他NameNode节点配置,作为可能的Active NameNodeList<Configuration> confForActive = HAUtil.getConfForOtherNodes(conf);activeNNAddresses = new ArrayList<URL>(confForActive.size());for (Configuration activeConf : confForActive) {URL activeNNAddress = getHttpAddress(activeConf);// sanity check each possible active NNPreconditions.checkArgument(checkAddress(activeNNAddress),"Bad address for active NN: %s", activeNNAddress);// 将此地址作为active的地址activeNNAddresses.add(activeNNAddress);}...}

其实从这里可以看出一点:Standby节点其实并不知道哪个是当前真正的Active NameNode

接下来进入checkpoint的线程服务内的doWork工作方法,

    private void doWork() {// 获取checkpoint动作的执行周期时间,默认1小时final long checkPeriod = 1000 * checkpointConf.getCheckPeriod();// 重置checkpoint时间,以及最近上传时间lastCheckpointTime = monotonicNow();lastUploadTime = monotonicNow();while (shouldRun) {boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();if (!needRollbackCheckpoint) {try {// 进行checkpoint周期时间睡眠Thread.sleep(checkPeriod);} catch (InterruptedException ie) {}if (!shouldRun) {break;}}// 这里开始准备checkpoint...

我们继续来看后面执行的方法,

          final long now = monotonicNow();// 获取未checkpoint的tx事务数final long uncheckpointed = countUncheckpointedTxns();// 计算距离上次未更新checkpoint的时间final long secsSinceLast = (now - lastCheckpointTime) / 1000;// if we need a rollback checkpoint, always attempt to checkpointboolean needCheckpoint = needRollbackCheckpoint;if (needCheckpoint) {LOG.info("Triggering a rollback fsimage for rolling upgrade.");} else if (uncheckpointed >= checkpointConf.getTxnCount()) {// 如果当前未checkpoint的事务数已经超过默认值,就是100w,则也需要进行一次checkpointLOG.info("Triggering checkpoint because there have been " + uncheckpointed + " txns since the last checkpoint, which " +"exceeds the configured threshold " +checkpointConf.getTxnCount());needCheckpoint = true;} else if (secsSinceLast >= checkpointConf.getPeriod()) {// 如果未更新时间已超出了周期时间,就是1小时,则需要进行一次checkpoint操作LOG.info("Triggering checkpoint because it has been " +secsSinceLast + " seconds since the last checkpoint, which " +"exceeds the configured interval " + checkpointConf.getPeriod());needCheckpoint = true;}

以上代码表明了一个Standby节点做一次checkpoint需要达到的2个条件(满足一个条件即可):

  • 未做checkpoint的tx事务数超过100w
  • 超过1小时的checkpoint周期

我们继续看下面的执行过程,

            ...// on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a// rollback request, are the checkpointer, are outside the quiet period.final long secsSinceLastUpload = (now - lastUploadTime) / 1000;// sendRequest表示是否要发送新的fsimage给Active NameNode,需满足以下2个条件:// 1.isPrimaryCheckPointer标记为true,也就是上次已经发送过fsimage给Active NameNode// 2.距离上次发送fsimage给Active NameNode的时间已经超过了Standby专门发送Active NameNode的时间,//   默认1.5倍的checkpoint的周期时间boolean sendRequest = isPrimaryCheckPointer|| secsSinceLastUpload >= checkpointConf.getQuietPeriod();// 执行checkpoint动作doCheckpoint(sendRequest);...

在上述过程中,会判断是否要将checkpoint过后的fsimage传到Active NameNode上。然后我们继续进入doCheckpoint方法内部,

  private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException {assert canceler != null;final long txid;final NameNodeFile imageType;// 这里开始准备进行checkpoint操作namesystem.cpLockInterruptibly();...//如果不需要进发送fsimage动作,则在这里会直接结束if(!sendCheckpoint){return;}// 新建线程池用来执行上传fsimage的动作ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),uploadThreadFactory);List<Future<TransferFsImage.TransferResult>> uploads =new ArrayList<Future<TransferFsImage.TransferResult>>();// 遍历潜在的Active NameNode地址,添加上传fsimage的请求、for (final URL activeNNAddress : activeNNAddresses) {Future<TransferFsImage.TransferResult> upload =executor.submit(new Callable<TransferFsImage.TransferResult>() {@Overridepublic TransferFsImage.TransferResult call() throws IOException {return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem.getFSImage().getStorage(), imageType, txid, canceler);}});uploads.add(upload);}...for (; i < uploads.size(); i++) {Future<TransferFsImage.TransferResult> upload = uploads.get(i);try {// 获取上传请求结果,如果成功了,则直接退出,无须获取下个请求的处理结果if (upload.get() == TransferFsImage.TransferResult.SUCCESS) {success = true;break;}} catch (ExecutionException e) {...}// 重新设置上次上传成功时间lastUploadTime = monotonicNow();// 记录此次上传结果,表明此Standby是当前领先的checkpointer节点this.isPrimaryCheckPointer = success;...}

Ok,以上就是StandbyCheckpointer内部的相关执行逻辑。在这里upload请求倒是发出去了,那么后面是怎么被处理的呢?接下来我们就来 ImageServlet内部的请求处理逻辑。

我们进入ImafeServlet的doPut处理方法,因为我们是上传文件的请求,不是Get,

  protected void doPut(final HttpServletRequest request,final HttpServletResponse response) throws ServletException, IOException {try {ServletContext context = getServletContext();final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);final Configuration conf = (Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);final PutImageParams parsedParams = new PutImageParams(request, response,conf);final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();// 验证请求信息validateRequest(context, conf, request, response, nnImage,parsedParams.getStorageInfoString());UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {@Overridepublic Void run() throws Exception {// 获取当前节点的服务状态HAServiceProtocol.HAServiceState state = NameNodeHttpServer.getNameNodeStateFromContext(getServletContext());// 如果当前处理的节点不是Active节点,说明请求发送到错误的目标节点上了if (state != HAServiceProtocol.HAServiceState.ACTIVE) {// 在此给出错误的回复码以及出错信息response.sendError(HttpServletResponse.SC_EXPECTATION_FAILED,"Nameode "+request.getLocalAddr()+" is currently not in a state which can "+ "accept uploads of new fsimages. State: "+state);return null;}...

因为之前Standby节点并不知道当前具体的Active NameNode,所以采用的是一种轮询遍历的方式,这样的话同为Standby的其他节点也会处理到。如果当前处理节点的确是Active NameNode,还会进行如下2个判断逻辑,

...// 通过tailSet方法,来比较是否此请求是来自于更旧的事务SortedSet<ImageUploadRequest> larger = currentlyDownloadingCheckpoints.tailSet(imageRequest);// 如果是,则给出错误回复,表明当前已经在处理更新的fsimage文件if (larger.size() > 0) {response.sendError(HttpServletResponse.SC_CONFLICT,"Another checkpointer is already in the process of uploading a" +" checkpoint made up to transaction ID " + larger.last());return null;}// 保证当前只处理一份请求,不处理重复的请求if (!currentlyDownloadingCheckpoints.add(imageRequest)) {response.sendError(HttpServletResponse.SC_CONFLICT,"Either current namenode is checkpointing or another"+ " checkpointer is already in the process of "+ "uploading a checkpoint made at transaction ID "+ txid);return null;}
...

前面判断如果都没问题,最后会进行fsimage的下载动作,Active NameNode会从目标Standby NameNode上download文件,

InputStream stream = request.getInputStream();try {long start = monotonicNow();// 此处进行镜像文件下载操作MD5Hash downloadImageDigest = TransferFsImage.handleUploadImageRequest(request, txid,nnImage.getStorage(), stream,parsedParams.getFileSize(), getThrottler(conf));nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,downloadImageDigest);...} finally {// 下载完成后移除镜像文件的请求currentlyDownloadingCheckpoints.remove(imageRequest);stream.close();}

具体上传的细节处理大家可以查阅TransferFsImage类里的代码。

总结


所以总的来看,HDFS-6440支持多Standby特性更多的是一些适配的改造,而不是对原先HA机制的大改。但是依然不可否认,这个特性要做的周边工作还是很多的,比如对应的unit test的构造,这些工作量也是很大的。最后一个小小的建议,配置多Standby的时候,建议数量不宜过多,3~5个足够了,2点原因:第一,zkfc切换选取Active时锁竞争的问题;第二,这些Standby节点同时tail editlog时造成的JournalNode带宽使用上升问题。

参考资料


[1].https://issues.apache.org/jira/browse/HDFS-6440
[2].https://issues.apache.org/jira/secure/attachment/12677453/Multiple-Standby-NameNodes_V1.pdf

HDFS HA支持多Standby节点机制相关推荐

  1. HDFS HA模式下支持只连接其中Active的NameNode

    HDFS HA模式下支持只连接其中Active的NameNode:

  2. Hadoop详解(五):HDFS HA 和 Federation

    1. Hadoop 2.0 产生背景 Hadoop 1.0中HDFS和MapReduce在高可用.扩展性等方面存在问题 HDFS存在的问题 NameNode单点故障,难以应用于在线场景 HA Name ...

  3. HDFS概述(5)————HDFS HA

    HA With QJM 目标 本指南概述了HDFS高可用性(HA)功能以及如何使用Quorum Journal Manager(QJM)功能配置和管理HA HDFS集群. 本文档假设读者对HDFS集群 ...

  4. HDFS HA与QJM(Quorum Journal Manager)介绍及官网内容整理

    问题导读 1.HDFS HA与QJM解决了什么问题? 2.HDFS HA与QJM区别是什么? 3.在HA(两个namenode)架构下,如何访问hdfs文件? [使用QJM构建HDFS HA架构(2. ...

  5. Hadoop组件之-HDFS(HA实现细节)

    NameNode 高可用整体架构概述 在 Hadoop 1.0 时代,Hadoop 的两大核心组件 HDFS NameNode 和 JobTracker 都存在着单点问题,这其中以 NameNode ...

  6. 大数据(3) - 高可用 HDFS HA

    HDFS HA高可用 1 HA概述 1)所谓HA(high available),即高可用(7*24小时不中断服务). 2)实现高可用最关键的策略是消除单点故障.HA严格来说应该分成各个组件的HA机制 ...

  7. Hadoop3.2.0 HDFS HA ( Quorum Journal Manager )

    目的 注意:使用Quorum Journal Manager或常规共享存储 背景 架构 硬件资源 部署 配置概述 配置细节 部署细节 管理命令 负载均衡器设置 自动故障转移 介绍 组件 部署ZooKe ...

  8. HDFS HA介绍及配置理解

    1.HDFS HA介绍 相比于Hadoop1.0,Hadoop 2.0中的HDFS增加了两个重大特性,HA和Federaion.HA即为High Availability,用于解决NameNode单点 ...

  9. HA:HADOOP高可用机制

    课程大纲(HADOOP高可用机制) HA运作机制 什么是HA HADOOP如何实现HA HDFS-HA详解 HA集群搭建 目标: 掌握分布式系统中HA机制的思想 掌握HADOOP内置HA的运作机制 掌 ...

最新文章

  1. 计蒜客 神奇的二叉树 ( 已知先序和中序遍历构建二叉树 )
  2. zabbix 监控端口,其他端口和此端口类似
  3. 1分钟解决VS每次运行都显示“正在还原nuget程序包”问题
  4. 点击率预估的几个经典模型简介
  5. OpenCV导出模板参数文件
  6. mysql.createpool_Node.js MySQL模块中mysql.createConnection和mysql.createPool有什么区别?
  7. 机器人J中WPR_北方工业大学服务机器人研究项目介绍
  8. json最大长度限制_api接口返回动态的json格式?我太难了,尝试一下 linq to json
  9. Linux Note
  10. 馒头何瑫写作训练营的学习笔记总结
  11. service $anchorScroll
  12. C#数据库类(zz)
  13. MySQL 刷脏页问题
  14. [Android]-图片JNI(C++\Java)高斯模糊的实现与比较
  15. 2020数据分析人才及CDA持证人行业报告
  16. catia二次开发:人机交互select,start command
  17. Sonarqube基础篇:property设定
  18. java 裁剪图片_java实现的图片裁剪功能示例
  19. Nginx的alias/root/try_files实战
  20. iPhone的备忘录如何进行撤销?

热门文章

  1. 华为弹性云服务器进修复模式,华为云弹性云控制面板简要解析
  2. 51单片机+LCD12864的万年历Proteus仿真
  3. 客户端与服务器之间的通信过程
  4. 倒置的方法//c++
  5. oracle实时异地同步,异地Oracle数据库数据同步
  6. 【Python可视化展示】-多维数据可视化分析
  7. 查看ip地址 通过域名
  8. 海康IVMS-8300平台国标对接
  9. HCL实验:用VRRP实现路由备份及负载分担
  10. 微信小程序登录 获取头像不显示