一、MapReduce编程思想

学些MapRedcue主要是学习它的编程思想,在MR的编程模型中,主要思想是把对数据的运算流程分成map和reduce两个阶段:

Map阶段:读取原始数据,形成key-value数据(map方法)。即,负责数据的过滤分发

Reduce阶段:把map阶段的key-value数据按照相同的key进行分组聚合(reduce方法)。即,数据的计算归并

它其实是一种数据逻辑运算模型,对于这样的运算模型,有一些成熟的具体软件实现,比如hadoop中的mapreduce框架、spark等,例如在hadoop的mr框架中,对map阶段的具体实现是map task,对reduce阶段的实现是reduce task。这些框架已经为我们提供了一些通用功能的实现,让我们专注于数据处理的逻辑,而不考虑分布式的具体实现,比如读取文件、写文件、数据分发等。我们要做的工作就是在这些编程框架下,来实现我们的具体需求。

下面我们先介绍一些map task和reduce task中的一些具体实现:

二、MapTask和ReduceTask

2.1 Map Task

读数据:利用InputFormat组件完成数据的读取。

    InputFormat-->TextInputFormat 读取文本文件的具体实现

            -->SequenceFileInputFormat 读取Sequence文件

            -->DBInputFormat 读数据库

处理数据:这一阶段将读取到的数据按照规则进行处理,生成key-value形式的结果。maptask通过调用用Mapper类的map方法实现对数据的处理。

分区:这一阶段主要是把map阶段产生的key-value数据进行分区,以分发给不同的reduce task来处理,使用的是Partitioner类。maptask通过调用Partitioner类的getPartition()方法来决定如何划分数据给不同的reduce task。

排序:这一阶段,对key-value数据做排序。maptask会按照key对数据进行排序,排序时调用key.compareTo()方法来实现对key-value数据排序。

2.2 Reduce Task

读数据:这一阶段通过http方式从maptask产生的数据文件中下载属于自己的“区”的数据。由于一个区的数据可能来自多个maptask,所以reduce还要把这些分散的数据进行合并(归并排序)

处理数据:一个reduce task中,处理刚才下载到自己本地的数据。通过调用GroupingComparator的compare()方法来判断文件中的哪些key-value属于同一组。然后将这一组数传给Reducer类的reduce()方法聚合一次。

