文章目录

  • 04 分布式文件系统以及MapReduce入门程序
  • 分布式文件系统HDFS
    • 1.分布式文件系统hdfs详细介绍
    • 2.HDFS分布式文件系统设计目标
    • 3.HDFS的来源
    • 4.hdfs的架构图
    • 5.hdfs的架构之文件的文件副本机制以及block块存储
    • 6.HDFS的元数据信息FSimage以及edits和secondaryNN的作用
      • 6.1、FSImage与edits详解
      • 6.2、FSimage文件当中的文件信息查看
      • 6.3、edits当中的文件信息查看
      • 6.4、secondarynameNode如何辅助管理FSImage与Edits文件
    • 7、HDFS的文件写入过程
    • 8、HDFS的文件读取过程
    • 9、HDFS的API操作
      • 9.1、创建maven工程并导入jar包
      • 9.2、使用url的方式访问数据(了解)
      • 9.3、使用文件系统方式访问数据(掌握)
      • 9.4、获取FileSystem的4种方式
      • 9.5、递归遍历文件系统当中的所有文件
      • 9.6、下载文件到本地
      • 9.7、hdfs上创建文件夹
      • 9.8、hdfs文件上传
      • 9.9、HDFS权限问题以及伪造用户
      • 9.10、HDFS的小文件合并
  • 分布式计算框架MapReduce入门
    • 1.1、理解MapReduce思想
    • 1.2、Hadoop MapReduce设计构思
    • 1.3、MapReduce框架结构
    • 1.4、MapReduce编程规范及示例编写
    • 1.5、MapReduce程序运行模式

04 分布式文件系统以及MapReduce入门程序

分布式文件系统HDFS

1.分布式文件系统hdfs详细介绍

在hadoop当中,分布式文件系统(HDFS),对文件系统有一个抽象,HDFS属于当中的一个实现类,也就是说分布式文件系统类
似于一个接口,定义了标准,下面有很多的实现类,其中HDFS是一个子实现类而已,但是现在很多人都只知道一种就是HDFS的
实现,并没有了解过其他的实现类,其实分布式文件系统的实现有很多种。
hadoop的文件系统:文件系统:是一个顶层的抽象,具体的实现,需要取决于你自己的获取的实例,我们可以通过文件系统获取本地文件系统,操作linux磁盘上面的文件,也可以获取分布式文件系统,操作hdfs上面的文件ftp:// ftp文件系统  可以做文件的上传下载webHdfs:浏览器操作文件系统,可以允许我们通过浏览器上传,下载,修改,hdfs上面的文件hdfs:分布式文件系统,最重要的一个local:本地文件系统

2.HDFS分布式文件系统设计目标

1、硬件错误  由于集群很多时候由数量众多的廉价机组成,使得硬件错误成为常态 ---(副本机制)
2、数据流访问  所有的访问都是访问大量的数据,使用IO流一直操作,追求的是稳定,不是效率
3、大数据集   假设所有存储到hdfs的数据都是海量的数据,不擅长处理小文件,一个小文件占用一个元数据,元数据都存储在内存当中,占用namenode的大量内存
4、简单的相关模型  假定文件是一次写入,多次读取的操作,比较擅长存出一些历史数据
5、移动计算比移动数据便宜   一个应用请求的计算,离它操作的数据越近,就越高效
6、多种软硬件的可移植性

3.HDFS的来源

HDFS起源于Google的GFS论文(GFS,Mapreduce,BigTable为google的旧的三驾马车)发表于2003年10月HDFS是GFS的克隆版
Hadoop  Distributed  File  system易于扩展的分布式文件系统运行在大量普通廉价机器上,提供容错机制为大量用户提供性能不错的文件存取服务

4.hdfs的架构图


1、NameNode负责管理文件系统的名字空间(namespace)以及客户端对文件的访问
2、文件操作,namenode是负责文件元数据的操作,datanode负责处理文件内容的读写,跟文件内容相关的数据流不经过
Namenode,只询问它跟哪个dataNode联系,否则NameNode会成为系统的瓶颈
3、副本存放在哪些Datanode上由NameNode来控制,根据全局情况作出块放置决定,读取文件时NameNode尽量让用户先读取
最近的副本,降低读取网络开销和读取延时
4、NameNode全权管理数据的复制,它周期性的从集群中的每个DataNode接收心跳信息和状态报告,接收到心跳信号意味着
DataNode节点工作正常,块状态报告包含了一个该DataNode上所有的数据列表
namenode负责数据block块的复制,定期的检测block的副本数,如果不够3个,继续复制出来保证足够三个
NameNode与Datanode的总结概述:

