04 分布式文件系统以及MapReduce入门程序
文章目录
- 04 分布式文件系统以及MapReduce入门程序
- 分布式文件系统HDFS
- 1.分布式文件系统hdfs详细介绍
- 2.HDFS分布式文件系统设计目标
- 3.HDFS的来源
- 4.hdfs的架构图
- 5.hdfs的架构之文件的文件副本机制以及block块存储
- 6.HDFS的元数据信息FSimage以及edits和secondaryNN的作用
- 6.1、FSImage与edits详解
- 6.2、FSimage文件当中的文件信息查看
- 6.3、edits当中的文件信息查看
- 6.4、secondarynameNode如何辅助管理FSImage与Edits文件
- 7、HDFS的文件写入过程
- 8、HDFS的文件读取过程
- 9、HDFS的API操作
- 9.1、创建maven工程并导入jar包
- 9.2、使用url的方式访问数据(了解)
- 9.3、使用文件系统方式访问数据(掌握)
- 9.4、获取FileSystem的4种方式
- 9.5、递归遍历文件系统当中的所有文件
- 9.6、下载文件到本地
- 9.7、hdfs上创建文件夹
- 9.8、hdfs文件上传
- 9.9、HDFS权限问题以及伪造用户
- 9.10、HDFS的小文件合并
- 分布式计算框架MapReduce入门
- 1.1、理解MapReduce思想
- 1.2、Hadoop MapReduce设计构思
- 1.3、MapReduce框架结构
- 1.4、MapReduce编程规范及示例编写
- 1.5、MapReduce程序运行模式
04 分布式文件系统以及MapReduce入门程序
分布式文件系统HDFS
1.分布式文件系统hdfs详细介绍
在hadoop当中,分布式文件系统(HDFS),对文件系统有一个抽象,HDFS属于当中的一个实现类,也就是说分布式文件系统类
似于一个接口,定义了标准,下面有很多的实现类,其中HDFS是一个子实现类而已,但是现在很多人都只知道一种就是HDFS的
实现,并没有了解过其他的实现类,其实分布式文件系统的实现有很多种。
hadoop的文件系统:文件系统:是一个顶层的抽象,具体的实现,需要取决于你自己的获取的实例,我们可以通过文件系统获取本地文件系统,操作linux磁盘上面的文件,也可以获取分布式文件系统,操作hdfs上面的文件ftp:// ftp文件系统 可以做文件的上传下载webHdfs:浏览器操作文件系统,可以允许我们通过浏览器上传,下载,修改,hdfs上面的文件hdfs:分布式文件系统,最重要的一个local:本地文件系统
2.HDFS分布式文件系统设计目标
1、硬件错误 由于集群很多时候由数量众多的廉价机组成,使得硬件错误成为常态 ---(副本机制)
2、数据流访问 所有的访问都是访问大量的数据,使用IO流一直操作,追求的是稳定,不是效率
3、大数据集 假设所有存储到hdfs的数据都是海量的数据,不擅长处理小文件,一个小文件占用一个元数据,元数据都存储在内存当中,占用namenode的大量内存
4、简单的相关模型 假定文件是一次写入,多次读取的操作,比较擅长存出一些历史数据
5、移动计算比移动数据便宜 一个应用请求的计算,离它操作的数据越近,就越高效
6、多种软硬件的可移植性
3.HDFS的来源
HDFS起源于Google的GFS论文(GFS,Mapreduce,BigTable为google的旧的三驾马车)发表于2003年10月HDFS是GFS的克隆版
Hadoop Distributed File system易于扩展的分布式文件系统运行在大量普通廉价机器上,提供容错机制为大量用户提供性能不错的文件存取服务
4.hdfs的架构图
1、NameNode负责管理文件系统的名字空间(namespace)以及客户端对文件的访问
2、文件操作,namenode是负责文件元数据的操作,datanode负责处理文件内容的读写,跟文件内容相关的数据流不经过
Namenode,只询问它跟哪个dataNode联系,否则NameNode会成为系统的瓶颈
3、副本存放在哪些Datanode上由NameNode来控制,根据全局情况作出块放置决定,读取文件时NameNode尽量让用户先读取
最近的副本,降低读取网络开销和读取延时
4、NameNode全权管理数据的复制,它周期性的从集群中的每个DataNode接收心跳信息和状态报告,接收到心跳信号意味着
DataNode节点工作正常,块状态报告包含了一个该DataNode上所有的数据列表
namenode负责数据block块的复制,定期的检测block的副本数,如果不够3个,继续复制出来保证足够三个
NameNode与Datanode的总结概述:
namenode的元数据保存在两个地方,一个是内存,一个是磁盘,存的是元数据的快照,如果快照非常大,停机再启动代价非常大
5.hdfs的架构之文件的文件副本机制以及block块存储
所有的文件都是以block块的方式存放在HDFS文件系统当中,在hadoop1当中,文件的block块默认大小是64M,hadoop2当中
,文件的block块大小默认是128M,block块的大小可以通过hdfs-site.xml当中的配置文件进行指定
<property><name>dfs.block.size</name><value>块大小 以字节为单位</value>//只写数值就可以
</property>抽象成数据块的好处:
1.一个文件有可能大于集群中任意一个磁盘
2.使用块抽象而不是文件可以简化存储子系统
3.块非常适合用于数据备份进而提供数据容错能力和可用性
block块的大小,可以根据实际工作当中的文件特性来调整,如果都是一些大文件,可以稍微调大block块的大小
128M的block块 300M 的文件 产生3个block块 3个block块的元数据信息会存储在namenode当中
256M的block块 300M 的文件 产生2个block块 2个block块的元数据信息会存储在namenode当中块缓存:
块缓存:distributedCache 可以用来实现我们的文件的缓存等等hdfs的文件权限验证
hdfs的文件权限机制与linux系统的文件权限机制类似
r:read w:write x:execute 权限x对于文件表示忽略,对于文件夹表示是否有权限访问其内容
如果linux系统用户zhangsan使用hadoop命令创建一个文件,那么这个文件在HDFS当中的owner就是zhangsan
HDFS文件权限的目的,防止好人做错事,而不是阻止坏人做坏事。HDFS相信你告诉我你是谁,你就是谁
6.HDFS的元数据信息FSimage以及edits和secondaryNN的作用
6.1、FSImage与edits详解
hdfs当中的元数据信息管理
fsimage:存储的是一份比较完整的元数据信息
edits:最近一段时间的操作日志
控制edits文件的大小:时间长短,文件大小 通过secondarynamenode将我们的edtis文件合并到fsimage当中去
客户端对hdfs进行写文件时会首先被记录在edits文件中。
edits修改时元数据也会更新。
每次hdfs更新时edits先更新后客户端才会看到最新信息。
fsimage:是namenode中关于元数据的镜像,一般称为检查点。
一般开始时对namenode的操作都放在edits中,为什么不放在fsimage中呢?
因为fsimage是namenode的完整的镜像,内容很大,如果每次都加载到内存的话生成树状拓扑结构,这是非常耗内存和CPU。
fsimage内容包含了namenode管理下的所有datanode中文件及文件block及block所在的datanode的元数据信息。
随着edits内容增大,就需要在一定时间点和fsimage合并。
合并过程见SecondaryNameNode如何辅助管理FSImage与edits
6.2、FSimage文件当中的文件信息查看
官方查看文档
http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.14.0/hadoop-project-dist/hadoop-hdfs/HdfsImageViewer.html使用命令 hdfs oiv
cd /export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/namenodeDatas/current
hdfs oiv -i fsimage_0000000000000000864 -p XML -o hello.xml
6.3、edits当中的文件信息查看
官方查看文档
http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.14.0/hadoop-project-dist/hadoop-hdfs/HdfsEditsViewer.html
查看命令 hdfs oev
cd /export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/dfs/nn/edits
hdfs oev -i edits_0000000000000000865-0000000000000000866 -o myedit.xml -p XML
6.4、secondarynameNode如何辅助管理FSImage与Edits文件
①:secnonaryNN通知NameNode切换editlog
②:secondaryNN从NameNode中获得FSImage和editlog(通过http方式)
③:secondaryNN将FSImage载入内存,然后开始合并editlog,合并之后成为新的fsimage
④:secondaryNN将新的fsimage发回给NameNode
⑤:NameNode用新的fsimage替换旧的fsimage
完成合并的是secondarynamenode,会请求namenode停止使用edits,暂时将新写操作放入一个新的文件中(edits.new)。
secondarynamenode从namenode中通过http get获得edits,因为要和fsimage合并,所以也是通过http get 的方式
把fsimage加载到内存,然后逐一执行具体对文件系统的操作,与fsimage合并,生成新的fsimage,然后把fsimage发送给
namenode,通过http post的方式。namenode从secondarynamenode获得了fsimage后会把原有的fsimage替换为新的
fsimage,把edits.new变成edits。同时会更新fstime。
hadoop进入安全模式时需要管理员使用dfsadmin的save namespace来创建新的检查点。
secondarynamenode在合并edits和fsimage时需要消耗的内存和namenode差不多,所以一般把namenode和
secondarynamenode放在不同的机器上。
fs.checkpoint.period: 默认是一个小时(3600s)
fs.checkpoint.size: edits达到一定大小时也会触发合并(默认64MB)
7、HDFS的文件写入过程
详细步骤解析:
1、 client发起文件上传请求,通过RPC与NameNode建立通讯,NameNode检查目标文件是否已存在,父目录是否存在,
返回是否可以上传;
2、 client请求第一个block该传输到哪些DataNode服务器上;
3、 NameNode根据配置文件中指定的备份数量及机架感知原理进行文件分配,返回可用的DataNode的地址如:A,B,C;注:Hadoop在设计时考虑到数据的安全与高效,数据文件默认在HDFS上存放三份,存储策略为本地一份,同机架内其它某一
节点上一份,不同机架的某一节点上一份。
4、 client请求3台DataNode中的一台A上传数据(本质上是一个RPC调用,建立pipeline),A收到请求会继续调用B,
然后B调用C,将整个pipeline建立完成,后逐级返回client;
5、 client开始往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位(默认64K),A收到
一个packet就会传给B,B传给C;A每传一个packet会放入一个应答队列等待应答。
6、 数据被分割成一个个packet数据包在pipeline上依次传输,在pipeline反方向上,逐个发送ack(命令正确应答),
最终由pipeline中第一个DataNode节点A将pipelineack发送给client;
7、 当一个block传输完成之后,client再次请求NameNode上传第二个block到服务器。
8、HDFS的文件读取过程
详细步骤解析
1、 Client向NameNode发起RPC请求,来确定请求文件block所在的位置;
2、 NameNode会视情况返回文件的部分或者全部block列表,对于每个block,NameNode 都会返回含有该 block 副本的
DataNode 地址; 这些返回的 DN 地址,会按照集群拓扑结构得出 DataNode 与客户端的距离,然后进行排序,排序两个
规则:网络拓扑结构中距离 Client 近的排靠前;心跳机制中超时汇报的 DN 状态为 STALE,这样的排靠后;
3、 Client 选取排序靠前的 DataNode 来读取 block,如果客户端本身就是DataNode,那么将从本地直接获取数据(短路
读取特性);
4、 底层上本质是建立 Socket Stream(FSDataInputStream),重复的调用父类 DataInputStream 的 read 方法,
直到这个块上的数据读取完毕;
5、 当读完列表的 block 后,若文件读取还没有结束,客户端会继续向NameNode 获取下一批的 block 列表;
6、 读取完一个 block 都会进行 checksum 验证,如果读取 DataNode 时出现错误,客户端会通知 NameNode,然后再
从下一个拥有该 block 副本的DataNode 继续读。
7、 read 方法是并行的读取 block 信息,不是一块一块的读取;NameNode 只是返回Client请求包含块的DataNode地址,
并不是返回请求块的数据;
8、 最终读取来所有的 block 会合并成一个完整的最终文件。
9、HDFS的API操作
9.1、创建maven工程并导入jar包
由于cdh版本的所有的软件涉及版权的问题,所以并没有将所有的jar包托管到maven仓库当中去,而是托管在了CDH自己的服务
器上面,所以我们默认去maven的仓库下载不到,需要自己手动的添加repository去CDH仓库进行下载,以下两个地址是官方
文档说明,请仔细查阅
https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh5_maven_repo.html
https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh5_maven_repo_514x.html
cdh5.114.0相关jar包
<!-- 通过哪个网址下载我们的jar包 -->
<repositories><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository>
</repositories>
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0-mr1-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.0-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.6.0-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.6.0-cdh5.14.0</version></dependency><!-- https://mvnrepository.com/artifact/junit/junit --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.testng</groupId><artifactId>testng</artifactId><version>RELEASE</version></dependency>
</dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding><!-- <verbal>true</verbal>--></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><minimizeJar>true</minimizeJar></configuration></execution></executions></plugin><!-- <plugin><artifactId>maven-assembly-plugin </artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>cn.itcast.hadoop.db.DBToHdfs2</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin>--></plugins>
</build>
9.2、使用url的方式访问数据(了解)
public class HdfsOperate {@Testpublic void downHdfsFile() throws Exception {// 通信三要素:ip 端口 协议// 第一步:注册hdfs的驱动文件,表示我们使用hdfs://这种协议访问我们hdfs文件系统URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());//URL需要接受一个文件的访问地址String url = "hdfs://node1:8020/install2.log";//获取一个inputStreamInputStream inputStream = new URL(url).openStream();OutputStream outputStream = new FileOutputStream(new File("c:\\hello.txt"));//通过工具类将我们的输入流读取到输出流里IOUtils.copy(inputStream,outputStream);//关闭io流IOUtils.closeQuietly(inputStream);IOUtils.closeQuietly(outputStream);}
}
如何解决winutils的问题
第一步:把这个文件夹 3、大数据离线第三天\解决winutils的问题\hadoop-2.6.0-cdh5.14.0 拷贝到一个没有中文没有空格的路径下
第二步:配置windows的hadoop_home环境变量
第三步:将这个路径下面的 hadoop-2.6.0-cdh5.14.0\bin\hadoop.dll 这个文件,丢到C:\Windows\System32 这个路径下
第四步:关闭windows重启
9.3、使用文件系统方式访问数据(掌握)
在 java 中操作 HDFS,主要涉及以下 Class:
Configuration:该类的对象封转了客户端或者服务器的配置; FileSystem:该类的对象是一个文件系统对象,
可以用该对象的一些方法来对文件进行操作,通过 FileSystem 的静态方法 get 获得该对象。
FileSystem fs = FileSystem.get(conf)
get 方法从 conf 中的一个参数 fs.defaultFS 的配置值判断具体是什么类型的文件系统。如果我们的代码中没有指定fs.defaultFS,并且工程 classpath下也没有给定相应的配置,conf中的默认值就来自于hadoop的jar包中的core-default.xml , 默 认 值 为 : file:/// , 则 获 取 的 将 不 是 一 个DistributedFileSystem的实例,而是一个本地文件系统的客户端对象
9.4、获取FileSystem的4种方式
/*** 获取文件系统的第一种方式*/@Testpublic void getFileSystem1() throws IOException {//FileSystem是一个抽象类,获取抽象类的实例有两种方式// 第一种:看看这个抽象类有没有提供什么方法,返回它本身// 第二种:找子类Configuration configuration = new Configuration();//如果不加任何配置,这里获取到的就是本地文件系统configuration.set("fs.defaultFS","hdfs://node1:8020");FileSystem fileSystem = FileSystem.get(configuration);System.out.println(fileSystem.toString());fileSystem.close();}/*** 获取文件系统的第二种方式*/@Testpublic void getFileSystem2() throws Exception {Configuration configuration = new Configuration();//通过指定URL来获取分布式文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), configuration);System.out.println(fileSystem.toString());fileSystem.close();}/*** 获取文件系统的第三种方式*/@Testpublic void getFileSystem3() throws IOException {Configuration configuration = new Configuration();configuration.set("fs.defaultFS","hdfs://node1:8020");FileSystem fileSystem = FileSystem.newInstance(configuration);System.out.println(fileSystem.toString());fileSystem.close();}/*** 获取文件系统的第四种方式*/@Testpublic void getFileSystem4() throws Exception {Configuration configuration = new Configuration();configuration.set("fs.defaultFS","hdfs://node1:8020");FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://node1:8020"),configuration);System.out.println(fileSystem.toString());fileSystem.close();}
}
9.5、递归遍历文件系统当中的所有文件
/**
* 递归遍历hdfs上面的所有文件出来
* 第一种方法:通过递归
*/
@Test
public void getAllFiles() throws Exception {//获取文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());//获取到我们的文件的状态,可以通过fileStatuses来判断是文件还是文件夹FileStatus[] fileStatuses = fileSystem.listStatus(new Path("hdfs://node1:8020/"));// 循环遍历fileStatus 判断是文件夹还有文件,如果是文件直接输出路径,如果是文件夹继续进去遍历for (FileStatus fileStatus:fileStatuses){if (fileStatus.isDirectory()){//文件夹getDirFiles(fileStatus.getPath(),fileSystem);}else {//文件Path path = fileStatus.getPath();System.out.println(path.toString());}}
}
public void getDirFiles(Path path, FileSystem fileSystem) throws IOException {FileStatus[] fileStatuses = fileSystem.listStatus(path);for (FileStatus fileStatus : fileStatuses) {if (fileStatus.isDirectory()){getDirFiles(fileStatus.getPath(),fileSystem);}else {System.out.println(fileStatus.getPath().toString());}}}
/**
* 递归遍历hdfs上面的所有文件出来
* 第二种方法:通过hdfs最直接提供的api进行遍历
*/
@Test
public void getAllFiles2() throws Exception{//获取文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("hdfs://node1:8020/"), true);while (locatedFileStatusRemoteIterator.hasNext()){LocatedFileStatus fileStatus = locatedFileStatusRemoteIterator.next();System.out.println(fileStatus.getPath().toString());}fileSystem.close();
}
9.6、下载文件到本地
方法1
/**
* 下载hdfs的文件到本地目录
*/
@Test
public void downFileToLocal() throws Exception {//获取文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());FSDataInputStream inputStream = fileSystem.open(new Path("/install2.log"));FileOutputStream fileOutputStream = new FileOutputStream(new File("c:\\hello2.txt"));IOUtils.copy(inputStream,fileOutputStream);IOUtils.closeQuietly(inputStream);IOUtils.closeQuietly(fileOutputStream);fileSystem.close();
}
方法2
/**
* 下载hdfs的文件到本地目录
*/
@Test
public void downFileToLocal2() throws Exception {//获取文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());fileSystem.copyToLocalFile(new Path("hdfs://node1:8020/install2.log"),new Path("file:///c:\\hello3.txt"));fileSystem.close();
}
9.7、hdfs上创建文件夹
/**
* hdfs上面创建文件夹
*/
@Test
public void createDir() throws Exception {FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://node1:8020"), new Configuration());fileSystem.mkdirs(new Path("/dir1/dir2"));fileSystem.close();
}
9.8、hdfs文件上传
方法1
/*** hdfs文件上传方法1*/
@Test
public void uploadFile() throws Exception{FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());fileSystem.copyFromLocalFile(new Path("file:///d:\\mn.jpg"),new Path("/dir1/dir2/mk.jpg"));fileSystem.close();
}
方法2
/**** hdfs文件上传方法2*/
@Test
public void uploadFile2() throws Exception{FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("/dir1/dir2/story.txt"));FileInputStream fileInputStream = new FileInputStream("d:\\窗里窗外.txt");IOUtils.copy(fileInputStream,fsDataOutputStream);IOUtils.closeQuietly(fsDataOutputStream);IOUtils.closeQuietly(fileInputStream);
}
9.9、HDFS权限问题以及伪造用户
首先停止hdfs集群,在node01机器上执行以下命令
cd /export/servers/hadoop-2.6.0-cdh5.14.0
sbin/stop-dfs.sh
修改node01机器上的hdfs-site.xml当中的配置文件
cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
vim hdfs-site.xml
<property><name>dfs.permissions</name><value>true</value>
</property>
修改完成之后配置文件发送到其他机器上面去
scp hdfs-site.xml node2:$PWD
scp hdfs-site.xml node3:$PWD
重启hdfs集群
cd /export/servers/hadoop-2.6.0-cdh5.14.0
sbin/start-dfs.sh随意上传一些文件到我们hadoop集群当中准备测试使用
cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
hdfs dfs -mkdir /config
hdfs dfs -put *.xml /config
hdfs dfs -chmod 600 /config/core-site.xml 使用代码准备下载文件
@Test //hdfs伪造用户,你告诉hdfs你是谁,你就是谁
public void getConfig()throws Exception{FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration(),"root");fileSystem.copyToLocalFile(new Path("/config/core-site.xml"),new Path("file:///d:/core-site.xml"));fileSystem.close();
}
9.10、HDFS的小文件合并
由于hadoop擅长存储大文件,因为大文件的元数据信息比较少,如果hadoop集群当中有大量的
小文件,那么每个小文件都需要维护一份元数据信息,会大大的增加集群管理元数据的内存压力,
所以在实际工作当中,如果有必要一定要将小文件合并成大文件进行一起处理
在我们的hdfs 的shell命令模式下,可以通过命令行将很多的hdfs文件合并成一个大文件下载
到本地,命令如下cd /export/servers
hdfs dfs -getmerge /config/*.xml ./hello.xml1.在上传之前做IO流合并//第一步获取本地文件系统//第二步找到所有本地文件系统的小文件//第三步输出成IO流//第四步将我们的IO流输出到hdfs上面去
2.上传之后,可以通过MR程序实现小文件合并成SequenceFile
3.har文件 归档文件
/*** 小文件合并*/
@Test
public void mergeSmallFile() throws Exception{// 获取分布式文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration(), "root");// 获取hdfs文件上的输出流FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("/bigFile.xml"));// 获取本地文件系统,遍历我们本地的小文件,将每一个小文件读成一个输入流LocalFileSystem localFileSystem = FileSystem.getLocal(new Configuration());FileStatus[] fileStatuses = localFileSystem.listStatus(new Path("file:///E:\\大数据资料\\大数据离线资料\\3、大数据离线第三天\\上传小文件合并"));for (FileStatus fileStatus : fileStatuses) {//获取每一个文件的路径Path path = fileStatus.getPath();// 循环遍历读取我们每一个文件的输入流FSDataInputStream fsDataInputStream = localFileSystem.open(path);IOUtils.copy(fsDataInputStream,fsDataOutputStream);IOUtils.closeQuietly(fsDataInputStream);}IOUtils.closeQuietly(fsDataOutputStream);
}
分布式计算框架MapReduce入门
1.1、理解MapReduce思想
分布式文件计算系统:主要用于计算一些数据
mapreduce的核心思想:分治 分而治之
最主要的特点:把一个大问题分成很多小问题,并且每个小的问题的解决思路与大问题的解决思路是一样的
Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是
这些小任务可以并行计算,彼此间几乎没有依赖关系。Reduce负责“合”,即对map阶段的结果进行全局汇总。
这两个阶段合起来正是MapReduce思想的体现。
一个block块数据对应一个mapTask:
在mapreduce中没有block块的概念,每个mapTask处理的一部分数据都叫做一个切片,
默认每个切片对应一个block块的大小
1.2、Hadoop MapReduce设计构思
Hadoop MapReduce构思体现在如下的三个方面:
(1)如何对付大数据处理:分而治之
(2)构建抽象模型:Map和ReduceMapReduce借鉴了函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型。
Map: 对一组数据元素进行某种重复式的处理;
Reduce: 对Map的中间结果进行某种进一步的结果整理。
MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:
map: (k1; v1) → [(k2; v2)]
reduce: (k2; [v2]) → [(k3; v3)]
Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。通过以上两个编程接口,大家可以看出MapReduce处理的数据类型是<key,value>键值对。
(3)统一构架,隐藏系统层细节如何提供统一的计算框架,如果没有统一封装底层细节,那么程序员则需要考虑诸如数据
存储、划分、分发、结果收集、错误恢复等诸多细节;为此,MapReduce设计并提供了统一的
计算框架,为程序员隐藏了绝大多数系统层面的处理细节。
MapReduce最大的亮点在于通过抽象模型和计算框架把
需要做什么(what need to do)与具体怎么做(how to do)分开了,
为程序员提供一个抽象和高层的编程接口和框架。程序员仅需要关心其应用层的具体计算问题,
仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的
诸多系统层细节被隐藏起来,交给计算框架去处理:从分布代码的执行,到大到数千小到单个节点
集群的自动调度使用。编写mapreduce的天龙八部:
map阶段两个步骤:
第一步:读取文件,解析成key,value对,这里是我们的K1 V1
第二步:接受我们的K1 V1,自定义我们的map逻辑,然后转换成key2 value2进行输出
往下发送 这里发送K2 V2shuffle阶段四个步骤:
第三步:分区 相同key的value发送到同一个reduce里面去,key合并,value形成一个集合
第四步:排序 默认按照字典顺序进行排序
第五步:规约
第六步:分组reduce阶段两个步骤:
第七步:接收我们的K2 V2 自定义我们reduce的逻辑,转换成新的K3 V3进行输出
第八步:将我们的K3 V3进行输出
例子:
单词统计,现在要统计,每一个单词在文本当中出现了多少次
hello world
hadoop hive
sqoop hive
hadoop hive第一步:读取文件,解析成key,value对 key是我们的行偏移量 value是我们行文本内容
下一行是上一行的行偏移量
key1 value1
0 hello world
11 hadoop hive
22 sqoop hive第二步:自定义map逻辑,接收我们的key1,value1 转换成新的key2 value2进行输出
获取我们的value1,
hello world
hadoop hive
sqoop hive
按照空格进行切割 [hello,world] [hadoop ,hive] [sqoop,hive] 转换成新的key2 value2 往下发送
key2 value2hello 1
world 1
hadoop 1
hive 1
sqoop 1
hive 1第三步:分区 相同key的value发送到同一个reduce当中去,
key进行合并,value形成一个集合
hive [1,1]
第四步:排序
第五步:规约
第六步:分组 第七步:reduce阶段,接收我们的key2 value2 转换成新的key3 value3进行输出
接收 key2 value2hive 1hive 1key2 value2已经变成了一个集合hive [1,1]转换成新的key3 value3hive 1+1 = 2第八步:输出我们的key3 value3
hive 2
sqoop 1
hello 1
world 1
hadoop 1
1.3、MapReduce框架结构
1.4、MapReduce编程规范及示例编写
需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数
数据格式准备如下:
cd /export/servers
vim wordcount.txt
hello,world,hadoop
hive,sqoop,flume,hello
kitty,tom,jerry,world
hadoop
hdfs dfs -mkdir /wordcount/
hdfs dfs -put wordcount.txt /wordcount/如何开发mr的代码
上面八个步骤,每一个步骤都是一个单独的java类
八个步骤写完了之后,通过job任务组装我们的mr的程序,进行任务的提交
定义一个主类,用来描述job并提交job
/*** 除了Javaweb的程序打成一个war包,其他都是打jar包* 运行jar包需要一个main方法,作为程序的入口类*/
public class MainCount extends Configured implements Tool {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();//这里执行完毕之后,得到一个int类型的返回值,表示我们程序的退出状态码// 状态码为0 程序执行成功// 通过这里设置configuration,就相当于把父类的configuration设置值了int run = ToolRunner.run(configuration, new MainCount(), args);System.exit(run);}/*** 这个run方法很重要* 这里面就是通过job对象来组装我们的程序,8个类*/@Overridepublic int run(String[] args) throws Exception {//第一步:读取文件,解析成key,value对//从父类里面获取configuration配置文件Job job = Job.getInstance(super.getConf(), "xxx");//jobname 随便写//如果打包到集群上运行,需要这句话,制定main方法所在的Java类job.setJarByClass(MainCount.class);TextInputFormat.addInputPath(job,new Path("hdfs://node1:8020/wordcount")); 可以把本地文件系统目录作为输入"file:///f:\\input"job.setInputFormatClass(TextInputFormat.class);//第二步:自定义map逻辑job.setMapperClass(WordCountMapper.class);//设置我们key2 value2的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);/*** 第三步到第六步,全部省略* 分区 相同key的value,发送到同一个reduce,key合并,value形成一个集合* 排序* 规约* 分组**///第七步:自定义reduce逻辑 默认只有一个reducetaskjob.setReducerClass(WordCountReducer.class);//通过这样设置reducetask个数job.setNumReduceTasks(3);//设置k3 v3的类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//第八步:输出文件//注意输出路径一定要不存在,存在就报错TextOutputFormat.setOutputPath(job,new Path("hdfs://node1:8020/wordcountoutput"));job.setOutputFormatClass(TextOutputFormat.class);//提交任务到集群上面boolean b = job.waitForCompletion(true);//三元运算符确认,程序推出状态码return b?0:1;}
}
定义一个mapper类
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {/*** 重写map方法,实现我们自己的逻辑,接受我们key1,value1 转换成k2 v2* key 是k1* value 是v1* context 上写文对象,承上启下,衔接上面的组件与下面的组件*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//第一步:切开一行数据 //hive,sqoop,flume,helloString line = value.toString();String[] split = line.split(",");//[hive,sqoop,flume,hello]// key2 value2// hive 1for (String word : split) {//通过write方法,将数据往下发送context.write(new Text(word),new IntWritable(1));}}
}
定义一个reducer类
public class WordCountReducer extends Reducer<Text, IntWritable,Text, IntWritable> {/**** @param key key 是 k2* @param values 注意这个value是一个集合,集合类型是 v2类型* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int j=0;for (IntWritable value : values) {//IntWritable类,没有普通的 + 方法,不能累加;需要转换成int进行累加int num = value.get();j += num;}//输出key3 value3 类型context.write(key,new IntWritable(j));}
}
1.5、MapReduce程序运行模式
本地运行模式
(1)mapreduce程序是被提交给LocalJobRunner在本地以单进程的形式运行
(2)而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上
(4)本地模式非常便于进行业务逻辑的debug,只要在idea中打断点即可
本地模式运行代码设置
TextInputFormat.addInputPath(job,new Path("file:///F:\\input"));
TextOutputFormat.setOutputPath(job,new Path("file:///F:\\output"));集群运行模式
将内容打成jar包在hdfs上执行:
hadoop jar original-day02_hdfs-1.0-SNAPSHOT.jar cn.itcast.wordcount.MainCount
04 分布式文件系统以及MapReduce入门程序相关推荐
- MapReduce入门程序
四.MapReduce 4.1 概述 MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算.概念"Map(映射)"和"Reduce(归约)" ...
- 高可用高性能分布式文件系统FastDFS实践Java程序
在前篇 高可用高性能分布式文件系统FastDFS进阶keepalived+nginx对多tracker进行高可用热备 中已介绍搭建高可用的分布式文件系统架构. 那怎么在程序中调用,其实网上有很多栗子, ...
- Hadoop MapReduce入门程序wordcount代码示例及打包部署运行结果演示
- Hadoop系列之六:分布式文件系统HDFS
1.MapReduce与分布式文件系统 前面的讨论中,我们已经得知,Hadoop中实现的MapReduce是一个编程模型和运行框架,它能够通过JobTracker接收客户提交的作业而后将其分割为多个任 ...
- 大数据技术之Hadoop分布式文件系统HDFS系统知识整理(从入门到熟练操作)
系列博客 1.大数据技术之Hadoop完全分布式集群搭建+Centos7配置连通外网和主机 2.大数据技术之Hadoop编译源码 3.大数据技术之Hadoop分布式文件系统HDFS系统知识整理(从入门 ...
- Hadoop教程(三):HDFS、MapReduce、程序入门实践
Hadoop 附带了一个名为 HDFS(Hadoop分布式文件系统)的分布式文件系统,基于 Hadoop 的应用程序使用 HDFS .HDFS 是专为存储超大数据文件,运行在集群的商品硬件上.它是容错 ...
- 大数据开发基础入门与项目实战(三)Hadoop核心及生态圈技术栈之2.HDFS分布式文件系统
文章目录 前言 1.HDFS特点 2.命令行和API操作HDFS (1)Shell命令行客户端 (2)API客户端连接HDFS的两种方式 (3)API客户端上传下载文件 (4)API客户端文件详情及文 ...
- JAVA大数据(二) Hadoop 分布式文件系统HDFS 架构,MapReduce介绍,Yarn资源调度
文章目录 1.分布式文件系统HDFS 1.HDFS的来源 2.HDFS的架构图之基础架构 2.1 master/slave 架构 2.2 名字空间(NameSpace) 2.3 文件操作 2.4副本机 ...
- Hadoop、分布式文件系统HDFS、YARN、MAPREDUCE
日萌社 人工智能AI:Keras PyTorch MXNet TensorFlow PaddlePaddle 深度学习实战(不定时更新) 1.1 什么是Hadoop Hadoop名字的由来 作者:Do ...
最新文章
- linux虚拟机文件挂载
- 01_Mybatis入门
- 浏览器模式用户代理字符串(IE)
- mysql search yum_mysql安装-yum方式
- ASP.NET CORE 项目实战 ---图形验证码的实现
- 机器人J中WPR_优傲:协作机器人的未来在哪里?
- linux取设备分辨率,linux 获取系统屏幕分辨率
- 如何将对象拼接成get传值的形式
- 如何快速开发一个支持高效、高并发的分布式ID生成器(二)
- markdown引入代码_markdown简单使用之插入代码段
- Linux添加环境变量与GCC编译器添加INCLUDE与LIB环境变量
- 用户微信好友关系属于个人隐私吗?深圳南山法院说不属于...
- android uri parcel,Android ParcelFileDescriptor实现进程间通信
- linux HA工作模型详解
- 知名IT公司的年度大会合集
- Python数学建模算法与应用(一、数学建模概论)
- 成功 Root ------ 红米note3
- JWplayer入门及使用
- Ansible 配置Windows Update
- 美国主要经济数据解注释
热门文章
- 《算法谜题》-第二章 谜题
- Android新闻公告切换效果(上下滚动左右滚动)
- 即使是庸才我也要成为庸才中的人才
- 黑马点评--附近商铺
- VSCode配置JavaScript基于Node.js环境
- 做自媒体,不用露脸拍视频,方法都在这篇文章
- 我慌了!我妈从床底掏出了我珍藏多年的小本本-----JAVA_Lambda表达式(笔记)
- 【JJ斗地主官网下载】在线斗地主比赛赢大奖,中文棋牌游戏
- 阻滞增长函数matlab拟合,matlab指数增长和阻滞增长拟合代码讲课稿
- 启动VMware时遇到“vmx86版本不匹配问题”处理方法