第五章-分布式并行编程框架MapReduce

文章目录

  • 第五章-分布式并行编程框架MapReduce
    • MapReduce概述
      • 分布式并行编程
      • MapReduce模型和函数
    • MapReduce体系结构
    • MapReduce工作流程
      • 工作流程概述
      • 各个执行阶段
      • shuffle过程
    • 实例分析:WordCount
    • MapReduce的具体应用
    • MapReduce编程实践

MapReduce概述

分布式并行编程

过去很长一段时间,CPU的性能都遵循“摩尔定律”:【当价格不变时,集成电路上可容纳的元器件的数目,约每隔18个月便会增加一倍,性能也将提升一倍】。从2005年开始摩尔定律逐渐失效,需要处理的数据量快速增加,人们开始借助于分布式并行编程来提高程序性能。

分布式并行程序运行在大规模计算机集群上,可以并行执行大规模数据处理任务,从而获得海量的计算能力。同时通过向集群中增加新的计算节点,就能很容易地实现集群计算能力的扩充。

谷歌公司最先提出了分布式并行编程模型 MapReduce,Hadoop MapReduce是它的开源实现 。谷歌的 MapReduce运行在分布式文件系统 GFS上,Hadoop MapReduce运行在分布式文件系统 HDFS上。相对而言,Hadoop MapReduce要比谷歌 MapReduce的使用门槛低很多,程序员即使没有任何分布式编程开发经验,也可以很轻松地开发出分布式程序部署到计算机集群上。

集群的架构 容错性 硬件价格及扩展性 编程和学习难度 适用场景
传统并行编程框架 通常采用共享式架构(共享内存、共享存储),底层通常采用统一的存储区域网络SAN 容错性差,其中一个硬件发生故障容易导致整个集群不可工作 通常采用刀片服务器,高速网络以及共享存储区域网络 SAN,价格高,扩展性差 编程难度大,需要解决做什么和怎么做的问题,编程原理和多线程编程逻辑类似,需要借助互斥量、信号量、锁等机制,实现不同任务之间的同步和通信 适用于实时、细粒度计算,尤其适用于计算密集型的应用
MapReduce 采用典型的非共享式架构 容错性好,在整个集群中每个节点都有自己的内存和存储,任何一个节点出现问题不会影响其他节点正常运行,同时系统中设计了冗余和容错机制 整个集群可以随意增加或减少相关的计算节点,普通PC机就可以实现,价格低廉,扩展性好 编程简单,只需要告诉系统要解决什么问题,系统自动实现分布式部署,屏蔽分布式同步、通信、负载均衡、失败恢复等底层细节 一般适用于非实时的批处理及数据密集型应用

MapReduce模型和函数

MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce。

MapReduce采用“分而治之”策略,一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的分片(split),这些分片可以被多个 Map任务并行处理。MapReduce框架会为每个 Map任务输入一个数据子集,Map任务生成的结果会继续作为 Reduce任务的输入,最终由 Reduce任务输出最后结果,并写入分布式文件系统。

这里要特别强调一下,适合用 MapReduce来处理的数据集需要满足一个前提条件:待处理的数据集可以分解成许多个小的数据集,而且每一个小数据集都可以完全并行地进行处理。

MapReduce设计的一个理念就是“计算向数据靠拢”,而不是“数据向计算靠拢”,因为,移动数据需要大量的网络传输开销,在大规模数据环境下开销更为惊人。所以,移动计算要比移动数据更加经济。

MapReduce框架采用了Master/Slave架构,包括一个 Master和若干个Slave。Master上运行JobTracker, Slave上运行TaskTracker。

Map函数和 Reduce函数都是以<key, value>作为输入,按一定的映射规则转换成另一个或一批<key, value>进行输出。

