Table of Contents

1. 构造方法

2.常量

3.RedundancyMonitor

3.1.前言

3.2.computeDatanodeWork

3.3.processPendingReconstructions

3.4.rescanPostponedMisreplicatedBlocks


BlockManager类保存并且管理了HDFS集群中所有数据块的元数据。

1. 构造方法

FSNamesystem在调用构造方法初始化的时候会构建BlockManager对象,

this.blockManager = new BlockManager(this, haEnabled, conf);

我们看一下构建BlockManager的时候都干了啥 [真的是很长很长, 不想看 ]

看几个比较重要的默认值:

defaultReplication                = 3      默认副本 3
    maxReplication                     = 512    最大副本 512
    minReplication                       = 1      最小副本 1
    maxReplicationStreams        = 2      最大复制流限制(可能会影响到datanode)
    redundancyRecheckInterval  = 3000ms 检查频率
    encryptDataTransfer              = false  是否颁发块加密密钥。
    maxNumBlocksToLog            = 1000   块报告期间要记录信息的最大块数。

public BlockManager(final Namesystem namesystem, boolean haEnabled,final Configuration conf) throws IOException {//设置Namesystem对象this.namesystem = namesystem;// 构建 DatanodeManagerdatanodeManager = new DatanodeManager(this, namesystem, conf);// 获取心跳管理器 HeartbeatManagerheartbeatManager = datanodeManager.getHeartbeatManager();// 构建BlockIdManager 对象this.blockIdManager = new BlockIdManager(this);// dfs.namenode.blocks.per.postponedblocks.rescan : 10000blocksPerPostpondedRescan = (int)Math.min(Integer.MAX_VALUE,datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan());// 0rescannedMisreplicatedBlocks =new ArrayList<Block>(blocksPerPostpondedRescan);startupDelayBlockDeletionInMs = conf.getLong(DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L;invalidateBlocks = new InvalidateBlocks(datanodeManager.getBlockInvalidateLimit(),startupDelayBlockDeletionInMs,blockIdManager);// Compute the map capacity by allocating 2% of total memoryblocksMap = new BlocksMap(  LightWeightGSet.computeCapacity(2.0, "BlocksMap"));placementPolicies = new BlockPlacementPolicies(conf, datanodeManager.getFSClusterStats(),datanodeManager.getNetworkTopology(),datanodeManager.getHost2DatanodeMap());storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite();// dfs.namenode.reconstruction.pending.timeout-sec : 300pendingReconstruction = new PendingReconstructionBlocks(conf.getInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)* 1000L);createSPSManager(conf);// 构建createBlockTokenSecretManagerblockTokenSecretManager = createBlockTokenSecretManager(conf);providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);// dfs.corruptfilesreturned.max  500this.maxCorruptFilesReturned = conf.getInt(DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);// 副本数:  dfs.replication  3this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,DFSConfigKeys.DFS_REPLICATION_DEFAULT);// 副本最大值: dfs.replication.max  512final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY,DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);// 副本最小值 : dfs.namenode.replication.min 1final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);if (minR <= 0)throw new IOException("Unexpected configuration parameters: "+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY+ " = " + minR + " <= 0");if (maxR > Short.MAX_VALUE)throw new IOException("Unexpected configuration parameters: "+ DFSConfigKeys.DFS_REPLICATION_MAX_KEY+ " = " + maxR + " > " + Short.MAX_VALUE);if (minR > maxR)throw new IOException("Unexpected configuration parameters: "+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY+ " = " + minR + " > "+ DFSConfigKeys.DFS_REPLICATION_MAX_KEY+ " = " + maxR);//设置副本的最大值 512 和 最小值  1      short 最值: 3万多...this.minReplication = (short)minR;this.maxReplication = (short)maxR;// dfs.namenode.replication.max-streams : 2this.maxReplicationStreams =conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);// dfs.namenode.replication.max-streams-hard-limit  4this.replicationStreamsHardLimit =conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);// dfs.namenode.replication.work.multiplier.per.iteration : 2this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);// dfs.namenode.redundancy.interval.seconds  3this.redundancyRecheckIntervalMs = conf.getTimeDuration(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT,TimeUnit.SECONDS, TimeUnit.MILLISECONDS);// dfs.namenode.storageinfo.defragment.interval.ms : 10 * 60 * 1000this.storageInfoDefragmentInterval =conf.getLong(DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY,DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT);// dfs.namenode.storageinfo.defragment.timeout.ms : 4this.storageInfoDefragmentTimeout =conf.getLong(DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY,DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT);// dfs.namenode.storageinfo.defragment.ratio : 0.75fthis.storageInfoDefragmentRatio =conf.getDouble(DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY,DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT);this.encryptDataTransfer =conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);// dfs.namenode.max-num-blocks-to-log : 1000 [对块报告后由NAMENODE打印到日志块的数量进行限制。]this.maxNumBlocksToLog =conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);this.numBlocksPerIteration = conf.getInt(DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT,DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT);final int minMaintenanceR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT);if (minMaintenanceR < 0) {throw new IOException("Unexpected configuration parameters: "+ DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY+ " = " + minMaintenanceR + " < 0");}if (minMaintenanceR > defaultReplication) {throw new IOException("Unexpected configuration parameters: "+ DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY+ " = " + minMaintenanceR + " > "+ DFSConfigKeys.DFS_REPLICATION_KEY+ " = " + defaultReplication);}this.minReplicationToBeInMaintenance = (short)minMaintenanceR;long heartbeatIntervalSecs = conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs);pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout);this.blockReportLeaseManager = new BlockReportLeaseManager(conf);bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);int queueSize = conf.getInt(DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_KEY,DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT);blockReportThread = new BlockReportProcessingThread(queueSize);//    defaultReplication         = 3      默认副本 3
//    maxReplication             = 512    最大副本 512
//    minReplication             = 1      最小副本 1
//    maxReplicationStreams      = 2      最大复制流限制(可能会影响到datanode)
//    redundancyRecheckInterval  = 3000ms 检查频率
//    encryptDataTransfer        = false  是否颁发块加密密钥。
//    maxNumBlocksToLog          = 1000   块报告期间要记录信息的最大块数。LOG.info("defaultReplication         = {}", defaultReplication);LOG.info("maxReplication             = {}", maxReplication);LOG.info("minReplication             = {}", minReplication);LOG.info("maxReplicationStreams      = {}", maxReplicationStreams);LOG.info("redundancyRecheckInterval  = {}ms", redundancyRecheckIntervalMs);LOG.info("encryptDataTransfer        = {}", encryptDataTransfer);LOG.info("maxNumBlocksToLog          = {}", maxNumBlocksToLog);}

2.常量

太多了,挑几个列出来.

private final Namesystem namesystem;/*** Mapping: Block -> { BlockCollection, datanodes, self ref }* Updated only in response to client-sent information.*/final BlocksMap blocksMap;/*** 损坏的数据块副本集合** Store blocks -> datanodedescriptor(s) map of corrupt replicas** corruptReplicas:* CorruptReplicasMap类的实例, 保存损坏的数据块副本(corruptReplica) ,* Datanode的数据块扫描器发现的错误的数据块副本会放入这个集合中。** CorruptReplicasMap保存的是损坏的数据块副本与保存这个副本的Datanode的对应关系(Block ->Datanode的映射关系) ,* 注意这里同时还保存了这个副本损坏的原因。* */final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();/*** 等待删除的数据块副本集合* Blocks to be invalidated.* For a striped block to invalidate, we should track its individual internal* blocks.** invalidateBlocks:* InvalidateBlocks类的实例, 保存等待删除的数据块副本。* 加入这个队列中的数据块副本会由Namenode通过名字节点指令向对应的Datanode下发删除指令。**/private final InvalidateBlocks invalidateBlocks;/**** 推迟操作的数据块副本结合** postponedMisreplicatedBlocks:** 当Namenode发生异常, 进行了Active与Standby切换时,多余的副本不能立即删除,* 需要先放入postponedMisreplicatedBlocks队列中,* 直到这个数据块的所有副本所在的Datanode都进行了块汇报。*** After a failover, over-replicated blocks may not be handled* until all of the replicas have done a block report to the* new active.** This is to make sure that this NameNode has been* notified of all block deletions that might have been pending* when the failover happened.**/private final Set<Block> postponedMisreplicatedBlocks = new LinkedHashSet<Block>();/**** 多余的数据块副本集合* Maps a StorageID to the set of blocks that are "extra" for this* DataNode. We'll eventually remove these extras.*/private final ExcessRedundancyMap excessRedundancyMap = new ExcessRedundancyMap();/*** 等待复制的数据块副本集合* Store set of Blocks that need to be replicated 1 or more times.* We also store pending reconstruction-orders.*/public final LowRedundancyBlocks neededReconstruction =  new LowRedundancyBlocks();//   要知道的几个常量默认值 :
//    defaultReplication         = 3      默认副本 3
//    maxReplication             = 512    最大副本 512
//    minReplication             = 1      最小副本 1
//    maxReplicationStreams      = 2      最大复制流限制(可能会影响到datanode)
//    redundancyRecheckInterval  = 3000ms 检查频率
//    encryptDataTransfer        = false  是否颁发块加密密钥。
//    maxNumBlocksToLog          = 1000   块报告期间要记录信息的最大块数。// 下面的就是各种manager ....private final Namesystem namesystem;
private final BlockManagerSafeMode bmSafeMode;
private final DatanodeManager datanodeManager;
private final HeartbeatManager heartbeatManager;private final BlockTokenSecretManager blockTokenSecretManager;// Block pool ID used by this namenode
private String blockPoolId;
private final BlockReportLeaseManager blockReportLeaseManager;private final BlockReportLeaseManager blockReportLeaseManager;private final BlockIdManager blockIdManager;// 其他的略.....

3.RedundancyMonitor

3.1.前言

FSNamesystem在启动的时候会调用startCommonServices启动BlockManager中已经定义好的线程,调用的是BlockManager中的activate方法.

  public void activate(Configuration conf, long blockTotal) {pendingReconstruction.start();datanodeManager.activate(conf);this.redundancyThread.setName("RedundancyMonitor");this.redundancyThread.start();storageInfoDefragmenterThread.setName("StorageInfoMonitor");storageInfoDefragmenterThread.start();this.blockReportThread.start();mxBeanName = MBeans.register("NameNode", "BlockStats", this);bmSafeMode.activate(blockTotal);}

从activate方法中我们可以看到,启动了多个线程来处理任务.

会调用PendingReconstructionMonitor#run()  ==> pendingReconstructionCheck()方法

这个线程会遍历pendingReplications集合中保存的所有数据块复制任务, 将超过指定时间(5分钟) 没有确认的复制请求加入超时队列PendingReplicationBlocks.timedOutItems中

它执行以下操作:

1) 此时可获得更强冗余的记录块。