namenode的元数据保存在两个地方,一个是内存,一个是磁盘,存的是元数据的快照,如果快照非常大,停机再启动代价非常大

5.hdfs的架构之文件的文件副本机制以及block块存储

所有的文件都是以block块的方式存放在HDFS文件系统当中,在hadoop1当中,文件的block块默认大小是64M,hadoop2当中
,文件的block块大小默认是128M,block块的大小可以通过hdfs-site.xml当中的配置文件进行指定
<property><name>dfs.block.size</name><value>块大小 以字节为单位</value>//只写数值就可以
</property>抽象成数据块的好处:
1.一个文件有可能大于集群中任意一个磁盘
2.使用块抽象而不是文件可以简化存储子系统
3.块非常适合用于数据备份进而提供数据容错能力和可用性
block块的大小,可以根据实际工作当中的文件特性来调整,如果都是一些大文件,可以稍微调大block块的大小
128M的block块    300M 的文件 产生3个block块   3个block块的元数据信息会存储在namenode当中
256M的block块    300M 的文件 产生2个block块   2个block块的元数据信息会存储在namenode当中块缓存:
块缓存:distributedCache   可以用来实现我们的文件的缓存等等hdfs的文件权限验证
hdfs的文件权限机制与linux系统的文件权限机制类似
r:read   w:write  x:execute  权限x对于文件表示忽略,对于文件夹表示是否有权限访问其内容
如果linux系统用户zhangsan使用hadoop命令创建一个文件,那么这个文件在HDFS当中的owner就是zhangsan
HDFS文件权限的目的,防止好人做错事,而不是阻止坏人做坏事。HDFS相信你告诉我你是谁,你就是谁

6.HDFS的元数据信息FSimage以及edits和secondaryNN的作用

6.1、FSImage与edits详解

hdfs当中的元数据信息管理
fsimage:存储的是一份比较完整的元数据信息
edits:最近一段时间的操作日志
控制edits文件的大小:时间长短,文件大小 通过secondarynamenode将我们的edtis文件合并到fsimage当中去

客户端对hdfs进行写文件时会首先被记录在edits文件中。
edits修改时元数据也会更新。
每次hdfs更新时edits先更新后客户端才会看到最新信息。
fsimage:是namenode中关于元数据的镜像,一般称为检查点。
一般开始时对namenode的操作都放在edits中,为什么不放在fsimage中呢?
因为fsimage是namenode的完整的镜像,内容很大,如果每次都加载到内存的话生成树状拓扑结构,这是非常耗内存和CPU。
fsimage内容包含了namenode管理下的所有datanode中文件及文件block及block所在的datanode的元数据信息。
随着edits内容增大,就需要在一定时间点和fsimage合并。
合并过程见SecondaryNameNode如何辅助管理FSImage与edits

6.2、FSimage文件当中的文件信息查看

官方查看文档
http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.14.0/hadoop-project-dist/hadoop-hdfs/HdfsImageViewer.html使用命令 hdfs  oiv
cd  /export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/namenodeDatas/current
hdfs oiv -i fsimage_0000000000000000864 -p XML -o hello.xml

6.3、edits当中的文件信息查看

官方查看文档
http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.14.0/hadoop-project-dist/hadoop-hdfs/HdfsEditsViewer.html
查看命令 hdfs  oev
cd  /export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/dfs/nn/edits
hdfs oev -i  edits_0000000000000000865-0000000000000000866 -o myedit.xml -p XML

6.4、secondarynameNode如何辅助管理FSImage与Edits文件

①:secnonaryNN通知NameNode切换editlog
②:secondaryNN从NameNode中获得FSImage和editlog(通过http方式)
③:secondaryNN将FSImage载入内存,然后开始合并editlog,合并之后成为新的fsimage
④:secondaryNN将新的fsimage发回给NameNode
⑤:NameNode用新的fsimage替换旧的fsimage


完成合并的是secondarynamenode,会请求namenode停止使用edits,暂时将新写操作放入一个新的文件中(edits.new)。
secondarynamenode从namenode中通过http get获得edits,因为要和fsimage合并,所以也是通过http get 的方式
把fsimage加载到内存,然后逐一执行具体对文件系统的操作,与fsimage合并,生成新的fsimage,然后把fsimage发送给
namenode,通过http post的方式。namenode从secondarynamenode获得了fsimage后会把原有的fsimage替换为新的
fsimage,把edits.new变成edits。同时会更新fstime。
hadoop进入安全模式时需要管理员使用dfsadmin的save namespace来创建新的检查点。
secondarynamenode在合并edits和fsimage时需要消耗的内存和namenode差不多,所以一般把namenode和
secondarynamenode放在不同的机器上。
fs.checkpoint.period: 默认是一个小时(3600s)
fs.checkpoint.size:  edits达到一定大小时也会触发合并(默认64MB)