函数 输入 输出 说明
Map <k1,v1> List(<k2,v2>) 将小数据集(split)进一步解析成一批<key,value>对,输入 Map函数中进行处理。每一个输入的<k1,v1>会输出一批<k2,v2>,<k2,v2>是计算的中间结果
Reduce <k2,List(v2)> <k3,v3> 输入的中间结果<k2,List(v2)>中的 List(v2)表示是一批属于同一个 k2的 value
  • Map函数将输入的元素转换成<key,value>形式的键值对,键和值的类型也是任意的。
  • Reduce函数将输入的一系列具有相同键的键值对以某种方式组合起来,输出处理后的键值对,输出结果合并为一个文件。

MapReduce体系结构

MapReduce体系结构主要由四个部分组成,分别是: Client、JobTracker、TaskTracker以及 Task。

Client:

  • 用户编写的 MapReduce程序通过 Client提交到 JobTracker端
  • 用户可通过 Client提供的一些接口查看作业运行状态

JobTracker:

  • 负责资源监控和作业调度
  • 监控所有 TaskTracker与 Job的健康状况,一旦发现失败,就将 相应的任务转移到其他节点
  • 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源
  • 调度器是一个可插拔的模块,用户可以根据自己的实际应用要求设计调度器

TaskTracker:

  • 接收 JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)
  • TaskTracker 会周期性地通过“心跳”将本节点上资源的使用情况 和任务的运行进度汇报给 JobTracker
  • TaskTracker 使用“slot”等量划分本节点上的资源量(CPU、内 存等)。一个 Task 获取到一个 slot 后才有机会运行,而 Hadoop调度器的作用就是将各个 TaskTracker上的空闲 slot分配给 Task使用。 slot 分为 Map slot 和 Reduce slot 两种,分别供 MapTask 和 Reduce Task 使用

Task:

  • Task 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动

MapReduce工作流程

工作流程概述

MapReduce的核心思想是“分而治之”。一个大的 MapReduce作业,首先会被拆分成许多个 Map任务在多台机器上并行处理,每个 Map任务通常运行在数据存储的节点上。当 Map任务结束后,会生成以<key,value>形式表示的中间结果,这些中间结果会被分发到多个 Reduce任务在多台机器上并行执行,具有相同 key的<key,value>会被发送到同一个 Reduce任务那里。Reduce任务会被中间结果进行汇总计算得到最后结果,并输出到分布式文件系统中。

  • 不同的 Map任务之间不会进行通信,不同的 Reduce任务之间也不会发生任何信息交换
  • 只有当 Map任务全部结束后,Reduce过程才能开始
  • 用户不能显式地从一台机器向另一台机器发送消息
  • 所有的数据交换都是通过 MapReduce框架自身去实现的
  • Map任务的输入文件、Reduce任务的处理结果都是保存在分布式文件系统中,而 Map任务处理的中间结果保存在本地磁盘中。

各个执行阶段

MapReduce算法的执行过程:

  1. 使用 InputFormat模块做 Map前的预处理,然后将输入文件切分为逻辑上的多个 InputSplit,每个 InputSplit并没有对文件进行实际切割,只是记录了要处理的位置和长度。
  2. 通过 RecordReader(RR)根据 InputSplit中的信息来处理 InputSplit中的具体记录,加载数据并转换为适合 Map任务读取的键值对,输入给 Map任务。
  3. Map任务根据用户自定义的映射规则,输出一系列<key,value>作为中间结果。
  4. 为了让 Reduce可以并行处理 Map的结果,需要对 Map的输出进行一定的分区(Partition)、排序(Sort)、合并(Combine)、归并(Merge)等操作,得到<key,value-list>形式的中间结果,再交给对应的 Reduce进行处理。这个过程将无序的<key,value>处理成了有序的<key,value-list>,成为 shuffle
  5. Reduce以一系列的<key,value-list>作为输入,执行用户定义的逻辑,输出结果给 OutputFormat模块。
  6. OutputFormat模块验证输出目录是否已经存在、输出结果类型是否符合配置文件中的配置类型。如果都满足,输出 Reduce的结果到分布式文件系统。

