概述

在输入启动命令的那台机器上会启动一个进程,为了避免给namenode带来过大的负担,整个balance过程由balance server而不是namenode来控制。

Balancer的最终结果是namenode上记录的一个block的一个副本从一个datanode转移到另一个datanode上。

PS:副本放置策略

  • 第 2 个副本存放于不同于第 1 个副本所在的机架

  • 第 3 个副本存放于第2个副本所在的机架,但 是属于不同的节点

*绿色区域表示一个副本

balance策略

把各个数据节点分成过载节点、负载节点、存储使用率高于平均水平的节点和低于平均水平的节点四类,再判断是否有节点处于过载和负载状态(也即过载节点列表和负载节点列表中是否有机器),如果是则继续,否则退出。如果判断可继续,则遍历过载节点列表和负载节点列表以生成balance策略。生成balance策略的过程包括以下步骤:

a、选择数据移动的源节点和目的节点,选择依据如下:

对于负载节点,依据以下条件随机选取选取作为其source,条件优先级自上而下递减

  • 同一机架上的过载节点

  • 同一机架上的高于平均使用率的节点

  • 其他机架上的过载节点

  • 其他机架上的高于平均使用率的节

对于过载节点,依据以下条件随机选取选取作为其target,条件优先级自上而下递减

  • 同一机架上的负载节点

  • 同一机架上的低于平均使用率的节点

  • 其他机架上的负载节点

  • 其他机架上的低于平均使用率的节点

b、计算每一个source到每个destination要移动的数据量(注意以byte为单位而不是block)。

如果source节点是过载节点,则看容积允许偏差值是否大于1GB,大于则取1GB,否则取允许偏差值。如果source只是高于平均使用率而没有达到过载的条件,则看该节点实际容积率与集群平均容积率之差是否大于2GB,大于取2GB,否则取前者。destination节点也如此计算。

dispatchBlockMoves