7、HDFS的文件写入过程

详细步骤解析:
1、 client发起文件上传请求,通过RPC与NameNode建立通讯,NameNode检查目标文件是否已存在,父目录是否存在,
返回是否可以上传;
2、 client请求第一个block该传输到哪些DataNode服务器上;
3、 NameNode根据配置文件中指定的备份数量及机架感知原理进行文件分配,返回可用的DataNode的地址如:A,B,C;注:Hadoop在设计时考虑到数据的安全与高效,数据文件默认在HDFS上存放三份,存储策略为本地一份,同机架内其它某一
节点上一份,不同机架的某一节点上一份。
4、 client请求3台DataNode中的一台A上传数据(本质上是一个RPC调用,建立pipeline),A收到请求会继续调用B,
然后B调用C,将整个pipeline建立完成,后逐级返回client;
5、 client开始往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位(默认64K),A收到
一个packet就会传给B,B传给C;A每传一个packet会放入一个应答队列等待应答。
6、 数据被分割成一个个packet数据包在pipeline上依次传输,在pipeline反方向上,逐个发送ack(命令正确应答),
最终由pipeline中第一个DataNode节点A将pipelineack发送给client;
7、 当一个block传输完成之后,client再次请求NameNode上传第二个block到服务器。

8、HDFS的文件读取过程

详细步骤解析
1、 Client向NameNode发起RPC请求,来确定请求文件block所在的位置;
2、 NameNode会视情况返回文件的部分或者全部block列表,对于每个block,NameNode 都会返回含有该 block 副本的
DataNode 地址;  这些返回的 DN 地址,会按照集群拓扑结构得出 DataNode 与客户端的距离,然后进行排序,排序两个
规则:网络拓扑结构中距离 Client 近的排靠前;心跳机制中超时汇报的 DN 状态为 STALE,这样的排靠后;
3、 Client 选取排序靠前的 DataNode 来读取 block,如果客户端本身就是DataNode,那么将从本地直接获取数据(短路
读取特性);
4、 底层上本质是建立 Socket Stream(FSDataInputStream),重复的调用父类 DataInputStream 的 read 方法,
直到这个块上的数据读取完毕;
5、 当读完列表的 block 后,若文件读取还没有结束,客户端会继续向NameNode 获取下一批的 block 列表;
6、 读取完一个 block 都会进行 checksum 验证,如果读取 DataNode 时出现错误,客户端会通知 NameNode,然后再
从下一个拥有该 block 副本的DataNode 继续读。
7、 read 方法是并行的读取 block 信息,不是一块一块的读取;NameNode 只是返回Client请求包含块的DataNode地址,
并不是返回请求块的数据;
8、 最终读取来所有的 block 会合并成一个完整的最终文件。

9、HDFS的API操作

9.1、创建maven工程并导入jar包

由于cdh版本的所有的软件涉及版权的问题,所以并没有将所有的jar包托管到maven仓库当中去,而是托管在了CDH自己的服务
器上面,所以我们默认去maven的仓库下载不到,需要自己手动的添加repository去CDH仓库进行下载,以下两个地址是官方
文档说明,请仔细查阅
https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh5_maven_repo.html
https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh5_maven_repo_514x.html

cdh5.114.0相关jar包

<!-- 通过哪个网址下载我们的jar包 -->
<repositories><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository>
</repositories>
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0-mr1-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.0-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.6.0-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.6.0-cdh5.14.0</version></dependency><!-- https://mvnrepository.com/artifact/junit/junit --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.testng</groupId><artifactId>testng</artifactId><version>RELEASE</version></dependency>
</dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding><!--    <verbal>true</verbal>--></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><minimizeJar>true</minimizeJar></configuration></execution></executions></plugin><!--  <plugin><artifactId>maven-assembly-plugin </artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>cn.itcast.hadoop.db.DBToHdfs2</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin>--></plugins>
</build>

9.2、使用url的方式访问数据(了解)