shuffle过程

Shuffle:是指对 Map输出的结果进行分区、排序、合并、归并等处理并交给 Reduce的过程,分为 Map端的操作和 Reduce端的操作。

Map端的 Shuffle过程 Reduce端的 Shuffle过程
  • 输入数据和执行Map任务
  • 写入缓存
  • 溢写 (Spill)
  • 文件归并 (merge)
  • “领取” (Fetch) 数据
  • 归并数据

实例分析:WordCount

WordCount程序任务

WordCount 说明
输入 一个包含大量单词的文本文件
输出 文件中每个单词及其出现次数(频数),并按照单词 字母顺序排序,每个单词和其频数占一行,单词和频 数之间有间隔

一个WordCount执行过程的实例

Map过程示意图 用户没有定义Combiner时的Reduce过程示意图 用户有定义Combiner时的Reduce过程示意图

MapReduce的具体应用

MapReduce可以很好地应用于各种计算问题:

  • 关系代数运算(选择、投影、并、交、差、连接)
  • 分组与聚合运算
  • 矩阵-向量乘法
  • 矩阵乘法

在 MapReduce环境下执行两个关系的连接操作的方法如下:

假设关系 R(A,B),S(B,C)都存储在一个文件中,为了连接这些关系,必须把来自每个关系的各个元组都和一个键关联,这个键就是属性 B的值。可以使用 Map过程把来自 R的每个元组<a,b>转换成一个键值对<b,<R,a>>,其中的键就是 b,值就是<R,a>。注意,这里把关系 R包含在值中,这样做可以使得我们在 Reduce阶段只把那些来自 R的元组和来自 S的元组进行匹配。

类似地,使用 Map过程把来自 S的每个元组<b,c>转换成一个键值对<b,<S,c>>,键是 b,值是<S,c>。Reduce进程的任务就是,把来自关系 R和 S的具有共同属性 B值的元组进行合并。这样,所有具有特定 B值的元组必须被发送到同一个 Reduce进程。

MapReduce编程实践

任务要求:用 MapReduce实现对输入文件中的单词做词频统计

实践一共分为四步:

  1. 编写 Map处理逻辑
  2. 编写 Reduce处理逻辑
  3. 编写 Main函数
  4. 编译打包代码

1.编写 Map处理逻辑

public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}
}

2.编写 Reduce处理逻辑

public static class IntSumReducerextends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}
}

3.编写 Main函数

public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);
}

编译打包代码请参考另一篇博客 简单的MapReduce实践

完整代码:

import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducerextends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

