上周已经把Hadoop的HDFS的架构和设计大概说了下,也有部署过程。在这周讲的HDFS的数据流及操作例子

HDFS数据流

HDFS系统依赖于以下服务
1.NameNode
2.DataNode
3.JournalNode
4.zkfc

其中JournalNode和zkfc是用来做高可用的。
那么数据流将在客户端、NameNode和DataNode之间进行流转。
HDFS可通过以下接口进行操作:
1.HTTP
2.C
3.NFS
4.FUSE
5.Java接口
本篇文章着重讲的是Java接口——FileSystem类。

读数据流

FileSystem类是Hadoop提供操作HDFS的Java类,通过这类,我们就可以作为客户端进行操作HDFS(除了自己写的服务是客户端,在节点上运行的MR程序(MapReduce)也是客户端,而且还是主要且常用的)。
以下是客户端、NameNode和DataNode之间的(读)数据流图:

1.客户端通过FileSystem对象的open()方法
2.open()方法通过DistributeFileSystem对象通过RPC调用NameNode,获取文件起始块的位置及其副本的DataNode地址,并返回FSDataInputStream对象。

DataNode是根据DataNode与客户端的距离进行排序,如果客户端本身就是一个DataNode,那么客户端将会从保存有相应数据块副本的本地DataNode读取数据。

3.调用FSDataInputStream对象的read()方法时,它会调用其自身的read方法将数据从DataNode传输到客户端。
4.到达块的末端时,DFSInputStream关闭与该DataNode的链接,然后寻找下一块数据的最佳DataNode。这些对于客户端都是透明的,在客户端看来它是一直在读取一个连续的流。

如果DFSInputStream在与DataNode通信时遇到错误,会尝试从这个块的另一个最邻近的DataNode读取数据。DFSDFSInputStream会记住故障的DataNode,以保证不会反复读取该节点上后续的块。DFSInputStream也会通过 校验和 确认从DataNode读取的数据是否完整。如果发现有损坏的块,DFSInputStream会试图从其他DataNode读取其副本,也会将损坏的块通知给NameNode。
5.客户端读取完后,会调用close()方法

写数据流

以下是客户端、NameNode和DataNode之间的(写)数据流图:

1.客户端通过FileSystem对象的create()方法
2.create()方法通过DistributeFileSystem对象通过RPC调用NameNode,在文件系统的命名空间新建一个文件,此时该文件中还没有相应的数据块,,并返回FSDataOutputStream对象。

NameNode执行各种不同的检查以确保这个文件不存在以及客户端有新建该文件的权限,如果通过检查,NameNode会创建新文件,否则 ,文件创建失败并向客户端抛出IOException异常。

3.调用FSDataOutputStream对象的write()方法时,它会使用DFSOutputStream对象进行写入数据(DFSOutputStream是封装在FSDataOutputStream)。

在客户端写入数据时,DFSOutputStream将它分成一个个的数据包(DFSPacket),并写入内部队列,称为“数据队列(dataQueue,是其内部成员变量LinkedList)”。DataStreamer处理数据队列,它的职责时挑选出适合存储数据副本的一组DataNode,并根据此要求NameNode分配新的数据块。这一组DataNode构成一个 管线,如果副本数是3个,则管线中有3个DataNode节点。DataStreamer将数据包流式传输到管线中第1个DataNode,第1个DataNode保存数据包并将数据包继续发送到管线的第2个DataNode,如此类推到第3个DataNode节点。
DFSOutputStream其成员变量ackQueue“确认队列”,维护者一个内部数据包队列来等待DataNode的确认回执。,收到管线中所有DataNode节点的确认信息后,该数据包才会从ackQueue删除。
异常情况,如果任意DataNode在数据包写入期间失败,则执行以下操作:首先关闭管线,会从ackQueue把所有数据包都添加回dataQueue的最前端,以保证故障节点下游的DataNode不会漏掉任何一个数据包。并将标识传给NameNode,以便故障DataNode在恢复后可以删除存储部分的数据块。从管线删除故障DataNode后,基于正常DataNode构建一条新的管线,继续写数据。