public class HdfsOperate {@Testpublic  void downHdfsFile() throws Exception {// 通信三要素:ip 端口 协议// 第一步:注册hdfs的驱动文件,表示我们使用hdfs://这种协议访问我们hdfs文件系统URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());//URL需要接受一个文件的访问地址String url = "hdfs://node1:8020/install2.log";//获取一个inputStreamInputStream inputStream = new URL(url).openStream();OutputStream outputStream = new FileOutputStream(new File("c:\\hello.txt"));//通过工具类将我们的输入流读取到输出流里IOUtils.copy(inputStream,outputStream);//关闭io流IOUtils.closeQuietly(inputStream);IOUtils.closeQuietly(outputStream);}
}
如何解决winutils的问题
第一步:把这个文件夹  3、大数据离线第三天\解决winutils的问题\hadoop-2.6.0-cdh5.14.0 拷贝到一个没有中文没有空格的路径下
第二步:配置windows的hadoop_home环境变量
第三步:将这个路径下面的 hadoop-2.6.0-cdh5.14.0\bin\hadoop.dll  这个文件,丢到C:\Windows\System32  这个路径下
第四步:关闭windows重启

9.3、使用文件系统方式访问数据(掌握)

在 java 中操作 HDFS,主要涉及以下 Class:
Configuration:该类的对象封转了客户端或者服务器的配置; FileSystem:该类的对象是一个文件系统对象,
可以用该对象的一些方法来对文件进行操作,通过 FileSystem 的静态方法 get 获得该对象。
FileSystem fs = FileSystem.get(conf)
get 方法从 conf 中的一个参数 fs.defaultFS 的配置值判断具体是什么类型的文件系统。如果我们的代码中没有指定fs.defaultFS,并且工程 classpath下也没有给定相应的配置,conf中的默认值就来自于hadoop的jar包中的core-default.xml , 默 认 值 为 : file:/// , 则 获 取 的 将 不 是 一 个DistributedFileSystem的实例,而是一个本地文件系统的客户端对象

9.4、获取FileSystem的4种方式

    /*** 获取文件系统的第一种方式*/@Testpublic void  getFileSystem1() throws IOException {//FileSystem是一个抽象类,获取抽象类的实例有两种方式// 第一种:看看这个抽象类有没有提供什么方法,返回它本身// 第二种:找子类Configuration configuration = new Configuration();//如果不加任何配置,这里获取到的就是本地文件系统configuration.set("fs.defaultFS","hdfs://node1:8020");FileSystem fileSystem = FileSystem.get(configuration);System.out.println(fileSystem.toString());fileSystem.close();}/*** 获取文件系统的第二种方式*/@Testpublic void getFileSystem2() throws Exception {Configuration configuration = new Configuration();//通过指定URL来获取分布式文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), configuration);System.out.println(fileSystem.toString());fileSystem.close();}/*** 获取文件系统的第三种方式*/@Testpublic void getFileSystem3() throws IOException {Configuration configuration = new Configuration();configuration.set("fs.defaultFS","hdfs://node1:8020");FileSystem fileSystem = FileSystem.newInstance(configuration);System.out.println(fileSystem.toString());fileSystem.close();}/*** 获取文件系统的第四种方式*/@Testpublic void getFileSystem4() throws Exception {Configuration configuration = new Configuration();configuration.set("fs.defaultFS","hdfs://node1:8020");FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://node1:8020"),configuration);System.out.println(fileSystem.toString());fileSystem.close();}
}

9.5、递归遍历文件系统当中的所有文件

 /**
* 递归遍历hdfs上面的所有文件出来
* 第一种方法:通过递归
*/
@Test
public void getAllFiles() throws Exception {//获取文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());//获取到我们的文件的状态,可以通过fileStatuses来判断是文件还是文件夹FileStatus[] fileStatuses = fileSystem.listStatus(new Path("hdfs://node1:8020/"));// 循环遍历fileStatus 判断是文件夹还有文件,如果是文件直接输出路径,如果是文件夹继续进去遍历for (FileStatus fileStatus:fileStatuses){if (fileStatus.isDirectory()){//文件夹getDirFiles(fileStatus.getPath(),fileSystem);}else {//文件Path path = fileStatus.getPath();System.out.println(path.toString());}}
}
public void getDirFiles(Path path, FileSystem fileSystem) throws IOException {FileStatus[] fileStatuses = fileSystem.listStatus(path);for (FileStatus fileStatus : fileStatuses) {if (fileStatus.isDirectory()){getDirFiles(fileStatus.getPath(),fileSystem);}else {System.out.println(fileStatus.getPath().toString());}}}
 /**
* 递归遍历hdfs上面的所有文件出来
* 第二种方法:通过hdfs最直接提供的api进行遍历
*/
@Test
public void getAllFiles2() throws Exception{//获取文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("hdfs://node1:8020/"), true);while (locatedFileStatusRemoteIterator.hasNext()){LocatedFileStatus fileStatus = locatedFileStatusRemoteIterator.next();System.out.println(fileStatus.getPath().toString());}fileSystem.close();
}

9.6、下载文件到本地

方法1