输出结果:调用OutputFormat组件将结果key-value数据写出去。

    Outputformat --> TextOutputFormat 写文本文件(会把一个key-value对写一行,分隔符为制表符\t

          --> SequenceFileOutputFormat 写Sequence文件(直接将key-value对象序列化到文件中)

          --> DBOutputFormat

下面介绍下利用MapReduce框架下的一般编程过程。我们要做的 工作就是把我们对数据的处理逻辑加入到框架的业务逻辑中。我们编写的MapReduce的job客户端主要包括三个部分,Mapper 、 Reducer和JobSubmitter,三个部分分别完成MR程序的map逻辑、reduce逻辑以及将我们编写的job程序提交给集群。下面分别介绍这三个部分如何实现。

三、Hadoop中MapReduce框架下的一般编程步骤

Mapper:创建类,该类要实现Mapper父类,复写read()方法,在方法内实现当前工程中的map逻辑。

Reducer:创建类,继承Reducer父类,复写reduce()方法,方法内实现当前工程中的reduce逻辑。

jobSubmitter:这是job在集群上实际运行的类,主要是通过main方法,封装job相关参数,并把job提交。jobsubmitter内一般包括以下操作

step1:创建Configuration对象,并通过创建的对象对集群进行配置,同时支持用户自定义一些变量并配置。这一步有些像我们集群搭建的时候对$haoop_home/etc/hadoop/*下的一些文件进行的配置。

step2:获得job对象,并通过job对象对我们job运行进行一些配置。例如,设置集群运行的jar文件、设置实际执行map和reduce的类等,下面列出一些必要设置和可选设置。

        Configuration conf = new Configuration(); //创建集群配置对象。Job job = Job.getInstance(conf);//根据配置对象获取一个job客户端实例。job.setJarByClass(JobSubmitter.class);//设置集群上job执行的类job.setMapperClass(FlowCountMapper.class);//设置job执行时使用的Mapper类job.setReducerClass(FlowCountReducer.class);//设置job执行时使用的Reducer类job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path("I:\\hadooptest\\input"));FileOutputFormat.setOutputPath(job, new Path("I:\\hadooptest\\output_pri"));//设置maptask做数据分发时使用的分发逻辑类,如果不指定,默认使用hashparjob.setPartitionerClass(ProvincePartitioner.class);job.setNumReduceTasks(4);//自定义的分发逻辑下,可能产生n个分区,所以reducetask的数量需要是nboolean res = job.waitForCompletion(true);System.exit(res ? 0:-1);

一般实践中,可以定义一个类,其中添加main方法对job进行提交,并在其中定义静态内部类maper和reduce类。

四、MapReduce框架中的可自定义项

<不小心删除以后就没有再补充了,挺重要的。。。。补上吧。。。。>

总结,你要把bean写到文本吗?重写toString方法

要传输吗?实现Writable接口

要排序吗?实现writablecompareble接口

遇到一些复杂的需求,需要我们自定义实现一些组件

2.1 自定义序列化数据类型

MapReduce框架为我们提供了基本数据类型的序列化类型,如String的Text类型,int的IntWritalbe类型,null的NullWritable类型等。但是有时候会有一些我们自定义的类型需要我们在map和reduce之间进行传输或者需要写到hdfs上。hadoop提供了自己的序列化机制,实现自定义类型的序列化和反序列化将自定义的类实现hadoop提供的Writable接口。

自定义类实现Writable接口,实现readFields(in)write(out)方法。

同时,重写toString()方法,可以自定义在写到文件系统时候写入的字段内容。

     * hadoop系统在序列化该类的对象时要调用的方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(upFlow);out.writeUTF(phone);out.writeInt(dFlow);out.writeInt(amountFlow);}/*** hadoop系统在反序列化该类的对象时要调用的方法*/@Overridepublic void readFields(DataInput in) throws IOException {this.upFlow = in.readInt();this.phone = in.readUTF();this.dFlow = in.readInt();this.amountFlow = in.readInt();}@Overridepublic String toString() {return this.phone + ","+this.upFlow +","+ this.dFlow +"," + this.amountFlow;}

View Code

2.2 自定义排序规则

MapReduce中提供了一个排序机制,map worker 和reduce worker ,都会对数据按照key的大小来排序,所以map和reduce阶段输出的记录都是经过排序的(按照key排序)。我们在实践中有时候需要对计算出来的结果进行排序,比如一个这样的需求:计算每个页面访问次数,并按照访问量倒序输出。我们可以在统计了每个页面访问次数之后进行排序,但是我们还可以直接应用MR自身的排序特性,在MR处理的时候按照我们的需求进行排序。这时候就需要我们自定义排序规则。

自定义类,实现WritableComparable接口,实现其中的compareTo()方法,在其中自定义排序的规则。同时一般还要实现readFields(in) 和write(out)和toString()方法。

public class PageCount implements WritableComparable<PageCount>{private String page;private int count;public void set(String page, int count) {this.page = page;this.count = count;}public String getPage() {return page;}public void setPage(String page) {this.page = page;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}@Overridepublic int compareTo(PageCount o) {return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(this.page);out.writeInt(this.count);}@Overridepublic void readFields(DataInput in) throws IOException {this.page= in.readUTF();this.count = in.readInt();}@Overridepublic String toString() {return this.page + "," + this.count;}}

View Code

总结:

实现Writable接口,是为了bean能够传输,能够写到文件系统中。

实现WritableComparable还为了bean能够按照你定义的规则进行排序。

2.2 自定义分区规则

我们知道,map计算出来的结果会分发给不同的reduce任务去进一步处理。MR中提供了一个默认的数据分发规则,会按照map的输出中的key的hashcode,然后模除reduce task的数量,模除的结果就是数据的分区。我们可以通过自定义map数据分发给reduce的规则,实现把数据按照自己的需求记录到不同的数据中。比如实现这样的需求,有一个通话记录的文件,按照归属地分别存储数据。

自定义类,继承Partitioner父类(类的泛型为MapTask的输出的key,value的类型),重写 getPartition(<>key, <>value, int numPartitions) 方法,在其中自定义分区的规则,方法返回计算出来的分区数。MapTask每处理一行数据都会调用getPartition方法。因此最好不要在方法中创建可以给很多数据行共同使用的对象。在jobsubmitter中,设置maptask在做数据分区时使用的分区逻辑类, job.setPartitonerClass(your.class) ,同时注意设置reduceTask的任务数量为我们在分区逻辑中定义的规则下回产生的分区数量, job.setNumReduceTasks(numOfPartition); 

/*** 本类是提供给MapTask用的* MapTask通过这个类的getPartition方法,来计算它所产生的每一对kv数据该分发给哪一个reduce task* @author ThinkPad**/
public class ProvincePartitioner extends Partitioner<Text, FlowBean>{static HashMap<String,Integer> codeMap = new HashMap<>();static{codeMap.put("135", 0);codeMap.put("136", 1);codeMap.put("137", 2);codeMap.put("138", 3);codeMap.put("139", 4);}@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {Integer code = codeMap.get(key.toString().substring(0, 3));return code==null?5:code;}}

Partitioner

public class JobSubmitter {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(JobSubmitter.class);job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);// 设置参数:maptask在做数据分区时,用哪个分区逻辑类  (如果不指定,它会用默认的HashPartitioner)job.setPartitionerClass(ProvincePartitioner.class);// 由于我们的ProvincePartitioner可能会产生6种分区号,所以,需要有6个reduce task来接收job.setNumReduceTasks(6);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\flow\\input"));FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\flow\\province-output"));job.waitForCompletion(true);}}

JobSubmitter

2.3 自定义分组规则

MapTask每调用一次map就会产生一个k-v,多次调用后,生成多个k-v,具有相同key的的记录称为一组,会存入一个partition中,注意一个patition可以包含多个组。

一个ReduceTask处理一个partition,在处理的时候 ,按照key的顺序进行。调用一次reduce会聚合一组数据,就是reduce方法中传入的一个Itetor。为了确认一个分区中的两条记录是不是同一个组,会调用一个工具类GroupingCompatator的compare(01,02)方法,用来判断两个key是否相同,如果两个key相等,则为同一组。利用这样的机制,我们可以自定义一个分组规则。

自定义类,实现 WritableComparator 类实现 compare 方法,在其中告知MapTask如何判断两个 记录是不是属于同一个组。调用父类构造函数,指定比较的类。

public class OrderIdGroupingComparator extends WritableComparator {pbulic OrderIdGroupingComparator(){//通过构造函数指定要比较的类super(OrderBean.class, true);//
     }@Overridepublic int compare(WritableComparable a, WritableComparable b) {//参数中将来会传入我们自定义的继承了WritableComparable的bean,把a、b向下转型为我们自定义类型的bean,才能比较a和bOrderBean o1 = (OrderBean)a;OrderBean o2 = (OrderBean)b;return o1.getOrderId().compareTo(o2.getOrderID);//id相同就是同一组
    }
}

View Code

在jobSubmiter中指定分组规则,

job.setGroupingComparatorClass(OrderIdGroupingComparator.class);

注意:关于区分分区和分组:

分区比分组的范围更加大。分区是指,在map task结束之后,中间结果数据会被分给哪些reduce task,而分组是指,同一个分区中(即一个reduce task处理的数据中)数据的分组。在默认的计算分区的方法中,不同key的hash code对reduce task取模计算出来的结果可能相同,这样的数据会被分到同一个分区;这一个分区中的key的haashcode不同,这样就在一个区中分了不同组。

那么什么时候使用分区,什么时候使用分组呢?

再如在计算每个订单中总金额最大的3笔中的案例中,可以考虑进行倒序排序,然后取前三;按照id进行倒序排序吗?不现实,因为订单id太多,不可能启动那么多的reduce task。那么就要把多个订单的数据存储到第一个分区中,同时保证同一个订单的数据全部在一个分区中,这时候,就需要自定义分区规则(保证同一订单中的数据在同一个分区),但是又要分组排序,所以这时候就需要自定义分组规则(保证该分区中同一订单在一组,不同订单在不同组)

2.3自定义MapTask的局部聚合规则

默认情况下,map计算的结果逐条保存到磁盘中,传输给reduce之后也是分条的记录,这样可能造成一个问题就是如果某个分区下的数据较多,而有的分区下数据较少,就导致出现reduce task之间任务量差距较大,即出现数据倾斜的情况。一个解决办法是在形成map结果文件的时候进行一次局部聚合。

使用Combiner组件可以实现在每个MapTask中对数据进行一次局部聚合。这个局部聚合的逻辑其实和Reducer的逻辑是一样的,都是对map计算出的kv数据进行聚合,只不过如果是maptask来调用我们定义的Reducer实现类,则聚合的是当前这个maptask运行的结果,如果是reducetask来调用我们定义的Reducer实现类,则聚合的是全部maptask的运行结果。

定义类局部聚合类XXCombationer,继承Rducer复写reduce方法,在方法中实现具体的聚合逻辑;在jobSubmitter的job中设置mapTask端的局部聚合类为我们定义的类 job.setCombinerClass(XXCombiner.class) 。

2.4 控制输入输出格式。。。

五、MR程序的调试、执行方式

5.1 提交到linux运行

5.2 Win本地执行

转载于:https://www.cnblogs.com/Jing-Wang/p/10886890.html

MapReduce编程实践相关推荐

  1. MapReduce 编程实践:统计对象中的某些属性

    文章目录 1. 生成数据 2. 编写实体类 3. Mapper类 4. Reducer类 5. Driver类 6. 运行 参考书:<Hadoop大数据原理与应用> 相关文章:MapRed ...

  2. MapReduce 编程实践

    文章目录 1. MapReduce 作业流程 2. 实践 2.1 启动 hadoop 2.2 创建 java 项目 2.3 MapReduce shell 2.4 MapReduce Web UI 3 ...

  3. Hadoop——实验七:MapReduce编程实践

    文章目录 一. 实验目的 二. 实验内容 三. 实验步骤及结果分析  1. 基于ubuntukylin14.04(7)版本,安装hadoop-eclipse-kepler-plugin-2.6.0.j ...

  4. MapReduce编程实践之自定义数据类型

    一:任务描述 自定义数据类型完成手机流量的分析 二:example data 格式为:记录报告时间戳.手机号码.AP mac.AC mac.访问的网址.网址种类.上行数据包数.下行数据包数.上行总流量 ...

  5. MapReduce编程初级实践

    MapReduce编程初级实践 1.通过实验掌握基本的MapReduce编程方法: 2.掌握用MapReduce解决一些常见的数据处理问题,包括数据去重.数据排序和数据挖掘等. 1.编程实现文件合并和 ...

  6. MapReduce编程规范及实践(流量统计)

    一.MapReduce编码规范 Map阶段2个步骤 设置 InputFormat 类, 将数据切分为 Key-Value(K1和V1) 对, 输入到第二步 自定义 Map 逻辑, 将第一步的结果转换成 ...

  7. 实验5 MapReduce初级编程实践(Python实现)

    一.实验目的 通过实验掌握基本的 MapReduce 编程方法: 掌握用 MapReduce 解决一些常见数据处理问题的方法,包括数据合并.数据去重.数据排序和数据挖掘等. 二.实验平台 操作系统:U ...

  8. MapReduce初级编程实践

    1. 实验目的和要求 1.1 实验目的 通过实验掌握基本的MapReduce编程方法: 掌握用MapReduce解决一些常见的数据处理问题,包括数据去重.数据排序和数据挖掘等. 1.2 实验软硬件 ...

  9. 《深入理解大数据:大数据处理与编程实践》一一3.3 HDFS文件存储组织与读写...

    本节书摘来自华章计算机<深入理解大数据:大数据处理与编程实践>一书中的第3章,第3.3节,作者 主 编:黄宜华(南京大学)副主编:苗凯翔(英特尔公司),更多章节内容可以访问云栖社区&quo ...

最新文章

  1. EdgeGallery — MECM — 系统架构
  2. 不擅长物理科学计算机吗,物理难学否?答案因人而异,高二同学3 + 3选科莫要太随意...
  3. flex布局:子子元素过大撑开了设定flex:1的子元素的解决方案
  4. 【Linux系统编程】vfork() 函数详解
  5. 8086条件转移指令JE,JZ
  6. javascript和var之间的区别?
  7. mysql语句错误怎么查找_为什么我的mysql语句一直报错,找不到错误,望各位大佬指点一番...
  8. C#实现请求服务器,类似于asp下的getHTTPPage(url)功能
  9. NPAPI中返回一个字串的正确做法
  10. 联想x3850x6从u盘引导_联想笔记本bios设置u盘启动教程
  11. react-native之react-native-vector-icons
  12. 批量重命名图片,去除括号
  13. 公众号推送长图最佳尺寸_公众号10W 排版攻略,长图无缝拼接一步做到!
  14. 抛砖引玉系列:Android简易实现录屏软件。
  15. B/S架构与C/S架构
  16. 大数据产业链包括哪几个环节,具体包含哪些内容
  17. css为何不支持父元素选择器
  18. 开发游戏音频程序——MP3的播放
  19. 如何设置tab缩进为4个字符
  20. 基于VR技术的输电线路巡检仿真系统

热门文章

  1. 英文版Ubuntu 16.04系统如何解决gedit中文显示乱码的问题
  2. 彻底搞定C指针-——第五篇:函数参数的传递
  3. vmware下找不到hgfs
  4. volatile的作用
  5. C++笔记——指针数组/数组指针
  6. 夫妻一方信用卡逾期,另外一方会受到牵连吗?
  7. QQ在屏幕边缘自动隐藏不能正常显示
  8. 乐高收割机器人_乐高圣诞45002作品来啦!——驯鹿拉雪橇、圣诞树、圣诞棒棒糖...
  9. linux下python脚本print中文显示不正确_在终端上运行python脚本,没有打印或显示-为什么?...
  10. python函数增强代码可读性_写Python必须知道的这几个代码技巧!你会吗?