2) 一种用于跟踪重建请求的时间的粗粒度计时器

3) 一个线程,它周期性地标识从未执行过的重建请求。

    /*** 遍历所有项并检测超时项* Iterate through all items and detect timed-out items*/void pendingReconstructionCheck() {synchronized (pendingReconstructions) {Iterator<Map.Entry<BlockInfo, PendingBlockInfo>> iter =  pendingReconstructions.entrySet().iterator();long now = monotonicNow();LOG.debug("PendingReconstructionMonitor checking Q");while (iter.hasNext()) {Map.Entry<BlockInfo, PendingBlockInfo> entry = iter.next();PendingBlockInfo pendingBlock = entry.getValue();// timeout 默认5分钟if (now > pendingBlock.getTimeStamp() + timeout) {BlockInfo block = entry.getKey();synchronized (timedOutItems) {timedOutItems.add(block);}LOG.warn("PendingReconstructionMonitor timed out " + block);NameNode.getNameNodeMetrics().incTimeoutReReplications();iter.remove();}}}}

接下来看一下RedundancyMonitor , redundancyThread的实现类是BlockManager中的内部类ReplicationMonitor,

ReplicationMonitor是一个线程类。 RedundancyMonitor线程会周期性地调用computeDatanodeWork()方法计算数据块的复制和删除任务, 然后调用processPendingReplications()方法将PendingReplicationBlocks.timedOutItems队列中保存的超时任务重新加neededReplications队列中。

@Overridepublic void run() {while (namesystem.isRunning()) {try {// Process recovery work only when active NN is out of safe mode.if (isPopulatingReplQueues()) {//计算 DataNode 节点上 块复制& 块失效的任务computeDatanodeWork();//处理重试挂起任务processPendingReconstructions();// 重新扫描延迟/挂起的blocks 清单.rescanPostponedMisreplicatedBlocks();}TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs);} catch (Throwable t) {if (!namesystem.isRunning()) {LOG.info("Stopping RedundancyMonitor.");if (!(t instanceof InterruptedException)) {LOG.info("RedundancyMonitor received an exception"+ " while shutting down.", t);}break;} else if (!checkNSRunning && t instanceof InterruptedException) {LOG.info("Stopping RedundancyMonitor for testing.");break;}LOG.error("RedundancyMonitor thread received Runtime exception. ",t);terminate(1, t);}}}

看上面代码其实就是干了三件事:

//统计任务.
computeDatanodeWork();

//处理重试挂起任务
processPendingReconstructions();

// 重新扫描延迟/挂起的blocks 清单.
rescanPostponedMisreplicatedBlocks();

我们依次看一下....

3.2.computeDatanodeWork

计算 DataNode 节点上 块复制& 块失效的任务 , DataNode 通过下次心跳的时候,被告知.

记忆几个参数:

1.最多复制datanode节点数2倍的block. 为了防止网络阻塞. 最少会删除block所在的datanode的数量.

2.只有三分之一的datanode 会进行删除操作.

3.每个进行删除操作的Datanode最多可以删除blockInvalidateLimit [1000]个副本

/*** 计算 DataNode 节点上 块复制& 块失效的任务* DataNode 通过下次心跳的时候,被告知.*** ■ 复制操作: 从等待复制的数据块中选出若干个数据块执行复制操作, 然后为这些* 数据块的复制操作选出source源节点以及target目标节点, 最后构造复制指令并在* 下次心跳时将复制指令带回给源节点以执行复制操作。** ■ 删除操作: 从等待删除的数据块副本中选出若干个副本, 然后构造删除指令, 并* 在下次心跳时将删除指令带到目标节点以执行副本的删除操作。*** 对于数据块的复制操作,* 每次复制的数据块数量为集群中Datanode数量的blocksReplWorkMultiplier倍* (由配置项 dfs.namenode.replication.work.multiplier.per.iteration配置, 默认为2) 。** 例如集群中有100个 节点,* ReplicationMonitor在一个周期中只会从neededReplications集合中取出200 (2*100)* 个数据块进行复制操作。*** HDFS之所以这样设计, 是考虑到如果一次冗余复制过多的数据块,* 则会造成HDFS集群的网络拥塞, 所以需要根据Datanode的数量来决定进行复制操作的数据块的数量。*** 而对于数据块的删除操作, 每次进行删除操作的Datanode数量占集群中 Datanode数量的百分比为* blocksInvalidateWorkPct* (由配置项dfs.namenode.invalidate.work.pct.per.iteration配置, 默认为32%)** 而每个进行删除操作的Datanode最多可以删除blockInvalidateLimit个副本* (由配置项dfs.block.invalidate.limit配置, 默认为1000) 。** 例如集群中有100个节点, ReplicationMonitor在一个周期中只会从32个(100*0.32)* Datanode上删除数据块,* 而每个Datanode上最多可删除1000个数据块,* 也就是总共可以删除32000个数据块。*** Compute block replication and block invalidation work that can be scheduled* on data-nodes. The datanode will be informed of this work at the next* heartbeat.* * @return number of blocks scheduled for replication or removal.*/int computeDatanodeWork() {// Blocks should not be replicated or removed if in safe mode.// It's OK to check safe mode here w/o holding lock, in the worst// case extra replications will be scheduled, and these will get// fixed up later.// 在安全模式下, Block 不允许复制或者移除.//if (namesystem.isInSafeMode()) {return 0;}// 存活的 datanode 的数量final int numlive = heartbeatManager.getLiveDatanodeCount();// 可以进行复制的 blocks 的复制因子 , 默认: numlive * 2 ,// 即 最多复制datanode节点数2倍的block. 为了防止网络阻塞.final int blocksToProcess = numlive * this.blocksReplWorkMultiplier;// 进行删除时的阈值  numlive * 32%  即, 只有三分之一的datanode 会进行删除操作final int nodesToProcess = (int) Math.ceil(numlive  * this.blocksInvalidateWorkPct);//调用computeReplicationWork()计算出需要进行备份的副本int workFound = this.computeBlockReconstructionWork(blocksToProcess);// Update countersnamesystem.writeLock();try {this.updateState();this.scheduledReplicationBlocksCount = workFound;} finally {namesystem.writeUnlock();}//调用computeInvalidateWork()计算出需要进行删除的副本workFound += this.computeInvalidateWork(nodesToProcess);return workFound;}

computeBlockReconstructionWork :

将要删除的块 加入到 datanode对应的 DatanodeDescriptor#invalidateBlocks 缓存中, 会通过心跳的方式发送给具体的datanode执行删除操作.

/*** 调度datanode  删除block* Schedule blocks for deletion at datanodes* @param nodesToProcess number of datanodes to schedule deletion work* @return total number of block for deletion*/int computeInvalidateWork(int nodesToProcess) {//获取要删除的block所在的datanode节点final List<DatanodeInfo> nodes = invalidateBlocks.getDatanodes();//打乱排序Collections.shuffle(nodes);//计算出需要操作的任务数量 (取 nodes 和 nodesToProcess 中的最小值 )nodesToProcess = Math.min(nodes.size(), nodesToProcess);int blockCnt = 0;for (DatanodeInfo dnInfo : nodes) {// 将要删除的块 加入到 datanode对应的 DatanodeDescriptor#invalidateBlocks 缓存中, 会通过心跳的方式发送给具体的datanode执行删除操作.int blocks = invalidateWorkForOneNode(dnInfo);if (blocks > 0) {blockCnt += blocks;if (--nodesToProcess == 0) {break;}}}return blockCnt;}

3.3.processPendingReconstructions

从pendingReconstruction中获取处理超时的BlockInfo信息,重新加入neededReconstruction队列进行处理.

 /*** If there were any reconstruction requests that timed out, reap them* and put them back into the neededReconstruction queue*/void processPendingReconstructions() {// 获取超时的BlockInfo 信息BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();if (timedOutItems != null) {namesystem.writeLock();try {for (int i = 0; i < timedOutItems.length; i++) {/** Use the blockinfo from the blocksmap to be certain we're working* with the most up-to-date block information (e.g. genstamp).*/BlockInfo bi = blocksMap.getStoredBlock(timedOutItems[i]);if (bi == null) {continue;}NumberReplicas num = countNodes(timedOutItems[i]);if (isNeededReconstruction(bi, num)) {// 重新加入neededReconstructionneededReconstruction.add(bi, num.liveReplicas(),num.readOnlyReplicas(), num.outOfServiceReplicas(),getExpectedRedundancyNum(bi));}}} finally {namesystem.writeUnlock();}/* If we know the target datanodes where the replication timedout,* we could invoke decBlocksScheduled() on it. Its ok for now.*/}}

3.4.rescanPostponedMisreplicatedBlocks

重新扫描延迟/挂起的blocks 清单.

BlockManager#processMisReplicatedBlock(BlockInfo)在执行的时候,会根据任务的执行情况返回不同的状态信息.

序号 参数 描述
1 INVALID 该块应无效,因为它属于已删除的文件
2 UNDER_REPLICATED 当前block未被复制
3 OVER_REPLICATED 当前block副本过多
4 POSTPONE 无法处理当前block
5 UNDER_CONSTRUCTION block正在构建,忽略
6 OK block 已经完美复制. 即处理成功
/*** 重新扫描延迟/挂起的blocks 清单.* Rescan the list of blocks which were previously postponed.*/void rescanPostponedMisreplicatedBlocks() {//如果没有延迟/挂起的任务,退出...if (getPostponedMisreplicatedBlocksCount() == 0) {return;}namesystem.writeLock();long startTime = Time.monotonicNow();long startSize = postponedMisreplicatedBlocks.size();try {Iterator<Block> it = postponedMisreplicatedBlocks.iterator();for (int i=0; i < blocksPerPostpondedRescan && it.hasNext(); i++) {Block b = it.next();it.remove();//获取block信息BlockInfo bi = getStoredBlock(b);if (bi == null) {LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +"Postponed mis-replicated block {} no longer found " +"in block map.", b);continue;}// 处理BlockInfo, 并返回响应结果MisReplicationResult res = processMisReplicatedBlock(bi);LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +"Re-scanned block {}, result is {}", b, res);// 将无法正常处理的BlockInfo放入到rescannedMisreplicatedBlocks队列中if (res == MisReplicationResult.POSTPONE) {rescannedMisreplicatedBlocks.add(b);}}} finally {postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks);rescannedMisreplicatedBlocks.clear();long endSize = postponedMisreplicatedBlocks.size();namesystem.writeUnlock();LOG.info("Rescan of postponedMisreplicatedBlocks completed in {}" +" msecs. {} blocks are left. {} blocks were removed.",(Time.monotonicNow() - startTime), endSize, (startSize - endSize));}}

processMisReplicatedBlock方法为处理block的核心方法,其实就是根据副本的情况进行操作.

/*** 处理一个异常block* 根据需要将其加入到对应的队列中,并返回结果** Process a single possibly misreplicated block. This adds it to the* appropriate queues if necessary, and returns a result code indicating* what happened with it.*/private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {//是否被删除if (block.isDeleted()) {// 删除块// block does not belong to any fileaddToInvalidates(block);return MisReplicationResult.INVALID;}//block正在构建if (!block.isComplete()) {// Incomplete blocks are never considered mis-replicated --// they'll be reached when they are completed or recovered.return MisReplicationResult.UNDER_CONSTRUCTION;}// 计算当前冗余// calculate current redundancyshort expectedRedundancy = getExpectedRedundancyNum(block);//获取block的副本数NumberReplicas num = countNodes(block);final int numCurrentReplica = num.liveReplicas();// add to low redundancy queue if need to be// 如果需要,添加到低冗余队列if (isNeededReconstruction(block, num)) {if (neededReconstruction.add(block, numCurrentReplica,num.readOnlyReplicas(), num.outOfServiceReplicas(),expectedRedundancy)) {return MisReplicationResult.UNDER_REPLICATED;}}//是否需要进行额外的副本冗余if (shouldProcessExtraRedundancy(num, expectedRedundancy)) {if (num.replicasOnStaleNodes() > 0) {// If any of the replicas of this block are on nodes that are// considered "stale", then these replicas may in fact have// already been deleted.//// So, we cannot safely act on the// over-replication until a later point in time, when// the "stale" nodes have block reported.//如果此块的任何副本位于被视为“stale”的节点上,则这些副本实际上可能已被删除。//因此,在稍后的时间点(当“过时”节点报告了块)之前,我们无法安全地对过度复制进行操作。return MisReplicationResult.POSTPONE;}// extra redundancy block//处理额外冗余块processExtraRedundancyBlock(block, expectedRedundancy, null, null);return MisReplicationResult.OVER_REPLICATED;}return MisReplicationResult.OK;}

Hadoop3.2.1 【 HDFS 】源码分析 :BlockManager解析 [二]相关推荐

  1. HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程

    在<HDFS源码分析心跳汇报之数据结构初始化>一文中,我们了解到HDFS心跳相关的BlockPoolManager.BPOfferService.BPServiceActor三者之间的关系 ...

  2. HDFS源码分析心跳汇报之数据结构初始化

    在<HDFS源码分析心跳汇报之整体结构>一文中,我们详细了解了HDFS中关于心跳的整体结构,知道了BlockPoolManager.BPOfferService和BPServiceActo ...

  3. HDFS源码分析DataXceiver之整体流程

    在<HDFS源码分析之DataXceiverServer>一文中,我们了解到在DataNode中,有一个后台工作的线程DataXceiverServer.它被用于接收来自客户端或其他数据节 ...

  4. idea 线程内存_Java线程池系列之-Java线程池底层源码分析系列(二)

    课程简介: 课程目标:通过本课程学习,深入理解Java线程池,提升自身技术能力与价值. 适用人群:具有Java多线程基础的人群,希望深入理解线程池底层原理的人群. 课程概述:多线程的异步执行方式,虽然 ...

  5. MyBatis 源码分析 - 配置文件解析过程

    文章目录 * 本文速览 1.简介 2.配置文件解析过程分析 2.1 配置文件解析入口 2.2 解析 properties 配置 2.3 解析 settings 配置 2.3.1 settings 节点 ...

  6. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

  7. hdfs源码分析第二弹

    以写文件为例,串联整个流程的源码: FSDataOutputStream out = fs.create(outFile); 1. DistributedFileSystem 继承并实现了FileSy ...

  8. 【转】ABP源码分析四十二:ZERO的身份认证

    ABP Zero模块通过自定义实现Asp.Net Identity完成身份认证功能, 对Asp.Net Identity做了较大幅度的扩展.同时重写了ABP核心模块中的permission功能,以实现 ...

  9. 【转】ABP源码分析三十二:ABP.SignalR

    Realtime Realtime是ABP底层模块提供的功能,用于管理在线用户.它是使用SignalR实现给在线用户发送通知的功能的前提 IOnlineClient/OnlineClient: 封装在 ...

  10. PDF阅读器系列之--MuPDF源码分析过程(二)

    博客找回来了,在那更新 http://blog.csdn.net/sky_pjf 前 时间好快,又一周过了,发现自己太忙了,博客都没去管-- 序 *MuPDF开源框架现在一直都在维护,我一般都会隔一周 ...

最新文章

  1. 1.1 Java数组简介:数组是什么?
  2. android 之ViewStub
  3. Java内存管理(一)--内存分区
  4. curl 安装_实用干货——如何使用curl命令下载文件
  5. Java 字符串使用之性能优化实践
  6. 利用EEPROM实现arduino的断电存储
  7. 软件工程期末考试题库(超全)
  8. java 高淇讲的怎么样_反射机制--高淇Java视频笔记
  9. 大厂P5、P6、P7级程序员的简历长什么样?
  10. 如何完整保存离线网页
  11. 视觉SLAM十四讲slambook2/ch3/examples/plotTrajectory.cpp程序报错解决
  12. Linux下core文件介绍与使用方法
  13. 苹果蓝牙日志如何获取
  14. 一个简单的问卷调查服务端设计
  15. MindFusion教程:Charting for Java Swing中的FunctionSeries
  16. vba 全拼_EXCEL中直接把中文转换成拼音全拼
  17. Linux 学习 -- 容器技术
  18. c语言判断后缀是否为bmp,c语言_常见图片格式判断
  19. SAP中源清单自动无法生成的原因
  20. C语言12M晶振,STM32将12M晶振换8M晶振后Keil及程序的相关设置

热门文章

  1. MacOs12Macbookpro读写NTFS
  2. Bootstrap Timepicker
  3. uygurqa输入法android,uygurqa键盘输入法
  4. 计算机PS考试都考哪些,计算机专业ps考试题(考查课)(10页)-原创力文档
  5. c语言引用win api,C调用WinAPI及窗口过程
  6. mysql分页查询公式
  7. 中兴WCDMA模块 Linux拨号流程
  8. Scratch-贪吃蛇小游戏
  9. 小程序 tabBar菜单
  10. MATLAB 插值与拟合