 /**
* 下载hdfs的文件到本地目录
*/
@Test
public void downFileToLocal() throws Exception {//获取文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());FSDataInputStream inputStream = fileSystem.open(new Path("/install2.log"));FileOutputStream fileOutputStream = new FileOutputStream(new File("c:\\hello2.txt"));IOUtils.copy(inputStream,fileOutputStream);IOUtils.closeQuietly(inputStream);IOUtils.closeQuietly(fileOutputStream);fileSystem.close();
}

方法2

/**
* 下载hdfs的文件到本地目录
*/
@Test
public void downFileToLocal2() throws Exception {//获取文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());fileSystem.copyToLocalFile(new Path("hdfs://node1:8020/install2.log"),new Path("file:///c:\\hello3.txt"));fileSystem.close();
}

9.7、hdfs上创建文件夹

/**
* hdfs上面创建文件夹
*/
@Test
public void createDir() throws Exception {FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://node1:8020"), new Configuration());fileSystem.mkdirs(new Path("/dir1/dir2"));fileSystem.close();
}

9.8、hdfs文件上传

方法1

/*** hdfs文件上传方法1*/
@Test
public void uploadFile() throws Exception{FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());fileSystem.copyFromLocalFile(new Path("file:///d:\\mn.jpg"),new Path("/dir1/dir2/mk.jpg"));fileSystem.close();
}

方法2

/**** hdfs文件上传方法2*/
@Test
public void uploadFile2() throws Exception{FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("/dir1/dir2/story.txt"));FileInputStream fileInputStream = new FileInputStream("d:\\窗里窗外.txt");IOUtils.copy(fileInputStream,fsDataOutputStream);IOUtils.closeQuietly(fsDataOutputStream);IOUtils.closeQuietly(fileInputStream);
}

9.9、HDFS权限问题以及伪造用户

首先停止hdfs集群,在node01机器上执行以下命令
cd /export/servers/hadoop-2.6.0-cdh5.14.0
sbin/stop-dfs.sh
修改node01机器上的hdfs-site.xml当中的配置文件
cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
vim hdfs-site.xml
<property><name>dfs.permissions</name><value>true</value>
</property>
修改完成之后配置文件发送到其他机器上面去
scp hdfs-site.xml node2:$PWD
scp hdfs-site.xml node3:$PWD
重启hdfs集群
cd /export/servers/hadoop-2.6.0-cdh5.14.0
sbin/start-dfs.sh随意上传一些文件到我们hadoop集群当中准备测试使用
cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
hdfs dfs -mkdir /config
hdfs dfs -put *.xml /config
hdfs dfs -chmod 600 /config/core-site.xml   使用代码准备下载文件
@Test //hdfs伪造用户,你告诉hdfs你是谁,你就是谁
public void getConfig()throws  Exception{FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration(),"root");fileSystem.copyToLocalFile(new Path("/config/core-site.xml"),new Path("file:///d:/core-site.xml"));fileSystem.close();
}

9.10、HDFS的小文件合并

由于hadoop擅长存储大文件,因为大文件的元数据信息比较少,如果hadoop集群当中有大量的
小文件,那么每个小文件都需要维护一份元数据信息,会大大的增加集群管理元数据的内存压力,
所以在实际工作当中,如果有必要一定要将小文件合并成大文件进行一起处理
在我们的hdfs 的shell命令模式下,可以通过命令行将很多的hdfs文件合并成一个大文件下载
到本地,命令如下cd /export/servers
hdfs dfs -getmerge /config/*.xml  ./hello.xml1.在上传之前做IO流合并//第一步获取本地文件系统//第二步找到所有本地文件系统的小文件//第三步输出成IO流//第四步将我们的IO流输出到hdfs上面去
2.上传之后,可以通过MR程序实现小文件合并成SequenceFile
3.har文件 归档文件
/*** 小文件合并*/
@Test
public void mergeSmallFile() throws Exception{// 获取分布式文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration(), "root");// 获取hdfs文件上的输出流FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("/bigFile.xml"));// 获取本地文件系统,遍历我们本地的小文件,将每一个小文件读成一个输入流LocalFileSystem localFileSystem = FileSystem.getLocal(new Configuration());FileStatus[] fileStatuses = localFileSystem.listStatus(new Path("file:///E:\\大数据资料\\大数据离线资料\\3、大数据离线第三天\\上传小文件合并"));for (FileStatus fileStatus : fileStatuses) {//获取每一个文件的路径Path path = fileStatus.getPath();// 循环遍历读取我们每一个文件的输入流FSDataInputStream fsDataInputStream = localFileSystem.open(path);IOUtils.copy(fsDataInputStream,fsDataOutputStream);IOUtils.closeQuietly(fsDataInputStream);}IOUtils.closeQuietly(fsDataOutputStream);
}

