简介:

MapReduce是一个基于集群的高性能并行计算平台,MapReduce是一个并行计算与运行的软件框架,MapReduce是一个并行程序设计模型与方法.
特点:

分布可靠,对数据集的操作分发给集群中的多个节点实现可靠性,每个节点周期性返回它完成的任务和最新的状态

②封装了实现细节,基于框架API编程,面向业务展开分布式编码

③提供跨语言编程的能力

MapReduce的主要功能:

1.1数据划分和计算任务调度

1.2数据/代码互相定位

1.3系统优化

1.4出错检测和恢复

MapReduce的运行流程:

由上图可以看到MapReduce执行下来主要包含这样几个步骤:

1) 首先正式提交作业代码,并对输入数据源进行切片

2) master调度worker执行map任务

3) worker当中的map任务读取输入源切片

4) worker执行map任务,将任务输出保存在本地

5) master调度worker执行reduce任务,reduce worker读取map任务的输出文件

6) 执行reduce任务,将任务输出保存到HDFS

运行流程详解

MAP:HDFS目录的数据输入进来,然后切块,将切好的块分给不同的计算机,各计算机将块按照本地规约分区,将不同的区按照key排序,然后将任务输出保存在本地.

Reduce:将数据从远程拷贝下来,然后按照key将数据合并并处理最后输出

Map阶段由一定数量的Map Task组成,流程如下:
■ 输入数据格式解析:InputFormat
■ 输入数据处理:Mapper
■ 数据分区:Partitioner
■ 数据按照key排序
■ 本地规约:Combiner(相当于local reducer,可选)
■ 将任务输出保存在本地
Reduce阶段由一定数量的Reduce Task组成,流程如下:
■ 数据远程拷贝
■ 数据按照key排序和文件合并merge
■ 数据处理:Reducer
■ 数据输出格式:OutputFormat
通常我们把从Mapper阶段输出数据到Reduce阶段的reduce计算之间的过程称之为shuffle

MapReduce Java API应用

1、MapReduce开发流程

➢ 搭建开发环境,参考HDFS环境搭建,基本一致
➢ 基于MapReduce框架编写代码,Map、Reduce、Driver三部分组成。
➢ 编译打包,将源代码打成的包和依赖jar包打成一个包
➢ 上传至运行环境
➢ 运行hadoop jar命令,现已由yarn jar替代,建议使用新命令提交执行
具体提交命令为:
yarn jar testhdfs-jar-with-dependencies.jar com.tianliangedu.driver.WordCount /tmp/tianliangedu/input /tmp/tianliangedu/output3
➢ 通过yarn web ui查看执行过程
➢ 查看执行结果

2、WordCount代码实现