private long dispatchBlockMoves() throws InterruptedException {final long bytesLastMoved = getBytesMoved();final Future<?>[] futures = new Future<?>[sources.size()];final Iterator<Source> i = sources.iterator();for (int j = 0; j < futures.length; j++) {final Source s = i.next();futures[j] = dispatchExecutor.submit(new Runnable() {@Overridepublic void run() {// 开始执行s.dispatchBlocks();}});}// wait for all dispatcher threads to finishfor (Future<?> future : futures) {try {future.get();} catch (ExecutionException e) {LOG.warn("Dispatcher thread failed", e.getCause());}}// wait for all block moving to be donewaitForMoveCompletion(targets);return getBytesMoved() - bytesLastMoved;}
private void dispatchBlocks() {this.blocksToReceive = 2 * getScheduledSize();int noPendingMoveIteration = 0;LOG.info(getScheduledSize() > 0 && !isIterationOver()&& (!srcBlocks.isEmpty() || blocksToReceive > 0));while (getScheduledSize() > 0 && !isIterationOver()&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {if (LOG.isTraceEnabled()) {LOG.trace(this + " blocksToReceive=" + blocksToReceive+ ", scheduledSize=" + getScheduledSize()+ ", srcBlocks#=" + srcBlocks.size());}final PendingMove p = chooseNextMove();if (p != null) {// Reset no pending move counternoPendingMoveIteration=0;executePendingMove(p);continue;}// Since we cannot schedule any block to move,// remove any moved blocks from the source block list andremoveMovedBlocks(); // filter already moved blocks// check if we should fetch more blocks from the namenodeif (shouldFetchMoreBlocks()) {// fetch new blockstry {final long received = getBlockList();if (received == 0) {return;}blocksToReceive -= received;continue;} catch (IOException e) {LOG.warn("Exception while getting block list", e);return;}} else {// source node cannot find a pending block to move, iteration +1noPendingMoveIteration++;// in case no blocks can be moved for source node's task,// jump out of while-loop after 5 iterations.if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {LOG.info("Failed to find a pending move "  + noPendingMoveIteration+ " times.  Skipping " + this);resetScheduledSize();}}// Now we can not schedule any block to move and there are// no new blocks added to the source block list, so we wait.try {synchronized (Dispatcher.this) {Dispatcher.this.wait(1000); // wait for targets/sources to be idle}} catch (InterruptedException ignored) {}}if (isIterationOver()) {LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000+ " seconds) has been reached. Stopping " + this);}}
  • 选择一个合适的block和一个proxy新建PendingMove对象

  • 把这个block添加到movedBlocks队列(标记为已经移动过)

  • 从srcBlocks队列中移除(这个队列为空需要从namenode上随机拉取一定数量的block,最多一次2G,筛选之后保存到src_block列表中。触发拉取的条件之一:src_block列表中尚未被迁移的block数量少于5(固定值,不可配)),每次拉取新的block的时候会排除掉已经在movedBlocks队列里的。

  • 执行一次移动(将src_block列表中的block提交到线程池(线程池大小:dfs.balancer.moverThreads,默认1000)进行迁移。)

getBlockList

  • 从namenode上随机拉取一定数量的block(每次最多2G,累计20G),筛选之后保存到src_block列表中。触发拉取的条件之一:src_block列表中尚未被迁移的block数量少于5(固定值,不可配)。

  • MAX_BLOCKS_SIZE_TO_FETCH这个参数默认是2G,blocksToReceive一开始默认是2*scheduledSize = 20G,也就是说一次dispatchBlockMove最多20G(外面的while循环控制)(!srcBlocks.isEmpty() || blocksToReceive > 0)

  • 拉取完之后筛选一遍

private long getBlockList() throws IOException {final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);if (LOG.isTraceEnabled()) {LOG.trace("getBlocks(" + getDatanodeInfo() + ", "+ StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2)+ ") returns " + newBlocks.getBlocks().length + " blocks.");}long bytesReceived = 0;for (BlockWithLocations blk : newBlocks.getBlocks()) {bytesReceived += blk.getBlock().getNumBytes();synchronized (globalBlocks) {final DBlock block = globalBlocks.get(blk.getBlock());synchronized (block) {block.clearLocations();// update locationsfinal String[] datanodeUuids = blk.getDatanodeUuids();final StorageType[] storageTypes = blk.getStorageTypes();for (int i = 0; i < datanodeUuids.length; i++) {final StorageGroup g = storageGroupMap.get(datanodeUuids[i], storageTypes[i]);if (g != null) { // not unknownblock.addLocation(g);}}}if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {if (LOG.isTraceEnabled()) {LOG.trace("Add " + block + " to " + this);}srcBlocks.add(block);}}}return bytesReceived;}

chooseNextMove

从逻辑上来说,chooseNextMove会选择一个block去新建一个PendingMove对象,然后标记为已经移动。

  • 判断target是否空闲

  • 从srcBlocks队列中选取合适的block并且找一个代理(proxy)

private PendingMove chooseNextMove() {for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {final Task task = i.next();final DDatanode target = task.target.getDDatanode();final PendingMove pendingBlock = new PendingMove(this, task.target);if (target.addPendingBlock(pendingBlock)) {// target is not busy, so do a tentative block allocationif (pendingBlock.chooseBlockAndProxy()) {long blockSize = pendingBlock.block.getNumBytes();incScheduledSize(-blockSize);task.size -= blockSize;if (task.size <= 0) {i.remove();}return pendingBlock;} else {// cancel the tentative movetarget.removePendingBlock(pendingBlock);}}}return null;}

选择block和proxy

如果选择了这个block,并且这个block已经添加到movedBlocks队列中,那么就从srcBlocks队列中移

private boolean chooseBlockAndProxy() {// source and target must have the same storage typefinal StorageType t = source.getStorageType();// iterate all source's blocks until find a good one// 遍历srcBlocks队列for (Iterator<DBlock> i = source.getBlockIterator(); i.hasNext();) {if (markMovedIfGoodBlock(i.next(), t)) {i.remove();return true;}}return false;}

选择合适的block

选取待移动block的时候不能破坏block的分布原则,也即不能造成block丢失,不能使一个block的副本数变少,也不能使一个block放置的机架数变少。选取时依据的原则如下:

  • 如果source和target在不同的机架上,则target所在的机架上不应该有待移动block的副本

  • target上不能有待移动block的副本

  • block不能处于正在被移动的状态/已经移动

  • 不能使一个block放置的机架数变少

  • target上不能有除了source本身以外的其他副本

/*** Decide if the block is a good candidate to be moved from source to target.* A block is a good candidate if* 1. the block is not in the process of being moved/has not been moved;* 2. the block does not have a replica on the target;* 3. doing the move does not reduce the number of racks that the block has*/private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,StorageType targetStorageType, DBlock block) {if (source.equals(target)) {return false;}if (target.storageType != targetStorageType) {return false;}// check if the block is moved or notif (movedBlocks.contains(block.getBlock())) {return false;}final DatanodeInfo targetDatanode = target.getDatanodeInfo();if (source.getDatanodeInfo().equals(targetDatanode)) {// the block is moved inside same DNreturn true;}// check if block has replica in target nodefor (StorageGroup blockLocation : block.getLocations()) {if (blockLocation.getDatanodeInfo().equals(targetDatanode)) {return false;}}if (cluster.isNodeGroupAware()&& isOnSameNodeGroupWithReplicas(source, target, block)) {LOG.info("cluster.isNodeGroupAware()\n" +"        && isOnSameNodeGroupWithReplicas(source, target, block)");return false;}if (reduceNumOfRacks(source, target, block)) {return false;}return true;}

block添加到movedBlocks队列

添加到movedBlocks,表示已经移动过这个block了。

private boolean markMovedIfGoodBlock(DBlock block, StorageType targetStorageType) {synchronized (block) {synchronized (movedBlocks) {if (isGoodBlockCandidate(source, target, targetStorageType, block)) {// PendingMove赋值blockthis.block = block;// 选择source的代理if (chooseProxySource()) {// 添加到已经移动的block队列movedBlocks.put(block);if (LOG.isDebugEnabled()) {LOG.debug("Decided to move " + this);}return true;}}}}return false;}

选择proxy

private boolean chooseProxySource() {final DatanodeInfo targetDN = target.getDatanodeInfo();// if source and target are same nodes then no need of proxyif (source.getDatanodeInfo().equals(targetDN) && addTo(source)) {return true;}// if node group is supported, first try add nodes in the same node groupif (cluster.isNodeGroupAware()) {for (StorageGroup loc : block.getLocations()) {if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN)&& addTo(loc)) {return true;}}}// check if there is replica which is on the same rack with the targetfor (StorageGroup loc : block.getLocations()) {if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) {return true;}}// find out a non-busy replicafor (StorageGroup loc : block.getLocations()) {if (addTo(loc)) {return true;}}return false;}

执行移动

public void executePendingMove(final PendingMove p) {// move the blockmoveExecutor.execute(new Runnable() {@Overridepublic void run() {p.dispatch();}});}

step1:

balancer socket连接target,发起replaceBlock 请求,请求target从proxy上复制一个block副本到本地来替换掉source上的副本。

step2:

target向proxy 发起copyBlock请求,从proxy上将block副本复制到本地,复制完成后 target 通过notifyNamenodeReceivedBlock 方法生成一个ReceivedDeletedBlockInfo对象并缓存在队列,下一次发起心跳的时候会据此对象通知namenode 将target上新加的block副本存入blockmap,并将source上对应的block 副本删除

/** Dispatch the move to the proxy source & wait for the response. */private void dispatch() {LOG.info("Start moving " + this);Socket sock = new Socket();DataOutputStream out = null;DataInputStream in = null;try {sock.connect(NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr(Dispatcher.this.connectToDnViaHostname)),HdfsServerConstants.READ_TIMEOUT);// Set read timeout so that it doesn't hang forever against// unresponsive nodes. Datanode normally sends IN_PROGRESS response// twice within the client read timeout period (every 30 seconds by// default). Here, we make it give up after 5 minutes of no response.sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT * 5);sock.setKeepAlive(true);OutputStream unbufOut = sock.getOutputStream();InputStream unbufIn = sock.getInputStream();ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),block.getBlock());final KeyManager km = nnc.getKeyManager(); Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,unbufIn, km, accessToken, target.getDatanodeInfo());unbufOut = saslStreams.out;unbufIn = saslStreams.in;out = new DataOutputStream(new BufferedOutputStream(unbufOut,HdfsConstants.IO_FILE_BUFFER_SIZE));in = new DataInputStream(new BufferedInputStream(unbufIn,HdfsConstants.IO_FILE_BUFFER_SIZE));sendRequest(out, eb, accessToken);receiveResponse(in);nnc.getBytesMoved().addAndGet(block.getNumBytes());LOG.info("Successfully moved " + this);} catch (IOException e) {LOG.warn("Failed to move " + this, e);target.getDDatanode().setHasFailure();// Proxy or target may have some issues, delay before using these nodes// further in order to avoid a potential storm of "threads quota// exceeded" warnings when the dispatcher gets out of sync with work// going on in datanodes.// 迁移失败,可能是因为proxy、target当前过于繁忙(同时处理blockReplace的操作太多),所以延迟10s其参与balanceproxySource.activateDelay(delayAfterErrors);target.getDDatanode().activateDelay(delayAfterErrors);} finally {IOUtils.closeStream(out);IOUtils.closeStream(in);IOUtils.closeSocket(sock);// 不管迁移成功还是失败,都将当前block从队列中删除proxySource.removePendingBlock(this);target.getDDatanode().removePendingBlock(this);synchronized (this) {reset();}synchronized (Dispatcher.this) {Dispatcher.this.notifyAll();}}}

这里会调用relaceBlock,通知namenode该block已经从datanode1转移到datanode2

/** Send a block replace request to the output stream */private void sendRequest(DataOutputStream out, ExtendedBlock eb,Token<BlockTokenIdentifier> accessToken) throws IOException {new Sender(out).replaceBlock(eb, target.storageType, accessToken,source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);}

HDFS源码解析---Balancer相关推荐

  1. HDFS源码解析:教你用HDFS客户端写数据

    摘要:终于开始了这个很感兴趣但是一直觉得困难重重的源码解析工作,也算是一个好的开端. 本文分享自华为云社区<hdfs源码解析之客户端写数据>,作者: dayu_dls. 在我们客户端写数据 ...

  2. 2015.07.20MapReducer源码解析(笔记)

    MapReducer源码解析(笔记) 第一步,读取数据源,将每一行内容解析成一个个键值对,每个键值对供map函数定义一次,数据源由FileInputFormat:指定的,程序就能从地址读取记录,读取的 ...

  3. e盾服务端源码_gRPC服务注册发现及负载均衡的实现方案与源码解析

    今天聊一下gRPC的服务发现和负载均衡原理相关的话题,不同于Nginx.Lvs或者F5这些服务端的负载均衡策略,gRPC采用的是客户端实现的负载均衡.什么意思呢,对于使用服务端负载均衡的系统,客户端会 ...

  4. gRPC服务注册发现及负载均衡的实现方案与源码解析

    今天聊一下gRPC的服务发现和负载均衡原理相关的话题,不同于Nginx.Lvs或者F5这些服务端的负载均衡策略,gRPC采用的是客户端实现的负载均衡.什么意思呢,对于使用服务端负载均衡的系统,客户端会 ...

  5. feign源码解析 - 运行时

    基于spring-cloud-openfeign-core-2.2.5.RELEASE. 0. 目录 1. 前言 2. 入口`ReflectiveFeign.FeignInvocationHandle ...

  6. sparksql insertinto 源码解析

    本篇源码解析主要来自于对overwrite覆盖写模式的好奇,想追踪下具体覆盖写的流程和如何进行的覆盖重写? sparksql insertinto 主要功能是向已有表中插入数据,其有四种模式:     ...

  7. SeaTunnel 2.1.2的源码解析(5)seatunnel-connectors-flink-http

    SeaTunnel 2.1.2的源码解析(5)seatunnel-connectors-flink-http 本文已参与「开源摘星计划」,欢迎正在阅读的你加入.活动链接:https://github. ...

  8. MapReduce的分片机制源码解析

    目录 一.分⽚的概念 二.分片大小的选择 三.源码解析 1)FileSplit源码解析 2)FileInputFormat源码解析 3)TextInputFormat源码解析 4) LineRecor ...

  9. Flink 全网最全资源(视频、博客、PPT、入门、原理、实战、性能调优、源码解析、问答等持续更新)

    Flink 学习 https://github.com/zhisheng17/flink-learning 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧 ...

  10. [源码解析] 机器学习参数服务器ps-lite (1) ----- PostOffice

    [源码解析] 机器学习参数服务器ps-lite (1) ----- PostOffice 文章目录 [源码解析] 机器学习参数服务器ps-lite (1) ----- PostOffice 0x00 ...

最新文章

  1. 把握数据,驱动未来 | 清华大学大数据研究中心2020年RONG奖学金答辩会成功举办...
  2. 人脸识别算法初次了解
  3. 一个openMP编程处理图像的示例
  4. 单链表的尾插,头插,遍历,查找和插入
  5. mysql 绑定 cpu 节点_MySQL Cluster(MySQL集群)配置
  6. 加入域时出现以下错误 登陆失败 该目标账户名称不正确_Windows 10 20H1新加入的这些功能,你应该用得上...
  7. 忙了好一阵子了 才记起来我的博客园
  8. Linux非root用户部署jdk等命令
  9. staruml怎么画协作图_er图怎么画?轻松绘制专业er图的软件
  10. 杭州计算机学校课程表,超级课程表电脑版
  11. iShare.js分享插件
  12. android toast防重_如何解决android Toast重复显示
  13. 「经济读物」经济学通识
  14. 预测大盘最准确的指标_迄今最权威的大盘预测K线指标下载
  15. 新手入门linux必看
  16. ElasticSerach7.6.0拼音分词器安装和使用
  17. Java代码转换成伪代码生成器_将一段java程序片段改成算法伪代码
  18. 基于matlab/simulink的交流电机调速系统建模与仿真,基于MATLABSIMULINK的交流电机调速系统建模与仿真...
  19. 西门子611驱动器调试软件, 西门子V90伺服调试软件
  20. 关于数据库键(Key)的一些理解

热门文章

  1. 基于JAVA学生用品交换平台计算机毕业设计源码+系统+数据库+lw文档+部署
  2. 综合电商高保真移动端Axure原型模板
  3. 图像处理学习笔记之空间滤波(1)图像的噪声
  4. h2os android版本,h2os属于安卓系统吗
  5. stm32中的“hello world”
  6. 完全指南:在 Linux 中如何打印和管理打印机
  7. kafka2.2源码分析之KafkaChannel
  8. 定时任务执行shell脚本中 grep -v grep 中的坑
  9. 二维码签到的几大优势,你了解几个?
  10. 数据预处理1:无量纲化especially for Scaler