分布式计算框架MapReduce入门

1.1、理解MapReduce思想

分布式文件计算系统:主要用于计算一些数据
mapreduce的核心思想:分治 分而治之
最主要的特点:把一个大问题分成很多小问题,并且每个小的问题的解决思路与大问题的解决思路是一样的

Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是
这些小任务可以并行计算,彼此间几乎没有依赖关系。Reduce负责“合”,即对map阶段的结果进行全局汇总。
这两个阶段合起来正是MapReduce思想的体现。

一个block块数据对应一个mapTask:
在mapreduce中没有block块的概念,每个mapTask处理的一部分数据都叫做一个切片,
默认每个切片对应一个block块的大小

1.2、Hadoop MapReduce设计构思

Hadoop MapReduce构思体现在如下的三个方面:
(1)如何对付大数据处理:分而治之
(2)构建抽象模型:Map和ReduceMapReduce借鉴了函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型。
Map: 对一组数据元素进行某种重复式的处理;
Reduce: 对Map的中间结果进行某种进一步的结果整理。
MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:
map: (k1; v1) → [(k2; v2)]
reduce: (k2; [v2]) → [(k3; v3)]
Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。通过以上两个编程接口,大家可以看出MapReduce处理的数据类型是<key,value>键值对。
(3)统一构架,隐藏系统层细节如何提供统一的计算框架,如果没有统一封装底层细节,那么程序员则需要考虑诸如数据
存储、划分、分发、结果收集、错误恢复等诸多细节;为此,MapReduce设计并提供了统一的
计算框架,为程序员隐藏了绝大多数系统层面的处理细节。
MapReduce最大的亮点在于通过抽象模型和计算框架把
需要做什么(what need to do)与具体怎么做(how to do)分开了,
为程序员提供一个抽象和高层的编程接口和框架。程序员仅需要关心其应用层的具体计算问题,
仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的
诸多系统层细节被隐藏起来,交给计算框架去处理:从分布代码的执行,到大到数千小到单个节点
集群的自动调度使用。编写mapreduce的天龙八部:
map阶段两个步骤:
第一步:读取文件,解析成key,value对,这里是我们的K1 V1
第二步:接受我们的K1 V1,自定义我们的map逻辑,然后转换成key2 value2进行输出
往下发送 这里发送K2 V2shuffle阶段四个步骤:
第三步:分区   相同key的value发送到同一个reduce里面去,key合并,value形成一个集合
第四步:排序   默认按照字典顺序进行排序
第五步:规约
第六步:分组reduce阶段两个步骤:
第七步:接收我们的K2 V2 自定义我们reduce的逻辑,转换成新的K3 V3进行输出
第八步:将我们的K3 V3进行输出

例子:

单词统计,现在要统计,每一个单词在文本当中出现了多少次
hello world
hadoop hive
sqoop hive
hadoop hive第一步:读取文件,解析成key,value对  key是我们的行偏移量  value是我们行文本内容
下一行是上一行的行偏移量
key1  value1
0  hello world
11  hadoop  hive
22 sqoop  hive第二步:自定义map逻辑,接收我们的key1,value1 转换成新的key2  value2进行输出
获取我们的value1,
hello   world
hadoop  hive
sqoop  hive
按照空格进行切割  [hello,world] [hadoop ,hive]  [sqoop,hive]  转换成新的key2   value2 往下发送
key2    value2hello   1
world   1
hadoop  1
hive    1
sqoop   1
hive    1第三步:分区  相同key的value发送到同一个reduce当中去,
key进行合并,value形成一个集合
hive  [1,1]
第四步:排序
第五步:规约
第六步:分组  第七步:reduce阶段,接收我们的key2   value2   转换成新的key3   value3进行输出
接收  key2     value2hive    1hive    1key2     value2已经变成了一个集合hive   [1,1]转换成新的key3   value3hive    1+1 = 2第八步:输出我们的key3  value3
hive  2
sqoop  1
hello   1
world   1
hadoop  1

