Checkpointer机制

  • 概览
  • 源码解读
    • 相关配置项
    • 源码解析
      • 创建StandbyCheckpointer
      • StandbyCheckpointer分析
      • CheckpointerThread 干了些啥
      • doCheckpoint
      • saveNamespace(核心)
      • endCurrentLogSegment
      • 回到fsImage upload的过程
      • ImageServlet.doPut接受fsimage文件

概览

Namenode是HDFS集群主节点,负责管理整个文件系统的元数据,所有的读写请求都要经过Namenode。

Namenode对元数据的管理采用了三种形式:

  • 内存元数据:基于内存存储元数据,元数据比较完整
  • fsimage文件:磁盘元数据镜像文件(fsimage_0000000000000102359),在NameNode工作目录中,它不包含block所在的Datanode 信息
  • edits文件:数据操作日志文件,用于衔接内存元数据和fsimage之间的操作日志,可通过日志运算(replay)出元数据

Namenode一方面为了提供客户端的响应速度,另外一方面为了提高集群的可靠稳定性(断电后数据不丢失),所以在内存中存储全量的文件系统元数据,定期的将元数据信息持久化到磁盘中(fsimage_0000000000000102359),对于此持久化时间点后产生的元数据操作(创建,修改,删除等),都会记录到edits_xxx-xxx文件中,对于正在执行元数据操作过程中的操作,会记录在一个edits_inprogress_xxxx文件中。

image文件与edit log文件相配合,比如 fsimage_0000000000000102359 记录了102359这个事务txId之前所有的事务操作,而edits_inprogress_xxxx 和 edits_xxx-xxx 则记录了102359 之后到最新事务之间的所有操作,即fsimage文件是一个全量文件,而edit log是一个增量文件。只要将这些记录在内存中反序列化,就可以恢复namenode内存所有的元数据。

官网中有对Checkpoint Node和Backup Node进行简单说明,可以看hadoop的官方文档;

本文将记录在学习HA模式下StandbyNode Checkpointer机制代码层的解析。

源码解读

相关配置项

  <property><name>dfs.namenode.checkpoint.period</name><value>3600</value><description>两次检查点创建之间的固定时间间隔,默认3600,即1小时</description></property><!--checkpoint次数--><property><name>dfs.namenode.checkpoint.txns</name><value>1000</value><description>未合并的事务数量。若未合并事务数达到这个值,也触发一次checkpoint,1,000,000
</description></property><property><name>dfs.namenode.checkpoint.check.period</name><value>60</value><description>检查是否满足建立checkpoint的条件的检查周期。默认60,即每1min检查一次</description></property><property><name>dfs.namenode.num.checkpoints.retained</name><value>2</value><description>在namenode上保存的fsimage的数目,超出的会被删除。默认保存2个</description></property><property><name>dfs.namenode.num.extra.edits.retained</name><value>1000</value></property><property><name>dfs.namenode.max.extra.edits.segments.retained</name><value>100</value></property>

源码解析

Namenode在命令启动时,构造函数中,会做一系列初始化(initialize)工作,之后会依据当前节点状态判断是否要启动StandbyCheckpointer
当启动的Namenode是standby状态时,会根据节点类型非observer并且dfs.ha.standby.checkpoints=true的情况下启动standbyCheckpointer

创建StandbyCheckpointer

