HDFS HA支持多Standby节点机制
前言
在现有的HDFS中,为了保证其高可用性,社区在早些年就已经完成HDFS的HA机制,也就是One Active,One Standby。在此种情况下,HDFS能够容忍其中一个节点出现失败的情况。这套HA机制的实现的确给用户带来了很大的帮助,基于此特性,我们可以做很多集群上的热操作,比如热迁移NameNode,或者滚动升级HDFS等等。可能唯一让人感觉还不是最好的一点是,它不能容忍更多失败的情况,比如2个NameNode都发生失败的情况。在其他的一些分布式系统中,例如Zookeeper,它的内部就可以容忍其中2节点出现崩溃的情况,当它启动了5个节点的时候。在HDFS内部的block副本设计上,也是保证了3副本的设计理念,同样可以容忍2个副本损坏的情况。所以我们不禁开始联想,在HDFS中什么时候也能容忍更多的出错情况?更具体地说,就是在只有一个Active NameNode情况下,同时有多个Standby NameNode。这样的话,HDFS的HA特性看上去就非常的强大了。本文我们就来好好聊聊这个话题。
HDFS多Standby节点机制概述
前面的铺垫内容说了这么多,那么到底目前是否已经有多Standby节点的实现机制呢?答案是有的,但是它还没有发布,目标发布版本Hadoop 3.0.又是在3.0版本,之前本人介绍了许多很棒的特性都是在这个版本发布的(比如HDFS EC),大家敬请期待这个版本吧。社区JIRA HDFS-6440(Support more than 2 NameNodes)最终实现了HA中支持多Standby的特性。本文是我阅读完此JIRA上的设计文档以及代码实现后所写的总结性文章,更多设计细节可以查看原文档。
在之前HDFS的HA的设计实现中,其实已经帮我们实现了许多在未来可能有多Standby节点出现的情况。所以在这里,我们只需要在原来One Active,One Standby完善的机制下,做局部的修改,来满足多Standby的情况即可。以下为几个需要修改的点:
- Zkfc的Active选举,此时不是只有另外一个可选节点,而是很多个Standby节点。
- Checkpoint过程以及Active NameNode上的Fsimage同步问题,之前都是一个Standby NameNode定期发给Active NameNode,这个时候有多个Standby,怎么办。
- Bootstrap过程。之前都是向另外一个Active NameNode进行bootstrap,而现在有多个节点。
- Block token id的生成。
主要为以上4点,其中第2点最为重要,因为涉及到元数据的更新同步,逻辑也作为复杂。在下一小节中,我们将会针对这4点做详细的分析。
多Standby节点细节实现
此小节将会针对以上提出的4点展开分析,下面首先是zkfc相关的改造。
Zkfc的选举
与原先的HA机制相比,多Standby的情况会造成锁竞争的加剧,因为每个Standby节点上的zkfc进程都要尝试获取锁,然后才会将自己的状态切到Active。所以在此建议的Standby数量不宜过多,3~5个足够了。还有当进行手动切换的时候,这个时候要保证其他节点此时不发生切换动作。
Checkpoint元数据同步过程
先来回顾一下原先HA机制的元数据同步过程:
Standby节点周期性的读取JournalNode上的editlog,等到了一次checkpoint周期,然后做一次checkpoint,然后将新的fsimage同步到Active节点。
在这个如果是多个Standby节点的情况,这个处理过程就没有那么简单了,下面几个是主要要解决的问题:
- 这么多个Standby节点,每个节点上都有自己的fsimage,该选哪个作为最终上传镜像文件的节点呢?
答:选择元数据最新的Standby,评判标准是看当前最新的txid。 - 如果Active节点当前已经同步了最新fsimage,而Standby节点又将稍老的fsimage同步过去,怎么办?
答:Active节点会进行比较,如果的确是老的fsimage,会给出失败的回复应答。
以上两点在后面代码实现的部分会有具体的体现。
Bootstrap过程
我们知道bootstrap的用处一般是在集群开始搭建时,将Active上的fsimage等元数据同步到当前的节点上,然后启动当前节点。而在当前多Standby节点的变化是,由向原来另外一个Active获取元数据变为同时向多个其他节点抓取元数据,直到有一个节点能抓取到元数据为止。
Block token id的构造
在block token id的生成中,会根据当前NameNode index下标来生成serialNo序列号数字,然后将此数字应用到token id的生成。生成代码如下:
public synchronized void setSerialNo(int serialNo) {this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31); }
但是原先的处理逻辑,只适用于2个NameNode的情况,也就是下标0和1的情况。在多个Standby出现的情况,NameNode的下标就有可能出现2,3,4等情况。因此此逻辑也需要进行修改。具体改动可见HDFS-6440上的设计文档。
多Standby情况下的Checkpoint同步
因为在多Standby情况下的checkpoint,fsmage同步过程最为复杂,此节我们从源代码实现层面来学习一下其中的过程,主要涉及以下2个类的改造:
- StandbyCheckpointer:Standby NameNode上专门控制做checkpoint以及上传fsimage到Active NameNode的服务。
- ImageServlet:NameNode服务请求处理类,里面包含了fsimage上传请求的处理逻辑。
我们首先进入StandbyCheckpointer类,
public StandbyCheckpointer(Configuration conf, FSNamesystem ns)throws IOException {this.namesystem = ns;this.conf = conf;// checkpoint配置类初始化this.checkpointConf = new CheckpointConf(conf);// 定期checkpoint线程初始化this.thread = new CheckpointerThread();this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TransferFsImageUpload-%d").build();// active结点地址初始化setNameNodeAddresses(conf);}
这里会有active节点地址的初始化,
private void setNameNodeAddresses(Configuration conf) throws IOException {// Look up our own address.myNNAddress = getHttpAddress(conf);// 获取其他NameNode节点配置,作为可能的Active NameNodeList<Configuration> confForActive = HAUtil.getConfForOtherNodes(conf);activeNNAddresses = new ArrayList<URL>(confForActive.size());for (Configuration activeConf : confForActive) {URL activeNNAddress = getHttpAddress(activeConf);// sanity check each possible active NNPreconditions.checkArgument(checkAddress(activeNNAddress),"Bad address for active NN: %s", activeNNAddress);// 将此地址作为active的地址activeNNAddresses.add(activeNNAddress);}...}
其实从这里可以看出一点:Standby节点其实并不知道哪个是当前真正的Active NameNode。
接下来进入checkpoint的线程服务内的doWork工作方法,
private void doWork() {// 获取checkpoint动作的执行周期时间,默认1小时final long checkPeriod = 1000 * checkpointConf.getCheckPeriod();// 重置checkpoint时间,以及最近上传时间lastCheckpointTime = monotonicNow();lastUploadTime = monotonicNow();while (shouldRun) {boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();if (!needRollbackCheckpoint) {try {// 进行checkpoint周期时间睡眠Thread.sleep(checkPeriod);} catch (InterruptedException ie) {}if (!shouldRun) {break;}}// 这里开始准备checkpoint...
我们继续来看后面执行的方法,
final long now = monotonicNow();// 获取未checkpoint的tx事务数final long uncheckpointed = countUncheckpointedTxns();// 计算距离上次未更新checkpoint的时间final long secsSinceLast = (now - lastCheckpointTime) / 1000;// if we need a rollback checkpoint, always attempt to checkpointboolean needCheckpoint = needRollbackCheckpoint;if (needCheckpoint) {LOG.info("Triggering a rollback fsimage for rolling upgrade.");} else if (uncheckpointed >= checkpointConf.getTxnCount()) {// 如果当前未checkpoint的事务数已经超过默认值,就是100w,则也需要进行一次checkpointLOG.info("Triggering checkpoint because there have been " + uncheckpointed + " txns since the last checkpoint, which " +"exceeds the configured threshold " +checkpointConf.getTxnCount());needCheckpoint = true;} else if (secsSinceLast >= checkpointConf.getPeriod()) {// 如果未更新时间已超出了周期时间,就是1小时,则需要进行一次checkpoint操作LOG.info("Triggering checkpoint because it has been " +secsSinceLast + " seconds since the last checkpoint, which " +"exceeds the configured interval " + checkpointConf.getPeriod());needCheckpoint = true;}
以上代码表明了一个Standby节点做一次checkpoint需要达到的2个条件(满足一个条件即可):
- 未做checkpoint的tx事务数超过100w
- 超过1小时的checkpoint周期
我们继续看下面的执行过程,
...// on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a// rollback request, are the checkpointer, are outside the quiet period.final long secsSinceLastUpload = (now - lastUploadTime) / 1000;// sendRequest表示是否要发送新的fsimage给Active NameNode,需满足以下2个条件:// 1.isPrimaryCheckPointer标记为true,也就是上次已经发送过fsimage给Active NameNode// 2.距离上次发送fsimage给Active NameNode的时间已经超过了Standby专门发送Active NameNode的时间,// 默认1.5倍的checkpoint的周期时间boolean sendRequest = isPrimaryCheckPointer|| secsSinceLastUpload >= checkpointConf.getQuietPeriod();// 执行checkpoint动作doCheckpoint(sendRequest);...
在上述过程中,会判断是否要将checkpoint过后的fsimage传到Active NameNode上。然后我们继续进入doCheckpoint方法内部,
private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException {assert canceler != null;final long txid;final NameNodeFile imageType;// 这里开始准备进行checkpoint操作namesystem.cpLockInterruptibly();...//如果不需要进发送fsimage动作,则在这里会直接结束if(!sendCheckpoint){return;}// 新建线程池用来执行上传fsimage的动作ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),uploadThreadFactory);List<Future<TransferFsImage.TransferResult>> uploads =new ArrayList<Future<TransferFsImage.TransferResult>>();// 遍历潜在的Active NameNode地址,添加上传fsimage的请求、for (final URL activeNNAddress : activeNNAddresses) {Future<TransferFsImage.TransferResult> upload =executor.submit(new Callable<TransferFsImage.TransferResult>() {@Overridepublic TransferFsImage.TransferResult call() throws IOException {return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem.getFSImage().getStorage(), imageType, txid, canceler);}});uploads.add(upload);}...for (; i < uploads.size(); i++) {Future<TransferFsImage.TransferResult> upload = uploads.get(i);try {// 获取上传请求结果,如果成功了,则直接退出,无须获取下个请求的处理结果if (upload.get() == TransferFsImage.TransferResult.SUCCESS) {success = true;break;}} catch (ExecutionException e) {...}// 重新设置上次上传成功时间lastUploadTime = monotonicNow();// 记录此次上传结果,表明此Standby是当前领先的checkpointer节点this.isPrimaryCheckPointer = success;...}
Ok,以上就是StandbyCheckpointer内部的相关执行逻辑。在这里upload请求倒是发出去了,那么后面是怎么被处理的呢?接下来我们就来 ImageServlet内部的请求处理逻辑。
我们进入ImafeServlet的doPut处理方法,因为我们是上传文件的请求,不是Get,
protected void doPut(final HttpServletRequest request,final HttpServletResponse response) throws ServletException, IOException {try {ServletContext context = getServletContext();final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);final Configuration conf = (Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);final PutImageParams parsedParams = new PutImageParams(request, response,conf);final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();// 验证请求信息validateRequest(context, conf, request, response, nnImage,parsedParams.getStorageInfoString());UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {@Overridepublic Void run() throws Exception {// 获取当前节点的服务状态HAServiceProtocol.HAServiceState state = NameNodeHttpServer.getNameNodeStateFromContext(getServletContext());// 如果当前处理的节点不是Active节点,说明请求发送到错误的目标节点上了if (state != HAServiceProtocol.HAServiceState.ACTIVE) {// 在此给出错误的回复码以及出错信息response.sendError(HttpServletResponse.SC_EXPECTATION_FAILED,"Nameode "+request.getLocalAddr()+" is currently not in a state which can "+ "accept uploads of new fsimages. State: "+state);return null;}...
因为之前Standby节点并不知道当前具体的Active NameNode,所以采用的是一种轮询遍历的方式,这样的话同为Standby的其他节点也会处理到。如果当前处理节点的确是Active NameNode,还会进行如下2个判断逻辑,
...// 通过tailSet方法,来比较是否此请求是来自于更旧的事务SortedSet<ImageUploadRequest> larger = currentlyDownloadingCheckpoints.tailSet(imageRequest);// 如果是,则给出错误回复,表明当前已经在处理更新的fsimage文件if (larger.size() > 0) {response.sendError(HttpServletResponse.SC_CONFLICT,"Another checkpointer is already in the process of uploading a" +" checkpoint made up to transaction ID " + larger.last());return null;}// 保证当前只处理一份请求,不处理重复的请求if (!currentlyDownloadingCheckpoints.add(imageRequest)) {response.sendError(HttpServletResponse.SC_CONFLICT,"Either current namenode is checkpointing or another"+ " checkpointer is already in the process of "+ "uploading a checkpoint made at transaction ID "+ txid);return null;}
...
前面判断如果都没问题,最后会进行fsimage的下载动作,Active NameNode会从目标Standby NameNode上download文件,
InputStream stream = request.getInputStream();try {long start = monotonicNow();// 此处进行镜像文件下载操作MD5Hash downloadImageDigest = TransferFsImage.handleUploadImageRequest(request, txid,nnImage.getStorage(), stream,parsedParams.getFileSize(), getThrottler(conf));nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,downloadImageDigest);...} finally {// 下载完成后移除镜像文件的请求currentlyDownloadingCheckpoints.remove(imageRequest);stream.close();}
具体上传的细节处理大家可以查阅TransferFsImage类里的代码。
总结
所以总的来看,HDFS-6440支持多Standby特性更多的是一些适配的改造,而不是对原先HA机制的大改。但是依然不可否认,这个特性要做的周边工作还是很多的,比如对应的unit test的构造,这些工作量也是很大的。最后一个小小的建议,配置多Standby的时候,建议数量不宜过多,3~5个足够了,2点原因:第一,zkfc切换选取Active时锁竞争的问题;第二,这些Standby节点同时tail editlog时造成的JournalNode带宽使用上升问题。
参考资料
[1].https://issues.apache.org/jira/browse/HDFS-6440
[2].https://issues.apache.org/jira/secure/attachment/12677453/Multiple-Standby-NameNodes_V1.pdf
HDFS HA支持多Standby节点机制相关推荐
- HDFS HA模式下支持只连接其中Active的NameNode
HDFS HA模式下支持只连接其中Active的NameNode:
- Hadoop详解(五):HDFS HA 和 Federation
1. Hadoop 2.0 产生背景 Hadoop 1.0中HDFS和MapReduce在高可用.扩展性等方面存在问题 HDFS存在的问题 NameNode单点故障,难以应用于在线场景 HA Name ...
- HDFS概述(5)————HDFS HA
HA With QJM 目标 本指南概述了HDFS高可用性(HA)功能以及如何使用Quorum Journal Manager(QJM)功能配置和管理HA HDFS集群. 本文档假设读者对HDFS集群 ...
- HDFS HA与QJM(Quorum Journal Manager)介绍及官网内容整理
问题导读 1.HDFS HA与QJM解决了什么问题? 2.HDFS HA与QJM区别是什么? 3.在HA(两个namenode)架构下,如何访问hdfs文件? [使用QJM构建HDFS HA架构(2. ...
- Hadoop组件之-HDFS(HA实现细节)
NameNode 高可用整体架构概述 在 Hadoop 1.0 时代,Hadoop 的两大核心组件 HDFS NameNode 和 JobTracker 都存在着单点问题,这其中以 NameNode ...
- 大数据(3) - 高可用 HDFS HA
HDFS HA高可用 1 HA概述 1)所谓HA(high available),即高可用(7*24小时不中断服务). 2)实现高可用最关键的策略是消除单点故障.HA严格来说应该分成各个组件的HA机制 ...
- Hadoop3.2.0 HDFS HA ( Quorum Journal Manager )
目的 注意:使用Quorum Journal Manager或常规共享存储 背景 架构 硬件资源 部署 配置概述 配置细节 部署细节 管理命令 负载均衡器设置 自动故障转移 介绍 组件 部署ZooKe ...
- HDFS HA介绍及配置理解
1.HDFS HA介绍 相比于Hadoop1.0,Hadoop 2.0中的HDFS增加了两个重大特性,HA和Federaion.HA即为High Availability,用于解决NameNode单点 ...
- HA:HADOOP高可用机制
课程大纲(HADOOP高可用机制) HA运作机制 什么是HA HADOOP如何实现HA HDFS-HA详解 HA集群搭建 目标: 掌握分布式系统中HA机制的思想 掌握HADOOP内置HA的运作机制 掌 ...
最新文章
- 计蒜客 神奇的二叉树 ( 已知先序和中序遍历构建二叉树 )
- zabbix 监控端口,其他端口和此端口类似
- 1分钟解决VS每次运行都显示“正在还原nuget程序包”问题
- 点击率预估的几个经典模型简介
- OpenCV导出模板参数文件
- mysql.createpool_Node.js MySQL模块中mysql.createConnection和mysql.createPool有什么区别?
- 机器人J中WPR_北方工业大学服务机器人研究项目介绍
- json最大长度限制_api接口返回动态的json格式?我太难了,尝试一下 linq to json
- Linux Note
- 馒头何瑫写作训练营的学习笔记总结
- service $anchorScroll
- C#数据库类(zz)
- MySQL 刷脏页问题
- [Android]-图片JNI(C++\Java)高斯模糊的实现与比较
- 2020数据分析人才及CDA持证人行业报告
- catia二次开发:人机交互select,start command
- Sonarqube基础篇:property设定
- java 裁剪图片_java实现的图片裁剪功能示例
- Nginx的alias/root/try_files实战
- iPhone的备忘录如何进行撤销?