1.3、MapReduce框架结构

1.4、MapReduce编程规范及示例编写

需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数
数据格式准备如下:
cd /export/servers
vim wordcount.txt
hello,world,hadoop
hive,sqoop,flume,hello
kitty,tom,jerry,world
hadoop
hdfs dfs -mkdir /wordcount/
hdfs dfs -put wordcount.txt /wordcount/如何开发mr的代码
上面八个步骤,每一个步骤都是一个单独的java类
八个步骤写完了之后,通过job任务组装我们的mr的程序,进行任务的提交

定义一个主类,用来描述job并提交job

/*** 除了Javaweb的程序打成一个war包,其他都是打jar包* 运行jar包需要一个main方法,作为程序的入口类*/
public class MainCount extends Configured implements Tool {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();//这里执行完毕之后,得到一个int类型的返回值,表示我们程序的退出状态码// 状态码为0 程序执行成功// 通过这里设置configuration,就相当于把父类的configuration设置值了int run = ToolRunner.run(configuration, new MainCount(), args);System.exit(run);}/***  这个run方法很重要*  这里面就是通过job对象来组装我们的程序,8个类*/@Overridepublic int run(String[] args) throws Exception {//第一步:读取文件,解析成key,value对//从父类里面获取configuration配置文件Job job = Job.getInstance(super.getConf(), "xxx");//jobname 随便写//如果打包到集群上运行,需要这句话,制定main方法所在的Java类job.setJarByClass(MainCount.class);TextInputFormat.addInputPath(job,new Path("hdfs://node1:8020/wordcount"));  可以把本地文件系统目录作为输入"file:///f:\\input"job.setInputFormatClass(TextInputFormat.class);//第二步:自定义map逻辑job.setMapperClass(WordCountMapper.class);//设置我们key2 value2的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);/*** 第三步到第六步,全部省略* 分区 相同key的value,发送到同一个reduce,key合并,value形成一个集合* 排序* 规约* 分组**///第七步:自定义reduce逻辑 默认只有一个reducetaskjob.setReducerClass(WordCountReducer.class);//通过这样设置reducetask个数job.setNumReduceTasks(3);//设置k3 v3的类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//第八步:输出文件//注意输出路径一定要不存在,存在就报错TextOutputFormat.setOutputPath(job,new Path("hdfs://node1:8020/wordcountoutput"));job.setOutputFormatClass(TextOutputFormat.class);//提交任务到集群上面boolean b = job.waitForCompletion(true);//三元运算符确认,程序推出状态码return b?0:1;}
}

定义一个mapper类

