程序入口

  • Hadoop Branch : Hadoop-2.6.0
  • 使用样例 : hadoop dfs -checksum /tmp/README.txt
  • 结果

/tmp/README.txt
MD5-of-0MD5-of-512CRC32C
00000200000000000000000017970719be16d1071635fa381b95f957

算法说明:
“MD5-of-” + crcPerBlock + “MD5-of-” + bytesPerCRC +
getCrcType().name()

crcPerBlock : 这个没想明白,只有对应的文件切分的block个数大于1时才会有值,否则为0
bytesPerCRC : 每512个byte使用CRC校验
CRC32C : CRC32的校验算法

结果说明:
00000200 00000000 00000000 17970719 be16d107 1635fa38 1b95f957
结果为28个byte,
bytesPerCRC 为int类型,使用前4个byte,值为512;
crcPerBlock为Long类型,使用中间8个byte,值为0;
MD5计算结果为最后16个byte
// TODO CRC32 校验算法

HDFS 物理存储格式
-rw-r–r-- 1 wankun wheel 1366 8 3 11:52 blk_1073742101
-rw-r–r-- 1 wankun wheel 19 8 3 11:52 blk_1073742101_1277.meta

meta文件的header信息占用7个字节,剩余3个chunk checkSum,每个占用4个byte,对应于block文件的每512个字节一个checksum.

  • 对应程序入口:org.apache.hadoop.fs.FsShell

FsShell.main() -> run(),cmd为需要执行的命令,根据cmd在commandFactory找到对应要执行的instance,执行instance.run()方法。commandFactory 在FsShell.init()通过反射注册入FsCommand中的命令。

FsCommand

public static void registerCommands(CommandFactory factory) {factory.registerCommands(AclCommands.class);factory.registerCommands(CopyCommands.class);factory.registerCommands(Count.class);factory.registerCommands(Delete.class);factory.registerCommands(Display.class);factory.registerCommands(Find.class);factory.registerCommands(FsShellPermissions.class);factory.registerCommands(FsUsage.class);factory.registerCommands(Ls.class);factory.registerCommands(Mkdir.class);factory.registerCommands(MoveCommands.class);factory.registerCommands(SetReplication.class);factory.registerCommands(Stat.class);factory.registerCommands(Tail.class);factory.registerCommands(Test.class);factory.registerCommands(Touch.class);factory.registerCommands(SnapshotCommands.class);factory.registerCommands(XAttrCommands.class);}
class Display extends FsCommand {
public static void registerCommands(CommandFactory factory) {factory.addClass(Cat.class, "-cat");factory.addClass(Text.class, "-text");factory.addClass(Checksum.class, "-checksum");}}

Checksum -> DistributedFileSystem.getFileChecksum() -> DFSClient.getFileChecksum()

checksum主逻辑

获取对应文件的所有block信息,连接到对应的DN,计算block的MD5校验信息,根据所有block的汇总byte值进行二次MD5计算,最后
文件对应的locatedblocks列表,依次计算该Block MD5值