public void enterState(HAContext context) throws ServiceFailedException {try {context.startStandbyServices();} catch (IOException e) {throw new ServiceFailedException("Failed to start standby services", e);}}void startStandbyServices(final Configuration conf, boolean isObserver)throws IOException {LOG.info("Starting services required for " +(isObserver ? "observer" : "standby") + " state");if (!getFSImage().editLog.isOpenForRead()) {// During startup, we're already open for read.getFSImage().editLog.initSharedJournalsForRead();}blockManager.setPostponeBlocksFromFuture(true);// Disable quota checks while in standby.dir.disableQuotaChecks();// 创建日志追踪器editLogTailer = new EditLogTailer(this, conf);editLogTailer.start();// 非observer节点 并且dfs.ha.standby.checkpoints=trueif (!isObserver && standbyShouldCheckpoint) {standbyCheckpointer = new StandbyCheckpointer(conf, this);standbyCheckpointer.start();}}

StandbyCheckpointer分析

构造函数中初始化了配置信息,创建了CheckpointerThread 线程以及fsimage文件上传的线程工厂。记录了存放 fsimage 推送的目标,即active namenode节点地址。

public StandbyCheckpointer(Configuration conf, FSNamesystem ns)throws IOException {// 元数据  this.namesystem = ns;this.conf = conf;// 有关 checkpoint 的配置this.checkpointConf = new CheckpointConf(conf); // 创建 CheckpointerThread 线程this.thread = new CheckpointerThread();// fsimage上传的线程工厂this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TransferFsImageUpload-%d").build();setNameNodeAddresses(conf);this.checkpointReceivers = new HashMap<>();// 存放 fsimage 推送的目标,即active namenode节点地址for (URL address : activeNNAddresses) {this.checkpointReceivers.put(address.toString(),new CheckpointReceiverEntry());}}

CheckpointerThread 干了些啥

Checkpointer start主要进行了如下工作:

  • 记录上次checkpoint时间
  • 三个条件,要么手工触发的,要么事务数到了,要么时间到了
  • 执行doCheckpoint
  • 执行doWork
private void doCheckpoint() throws InterruptedException, IOException {assert canceler != null;final long txid;final NameNodeFile imageType;// Acquire cpLock to make sure no one is modifying the name system.// It does not need the full namesystem write lock, since the only thing// that modifies namesystem on standby node is edit log replaying.namesystem.cpLockInterruptibly();try {assert namesystem.getEditLog().isOpenForRead() :"Standby Checkpointer should only attempt a checkpoint when " +"NN is in standby mode, but the edit logs are in an unexpected state";FSImage img = namesystem.getFSImage();long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();long thisCheckpointTxId = img.getCorrectLastAppliedOrWrittenTxId();assert thisCheckpointTxId >= prevCheckpointTxId;if (thisCheckpointTxId == prevCheckpointTxId) {LOG.info("A checkpoint was triggered but the Standby Node has not " +"received any transactions since the last checkpoint at txid {}. " +"Skipping...", thisCheckpointTxId);return;}if (namesystem.isRollingUpgrade()&& !namesystem.getFSImage().hasRollbackFSImage()) {// if we will do rolling upgrade but have not created the rollback image// yet, name this checkpoint as fsimage_rollbackimageType = NameNodeFile.IMAGE_ROLLBACK;} else {imageType = NameNodeFile.IMAGE;}img.saveNamespace(namesystem, imageType, canceler);txid = img.getStorage().getMostRecentCheckpointTxId();assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +thisCheckpointTxId + " but instead saved at txid=" + txid;// Save the legacy OIV image, if the output dir is defined.String outputDir = checkpointConf.getLegacyOivImageDir();if (outputDir != null && !outputDir.isEmpty()) {try {img.saveLegacyOIVImage(namesystem, outputDir, canceler);} catch (IOException ioe) {LOG.warn("Exception encountered while saving legacy OIV image; "+ "continuing with other checkpointing steps", ioe);}}} finally {namesystem.cpUnlock();}// Upload the saved checkpoint back to the active// Do this in a separate thread to avoid blocking transition to active, but don't allow more// than the expected number of tasks to run or queue up// See HDFS-4816ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),uploadThreadFactory);// for right now, just match the upload to the nn address by convention. There is no need to// directly tie them together by adding a pair class.HashMap<String, Future<TransferFsImage.TransferResult>> uploads =new HashMap<>();for (final URL activeNNAddress : activeNNAddresses) {// Upload image if at least 1 of 2 following conditions met:// 1. has been quiet for long enough, try to contact the node.// 2. this standby IS the primary checkpointer of target NN.String addressString = activeNNAddress.toString();assert checkpointReceivers.containsKey(addressString);CheckpointReceiverEntry receiverEntry =checkpointReceivers.get(addressString);long secsSinceLastUpload =TimeUnit.MILLISECONDS.toSeconds(monotonicNow() - receiverEntry.getLastUploadTime());boolean shouldUpload = receiverEntry.isPrimary() ||secsSinceLastUpload >= checkpointConf.getQuietPeriod();if (shouldUpload) {Future<TransferFsImage.TransferResult> upload =executor.submit(new Callable<TransferFsImage.TransferResult>() {@Overridepublic TransferFsImage.TransferResult call()throws IOException, InterruptedException {CheckpointFaultInjector.getInstance().duringUploadInProgess();return TransferFsImage.uploadImageFromStorage(activeNNAddress,conf, namesystem.getFSImage().getStorage(), imageType, txid,canceler);}});uploads.put(addressString, upload);}}InterruptedException ie = null;List<IOException> ioes = Lists.newArrayList();for (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :uploads.entrySet()) {String url = entry.getKey();Future<TransferFsImage.TransferResult> upload = entry.getValue();try {// TODO should there be some smarts here about retries nodes that//  are not the active NN?CheckpointReceiverEntry receiverEntry = checkpointReceivers.get(url);TransferFsImage.TransferResult uploadResult = upload.get();if (uploadResult == TransferFsImage.TransferResult.SUCCESS) {receiverEntry.setLastUploadTime(monotonicNow());receiverEntry.setIsPrimary(true);} else {// Getting here means image upload is explicitly rejected// by the other node. This could happen if:// 1. the other is also a standby, or// 2. the other is active, but already accepted another// newer image, or// 3. the other is active but has a recent enough image.// All these are valid cases, just log for information.LOG.info("Image upload rejected by the other NameNode: {}",uploadResult);receiverEntry.setIsPrimary(false);}} catch (ExecutionException e) {// Even if exception happens, still proceeds to next NN url.// so that fail to upload to previous NN does not cause the// remaining NN not getting the fsImage.ioes.add(new IOException("Exception during image upload", e));} catch (InterruptedException e) {ie = e;break;}}// cleaner than copying code for multiple catch statements and better than catching all// exceptions, so we just handle the ones we expect.if (ie != null) {// cancel the rest of the tasks, and close the poolfor (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :uploads.entrySet()) {Future<TransferFsImage.TransferResult> upload = entry.getValue();// The background thread may be blocked waiting in the throttler, so// interrupt it.upload.cancel(true);}// shutdown so we interrupt anything running and don't start anything newexecutor.shutdownNow();// this is a good bit longer than the thread timeout, just to make sure all the threads// that are not doing any work also stopexecutor.awaitTermination(500, TimeUnit.MILLISECONDS);// re-throw the exception we got, since one of these two must be non-nullthrow ie;}if (!ioes.isEmpty()) {throw MultipleIOException.createIOException(ioes);}}

执行 doWork 方法,非常重要

private void doWork() {// 获取 dfs.namenode.checkpoint.period ,默认3600即一小时final long checkPeriod = 1000 * checkpointConf.getCheckPeriod();// Reset checkpoint time so that we don't always checkpoint// on startup.// 记录上一次 Checkpoint 时间lastCheckpointTime = monotonicNow();// 默认为 true,即默认启动第一次会进入循环体,执行一次 Checkpointwhile (shouldRun) {boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();if (!needRollbackCheckpoint) {try {Thread.sleep(checkPeriod);} catch (InterruptedException ie) {}if (!shouldRun) {break;}}try {// We may have lost our ticket since last checkpoint, log in again, just in caseif (UserGroupInformation.isSecurityEnabled()) {UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();}// 记录当前时间final long now = monotonicNow();// 计算还未 checkpoint的事务数final long uncheckpointed = countUncheckpointedTxns();// 计算间隔时间final long secsSinceLast = (now - lastCheckpointTime) / 1000;// if we need a rollback checkpoint, always attempt to checkpointboolean needCheckpoint = needRollbackCheckpoint;// 三个条件,要么手工触发的,要么事务数到了,要么时间到了if (needCheckpoint) {LOG.info("Triggering a rollback fsimage for rolling upgrade.");} else if (uncheckpointed >= checkpointConf.getTxnCount()) {LOG.info("Triggering checkpoint because there have been {} txns " +"since the last checkpoint, " +"which exceeds the configured threshold {}",uncheckpointed, checkpointConf.getTxnCount());needCheckpoint = true;} else if (secsSinceLast >= checkpointConf.getPeriod()) {LOG.info("Triggering checkpoint because it has been {} seconds " +"since the last checkpoint, which exceeds the configured " +"interval {}", secsSinceLast, checkpointConf.getPeriod());needCheckpoint = true;}if (needCheckpoint) {synchronized (cancelLock) {if (now < preventCheckpointsUntil) {LOG.info("But skipping this checkpoint since we are about to failover!");canceledCount++;continue;}assert canceler == null;canceler = new Canceler();}// on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a// rollback request, are the checkpointer, are outside the quiet period.doCheckpoint();// reset needRollbackCheckpoint to false only when we finish a ckpt// for rollback imageif (needRollbackCheckpoint&& namesystem.getFSImage().hasRollbackFSImage()) {namesystem.setCreatedRollbackImages(true);namesystem.setNeedRollbackFsImage(false);}lastCheckpointTime = now;LOG.info("Checkpoint finished successfully.");}} catch (SaveNamespaceCancelledException ce) {LOG.info("Checkpoint was cancelled: {}", ce.getMessage());canceledCount++;} catch (InterruptedException ie) {LOG.info("Interrupted during checkpointing", ie);// Probably requested shutdown.continue;} catch (Throwable t) {LOG.error("Exception in doCheckpoint", t);} finally {synchronized (cancelLock) {canceler = null;}}}
}

doCheckpoint

此过程包括如下操作内容:

  • 持久化fsImage镜像文件
  • 异步线程上传fsImage到activeNNAddresses
  • 记录上传结果
private void doCheckpoint() throws InterruptedException, IOException {assert canceler != null;final long txid;final NameNodeFile imageType;// Acquire cpLock to make sure no one is modifying the name system.// It does not need the full namesystem write lock, since the only thing// that modifies namesystem on standby node is edit log replaying.namesystem.cpLockInterruptibly();try {assert namesystem.getEditLog().isOpenForRead() :"Standby Checkpointer should only attempt a checkpoint when " +"NN is in standby mode, but the edit logs are in an unexpected state";FSImage img = namesystem.getFSImage();long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();long thisCheckpointTxId = img.getCorrectLastAppliedOrWrittenTxId();assert thisCheckpointTxId >= prevCheckpointTxId;// 判断两次Checkpoint事务是否相同if (thisCheckpointTxId == prevCheckpointTxId) {LOG.info("A checkpoint was triggered but the Standby Node has not " +"received any transactions since the last checkpoint at txid {}. " +"Skipping...", thisCheckpointTxId);return;}if (namesystem.isRollingUpgrade()&& !namesystem.getFSImage().hasRollbackFSImage()) {// if we will do rolling upgrade but have not created the rollback image// yet, name this checkpoint as fsimage_rollbackimageType = NameNodeFile.IMAGE_ROLLBACK;} else {imageType = NameNodeFile.IMAGE;}// 保存 namespace img.saveNamespace(namesystem, imageType, canceler);txid = img.getStorage().getMostRecentCheckpointTxId();assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +thisCheckpointTxId + " but instead saved at txid=" + txid;// Save the legacy OIV image, if the output dir is defined.String outputDir = checkpointConf.getLegacyOivImageDir();if (outputDir != null && !outputDir.isEmpty()) {try {img.saveLegacyOIVImage(namesystem, outputDir, canceler);} catch (IOException ioe) {LOG.warn("Exception encountered while saving legacy OIV image; "+ "continuing with other checkpointing steps", ioe);}}} finally {namesystem.cpUnlock();}// Upload the saved checkpoint back to the active// Do this in a separate thread to avoid blocking transition to active, but don't allow more// than the expected number of tasks to run or queue up// See HDFS-4816ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),uploadThreadFactory);// for right now, just match the upload to the nn address by convention. There is no need to// directly tie them together by adding a pair class.HashMap<String, Future<TransferFsImage.TransferResult>> uploads =new HashMap<>();for (final URL activeNNAddress : activeNNAddresses) {// Upload image if at least 1 of 2 following conditions met:// 1. has been quiet for long enough, try to contact the node.// 2. this standby IS the primary checkpointer of target NN.String addressString = activeNNAddress.toString();assert checkpointReceivers.containsKey(addressString);CheckpointReceiverEntry receiverEntry =checkpointReceivers.get(addressString);long secsSinceLastUpload =TimeUnit.MILLISECONDS.toSeconds(monotonicNow() - receiverEntry.getLastUploadTime());boolean shouldUpload = receiverEntry.isPrimary() ||secsSinceLastUpload >= checkpointConf.getQuietPeriod();if (shouldUpload) {Future<TransferFsImage.TransferResult> upload =executor.submit(new Callable<TransferFsImage.TransferResult>() {@Overridepublic TransferFsImage.TransferResult call()throws IOException, InterruptedException {CheckpointFaultInjector.getInstance().duringUploadInProgess();return TransferFsImage.uploadImageFromStorage(activeNNAddress,conf, namesystem.getFSImage().getStorage(), imageType, txid,canceler);}});uploads.put(addressString, upload);}}InterruptedException ie = null;List<IOException> ioes = Lists.newArrayList();for (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :uploads.entrySet()) {String url = entry.getKey();Future<TransferFsImage.TransferResult> upload = entry.getValue();try {// TODO should there be some smarts here about retries nodes that//  are not the active NN?CheckpointReceiverEntry receiverEntry = checkpointReceivers.get(url);TransferFsImage.TransferResult uploadResult = upload.get();if (uploadResult == TransferFsImage.TransferResult.SUCCESS) {receiverEntry.setLastUploadTime(monotonicNow());receiverEntry.setIsPrimary(true);} else {// Getting here means image upload is explicitly rejected// by the other node. This could happen if:// 1. the other is also a standby, or// 2. the other is active, but already accepted another// newer image, or// 3. the other is active but has a recent enough image.// All these are valid cases, just log for information.LOG.info("Image upload rejected by the other NameNode: {}",uploadResult);receiverEntry.setIsPrimary(false);}} catch (ExecutionException e) {// Even if exception happens, still proceeds to next NN url.// so that fail to upload to previous NN does not cause the// remaining NN not getting the fsImage.ioes.add(new IOException("Exception during image upload", e));} catch (InterruptedException e) {ie = e;break;}}// cleaner than copying code for multiple catch statements and better than catching all// exceptions, so we just handle the ones we expect.if (ie != null) {// cancel the rest of the tasks, and close the poolfor (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :uploads.entrySet()) {Future<TransferFsImage.TransferResult> upload = entry.getValue();// The background thread may be blocked waiting in the throttler, so// interrupt it.upload.cancel(true);}// shutdown so we interrupt anything running and don't start anything newexecutor.shutdownNow();// this is a good bit longer than the thread timeout, just to make sure all the threads// that are not doing any work also stopexecutor.awaitTermination(500, TimeUnit.MILLISECONDS);// re-throw the exception we got, since one of these two must be non-nullthrow ie;}if (!ioes.isEmpty()) {throw MultipleIOException.createIOException(ioes);}}

saveNamespace(核心)

此过程做了如下操作:

  • endCurrentLogSegment,关闭一下当前的 LogSegment,也就是将edits_inprogress_xxx的文件给重命名到 edits_xxx-xxx
  • 保存 FSImage 文件,即通过FSImageSaver/FSImageFormatProtobuf 等对象将元数据进行序列化到文件中。

序列化文件可参考 Namenode启动加载FsImage的过程

public synchronized void saveNamespace(FSNamesystem source, NameNodeFile nnf,Canceler canceler) throws IOException {assert editLog != null : "editLog must be initialized";LOG.info("Save namespace ...");storage.attemptRestoreRemovedStorage();// 判断 状态 是否 IN_SEGMENTboolean editLogWasOpen = editLog.isSegmentOpen();if (editLogWasOpen) {// 关闭一下当前的 LogSegment,也就是将edits_inprogress_xxx的文件给重命名到 edits_xxx-xxxeditLog.endCurrentLogSegment(true);}long imageTxId = getCorrectLastAppliedOrWrittenTxId();if (!addToCheckpointing(imageTxId)) {throw new IOException("FS image is being downloaded from another NN at txid " + imageTxId);}try {try {// 保存 FSImage 文件,即通过FSImageSaver/FSImageFormatProtobuf 等对象将元数据进行序列化到文件中。// 先是fsimage.ckpt,最后重命名为fsimage_xxxxx,以及fsimage_xxxx.md5文件saveFSImageInAllDirs(source, nnf, imageTxId, canceler);if (!source.isRollingUpgrade()) {updateStorageVersion();}} finally {if (editLogWasOpen) {editLog.startLogSegmentAndWriteHeaderTxn(imageTxId + 1,source.getEffectiveLayoutVersion());// Take this opportunity to note the current transaction.// Even if the namespace save was cancelled, this marker// is only used to determine what transaction ID is required// for startup. So, it doesn't hurt to update it unnecessarily.storage.writeTransactionIdFileToStorage(imageTxId + 1);}}} finally {removeFromCheckpointing(imageTxId);}//Update NameDirSize Metric// 更新name dir size,即存放fsimage目录空间大小getStorage().updateNameDirSize();if (exitAfterSave.get()) {LOG.error("NameNode process will exit now... The saved FsImage " +nnf + " is potentially corrupted.");ExitUtil.terminate(-1);}}

我们经常看到日志中展示如下
standby 节点

2023-03-26 09:25:11,838 INFO org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer: Triggering checkpoint because it has been 180 seconds since the last checkpoint, which exceeds the configured interval 180
2023-03-26 09:25:11,838 INFO org.apache.hadoop.hdfs.server.namenode.FSImage: Save namespace ...
2023-03-26 09:25:11,859 INFO org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf: Saving image file /data/apps/hadoop-3.3.1/data/namenode/current/fsimage.ckpt_0000000000000102477 using no compression
2023-03-26 09:25:11,893 INFO org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf: Image file /data/apps/hadoop-3.3.1/data/namenode/current/fsimage.ckpt_0000000000000102477 of size 825045 bytes saved in 0 seconds .
2023-03-26 09:25:11,904 INFO org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager: Going to retain 2 images with txid >= 102475
2023-03-26 09:25:11,904 INFO org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager: Purging old image FSImageFile(file=/data/apps/hadoop-3.3.1/data/namenode/current/fsimage_0000000000000102471, cpktTxId=0000000000000102471)
2023-03-26 09:25:11,922 INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Sending fileName: /data/apps/hadoop-3.3.1/data/namenode/current/fsimage_0000000000000102477, fileSize: 825045. Sent total: 825045 bytes. Size of last segment intended to send: -1 bytes.
2023-03-26 09:25:11,975 INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Uploaded image with txid 102477 to namenode at http://10.253.128.31:9870 in 0.058 seconds
2023-03-26 09:25:11,975 INFO org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer: Checkpoint finished successfully.

active namenode

2023-03-26 09:19:23,726 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: Starting log segment at 102474
2023-03-26 09:21:23,781 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Roll Edit Log from 10.253.128.33
2023-03-26 09:21:23,781 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: Rolling edit logs
2023-03-26 09:21:23,781 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: Ending log segment 102474, 102474
2023-03-26 09:21:23,782 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: Number of transactions: 2 Total time for transactions(ms): 2 Number of transactions batched in Syncs: 0 Number of syncs: 1 SyncTimes(ms): 4 7
2023-03-26 09:21:23,793 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: Number of transactions: 2 Total time for transactions(ms): 2 Number of transactions batched in Syncs: 0 Number of syncs: 2 SyncTimes(ms): 15 8
2023-03-26 09:21:23,800 INFO org.apache.hadoop.hdfs.server.namenode.FileJournalManager: Finalizing edits file /data/apps/hadoop-3.3.1/data/namenode/current/edits_inprogress_0000000000000102474 -> /data/apps/hadoop-3.3.1/data/namenode/current/edits_0000000000000102474-0000000000000102475
2023-03-26 09:21:23,800 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: Starting log segment at 102476

通过日志能够看出 fsimage的保存,上传是在standby节点上,roll edit 以及 ending log segment是在主节点上进行。

endCurrentLogSegment

此过程做了如下操作:

  • 将本次的OP_END_LOG_SEGMENT操作也记录到editlog中。
  • 调用logSyncAll(),将最新LastWrittenTxId事务刷新持久化到qjm中
  • 调用finalizeLogSegment,通过rpc去完成finalizeLogSegment操作。即qjm中记录同步完成finalizeLogSegment
  • 主节点同步到此次操作,也会同步执行finalizeLogSegment操作。
public synchronized void endCurrentLogSegment(boolean writeEndTxn) {LOG.info("Ending log segment " + curSegmentTxId +", " + getLastWrittenTxId());Preconditions.checkState(isSegmentOpen(),"Bad state: %s", state);if (writeEndTxn) {logEdit(LogSegmentOp.getInstance(cache.get(), FSEditLogOpCodes.OP_END_LOG_SEGMENT));}// always sync to ensure all edits are flushed.logSyncAll();printStatistics(true);final long lastTxId = getLastWrittenTxId();final long lastSyncedTxId = getSyncTxId();Preconditions.checkArgument(lastTxId == lastSyncedTxId,"LastWrittenTxId %s is expected to be the same as lastSyncedTxId %s",lastTxId, lastSyncedTxId);try {// 调用journalSet即JournalManager的所有子集,包括QuorumJournalManager,FileJournalManagerjournalSet.finalizeLogSegment(curSegmentTxId, lastTxId);editLogStream = null;} catch (IOException e) {//All journals have failed, it will be handled in logSync.}state = State.BETWEEN_LOG_SEGMENTS;}

回到fsImage upload的过程

回到主线docheckpoint,保存完fsimage之后,还需要将将 fsImage 上传给 active namenode节点。
http://192.168.128.32:9876/imagetransfer

public static TransferResult uploadImageFromStorage(URL fsName, Configuration conf,NNStorage storage, NameNodeFile nnf, long txid, Canceler canceler)throws IOException {URL url = new URL(fsName, ImageServlet.PATH_SPEC);long startTime = Time.monotonicNow();try {uploadImage(url, conf, storage, nnf, txid, canceler);} catch (HttpPutFailedException e) {// translate the error code to a result, which is a bit more obvious in usageTransferResult result = TransferResult.getResultForCode(e.getResponseCode());if (result.shouldReThrowException) {throw e;}return result;}double xferSec = Math.max(((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001);LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName+ " in " + xferSec + " seconds");return TransferResult.SUCCESS;}

ImageServlet.doPut接受fsimage文件

上传standby namenode上传 就会对应active namenode那边必须接受此次上传过来的文件

代码量较大,总结如下几点:

  • 校验请求是否合法
  • 当前节点是否是active state(不是的话,不能接受)
  • 当前节点是否正在上传旧的事务
  • 当前事务id所属的fsimage文件是否存在,即是否已经执行过checkpoint
  • 调用TransferFsImage.handleUploadImageRequest进行保存啦。
  • 保存fsimage的md5文件了
  • 删除旧的fsimage文件,按照配置保留最近n个。过程结束
protected void doPut(final HttpServletRequest request,final HttpServletResponse response) throws ServletException, IOException {try {ServletContext context = getServletContext();final FSImage nnImage = getAndValidateFSImage(context, response);final Configuration conf = (Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);final PutImageParams parsedParams = new PutImageParams(request, response,conf);final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();final boolean checkRecentImageEnable;Object checkRecentImageEnableObj =context.getAttribute(RECENT_IMAGE_CHECK_ENABLED);if (checkRecentImageEnableObj != null) {if (checkRecentImageEnableObj instanceof Boolean) {checkRecentImageEnable = (boolean) checkRecentImageEnableObj;} else {// This is an error case, but crashing NN due to this// seems more undesirable. Only log the error and set to default.LOG.error("Expecting boolean obj for setting checking recent image, "+ "but got " + checkRecentImageEnableObj.getClass() + ". This is "+ "unexpected! Setting to default.");checkRecentImageEnable = RECENT_IMAGE_CHECK_ENABLED_DEFAULT;}} else {checkRecentImageEnable = RECENT_IMAGE_CHECK_ENABLED_DEFAULT;}validateRequest(context, conf, request, response, nnImage,parsedParams.getStorageInfoString());UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {@Overridepublic Void run() throws Exception {// if its not the active NN, then we need to notify the caller it was was the wrong// target (regardless of the fact that we got the image)HAServiceProtocol.HAServiceState state = NameNodeHttpServer.getNameNodeStateFromContext(getServletContext());if (state != HAServiceProtocol.HAServiceState.ACTIVE &&state != HAServiceProtocol.HAServiceState.OBSERVER) {// we need a different response type here so the client can differentiate this// from the failure to upload due to (1) security, or (2) other checkpoints already// presentsendError(response, HttpServletResponse.SC_EXPECTATION_FAILED,"Nameode "+request.getLocalAddr()+" is currently not in a state which can "+ "accept uploads of new fsimages. State: "+state);return null;}final long txid = parsedParams.getTxId();String remoteAddr = request.getRemoteAddr();ImageUploadRequest imageRequest = new ImageUploadRequest(txid, remoteAddr);final NameNodeFile nnf = parsedParams.getNameNodeFile();// if the node is attempting to upload an older transaction, we ignore itSortedSet<ImageUploadRequest> larger = currentlyDownloadingCheckpoints.tailSet(imageRequest);if (larger.size() > 0) {sendError(response, HttpServletResponse.SC_CONFLICT,"Another checkpointer is already in the process of uploading a" +" checkpoint made up to transaction ID " + larger.last());return null;}//make sure no one else has started uploading oneif (!currentlyDownloadingCheckpoints.add(imageRequest)) {sendError(response, HttpServletResponse.SC_CONFLICT,"Either current namenode is checkpointing or another"+ " checkpointer is already in the process of "+ "uploading a checkpoint made at transaction ID "+ txid);return null;}long now = System.currentTimeMillis();long lastCheckpointTime =nnImage.getStorage().getMostRecentCheckpointTime();long lastCheckpointTxid =nnImage.getStorage().getMostRecentCheckpointTxId();long checkpointPeriod =conf.getTimeDuration(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT, TimeUnit.SECONDS);checkpointPeriod = Math.round(checkpointPeriod * recentImageCheckTimePrecision);long checkpointTxnCount =conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY,DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);long timeDelta = TimeUnit.MILLISECONDS.toSeconds(now - lastCheckpointTime);// Since the goal of the check below is to prevent overly// frequent upload from Standby, the check should only be done// for the periodical upload from Standby. For the other// scenarios such as rollback image and ckpt file, they skip// this check, see HDFS-15036 for more info.if (checkRecentImageEnable &&NameNodeFile.IMAGE.equals(parsedParams.getNameNodeFile()) &&timeDelta < checkpointPeriod &&txid - lastCheckpointTxid < checkpointTxnCount) {// only when at least one of two conditions are met we accept// a new fsImage// 1. most recent image's txid is too far behind// 2. last checkpoint time was too oldString message = "Rejecting a fsimage due to small time delta "+ "and txnid delta. Time since previous checkpoint is "+ timeDelta + " expecting at least " + checkpointPeriod+ " txnid delta since previous checkpoint is " +(txid - lastCheckpointTxid) + " expecting at least "+ checkpointTxnCount;LOG.info(message);sendError(response, HttpServletResponse.SC_CONFLICT, message);return null;}try {if (nnImage.getStorage().findImageFile(nnf, txid) != null) {String message = "Either current namenode has checkpointed or "+ "another checkpointer already uploaded an "+ "checkpoint for txid " + txid;LOG.info(message);sendError(response, HttpServletResponse.SC_CONFLICT, message);return null;}InputStream stream = request.getInputStream();try {long start = monotonicNow();MD5Hash downloadImageDigest = TransferFsImage.handleUploadImageRequest(request, txid,nnImage.getStorage(), stream,parsedParams.getFileSize(), getThrottler(conf));nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,downloadImageDigest);// Metrics non-null only when used inside name nodeif (metrics != null) {long elapsed = monotonicNow() - start;metrics.addPutImage(elapsed);}// Now that we have a new checkpoint, we might be able to// remove some old ones.nnImage.purgeOldStorage(nnf);} finally {// remove the request once we've processed it, or it threw an error, so we// aren't using it eithercurrentlyDownloadingCheckpoints.remove(imageRequest);stream.close();}} finally {nnImage.removeFromCheckpointing(txid);}return null;}});} catch (Throwable t) {String errMsg = "PutImage failed. " + StringUtils.stringifyException(t);sendError(response, HttpServletResponse.SC_GONE, errMsg);throw new IOException(errMsg);}}

希望对正在查看文章的您有所帮助,记得关注、评论、收藏,谢谢您

【大数据Hadoop】HDFS-HA模式下Checkpointer机制代码分析相关推荐

  1. HDFS HA模式下支持只连接其中Active的NameNode

    HDFS HA模式下支持只连接其中Active的NameNode:

  2. [喵咪大数据]Hadoop集群模式

    既然是大数据无论存储和处理都需要相当大的磁盘或者是处理的资源消耗,那么单机肯定是满足不了我们的需求的,所以本节我们就来了解Hadoop的集群模式搭建,在集群情况下一同配合处理任务分发,存储分担等相关的 ...

  3. 大数据时代的隐身模式下的大数据创业公司

    "大数据之所以有趣,是因为它将是未来许多年时间里的一个重大投资领域.大数据浪潮将持续很久,而不会是18个月或24个月以后就宣告终结."风险投资公司Accel Partners普通合 ...

  4. 大数据hadoop环境部署

    大数据hadoop在linux环境下的部署安装,包括单节点模式.伪分布模式以及完全分布式模式

  5. 大数据Hadoop之——Hadoop HDFS多目录磁盘扩展与数据平衡实战操作

    文章目录 一.概述 二.Hadoop DataNode多目录磁盘配置 1)配置hdfs-site.xml 2)配置详解 1. dfs.datanode.data.dir 2.dfs.datanode. ...

  6. java基础巩固-宇宙第一AiYWM:为了维持生计,大数据Hadoop之HDFS分布式文件系统(HDFS读写流程、主从集群两种问题“单点故障”及“压力过大内存受限”、HDFS的架构设计)~整起

    Hadoop之HDFS 目录 一.大数据 二.HADOOP 三.HDFS 1.HDFS基本概念 2.HDFS的架构设计 3.HDFS自己对于上面两种数据持久化技术的实现: 4.HDFS读写流程 5.H ...

  7. 2021年大数据Hadoop(十四):HDFS的高可用机制

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 HDFS的高可用机制 HDFS高可用介绍 组件介绍 Nam ...

  8. 2021年大数据Hadoop(八):HDFS的Shell命令行使用

    2021大数据领域优质创作博客,带你从入门到精通,该博客每天更新,逐渐完善大数据各个知识体系的文章,帮助大家更高效学习. 有对大数据感兴趣的可以关注微信公众号:三帮大数据 目录 HDFS的Shell命 ...

  9. 大数据Hadoop之HDFS和MapReduce_02_01

    Hadoop 主要由HDFS和MapReduce 引擎两部分组成.最底部是HDFS,它存储hadoop集群中所有存储节点上的文件.HDFS 的上一层是MapReduce 引擎,该引擎由JobTrack ...

最新文章

  1. Linux下KickStart+PXE无人值守装机服务器的配置
  2. 中国移动、联通、电信、第三方平台IDC市场机房布局之争
  3. Launch debug in SWI1 workflow
  4. MATLAB摄像头可以运行但是打不开视频
  5. python 获取帮助页_Python-爬取页面内容(涉及urllib、requests、UserAgent、Json等)
  6. 【BZOJ1015】【tyvj3487】星球大战starwar,特别的并查集技巧
  7. 倍投计算器工具_天天基金网上实用的小工具
  8. mysql执行计划id相同_MySQL|MySQL执行计划
  9. MySQL 用gourp by分组后取某一字段最大值
  10. 在mysql 使用binlog日志
  11. 计算机网络实验二:网络基础编程实验
  12. ps4计算机,设置PS4连接电脑
  13. 谈谈任务调度的四种实现方式
  14. 女孩起名字:100个优秀的女孩名字大全
  15. elite php,Elite Chat (开源在线客服系统)
  16. import javax.jws 出错
  17. 路由器wifi信号测试软件,顶级无线路由器信号强度测试(5GHz)_网络设备无线网络和技术-中关村在线...
  18. VS2012 处理器架构“x86”不匹配 通过配置管理器更改您的项目的目标处理器架构...
  19. 如何快速推广引流?利用微博截流疯狂获取流量
  20. 图像处理--特征匹配

热门文章

  1. 短视频转发平台是什么意思?
  2. uboot 网络驱动模型
  3. 图的割点 桥 双连通(byvoid)
  4. 开发者危机!GitHub、GitLab 全部大裁员
  5. Windows Server 2003 服务器插入移动硬盘不显示
  6. LNK2019 无法解析的外部符号“XXX”()中引用了该符号,原因之一
  7. [WiFi教程] 轻松教你支持ZTE中兴客户端
  8. 用铸造涂料中的消泡剂消除泡沫能对生产能起到哪些作用?
  9. 存储区域网(SAN-Storage Area Network)
  10. 8-ARM-PEG10k-Dopamine可以减少多肽的免疫原性,8-ARM-PEG10k-DOPA