4.客户端完成数据的写入后,对数据流调用close()方法,该操作等待NameNode返回确认写入完成。

HDFS操作例子

命令行

管理命令参考:http://hadoop.apache.org/docs...
文件操作命令参考:http://hadoop.apache.org/docs... 不过这文档里的hadoop fs 要改为hdfs dfs

  • 查看版本hdfs version
[jevoncode@s1 ~]$ hdfs version
Hadoop 2.7.3
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r baa91f7c6bc9cb92be5982de4719c1c8af91ccff
Compiled by root on 2016-08-18T01:41Z
Compiled with protoc 2.5.0
From source with checksum 2e4ce5f957ea4db193bce3734ff29ff4
This command was run using /mydata1/hadoop-2.7.3/share/hadoop/common/hadoop-common-2.7.3.jar
  • 查看文件系统的统计信息hdfs dfsadmin -report

[jevoncode@s1 ~]$hdfs dfsadmin -report
Configured Capacity: 158127783936 (147.27 GB)
Present Capacity: 148158701568 (137.98 GB)
DFS Remaining: 148158615552 (137.98 GB)
DFS Used: 86016 (84 KB)
DFS Used%: 0.00%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0-------------------------------------------------
Live datanodes (3):Name: 192.168.31.181:50010 (s6.jevoncode.com)
Hostname: s6.jevoncode.com
Decommission Status : Normal
Configured Capacity: 52709261312 (49.09 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3323027456 (3.09 GB)
DFS Remaining: 49386205184 (45.99 GB)
DFS Used%: 0.00%
DFS Remaining%: 93.70%
Configured Cache Capacity: 0 (0 B)
Cache Used: 0 (0 B)
Cache Remaining: 0 (0 B)
Cache Used%: 100.00%
Cache Remaining%: 0.00%
Xceivers: 1
Last contact: Sun Jun 10 14:00:14 CST 2018...
  • 创建和目录
[jevoncode@s1 ~]$ hdfs dfs -mkdir /opt/
[jevoncode@s1 ~]$ hdfs dfs -mkdir /opt/command/
[jevoncode@s1 ~]$ hdfs dfs -ls /
Found 1 items
drwxr-xr-x   - jevoncode supergroup          0 2018-06-10 14:05 /opt
  • 复制文本文件到HDFS
hdfs dfs -put sougouword.txt /opt/command/word.txt
  • 复制HDFS的文件到本地
hdfs dfs -get /opt/command/word.txt sougouword2.txt
  • 删除文件
hdfs dfs -rm /opt/command/word.txt

Java接口

  • 通过URL对象获取文件内容
package com.jc.demo.hadoop.hdfs;import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;import java.io.InputStream;
import java.net.URL;/*** 前期准备:* [jevoncode@s1 ~]# hdfs dfs -mkdir /opt/* [jevoncode@s1 ~]# hdfs dfs -mkdir /opt/command/* [jevoncode@s1 ~]# hdfs dfs -put sougouword.txt /opt/command/word.txt* <p>* <p>* 方法一:动态参数* 命令如下:上传至hadoop服务器* [jevoncode@s1 ~]# export HADOOP_CLASSPATH=jc-demo-hadoop-0.0.1.0-SNAPSHOT-development.jar* [jevoncode@s1 ~]# hadoop com.jc.demo.hadoop.hdfs.URLCat hdfs://ns/opt/command/word.txt* 其中ns是hdfs-site.xml配置的主机名,用于高可用** <p>* 方法二:远程访问* 直接执行main方法,使用hdfsHost做参数,可远程访问*/
public class URLCat {static {URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());}public static void main(String[] args) throws Exception {String hdfsHost = "hdfs://s1.jevoncode.com:9000/opt/command/word.txt";InputStream in = null;try {
//            in = new URL(args[0]).openStream();  //方法一:动态参数in = new URL(hdfsHost).openStream();    //方法二:远程访问IOUtils.copyBytes(in, System.out, 4096, false);} finally {IOUtils.closeStream(in);}}
}
  • 通过FileSystem获取文件内容
package com.jc.demo.hadoop.hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;import java.io.InputStream;
import java.net.URI;/*** 使用FileSystem获取文件内容* Configuration在这例子中仅仅做个参数而已,没啥用,还是需要在代码里指定url**/
public class FileSystemCat {public static void main(String[] args) throws Exception {String hdfsHost = "hdfs://s1.jevoncode.com:9000/opt/command/word.txt";String uri = hdfsHost;Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(uri), conf);InputStream in = null;try {in = fs.open(new Path(uri));IOUtils.copyBytes(in, System.out, 4096, false);} finally {IOUtils.closeStream(in);}}
}
  • FSDataInputStream还支持随机读
package com.jc.demo.hadoop.hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;import java.net.URI;/*** 使用FSDataInputStream随机读*/
public class FileSystemDoubleCat {public static void main(String[] args) throws Exception {String hdfsHost = "hdfs://s1.jevoncode.com:9000/opt/command/word.txt";String uri = hdfsHost;Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(uri), conf);FSDataInputStream in = null;try {in = fs.open(new Path(uri));IOUtils.copyBytes(in, System.out, 4096, false);in.seek(0); // go back to the start of the fileIOUtils.copyBytes(in, System.out, 4096, false);} finally {IOUtils.closeStream(in);}}
}
  • 使用FileSystem复制文件(写入),会自动创建目录
package com.jc.demo.hadoop.hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;/*** 使用FileSystem复制文件(写入),会自动创建目录* FileSystem也是可以操作本地文件的,所以没有指定协议,就会操作本地文件目录/opt/java*/
public class FileCopyWithProgress {public static void main(String[] args) throws Exception {String localSrc = "/home/cherry/Downloads/斗破苍穹.txt";String dst = "hdfs://s1.jevoncode.com:9000/opt/java/斗破苍穹.txt";
//        String dst = "/opt/java/"; //FileSystem也是可以操作本地文件的,所以没有指定协议,就会操作本地文件目录/opt/javaInputStream in = new BufferedInputStream(new FileInputStream(localSrc));Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(dst), conf);OutputStream out = fs.create(new Path(dst), new Progressable() {public void progress() {System.out.print(".");}});IOUtils.copyBytes(in, out, 4096, true);}
}
  • 遍历目录(仅一层)
package com.jc.demo.hadoop.hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;import java.net.URI;/*** 获取当前目录的所有文件及目录信息(仅一层)*/
public class ListStatus {public static void main(String[] args) throws Exception {String hdfsHost = "hdfs://s1.jevoncode.com:9000/";String uri = hdfsHost;args = new String[]{"/opt/", "/dir"};Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(uri), conf);Path[] paths = new Path[args.length];for (int i = 0; i < paths.length; i++) {paths[i] = new Path(args[i]);}FileStatus[] status = fs.listStatus(paths);Path[] listedPaths = FileUtil.stat2Paths(status);       //将文件状态FileStatus数组转为Path数组for (Path p : listedPaths) {System.out.println(p);}}
}
/*** output:* 06-10 14:29:16 [main] DEBUG o.a.h.s.a.util.KerberosName - Kerberos krb5 configuration not found, setting default realm to empty* 06-10 14:29:16 [main] DEBUG o.a.hadoop.util.PerformanceAdvisory - Falling back to shell based* 06-10 14:29:18 [main] DEBUG o.a.hadoop.util.PerformanceAdvisory - Both short-circuit local reads and UNIX domain socket are disabled.* 06-10 14:29:18 [main] DEBUG o.a.h.h.p.d.s.DataTransferSaslUtil - DataTransferProtocol not using SaslPropertiesResolver, no QOP found in configuration for dfs.data.transfer.protection* hdfs://s1.jevoncode.com:9000/opt/command* hdfs://s1.jevoncode.com:9000/opt/java*/
  • 获取文件信息(创建时间,创建者等)
package com.jc.demo.hadoop.hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;/*** 获取文件状态*/
public class ShowFileStatusTest {private FileSystem fs;@Beforepublic void setUp() throws IOException {String hdfsHost = "hdfs://s1.jevoncode.com:9000/";String uri = hdfsHost;Configuration conf = new Configuration();fs = FileSystem.get(URI.create(uri), conf);OutputStream out = fs.create(new Path("/dir/file"));out.write("content".getBytes("UTF-8"));out.close();}@Afterpublic void tearDown() throws IOException {if (fs != null) {fs.close();}}@Test(expected = FileNotFoundException.class)public void throwsFileNotFoundForNonExistentFile() throws IOException {fs.getFileStatus(new Path("no-such-file"));}/*** 测试文件状态* @throws IOException* @throws InterruptedException*/@Testpublic void fileStatusForFile() throws IOException, InterruptedException {Path file = new Path("/dir/file");FileStatus stat = fs.getFileStatus(file);assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));                      //路径应为/dir/fileassertThat(stat.isDirectory(), is(false));                                          //不是目录assertThat(stat.getLen(), is(7L));                                                  //文件大小Thread.sleep(3000);                                                                 //避免创建时间大于测试时间assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis())));//创建时间应该小于测试时间assertThat(stat.getReplication(), is((short) 3));                                         //副本个数assertThat(stat.getBlockSize(), is(128 * 1024 * 1024L));                            //块大小assertThat(stat.getOwner(), is(System.getProperty("user.name")));                         //当前用户是其创建者assertThat(stat.getGroup(), is("supergroup"));                                      //文件的用户组校验assertThat(stat.getPermission().toString(), is("rw-r--r--"));                       //文件权限教研}@Testpublic void fileStatusForDirectory() throws IOException {Path dir = new Path("/dir");FileStatus stat = fs.getFileStatus(dir);assertThat(stat.getPath().toUri().getPath(), is("/dir"));assertThat(stat.isDirectory(), is(true));assertThat(stat.getLen(), is(0L));assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis())));assertThat(stat.getReplication(), is((short) 0));assertThat(stat.getBlockSize(), is(0L));assertThat(stat.getOwner(), is(System.getProperty("user.name")));assertThat(stat.getGroup(), is("supergroup"));assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));}}

2018年第23周-大数据的HDFS数据流及操作例子相关推荐

  1. 数据分析真题日刷 | 商汤科技2018校招C++/算法开发/大数据/后端/运维/测试/数据挖掘开发工程师笔试第二场

    断了大半个月没有刷题,进入「数据分析真题日刷」系列第13篇 . 今日真题 商汤科技2018校招C++/算法开发/大数据/后端/运维/测试/数据挖掘开发工程师笔试第二场 (来源:牛客网) 题型 客观题: ...

  2. 中国旅游研究院携程:2018中国在线旅游发展大数据指数报告(附下载)

    报告下载:添加199IT官方微信[i199it],回复关键词[2018中国在线旅游发展大数据指数报告]即可! 报告下载:添加199IT官方微信[i199it],回复关键词[2018中国在线旅游发展大数 ...

  3. 舒亦梵:4.24非农周大数据即将来临,黄金行情将是怎样

    舒亦梵:4.24非农周大数据即将来临,黄金行情将是怎样 ​ 对待市场,我们不妨以平善柔和目光视之,触之,入之.然后才能懂之,御之,胜之!待你心明如镜,试再看-- 你看市场如山,它自成山,登峰时天高海阔 ...

  4. 中国旅游研究院:2018中日韩旅游大数据报告(附下载)

    报告下载:添加199IT官方微信[i199it],回复关键词[2018中日韩旅游大数据报告]即可! 报告下载:添加199IT官方微信[i199it],回复关键词[2018中日韩旅游大数据报告]即可! ...

  5. 23篇大数据系列(一)java基础知识全集(2万字干货,建议收藏)

    大数据系列爽文,从技术能力.业务基础.分析思维三大板块来呈现,你将收获: ❖ 提升自信心,自如应对面试,顺利拿到实习岗位或offer: ❖ 掌握大数据的基础知识,与其他同事沟通无障碍: ❖ 具备一定的 ...

  6. 中国旅游研究院马蜂窝:2018中国省域自由行大数据系列报告之华东地区(附下载)...

    报告下载:添加199IT官方微信[i199it],回复关键词[2018中国华东省域自由行大数据]即可! 2018年8月,中国旅游研究院与马蜂窝旅游网联合成立的"自由行大数据联合实验室&quo ...

  7. 马蜂窝:2018中国省域自由行大数据系列报告之东北地区(附下载)

    报告下载:添加199IT官方微信[i199it],回复关键词[2018中国省域自由行东北地区]即可! 2018年12月12日,中国旅游研究院与马蜂窝旅游网联合成立的"自由行大数据联合实验室& ...

  8. 23篇大数据系列(二)scala基础知识全集(史上最全,建议收藏)

    作者简介: 蓝桥签约作者.大数据&Python领域优质创作者.管理多个大数据技术群,帮助大学生就业和初级程序员解决工作难题. 我的使命与愿景:持续稳定输出,赋能中国技术社区蓬勃发展! 大数据系 ...

  9. 网易大数据平台HDFS性能优化实践

    导读:本文的主题是网易大数据HDFS的优化和实践,下面会从三个方面来介绍网易在大数据存储相关的工作和努力. 网易大数据平台 HDFS在网易的实践及挑战 重点业务分享 01 网易大数据平台 网易引入Ha ...

最新文章

  1. python 内推_用Python实现内推外插法
  2. python/pandas数据分析(十五)-聚合与分组运算实例
  3. NI FlexLogger 2020 R3中文版
  4. alias怎么每次登陆都保存_alias命令使用说明
  5. 中科大 计算机网络8 协议层次和服务模型
  6. 不同曲线设置标签_带动态标签的面积曲线图
  7. oracle----删除数据
  8. js中立即执行函数会预编译吗_javascript引擎执行的过程的理解--执行阶段
  9. [网安实践II] 实验2. 密码学实验
  10. windows 编译xvidcore-1.1.3.tar
  11. Python爬虫理论 | (4) 数据存储
  12. 【python】生成随机数字/字母/指定位数的字母+数字的字符串
  13. 2021年N1叉车司机复审考试及N1叉车司机证考试
  14. 环境经济:上市公司环保支出(2000-2020)104城-城投债数据(2000-2020)
  15. 职场牛人的9个重要特点,助你在职场轻松获得主动权!
  16. titan框架的使用_如何设置和使用Google Titan密钥捆绑包
  17. 基于vmware16 和 ubuntu20.04, 搭建单节点 kubernetes 1.22.2
  18. html5+css3布局尝试
  19. [AI开发]深度学习如何选择GPU?
  20. 阿里云服务器支持IPV6和CND的详细教程

热门文章

  1. vue nuxt npm 调用百度地图
  2. EBS OPP响应时间超时处理过程
  3. pythonturtle是标准库_Python入门自学最重要、最全标准库Turtle
  4. PTA团体程序设计天梯赛(L1-061~L1-070)
  5. Vue面试题总结01
  6. 爱奇艺适配鸿蒙,鸿蒙天尊_爱奇艺鸿蒙天尊官网
  7. Linux学习-修复win7下安装centos7双系统后,没有win7启动项
  8. Nginx配置和路由
  9. 大一计算机导论问题,计算机导论习题及问题详解
  10. 转行运维工程师之后,我先把这几个Linux 命令记在了本子上,实干9场景