Mapper:是MapReduce计算框架中Map过程的封装Text:Hadoop对Java String类的封装,适用于Hadoop对文本字符串的处理IntWritable:Hadoop对Java Integer类的封装,适用于Hadoop整型的处理Context:Hadoop环境基于上下文的操作对象,如Map中key/value的输出、分布式缓存数据、分布式参数传递等StringTokenizer:对String对象字符串的操作类,做基于空白字符的切分操作工具类
  • 2.1 Map类编写

  • package com.tianliangedu.mapper;
    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    public class MyTokenizerMapper extendsMapper<Object, Text, Text, IntWritable> {// 暂存每个传过来的词频计数,均为1,省掉重复申请空间private final static IntWritable one = new IntWritable(1);// 暂存每个传过来的词的值,省掉重复申请空间private Text word = new Text();// 核心map方法的具体实现,逐个<key,value>对去处理public void map(Object key, Text value, Context context)throws IOException, InterruptedException {// 用每行的字符串值初始化StringTokenizerStringTokenizer itr = new StringTokenizer(value.toString());// 循环取得每个空白符分隔出来的每个元素while (itr.hasMoreTokens()) {// 将取得出的每个元素放到word Text对象中word.set(itr.nextToken());// 通过context对象,将map的输出逐个输出context.write(word, one);}}
    }

    2.2 Reduce类编写

  • Reducer:是MapReduce计算框架中Reduce过程的封装

  • package com.tianliangedu.reducer;
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    //reduce类,实现reduce函数
    public class IntSumReducer extendsReducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();//核心reduce方法的具体实现,逐个<key,List(v1,v2)>去处理public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {//暂存每个key组中计算总和int sum = 0;//加强型for,依次获取迭代器中的每个元素值,即为一个一个的词频数值for (IntWritable val : values) {//将key组中的每个词频数值sum到一起sum += val.get();}//将该key组sum完成的值放到result IntWritable中,使可以序列化输出result.set(sum);//将计算结果逐条输出context.write(key, result);}}

    2.3 Driver类编写

  • ➢ Configuration:与HDFS中的Configuration一致,负责参数的加载和传递
    ➢ Job:作业,是对一轮MapReduce任务的抽象,即一个MapReduce的执行全过程的管理类
    ➢ FileInputFormat:指定输入数据的工具类,用于指定任务的输入数据路径
    ➢ FileOutputFormat:指定输出数据的工具类,用于指定任务的输出数据路径
    package com.tianliangedu.driver;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import com.tianliangedu.mapper.MyTokenizerMapper;
    import com.tianliangedu.reducer.IntSumReducer;
    public class WordCountDriver {// 启动mr的driver方法public static void main(String[] args) throws Exception {// 得到集群配置参数Configuration conf = new Configuration();// 设置到本次的job实例中Job job = Job.getInstance(conf, "天亮WordCount");// 指定本次执行的主类是WordCountjob.setJarByClass(WordCountDriver.class);// 指定map类job.setMapperClass(MyTokenizerMapper.class);// 指定combiner类,要么不指定,如果指定,一般与reducer类相同job.setCombinerClass(IntSumReducer.class);// 指定reducer类job.setReducerClass(IntSumReducer.class);// 指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 指定输入数据的路径FileInputFormat.addInputPath(job, new Path(args[0]));// 指定输出路径,并要求该输出路径一定是不存在的FileOutputFormat.setOutputPath(job, new Path(args[1]));// 指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出!System.exit(job.waitForCompletion(true) ? 0 : 1);}
    }

    2.4本地模拟分布式计算环境运行mapreduce

    鉴于远程运行进行代码测试的复杂性,以及其它新框架均开始支持本地local环境模拟分布式计算运行, 故mapreduce从2.x开始也已经支持本地环境

    具体做法请参见辅助资料集” 06-本地local环境模拟mapreduce并行计算的操作步骤”。

    2.5 Maven打包

    使用Maven命令,基于配置的Maven插件实现代码打包。

    2.6 上传到运行环境

    使用rz命令将打好的运行包上传到集群环境中。

    2.7 运行WordCount程序

    具体提交命令为:

    具体提交命令为:
    yarn jar testhdfs-jar-with-dependencies.jar com.tianliangedu.driver.WordCount /tmp/tianliangedu/input /tmp/tianliangedu/output3

    2.8 查看执行过程

    Web访问地址为:http://cluster1.hadoop:8088/ui2/#/yarn-apps/apps

  • 2.9 查看执行结果

    3、标准代码实现

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//启动mr的driver类
public class WordCountDriver {//map类,实现map函数public static class MyTokenizerMapper extendsMapper<Object, Text, Text, IntWritable> {//暂存每个传过来的词频计数,均为1,省掉重复申请空间private final static IntWritable one = new IntWritable(1);//暂存每个传过来的词的值,省掉重复申请空间private Text word = new Text();//核心map方法的具体实现,逐个<key,value>对去处理public void map(Object key, Text value, Context context)throws IOException, InterruptedException {//用每行的字符串值初始化StringTokenizerStringTokenizer itr = new StringTokenizer(value.toString());//循环取得每个空白符分隔出来的每个元素while (itr.hasMoreTokens()) {//将取得出的每个元素放到word Text对象中word.set(itr.nextToken());//通过context对象,将map的输出逐个输出context.write(word, one);}}}//reduce类,实现reduce函数public static class IntSumReducer extendsReducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();//核心reduce方法的具体实现,逐个<key,List(v1,v2)>去处理public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {//暂存每个key组中计算总和int sum = 0;//加强型for,依次获取迭代器中的每个元素值,即为一个一个的词频数值for (IntWritable val : values) {//将key组中的每个词频数值sum到一起sum += val.get();}//将该key组sum完成的值放到result IntWritable中,使可以序列化输出result.set(sum);//将计算结果逐条输出context.write(key, result);}}//启动mr的driver方法public static void main(String[] args) throws Exception {//得到集群配置参数Configuration conf = new Configuration();//设置到本次的job实例中Job job = Job.getInstance(conf, "天亮WordCount");//通过指定相关字节码对象,找到所属的主jar包job.setJarByClass(WordCountDriver.class);//指定map类job.setMapperClass(MyTokenizerMapper.class);//指定combiner类,要么不指定,如果指定,一般与reducer类相同job.setCombinerClass(IntSumReducer.class);//指定reducer类job.setReducerClass(IntSumReducer.class);//指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//指定输入数据的路径FileInputFormat.addInputPath(job, new Path(args[0]));//指定输出路径,并要求该输出路径一定是不存在的FileOutputFormat.setOutputPath(job, new Path(args[1]));//指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出!System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

MapReduce Shell应用

1、MapReduce的二级命令

mapred称为一级命令,直接输入mapred回车,即可查看二级命令:

2、MapReduce的三级命令

输入一级命令mapred后,再任意输入一个二级命令,即可查看三级命令:

3、MapReduce shell应用

