Hadoop三大框架之MapReduce工作流程
一、MapReduce基础
MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。
Map负责“分”,把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
Reduce负责“合”,即对map阶段的结果进行全局汇总。
MapReduce运行在yarn集群。ResourceManager+NodeManager这两个阶段合起来就是MapReduce思想的体现。
1.1 MapReduce设计构思
MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。
MapReduce设计并提供了一个同意的计算框架,为程序员隐藏了绝大多数系统层面的处理细节,为程序员提供了一个抽象和高层的编程接口和框架。程序员仅需要关心应用层的具体计算问题,仅需要编写少量的处理应用本身计算问题的程序代码。
Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现Map和Reduce,MapReduce处理的数据类型是<key,value>键值对。
一个完整的MapReduce程序在分布式运行时有三类实例进程:
MRAppMaster 负责整个程序的过程调度及状态协调
MapTask 负责map阶段的整个数据处理流程
ReduceTask 负责reduce阶段的整个数据处理流程
1.2 MapReduce工作原理
1、分片操作:
FileInputstream,首先要计算切片大小,FileInputstream是一个抽象类,继承InputFormat接口,真正完成工作的是它的实现类,默认为是TextInputFormat,TextInputFormat是读取文件的,默认为一行一行读取,将输入文件切分为逻辑上的多个input split,input split是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念。
在进行Map计算之前,MapReduce会根据输入文件计算的切片数开启map任务,一个输入切片对应一个maptask,输入分片存储的并非数据本身,而是一个分片长度和一个记录数据位置的集合,每个input spilt中存储着该分片的数据信息如:文件块信息、起始位置、数据长度、所在节点列表等,并不是对文件实际分割成多个小文件,输入切片大小往往与hdfs的block关系密切,默认一个切片对应一个block,大小为128M;注意:尽管我们可以使用默认块大小或自定义的方式来定义分片的大小,但一个文件的大小,如果是在切片大小的1.1倍以内,仍作为一个片存储,而不会将那多出来的0.1单独分片。
2、数据格式化操作:
TextInputFormat 会创建RecordReader去读取数据,通过getCurrentkey,getCurrentvalue,nextkey,value等方法来读取,读取结果会形成key,value形式返回给maptask,key为偏移量,value为每一行的内容,此操作的作用为在分片中每读取一条记录就调用一次map方法,反复这一过程直到将整个分片读取完毕。
3、map阶段操作:
此阶段就是程序员通过需求偏写了map函数,将数据格式化的<K,V>键值对通过Mapper的map()方法逻辑处理,形成新的<k,v>键值对,通过Context.write输出到OutPutCollector收集器
map端的shuffle(数据混洗)过程:溢写(分区,排序,合并,归并)
溢写:
由map处理的结果并不会直接写入磁盘,而是会在内存中开启一个环形内存缓冲区,先将map结果写入缓冲区,这个缓冲区默认大小为100M,并且在配置文件里为这个缓冲区设了一个阀值,默认为0.8,同时map还会为输出操作启动一个守护线程,如果缓冲区内存达到了阀值0.8,这个线程会将内容写入到磁盘上,这个过程叫作spill(溢写)。
分区Partition
当数据写入内存时,决定数据由哪个Reduce处理,从而需要分区,默认分区方式采用hash函数对key进行哈布后再用Reduce任务数量进行取模,表示为hash(key)modR,这样就可以把map输出结果均匀分配给Reduce任务处理,Partition与Reduce是一一对应关系,类似于一个分片对应一个map task,最终形成的形式为(分区号,key,value)
排序Sort:
在溢出的数据写入磁盘前,会对数据按照key进行排序,默认采用快速排序,第一关键字为分区号,第二关键字为key。
合并combiner:
程序员可选是否合并,数据合并,在Reduce计算前对相同的key数据、value值合并,减少输出量,如(“a”,1)(“a”,1)合并之后(“a”,2)
归并menge
每块溢写会成一个溢写文件,这些溢写文件最终需要被归并为一个大文件,生成key对应的value-list,会进行归并排序<"a",1><"a",1>归并后<"a",<1,1>>。
Reduce 端的shffle:
数据copy:map端的shffle结束后,所有map的输出结果都会保存在map节点的本地磁盘上,文件都经过分区,不同的分区会被copy到不同的Recuce任务并进行并行处理,每个Reduce任务会不断通过RPC向JobTracker询问map任务是否完成,JobTracker检测到map位务完成后,就会通过相关Reduce任务去aopy拉取数据,Recluce收到通知就会从Map任务节点Copy自己分区的数据此过程一般是Reduce任务采用写个线程从不同map节点拉取
归并数据:
Map端接取的数据会被存放到 Reduce端的缓存中,如果缓存被占满,就会溢写到磁盘上,缓存数据来自不同的Map节点,会存在很多合并的键值对,当溢写启动时,相同的keg会被归并,最终各个溢写文件会被归并为一个大类件归并时会进行排序,磁盘中多个溢写文许归并为一个大文许可能需要多次归并,一次归并溢写文件默认为10个
4、Reduce阶段:
Reduce任务会执行Reduce函数中定义的各种映射,输出结果存在分布式文件系统中。
二、 MapReduce编程模型
2.1 编程模型概述
2.1.1 Map阶段2个步骤
设置InputFormat类,将数据切分为Key-Value(K1和V1)对,输入到第二步
自定义Map逻辑,将第一步的结果转换为另外的Key-Value(K2和V2)对,输出结果
2.1.2 Shuffle阶段4个步骤
对输出的Key-Value进行分区
对不同分区的数据按照相同的key排序
(可选)对分组过的数据初步规约,降低数据的网络拷贝
对数据进行分组,相同的Key和Value放入一个集合中
2.1.3 Reduce阶段2个步骤
对多个Map任务的结果进行排序以及合并,编写Reduce函数实现自己的逻辑,对输入的Key-Value进行处理,转为新的Key-Value(K3和V3)输出
设置OutputFromat处理并保存Reduce输出的Key-Value数据
2.2 编程模型三部曲
(1)Input:一系列(K1,V1)。
(2)Map和Reduce:
Map:(K1,V1) -> list(K2,V2) (其中K2/V2是中间结果对)
Reduce:(K2,list(v2)) -> list(K3,V3)
(3)Output:一系列(K3,V3)。
三、词频统计WordCount
3.1 添加Maven依赖
# 将maven版本改为1.8
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target># 添加如下依赖
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop-version}</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop-version}</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${hadoop-version}</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-common</artifactId><version>${hadoop-version}</version>
</dependency>
3.2 代码实现
《wordcount.txt》hello java
hello hadoop
hello java
hello java hadoop
java hadoop
hadoop java
Mapper类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Text text = new Text();IntWritable intWritable = new IntWritable();System.out.println("WordCountMap stage Key:"+key+" Value:"+value);String[] words = value.toString().split(" "); // "hello world"->[hello,world]for (String word : words) {text.set(word);intWritable.set(1);context.write(text,intWritable); // <hello,1> <word,1>}}
}
Reduce类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReduce extends Reducer<Text, IntWritable,Text, LongWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {System.out.println("Reduce stage Key:"+key+" Values"+values.toString());int count=0;for (IntWritable intWritable : values) {count += intWritable.get();}LongWritable longWritable = new LongWritable(count);System.out.println("Key:"+key+" ResultValue:"+longWritable.get());context.write(key,longWritable);}
}
Driver类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);job.setJarByClass(WordCountDriver.class);// 设置mapper类job.setMapperClass(WordCountMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 设置Reduce类job.setReducerClass(WordCountReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 指定map输入的文件路径FileInputFormat.setInputPaths(job,new Path("D:\\Servers\\hadoopstu\\in\\wordcount.txt"));// 指定reduce结果的输出路径Path path = new Path("D:\\Servers\\hadoopstu\\out1");FileSystem fileSystem = FileSystem.get(path.toUri(), configuration);if(fileSystem.exists(path)){fileSystem.delete(path,true);}FileOutputFormat.setOutputPath(job,path);job.waitForCompletion(true);}
}
3.3 代码说明
3.3.1 对于map函数的方法
protected void map(LongWritable key, Text value, Context context)
继承Mapper类,实现map方法,重写的map方法中包含三个参数,key、value就是输入的key、value键值对(<K1,V1>),context记录的是整个上下文,可以通过context将数据写出去。
3.3.2 对于reduce函数的方法
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
继承Reduce类,实现reduce方法,reduce函数输入的是一个<K,V>形式,但是这里的value是以迭代器的形式Iterable<IntWritable> value。即,reduce的输入是一个key对应一组value。
reduce中context参数与map中的reduce参数作用一致。
3.3.3 对于main函数的调用
创建configuration()类,作用是读取MapReduce系统配置信息
创建job类
设置map函数、map函数输出的key、value类型
设置reduce函数、reduce函数输出的key、value类型,即最终存储再HDFS结果文件中的key、value类型
Hadoop三大框架之MapReduce工作流程相关推荐
- Hadoop之MapReduce工作流程
Hadoop之MapReduce工作流程 目录 流程示意图 流程详解 注意 1. 流程示意图 MapReduce工作流程 流程示意图,如下图 2. 流程详解 上面的流程是整个mapreduce最全工作 ...
- SSM三大框架+SpringMVC的工作原理及其流程
SSM三大框架+SpringMVC的工作原理及其流程 一.SSM中各层作用及关系 1.持久层:DAO层(mapper层)(属于mybatis模块) DAO层(Mapper层):主要负责与数据库 ...
- MapReduce 分布式计算框架 简介 特点 工作流程
MapReduce 计算框架 一种分布式计算框架,解决海量数据的计算问题 MapReduce将整个并行计算过程抽象到两个函数 Map(映射):对一些独立元素组成的列表的每一个元素进行指定的操作,可以高 ...
- Mapreduce工作流程与简介
最近几天一直在学习关于大数据方面的相关技术,今天学习了MapReduce的工作流程,让我对数据地处理有了新的认识,接下来我分享一下关于MapReduce2.0的工作流程 Mapreduce简介 Had ...
- mapreduce工作流程_我要进大厂之大数据MapReduce知识点(2)
01 我们一起学大数据 今天老刘分享的是MapReduce知识点的第二部分,在第一部分中基本把MapReduce的工作流程讲述清楚了,现在就是对MapReduce零零散散的知识点进行总结,这次的内容大 ...
- mapreduce工作流程
mapre的工作流程 1.文件要存储在HDFS中,每个文件被切分成多个一定大小的块也就是Block,(Hadoop1.0默认为64M,Hadoop2.0默认为128M),并且默认3个备份存储在多个的节 ...
- 17-爬虫之scrapy框架五大核心组件工作流程及下载中间件介绍04
scrapy的五大核心组件 引擎(Scrapy) 对整个系统的数据流进行处理, 触发事务(框架核心). 调度器(Scheduler) 用来接受引擎发过来的请求. 由过滤器过滤重复的url并将其压入队列 ...
- Django框架MVT模型工作流程
Django 一.Django介绍 Django是一个开源的Web应用框架,由Python写成.采用了MTV的框架模式,它最初是被用来做CMS(内容管理系统)软件. 使用Django,程序员可以方便. ...
- 二十一、MapReduce工作流程介绍
一.流程示意图 如下图流程mr工作详细运行流程 步骤详细说明 1.获取待处理文件信息,得到文件大小,文件存储位置 2.根据切片参数,准备文件切片信息,如上假设按照默认块大小切片 0-128M 129- ...
最新文章
- 单应性Homograph估计:从传统算法到深度学习
- 第1节:C语言发展及基本格式步骤
- hibernate3.2多表关联查询常见问题
- python怎么接外活_Python三大活器
- dell t40 固态系统盘_笔记本怎么安装固态硬盘 笔记本安装固态硬盘教程【详解】...
- 信息学奥赛一本通C++语言——1037:计算2的幂
- JSON有关的一道题
- Blender 快捷键总结,一些子问题
- 清华大学和ubc计算机哪个好,2021世界大学排名出炉!多大进前20!清华UBC并列!...
- 【Pandas】Pandas求某列字符串的长度,总结经验教训
- 不是“饭饭之交”! 李彦宏丁磊CP乌镇神同步
- Windows 7笔记本创建wifi热点供手机上网教程
- c语言程序设计精髓 第13周练兵题
- Jetson Nano开发实录
- Houdini 求中点,点连成线
- 店盈通:如何打造赚钱的拼多多店铺?
- Transmitting Network Data Using Volley(使用Volley框架传输网络数据)
- 8月30日--全天课程--马哥培训
- Hibernate学习—— 一级缓存快照
- Html5酷播云视频播放器同层播放(代码实例)