第五章-分布式并行编程框架MapReduce相关推荐

  1. 用 Hadoop 进行分布式并行编程, 第 1 部分 基本概念与安装部署

    Hadoop 简介 Hadoop 是一个开源的可运行于大规模集群上的分布式并行编程框架,由于分布式存储对于分布式编程来说是必不可少的,这个框架中还包含了一个分布式文件系统 HDFS( Hadoop D ...

  2. 用Hadoop进行分布式并行编程

    程序实例与分析 Hadoop 是一个实现了MapReduce 计算模型的开源分布式并行编程框架,借助于Hadoop, 程序员可以轻松地编写分布式并行程序,将其运行于计算机集群上,完成海量数据的计算.在 ...

  3. Java7并发编程指南——第五章:Fork Join框架

    Java7并发编程指南--第五章:Fork Join框架 @(并发和IO流) Java7并发编程指南第五章Fork Join框架 思维导图 项目代码 思维导图 项目代码 GitHub:Java7Con ...

  4. 第五章 - 分布式定时任务框架ElasticJob之SpringBoot整合SimpleJob作业(实战一)

    系列文章目录 第一章 - 分布式定时任务框架ElasticJob之JavaApi整合Simple作业 第二章 - 分布式定时任务框架ElasticJob之JavaApi整合DataflowJob作业 ...

  5. 用 Hadoop 进行分布式并行编程, 第 3 部分 部署到分布式环境

    一 前言 在本系列文章的第一篇:用 Hadoop 进行分布式并行编程,第 1 部分: 基本概念与安装部署中,介绍了 MapReduce 计算模型,分布式文件系统 HDFS,分布式并行计算等的基本原理, ...

  6. 用 Hadoop 进行分布式并行编程, 第 2 部分 程序实例与分析

    前言 在上一篇文章:"用 Hadoop 进行分布式并行编程 第一部分 基本概念与安装部署"中,介绍了 MapReduce 计算模型,分布式文件系统 HDFS,分布式并行计算等的基本 ...

  7. 《Kotlin 程序设计》第五章 Kotlin 面向对象编程(OOP)

    第五章 Kotlin 面向对象编程(OOP) 正式上架:<Kotlin极简教程>Official on shelves: Kotlin Programming minimalist tut ...

  8. Kotlin第五章: android网络编程

    1. Android网络编程 OkHttp OkHttp是一个高效的HTTP客户端,它的横空出世,让其他的网络请求框架都变得黯然失色. Retrofit Retrofit是一个基于OkHttp的RES ...

  9. 第五章 面向方面编程___AOP入门

    上一篇讲了 AOP 和 OOP 的区别,这一次我们开始入门 AOP .实现面向方面编程的技术,主要分为两大类: 一是 采用动态代理技术,利用截取消息的方式,对该消息进行装饰,以取代原有对象行为的执行: ...

最新文章

  1. 怎么把本地的文件传给服务器,怎么把本地文件传给云服务器
  2. 应用虚拟化的五大理由
  3. hdu1466(dp)
  4. Kali Linux 网络扫描秘籍 第三章 端口扫描(三)
  5. 目标检测——SSD的学习笔记
  6. python机器学习库sklearn——多类、多标签、多输出
  7. 学设计要学python吗_北京学习Python设计大概需要多长时间能学会
  8. linux设置系统环境变量的天坑
  9. 计算机在护理专业中的论文题目,护理专业论文格式(通用模板)
  10. 2021年全球电力线通信(PLC)系统收入大约7385.8百万美元,预计2028年达到14530百万美元,2022至2028期间,年复合增长率CAGR为11.0%
  11. HTML 内容保存到word文档(angular4调用第三方js插件实现)
  12. notes服务器标识文件,怎样重新验证将要过期的服务器标识符文件_lotus notes
  13. 2021秋软工实践第二次结对编程作业
  14. 小象学院——面向对象的特点
  15. dat image 微信_微信Dat文件解码,PC微信加密图片解密工具
  16. Navicat Premium 15 for Mac(数据库管理)
  17. 惠普LaserJet M1005 MFP报错b2
  18. 解决swap file .swp already exists 问题
  19. vb.net 窗体接收键盘事件_不用100就能够买到全键盘的无线键鼠套装,双飞燕FG1010魅力依旧...
  20. 详解EBS接口开发之销售订单挑库发放(转载)

热门文章

  1. java高级应用:线程池全面解析
  2. Python 2.7版本与3.6的不同
  3. 框架:AspectJ
  4. Redis:相关知识点纵观
  5. java 鼠标停留时,[Java教程]鼠标悬浮停留三秒 显示大图_星空网
  6. python flask框架发布问答平台注册页面_Python|Flask框架实现QQ账号登录
  7. android+p+华为手机,给1.9亿用户32款老机型进行安卓P升级 华为值吗?
  8. eviews如何处理缺失数据填补_python数据预处理之异常值、缺失值处理方法
  9. ML之分类预测:机器学习中多分类预测数据集可视化(不同类别赋予不同颜色)设计思路及代码实现
  10. Python编程语言学习:python中与数字相关的函数(取整等)、案例应用之详细攻略