public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {/*** 重写map方法,实现我们自己的逻辑,接受我们key1,value1 转换成k2 v2* key 是k1* value 是v1* context 上写文对象,承上启下,衔接上面的组件与下面的组件*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//第一步:切开一行数据 //hive,sqoop,flume,helloString line = value.toString();String[] split = line.split(",");//[hive,sqoop,flume,hello]// key2 value2// hive 1for (String word : split) {//通过write方法,将数据往下发送context.write(new Text(word),new IntWritable(1));}}
}

定义一个reducer类

public class WordCountReducer extends Reducer<Text, IntWritable,Text, IntWritable> {/**** @param key key 是 k2* @param values 注意这个value是一个集合,集合类型是 v2类型* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int j=0;for (IntWritable value : values) {//IntWritable类,没有普通的 + 方法,不能累加;需要转换成int进行累加int num = value.get();j += num;}//输出key3 value3 类型context.write(key,new IntWritable(j));}
}

1.5、MapReduce程序运行模式

本地运行模式
(1)mapreduce程序是被提交给LocalJobRunner在本地以单进程的形式运行
(2)而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上
(4)本地模式非常便于进行业务逻辑的debug,只要在idea中打断点即可
本地模式运行代码设置
TextInputFormat.addInputPath(job,new Path("file:///F:\\input"));
TextOutputFormat.setOutputPath(job,new Path("file:///F:\\output"));集群运行模式
将内容打成jar包在hdfs上执行:
hadoop jar original-day02_hdfs-1.0-SNAPSHOT.jar cn.itcast.wordcount.MainCount

04 分布式文件系统以及MapReduce入门程序相关推荐

  1. MapReduce入门程序

    四.MapReduce 4.1 概述 MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算.概念"Map(映射)"和"Reduce(归约)" ...

  2. 高可用高性能分布式文件系统FastDFS实践Java程序

    在前篇 高可用高性能分布式文件系统FastDFS进阶keepalived+nginx对多tracker进行高可用热备 中已介绍搭建高可用的分布式文件系统架构. 那怎么在程序中调用,其实网上有很多栗子, ...

  3. Hadoop MapReduce入门程序wordcount代码示例及打包部署运行结果演示

  4. Hadoop系列之六:分布式文件系统HDFS

    1.MapReduce与分布式文件系统 前面的讨论中,我们已经得知,Hadoop中实现的MapReduce是一个编程模型和运行框架,它能够通过JobTracker接收客户提交的作业而后将其分割为多个任 ...

  5. 大数据技术之Hadoop分布式文件系统HDFS系统知识整理(从入门到熟练操作)

    系列博客 1.大数据技术之Hadoop完全分布式集群搭建+Centos7配置连通外网和主机 2.大数据技术之Hadoop编译源码 3.大数据技术之Hadoop分布式文件系统HDFS系统知识整理(从入门 ...

  6. Hadoop教程(三):HDFS、MapReduce、程序入门实践

    Hadoop 附带了一个名为 HDFS(Hadoop分布式文件系统)的分布式文件系统,基于 Hadoop 的应用程序使用 HDFS .HDFS 是专为存储超大数据文件,运行在集群的商品硬件上.它是容错 ...

  7. 大数据开发基础入门与项目实战(三)Hadoop核心及生态圈技术栈之2.HDFS分布式文件系统

    文章目录 前言 1.HDFS特点 2.命令行和API操作HDFS (1)Shell命令行客户端 (2)API客户端连接HDFS的两种方式 (3)API客户端上传下载文件 (4)API客户端文件详情及文 ...

  8. JAVA大数据(二) Hadoop 分布式文件系统HDFS 架构,MapReduce介绍,Yarn资源调度

    文章目录 1.分布式文件系统HDFS 1.HDFS的来源 2.HDFS的架构图之基础架构 2.1 master/slave 架构 2.2 名字空间(NameSpace) 2.3 文件操作 2.4副本机 ...

  9. Hadoop、分布式文件系统HDFS、YARN、MAPREDUCE

    日萌社 人工智能AI:Keras PyTorch MXNet TensorFlow PaddlePaddle 深度学习实战(不定时更新) 1.1 什么是Hadoop Hadoop名字的由来 作者:Do ...

最新文章

  1. linux虚拟机文件挂载
  2. 01_Mybatis入门
  3. 浏览器模式用户代理字符串(IE)
  4. mysql search yum_mysql安装-yum方式
  5. ASP.NET CORE 项目实战 ---图形验证码的实现
  6. 机器人J中WPR_优傲:协作机器人的未来在哪里?
  7. linux取设备分辨率,linux 获取系统屏幕分辨率
  8. 如何将对象拼接成get传值的形式
  9. 如何快速开发一个支持高效、高并发的分布式ID生成器(二)
  10. markdown引入代码_markdown简单使用之插入代码段
  11. Linux添加环境变量与GCC编译器添加INCLUDE与LIB环境变量
  12. 用户微信好友关系属于个人隐私吗?深圳南山法院说不属于...
  13. android uri parcel,Android ParcelFileDescriptor实现进程间通信
  14. linux HA工作模型详解
  15. 知名IT公司的年度大会合集
  16. Python数学建模算法与应用(一、数学建模概论)
  17. 成功 Root ------ 红米note3
  18. JWplayer入门及使用
  19. Ansible 配置Windows Update
  20. 美国主要经济数据解注释

热门文章

  1. 《算法谜题》-第二章 谜题
  2. Android新闻公告切换效果(上下滚动左右滚动)
  3. 即使是庸才我也要成为庸才中的人才
  4. 黑马点评--附近商铺
  5. VSCode配置JavaScript基于Node.js环境
  6. 做自媒体,不用露脸拍视频,方法都在这篇文章
  7. 我慌了!我妈从床底掏出了我珍藏多年的小本本-----JAVA_Lambda表达式(笔记)
  8. 【JJ斗地主官网下载】在线斗地主比赛赢大奖,中文棋牌游戏
  9. 阻滞增长函数matlab拟合,matlab指数增长和阻滞增长拟合代码讲课稿
  10. 启动VMware时遇到“vmx86版本不匹配问题”处理方法