  • 查看当前正在执行的job任务

  • 先提交一个WordCount任务,然后使用mapred job -list查看任务列表

  • 终止(kill)一个任务的执行

由于某种原因,要立即终止某任务的执行,则使用mapred job -kill job-id。

构造场景:先提交一个WordCount job,然后通过kill job-id来终止任务:

  • 查看一个job的日志

使用mapred shell命令,通过job-id可以查看job的工作日志。

命令格式为:mapred job -logs job-id:

MapReduce技术特征
1、向“外”横向扩展,而非向“上”纵向扩展
➢ 集群的构建完全选用价格便宜、易于扩展的低端商用服务器,而非价格昂贵不易扩展的商用服务
➢ 大规模数据处理和大规模数据存储的需要,讲求集群综合能力,而非单台机器处理能力,横向增加机器节点数据量
2、失效被认为是常态
➢ 使用大量普通服务器,节点硬件和软件出错是常态
➢ 具备多种有效的错误检测和恢复机制,在某个计算节点失效后会自动转移到别的计算节点。某个任务节点失败后其他节点能够无缝接管失效节点的计算任务
➢ 当失效节点恢复后自动无缝加入集群,不需要管理员人工进行系统配置
3、移动计算,把处理向数据迁移(数据本地性)
➢ 采用代码/数据互定位的功能,计算和数据在同一个机器节点或者是同一个机架中,发挥数据本地化特点
➢ 可避免跨机器节点或是机架传输数据,提高运行效率
4、顺序处理数据、避免随机访问数据
➢ 磁盘的顺序访问远比随机访问快得多,因此MapReduce设计为面向顺序式大规模数据的磁盘访问处理
➢ 利用集群中的大量数据存储节点同时访问数据,实现面向大数据集批处理的高吞吐量的并行处理
5、推测执行
➢ 一个作业由若干个Map任务和Reduce任务构成,整个作业完成的时间取决于最慢的任务的完成时间。由于节点硬件、软件问题,某些任务可能运行很慢
➢ 采用推测执行机制,发现某个任务的运行速度远低于任务平均速度,会为慢的任务启动一个备份任务,同时运行。哪个先运行完,采用哪个结果。
6、平滑无缝的可扩展性
➢ 可弹性的增加或减少集群计算节点来调节计算能力
➢ 计算的性能随着节点数的增加保持接近线性程度的增长
7、为应用开发隐藏系统底层细节
➢ 并行编程有很多困难,需要考虑多线程中复杂繁琐的细节,诸如分布式存储管理、数据分发、数据通信和同步、计算结果收集等细节问题。
➢ MapReduce提供了一种抽象机制将程序员与系统层细节隔离开,程序员只需关注业务,其他具体执行交由框架处理即可。

分布式计算框架Map/reduce相关推荐

  1. 分布式计算框架Hadoop核心组件

    Hadoop作为成熟的分布式计算框架在大数据生态领域已经使用多年,本文简要介绍Hadoop的核心组件MapReduce.YARN和HDFS,以加深了解. 1.Hadoop基本介绍 Hadoop是分布式 ...

  2. 分布式计算框架Hadoop核心组件概述