public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)throws IOException {checkOpen();Preconditions.checkArgument(length >= 0);//get block locations for the file rangeLocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,length);if (null == blockLocations) {throw new FileNotFoundException("File does not exist: " + src);}List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks();final DataOutputBuffer md5out = new DataOutputBuffer();int bytesPerCRC = -1;DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;long crcPerBlock = 0;boolean refetchBlocks = false;int lastRetriedIndex = -1;// get block checksum for each blocklong remaining = length;if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) {remaining = Math.min(length, blockLocations.getFileLength());}// 文件对应的locatedblocks列表,依次计算该Block MD5值for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {if (refetchBlocks) {  // refetch to get fresh tokensblockLocations = callGetBlockLocations(namenode, src, 0, length);if (null == blockLocations) {throw new FileNotFoundException("File does not exist: " + src);}locatedblocks = blockLocations.getLocatedBlocks();refetchBlocks = false;}LocatedBlock lb = locatedblocks.get(i);final ExtendedBlock block = lb.getBlock();if (remaining < block.getNumBytes()) {block.setNumBytes(remaining);}remaining -= block.getNumBytes();final DatanodeInfo[] datanodes = lb.getLocations();//try each datanode location of the blockfinal int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout;boolean done = false;// connectToDN并从DN获取计算好的MD5值for(int j = 0; !done && j < datanodes.length; j++) {DataOutputStream out = null;DataInputStream in = null;try {//connect to a datanodeIOStreamPair pair = connectToDN(datanodes[j], timeout, lb);out = new DataOutputStream(new BufferedOutputStream(pair.out,HdfsConstants.SMALL_BUFFER_SIZE));in = new DataInputStream(pair.in);if (LOG.isDebugEnabled()) {LOG.debug("write to " + datanodes[j] + ": "+ Op.BLOCK_CHECKSUM + ", block=" + block);}// get block MD5// Sender 负责向out输出流中发送Op操作类型和proto参数(Op.BLOCK_CHECKSUM, proto)// Receiver 负责接收Op操作,调用 DataXceiver 进行处理new Sender(out).blockChecksum(block, lb.getBlockToken());final BlockOpResponseProto reply =BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));if (reply.getStatus() != Status.SUCCESS) {if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) {throw new InvalidBlockTokenException();} else {throw new IOException("Bad response " + reply + " for block "+ block + " from datanode " + datanodes[j]);}}OpBlockChecksumResponseProto checksumData =reply.getChecksumResponse();//read byte-per-checksumfinal int bpc = checksumData.getBytesPerCrc();if (i == 0) { //first blockbytesPerCRC = bpc;}else if (bpc != bytesPerCRC) {throw new IOException("Byte-per-checksum not matched: bpc=" + bpc+ " but bytesPerCRC=" + bytesPerCRC);}//read crc-per-blockfinal long cpb = checksumData.getCrcPerBlock();if (locatedblocks.size() > 1 && i == 0) {crcPerBlock = cpb;}//read md5final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());md5.write(md5out);// read crc-typefinal DataChecksum.Type ct;if (checksumData.hasCrcType()) {ct = PBHelper.convert(checksumData.getCrcType());} else {LOG.debug("Retrieving checksum from an earlier-version DataNode: " +"inferring checksum by reading first byte");ct = inferChecksumTypeByReading(lb, datanodes[j]);}if (i == 0) { // first blockcrcType = ct;} else if (crcType != DataChecksum.Type.MIXED&& crcType != ct) {// if crc types are mixed in a filecrcType = DataChecksum.Type.MIXED;}done = true;if (LOG.isDebugEnabled()) {if (i == 0) {LOG.debug("set bytesPerCRC=" + bytesPerCRC+ ", crcPerBlock=" + crcPerBlock);}LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5);}} catch (InvalidBlockTokenException ibte) {if (i > lastRetriedIndex) {if (LOG.isDebugEnabled()) {LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "+ "for file " + src + " for block " + block+ " from datanode " + datanodes[j]+ ". Will retry the block once.");}lastRetriedIndex = i;done = true; // actually it's not done; but we'll retryi--; // repeat at i-th blockrefetchBlocks = true;break;}} catch (IOException ie) {LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie);} finally {IOUtils.closeStream(in);IOUtils.closeStream(out);}}if (!done) {throw new IOException("Fail to get block MD5 for " + block);}}//compute file MD5// MD5加密:根据每个Block MD5值的数组data序列字节更新摘要,生产二次MD5值final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); switch (crcType) {case CRC32:return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,crcPerBlock, fileMD5);case CRC32C:// CRC32C 一种基于Intel硬件指令,加算计算的CRC32算法// https://issues.apache.org/jira/browse/HADOOP-7443return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,crcPerBlock, fileMD5);default:// If there is no block allocated for the file,// return one with the magic entry that matches what previous// hdfs versions return.if (locatedblocks.size() == 0) {return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5);}// we should never get here since the validity was checked// when getCrcType() was called above.return null;}}

DataXceiver 计算Block Checksum

读取Block对应的meta文件内容,并计算其MD5值

  • 如果是整个Block校验,直接使用全部meta文件内容计算其MD5值
  • 如果是部分Block校验,
@Overridepublic void blockChecksum(final ExtendedBlock block,final Token<BlockTokenIdentifier> blockToken) throws IOException {final DataOutputStream out = new DataOutputStream(getOutputStream());checkAccess(out, true, block, blockToken,Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);// client side now can specify a range of the block for checksumlong requestLength = block.getNumBytes();Preconditions.checkArgument(requestLength >= 0);long visibleLength = datanode.data.getReplicaVisibleLength(block);boolean partialBlk = requestLength < visibleLength;updateCurrentThreadName("Reading metadata for block " + block);// 每一个Block都有一个对应的meta文件,metadataIn是meta文件的内容// eg. blk_1073742101  blk_1073742101_1277.metafinal LengthInputStream metadataIn = datanode.data.getMetaDataInputStream(block);final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));updateCurrentThreadName("Getting checksum for block " + block);try {//read metadata file// header content://    short version;//    byte type; // CRC32 or CRC32C 这两种类型ChecksumSize = 4//    int bpc; // bytePerCRC 应该是默认512,即每512个byte进行校验,校验结果占用4个bytefinal BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);final DataChecksum checksum = header.getChecksum();final int csize = checksum.getChecksumSize();final int bytesPerCRC = checksum.getBytesPerChecksum();// metadata的大小(去除header 2+1+4 个byte)/ 校验chunk checkSum(4)的大小,得到chunk checkSum个数final long crcPerBlock = csize <= 0 ? 0 : (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize;// 如果是部分Block校验,【--计算部分Block内容的MD5值--】// 整个Block校验,直接使用MD5加密meta内容final MD5Hash md5 = partialBlk && crcPerBlock > 0 ? calcPartialBlockChecksum(block, requestLength, checksum, checksumIn): MD5Hash.digest(checksumIn);if (LOG.isDebugEnabled()) {LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC+ ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);}//write replyBlockOpResponseProto.newBuilder().setStatus(SUCCESS).setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()             .setBytesPerCrc(bytesPerCRC).setCrcPerBlock(crcPerBlock).setMd5(ByteString.copyFrom(md5.getDigest())).setCrcType(PBHelper.convert(checksum.getChecksumType()))).build().writeDelimitedTo(out);out.flush();} catch (IOException ioe) {LOG.info("blockChecksum " + block + " received exception " + ioe);incrDatanodeNetworkErrors();throw ioe;} finally {IOUtils.closeStream(out);IOUtils.closeStream(checksumIn);IOUtils.closeStream(metadataIn);}//update metricsdatanode.metrics.addBlockChecksumOp(elapsed());}

Block meta文件写入

参考:http://shiyanjun.cn/archives/942.html

HDFS CheckSum相关推荐

  1. HDFS上传文件报错org.apache.hadoop.fs.ChecksumException: Checksum error: file:/hyk/data/hyk.txt

    当从本地上传文件到HDFS中时报错 fs.FSInputChecker: Found checksum error: b[0, 69]=6d6f7765696861686168616861686861 ...

  2. 2021年大数据Hadoop(十):HDFS的数据读写流程

    2021大数据领域优质创作博客,带你从入门到精通,该博客每天更新,逐渐完善大数据各个知识体系的文章,帮助大家更高效学习. 有对大数据感兴趣的可以关注微信公众号:三帮大数据 目录 HDFS的数据读写流程 ...

  3. hadoop系统 hdfs 命令行操作

    转自:https://blog.csdn.net/sjhuangx/article/details/79796388 Hadoop文件系统shell命令列表: https://hadoop.apach ...

  4. hadoop 2 java hdfs_Hadoop2.6.0学习笔记(二)HDFS访问

    鲁春利的工作笔记,谁说程序员不能有文艺范? 通过hadoop shell与java api访问hdfs 工作笔记之Hadoop2.6集群搭建已经将集群环境搭建好了,下面来进行一些HDFS的操作 1.H ...

  5. HDFS的shell和API操作

    1. HDFS的shell操作 hadoop version //查看版本 hadoop fs -appendToFile src(Linux中的文件) dest(hdfs目录下的文件) //追加 h ...

  6. Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)

    一.综述 HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode.DataNode.DFSClient等众多角色的分工与合作. 首先上一段代码,客户端是如何写文件的: ...

  7. 分布式文件系统:HDFS

    学习Hadoop,两个东西肯定是绕不过,MapReduce和HDFS,上一篇博客介绍了MapReduce的处理流程,这一篇博客就来学习一下HDFS. HDFS是一个分布式的文件系统,就是将多台机器的存 ...

  8. HDFS集中式缓存管理(Centralized Cache Management)

    Hadoop从2.3.0版本号開始支持HDFS缓存机制,HDFS同意用户将一部分文件夹或文件缓存在HDFS其中.NameNode会通知拥有相应块的DataNodes将其缓存在DataNode的内存其中 ...

  9. HDFS读写过程解析

    一.文件的打开 1.1.客户端 HDFS打开一个文件,需要在客户端调用DistributedFileSystem.open(Path f, int bufferSize),其实现为: public F ...

最新文章

  1. Android多点触摸交互处理,放大缩小图片
  2. 如何使用Laravel Debugbar?
  3. Java的算法讲解以及案例!
  4. Servlet的生命周期 与CGI的区别
  5. bt解析 开源 java_修复开源项目 btcd RPC 实现比特币获取区块的问题
  6. javascript技巧参考
  7. 华硕服务器性能,华硕45nm四核服务器主板突破性能瓶颈
  8. python可视化的图表汉字显示成框框_Matplotlib图表上输出中文(汉字)、减号问题...
  9. 【转】TreeList 选中节点时图标状态和背景色
  10. 整理 深入理解RunLoop
  11. 人性的弱点 - 把握人际交往的关键
  12. JDBC 数据库连接池 工作原理
  13. Rayman的绝顶之路——Leetcode每日一题打卡11
  14. Vuforia入门之简单图片识别案例(一)
  15. linux 平台下 MATLAB 打不开图形界面
  16. Leetcode刷题 2021.02.15
  17. 一部区块链创业者的“燃点” | 《8问》
  18. RT-Thread柿饼的控件简介
  19. 计算机断电后信息会丢失的存储设备是什么,断电会使原存信息丢失的储存器是什么...
  20. aul 语法约定和命令行方式

热门文章

  1. 玩转2023国际无人机应用及防控大会 四大吸睛亮点揭密
  2. 飞秋的文件读取、写入代码
  3. 【python】获取历史天气数据
  4. 20+ 创意Flash网站设计欣赏
  5. 2023全新借贷APP系统源码全开源/独立Uni前端+JAVA后端开发
  6. 180年魔鬼训练!人工智能打Dota 2 轻松打倒人类
  7. RHEL7下使用iSCSI协议挂载IBM SVC V7000
  8. SQL:计算某列累加合计
  9. 销售技巧之所遵循原则
  10. 蓝桥杯 stm32 MCP4017