    Hadoop作为成熟的分布式计算框架在大数据生态领域已经使用多年,本文简要介绍Hadoop的核心组件MapReduce.YARN和HDFS,以加深了解. 1.Hadoop基本介绍 Hadoop是分布式 ...

  3. [ZZ]Map/Reduce hadoop 细节

    转自:Venus神庙原文:http://www.cnblogs.com/duguguiyu/archive/2009/02/28/1400278.html 分布式计算(Map/Reduce) 分布式计 ...

  4. Hadoop简介(1):什么是Map/Reduce

    看这篇文章请出去跑两圈,然后泡一壶茶,边喝茶,边看,看完你就对hadoop整体有所了解了. Hadoop简介 Hadoop就是一个实现了Google云计算系统的开源系统,包括并行计算模型Map/Red ...

  5. 用通俗易懂的大白话讲解Map/Reduce原理

    Hadoop简介 Hadoop就是一个实现了Google云计算系统的开源系统,包括并行计算模型Map/Reduce,分布式文件系统HDFS,以及分布式数据库Hbase,同时Hadoop的相关项目也很丰 ...

  6. Hadoop学习:Map/Reduce初探与小Demo实现

    一.    概念知识介绍 Hadoop MapReduce是一个用于处理海量数据的分布式计算框架.这个框架攻克了诸如数据分布式存储.作业调度.容错.机器间通信等复杂问题,能够使没有并行 处理或者分布式 ...

  7. Hadoop大数据原理(3) - 分布式计算框架MapReduce

    文章目录 1. 大数据的通用计算 2 MapReduce编程模型 3. MapReduce计算框架 3.1 三类关键进程 大数据应用进程 JobTracker进程 TaskTracker进程 3.2 ...

  8. MapReduce基本概述——分布式计算框架

    目录 一.MapReduce的基本概念 二.MapReduce的设计思想 三.MapReduce特点 四.MapReduce编程规范 一.MapReduce的基本概念 MapReduce是一个分布式计 ...

  9. 分布式系列之分布式计算框架Flink深度解析

    Flink作为主流的分布式计算框架,满足批流一体.高吞吐低时延.大规模复杂计算.高可靠的容错和多平台部署能力.本文简要介绍了Flink中的数据流处理流程以及基本部署架构和概念,以加深对分布式计算平台的 ...

最新文章

  1. 闽台“物联网+”产业融合方兴未艾
  2. Android 通过反射让SQlite建表
  3. tinyint对应什么数据类型_学习西门子S7-200系列PLC不得不掌握的数据类型
  4. Java 技术小图谱
  5. HDU 6029(思维)
  6. js温故而知新11(AJAX)——学习廖雪峰的js教程
  7. ASP.NET Core 实战:使用 Docker 容器化部署 ASP.NET Core + MySQL + Nginx
  8. virtualbox安装android6.0并设置分辨率为1920x1080x32
  9. 数据结构与算法面试题80道(32)
  10. 程序员,你怎么这么忙?为什么天天熬夜加班?
  11. ASP.NET 路由
  12. 2015湖南省选集训DAY5——work(BZOJ4177)
  13. 语音识别现在发展到什么阶段了?
  14. python接口自动化测试二十六:使用pymysql模块链接数据库
  15. multisim安装后无法连接数据库_如何解决multisim无法安装的问题
  16. 虚拟机VMware16安装教程
  17. 什么是Android手机
  18. 在线进行立体几何画图——GeoGebra
  19. java jdk安装失败_图文解答Java JDK9.0安装失败的原因,附带处理方法
  20. Rasa课程、Rasa培训、Rasa面试、Rasa实战系列之Typo Robustness

热门文章

  1. 算法复杂度-渐进分析
  2. 七大设计原则之单一职责原则应用
  3. linux docker ps没有东西,Docker ps 命令
  4. 千宗版权案缠身 三年亏掉21亿 资本会为喜马拉雅买单吗?
  5. 论面向组合子程序设计方法 之 微步毂纹生
  6. kubernetes集群配置Cgroups驱动
  7. 数字化创新助力医药行业发展:智慧医疗与数字药房系统技术突破
  8. 别让我思考,和广告牌设计
  9. 00024__pclint
  10. React搜索框历史搜索记录——数组前插