2019独角兽企业重金招聘Python工程师标准>>>

1. Writable序列化

序列化就是把内存中的对象,转换成字节序列 (或其他数据传输协议) 以便于存储 (持久化) 和网络传输。

反序列化就是将收到字节序列 (或其他数据传输协议) 或者是硬盘的持久化数据,转换成内存中的对象。

Java 的序列化是一个重量级序列化框架 (Serializable) ,一个对象被序列化后,会附带很多额外的信息 (各种校验信息,header,继承体系等) ,不便于在网络中高效传输。所以,Hadoop 自己开发了一套序列化机制 (Writable),精简、高效。

1.1. 常用的数据序列化类型

常用的数据类型对应的 Hadoop 数据序列化类型

Java类型 Hadoop Writable类型
boolean   BooleanWritable
byte  ByteWritable
int  IntWritable
float  FloatWritable
long LongWritable
double DoubleWritable
string Text
map MapWritable
array ArrayWritable

1.2. 自定义bean对象实现序列化接口

自定义 bean 对象要想序列化传输,必须实现序列化接口,需要注意以下7项:
(1) 必须实现 Writable 接口;
(2) 反序列化时,需要反射调用空参构造函数,所以必须有空参构造;
(3) 重写序列化方法;
(4) 重写反序列化方法;
(5) 注意反序列化的顺序和序列化的顺序完全一致;
(6) 要想把结果显示在文件中,需要重写 toString(),且用 "\t" 分开,方便后续用;
(7) 如果需要将自定义的 bean 放在 key 中传输,则还需要实现 comparable 接口,因为 mapreduce 过程中的 shuffle 过程一定会对 key 进行排序。

// 1 必须实现Writable接口
public class FlowBean implements Writable {private long upFlow;private long downFlow;private long sumFlow;//2 反序列化时,需要反射调用空参构造函数,所以必须有public FlowBean() {super();}/*** 3重写序列化方法* * @param out* @throws IOException*/@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}/*** 4 重写反序列化方法 * 5 注意反序列化的顺序和序列化的顺序完全一致* * @param in* @throws IOException*/@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}// 6要想把结果显示在文件中,需要重写toString(),且用”\t”分开,方便后续用@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}//7 如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序@Overridepublic int compareTo(FlowBean o) {// 倒序排列,从大到小return this.sumFlow > o.getSumFlow() ? -1 : 1;}
}

2. InputFormat数据切片机制

2.1. FileInputFormat切片机制

2.1.1. job 的提交流程源码详解

waitForCompletion()
submit();
// 1 建立连接connect(); // 1)创建提交job的代理new Cluster(getConfiguration());// (1)判断是本地yarn还是远程initialize(jobTrackAddr, conf); // 2 提交job
submitter.submitJobInternal(Job.this, cluster)// 1) 创建给集群提交数据的Stag路径Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);// 2)获取jobid ,并创建job路径JobID jobId = submitClient.getNewJobID();// 3)拷贝jar包到集群copyAndConfigureFiles(job, submitJobDir);   rUploader.uploadFiles(job, jobSubmitDir);// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);maps = writeNewSplits(job, jobSubmitDir);input.getSplits(job);// 5)向Stag路径写xml配置文件
writeConf(conf, submitJobFile);conf.writeXml(out);// 6)提交job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

2.1.2. FileInputFormat 源码解析 (input.getSplits(job))

(1) 找到你数据存储的目录;
(2) 开始遍历处理 (规划切片) 目录下的每一个文件;
(3) 遍历第一个文件 ss.txt;
      A、获取文件大小 fs.sizeOf(ss.txt);
      B、计算切片大小 computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize=128M;
      C、默认情况下,切片大小 =blocksize;
      D、开始切,形成第 1 个切片:ss.txt—0:128M 第 2 个切片 ss.txt—128:256M 第 3 个切片 ss.txt—256M:300M (每次切片时,都要判断切完剩下的部分是否大于块的 1.1 倍,不大于 1.1 倍就划分一块切片);
      E、将切片信息写到一个切片规划文件中;
      F、整个切片的核心过程在 getSplit() 方法中完成;
      G、数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit 只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等;
      H、注意:block 是 HDFS 上物理上存储的存储的数据,切片是对数据逻辑上的划分;
(4) 提交切片规划文件到 yarn 上,yarn 上的 MrAppMaster 就可以根据切片规划文件计算开启 maptask 个数。

2.1.3. FileInputFormat 中默认的切片机制

(1) 简单地按照文件的内容长度进行切片;
(2) 切片大小,默认等于 block 大小;
(3) 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片。
比如待处理数据有两个文件:

file1.txt    320M
file2.txt    10M

经过 FileInputFormat 的切片机制运算后,形成的切片信息如下:

file1.txt.split1--  0~128
file1.txt.split2--  128~256
file1.txt.split3--  256~320
file2.txt.split1--  0~10M

获取切片信息 API:

// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 获取切片的文件名称
String name = inputSplit.getPath().getName();

2.1.4. FileInputFormat 切片大小的参数配置

通过分析源码,在 FileInputFormat 中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize));  
切片主要由这几个值来运算决定
mapreduce.input.fileinputformat.split.minsize=1 默认值为 1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值 Long.MAXValue
因此,默认情况下,切片大小 =blocksize。
maxsize (切片最大值):参数如果调得比 blocksize 小,则会让切片变小,而且就等于配置的这个参数的值。
minsize (切片最小值):参数调的比 blockSize 大,则可以让切片变得比 blocksize 还大。

2.2. CombineTextInputFormat切片机制

关于大量小文件的优化策略
1. 默认情况下 TextInputformat 对任务的切片机制是按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 maptask,这样如果有大量小文件,就会产生大量的 maptask,处理效率极其低下。
2. 优化策略
(1) 最好的办法,在数据处理系统的最前端 (预处理/采集),将小文件先合并成大文件,再上传到 HDFS 做后续分析;
(2) 补救措施:如果已经是大量小文件在 HDFS 中了,可以使用另一种 InputFormat 来做切片 (CombineTextInputFormat),它的切片逻辑跟 TextFileInputFormat 不同:它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个 maptask;
(3) 优先满足最小切片大小,不超过最大切片大小。

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m举例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m

3. 具体实现步骤

//如果不设置 InputFormat,它默认用的是 TextInputFormat.classjob.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

2.3. 自定义InputFormat

1. 采用自定义 InputFormat 的方式,处理输入小文件的问题。

(1) 自定义一个 InputFormat;
(2) 改写 RecordReader,实现一次读取一个完整文件封装为 KV;
(3) 在输出时使用 SequenceFileOutPutFormat 输出合并文件。

2. 程序实现

(1) 自定义 InputFormat

package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class WholeFileInputformat extends FileInputFormat<NullWritable, BytesWritable>{@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false;}@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {// 1 定义一个自己的recordReaderWholeRecordReader recordReader = new WholeRecordReader();// 2 初始化recordReaderrecordReader.initialize(split, context);return recordReader;}
}

(2) 自定义 RecordReader

package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable> {private FileSplit split;private Configuration configuration;private BytesWritable value = new BytesWritable();private boolean processed = false;@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {// 获取传递过来的数据this.split = (FileSplit) split;configuration = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!processed) {// 1 定义缓存byte[] contents = new byte[(int) split.getLength()];// 2 获取文件系统Path path = split.getPath();FileSystem fs = path.getFileSystem(configuration);// 3 读取内容FSDataInputStream fis = null;try {// 3.1 打开输入流fis = fs.open(path);// 3.2 读取文件内容IOUtils.readFully(fis, contents, 0, contents.length);// 3.3 输出文件内容value.set(contents, 0, contents.length);} catch (Exception e) {} finally {IOUtils.closeStream(fis);}processed = true;return true;}return false;}@Overridepublic NullWritable getCurrentKey() throws IOException, InterruptedException {return NullWritable.get();}@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return processed?1:0;}@Overridepublic void close() throws IOException {}
}

(3) InputFormatDriver 处理流程

package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class InputFormatDriver {static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {private Text filenameKey;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {// 获取切片信息InputSplit split = context.getInputSplit();// 获取切片路径Path path = ((FileSplit) split).getPath();// 根据切片路径获取文件名称filenameKey = new Text(path.toString());}@Overrideprotected void map(NullWritable key, BytesWritable value, Context context)throws IOException, InterruptedException {// 文件名称为keycontext.write(filenameKey, value);}}public static void main(String[] args) throws Exception {args = new String[] { "e:/input", "e:/output11" };Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(InputFormatDriver.class);job.setInputFormatClass(WholeFileInputFormat.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setMapperClass(SequenceFileMapper.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

3. MapTask工作机制

3.1. 问题引出

maptask 的并行度决定 map 阶段的任务处理并发度,进而影响到整个 job 的处理速度。那么,mapTask 并行任务是否越多越好呢?

3.2. MapTask并行度决定机制

一个 job 的 map 阶段 MapTask 并行度(个数),由客户端提交 job 时的切片个数决定。

3.3. MapTask工作机制

(1) Read 阶段:Map Task 通过用户编写的 RecordReader,从输入 InputSplit 中解析出一个个 key/value。
(2) Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map() 函数处理,并产生一系列新的 key/value。
(3) Collect 阶段:在用户编写 map() 函数中,当数据处理完成后,一般会调用 OutputCollector.collect() 输出结果。在该函数内部,它会将生成的 key/value 分区 (调用 Partitioner),并写入一个环形内存缓冲区中。
(4) Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

溢写阶段详情:
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号 partition 进行排序,然后按照 key 进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序;
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件 output/spillN.out (N表示当前溢写次数) 中。如果用户设置了 Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作;
步骤3:将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当期内存索引大小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。
(4) Combine 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件 output/file.out 中,同时生成相应的索引文件 output/file.out.index。
在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并 io.sort.factor (默认100) 个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

4. Shuffle机制

Mapreduce 确保每个 reducer 的输入都是按键排序的。系统执行排序的过程 (即将 map 输出作为输入传给 reducer) 称为 shuffle。

4.1. MapReduce 工作流程

4.1.1. 流程示意图

4.1.2. 流程详解

上面的流程是整个 mapreduce 最全工作流程,但是 shuffle 过程只是从第 7 步开始到第 16 步结束,具体 shuffle 过程详解,如下:
(1) maptask 收集我们的 map() 方法输出的 kv 对,放到内存缓冲区中;
(2) 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件;
(3) 多个溢出文件会被合并成大的溢出文件;
(4) 在溢出过程中,及合并的过程中,都要调用 partitoner 进行分组和针对 key 进行排序;
(5) reducetask 根据自己的分区号,去各个 maptask 机器上取相应的结果分区数据;
(6) reducetask 会取到同一个分区的来自不同 maptask 的结果文件,reducetask 会将这些文件再进行合并(归并排序);
(7) 合并成大文件后,shuffle 的过程也就结束了,后面进入 reducetask 的逻辑运算过程(从文件中取出一个一个的键值对 group,调用用户自定义的 reduce() 方法)。

4.1.3 注意

Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快。
缓冲区的大小可以通过参数调整,参数:io.sort.mb  默认100M。

4.2. partition分区

4.2.1 问题引出

要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

4.2.2. 默认分区

public class HashPartitioner<K, V> extends Partitioner<K, V> {/** Use {@link Object#hashCode()} to partition. */public int getPartition(K key, V value, int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}
}

默认分区是根据 key 的 hashCode 对 reduceTasks 个数取模得到的。用户没法控制哪个 key 存储到哪个分区。

4.2.3. 自定义Partitioner步骤

(1) 自定义类继承 Partitioner,重写 getPartition() 方法。

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {// 1 获取电话号码的前三位String preNum = key.toString().substring(0, 3);int partition = 4;// 2 判断是哪个省if ("136".equals(preNum)) {partition = 0;}else if ("137".equals(preNum)) {partition = 1;}else if ("138".equals(preNum)) {partition = 2;}else if ("139".equals(preNum)) {partition = 3;}return partition;}
}

(2) 在 job 驱动中,设置自定义 partitioner。

job.setPartitionerClass(CustomPartitioner.class)

(3) 自定义 partition 后,要根据自定义 partitioner 的逻辑设置相应数量的 reduce task。

job.setNumReduceTasks(5);

4.2.4. 注意

如果 reduceTask 的数量大于 getPartition 的结果数,则会多产生几个空的输出文件 part-r-000xx;
如果 reduceTask 的数量大于 1 且小于 getPartition 的结果数,则有一部分分区数据无处安放,会报 Exception;
如果 reduceTask 的数量等于1,则不管 mapTask 端输出多少个分区文件,最终结果都交给这一个 reduceTask,最终也就只会产生一个结果文件 part-r-00000;

例如:假设自定义分区数为5,则
(1) job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件
(2) job.setNumReduceTasks(2);会报错
(3) job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件

4.3. 排序

4.3.1. 基本概念

排序是 MapReduce 框架中最重要的操作之一。Map Task 和 Reduce Task 均会对数据 (按照 key )进行排序。该操作属于 Hadoop 的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
对于 Map Task,它会将处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次排序,并将这些有序数据写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行一次合并,以将这些文件合并成一个大的有序文件。
对于 Reduce Task,它从每个 Map Task 上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则放到磁盘上,否则放到内存中。如果磁盘上文件数目达到一定阈值,则进行一次合并以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据写到磁盘上。当所有数据拷贝完毕后,Reduce Task 统一对内存和磁盘上的所有数据进行一次合并。

4.3.2. 排序的分类

(1) 部分排序
MapReduce 根据输入记录的键对数据集排序。保证输出的每个文件内部排序。
(2) 全排序
如何用 Hadoop 产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了 MapReduce 所提供的并行架构。
替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为上述文件创建 3 个分区,在第一分区中,记录的单词首字母 a-g,第二分区记录单词首字母 h-n, 第三分区记录单词首字母 o-z。
(3) 辅助排序( GroupingComparator 分组)
Mapreduce 框架在记录到达 reducer 之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的 map 任务且这些 map 任务在不同轮次中完成时间各不相同。一般来说,大多数 MapReduce 程序会避免让 reduce 函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。

4.3.3. 自定义排序 WritableComparable

(1) 原理分析
bean 对象实现 WritableComparable 接口重写 compareTo() 方法,就可以实现排序。
(2) 案例
统计每个手机号的流量,并按总量排序。

A、改造 FlowBean 对象,添加比较功能。

package com.mr.sort
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable<FlowBean> {private long upFlow;private long downFlow;private long sumFlow;// 反序列化时,需要反射调用空参构造函数,所以必须有public FlowBean() {super();}public FlowBean(long upFlow, long downFlow) {super();this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}public void set(long upFlow, long downFlow) {this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}/*** 序列化方法* @param out* @throws IOException*/@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}/*** 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致* @param in* @throws IOException*/@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}@Overridepublic int compareTo(FlowBean o) {// 倒序排列,从大到小return this.sumFlow > o.getSumFlow() ? -1 : 1;}
}

B、Map 方法优化为一个对象,reduce 方法则直接输出结果即可,驱动函数根据输入输出重写配置即可。

package com.mr.sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowCountSort {static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{FlowBean bean = new FlowBean();Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 1 拿到的是上一个统计程序输出的结果,已经是各手机号的总流量信息String line = value.toString();// 2 截取字符串并获取电话号、上行流量、下行流量String[] fields = line.split("\t");String phoneNbr = fields[0];long upFlow = Long.parseLong(fields[1]);long downFlow = Long.parseLong(fields[2]);// 3 封装对象bean.set(upFlow, downFlow);v.set(phoneNbr);// 4 输出context.write(bean, v);}}static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{@Overrideprotected void reduce(FlowBean bean, Iterable<Text> values, Context context)throws IOException, InterruptedException {context.write(values.iterator().next(), bean);}}public static void main(String[] args) throws Exception {// 1 获取配置信息,或者job对象实例Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 6 指定本程序的jar包所在的本地路径job.setJarByClass(FlowCountSort.class);// 2 指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(FlowCountSortMapper.class);job.setReducerClass(FlowCountSortReducer.class);// 3 指定mapper输出数据的kv类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);// 4 指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 5 指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));Path outPath = new Path(args[1]);
//      FileSystem fs = FileSystem.get(configuration);
//      if (fs.exists(outPath)) {
//          fs.delete(outPath, true);
//      }FileOutputFormat.setOutputPath(job, outPath);// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

C、将程序打成 jar 包,然后拷贝到 Hadoop 集群中执行 FlowCountSort 程序,并查看执行结果。

# 执行程序
hadoop jar FlowCountSort.jar com.mr.sort.FlowCountSort /input/flowcount/ /output/flowcount/# 查看程序结果
hadoop fs -cat /output/flowcount/part-r-00000

4.4. GroupingComparator分组

功能:对 reduce 阶段的数据根据某一个或几个字段进行分组。

案例:求每个订单中最贵的商品

4.4.1. 需求

有如下订单数据,现在需要求出每一个订单中最贵的商品。

订单原始数据订单id         商品id    成交金额
Order_0000001   Pdt_01  222.8
Order_0000001   Pdt_05  25.8
Order_0000002   Pdt_03  522.8
Order_0000002   Pdt_04  122.4
Order_0000002   Pdt_05  722.4
Order_0000003   Pdt_01  222.8
Order_0000003   Pdt_02  33.8预期结果数据part-r-00000 文件
Order_0000001 222.8part-r-00001 文件
Order_0000002 722.4part-r-00002 文件
Order_0000003 222.8

将上述订单原始数据的数据部分写入 GroupingComparatorTest.txt 文件中,之后作为程序的输入文件,预期得到三个输出数据结果文件 part-r-00000、part-r-00001、part-r-00002。

4.4.2. 分析

(1) 利用“订单 id 和成交金额”作为 key,可以将 map 阶段读取到的所有订单数据按照 id 分区,按照金额排序,发送到 reduce。
(2) 在 reduce 端利用 groupingcomparator 将订单 id 相同的 kv 聚合成组,然后取第一个即是最大值。

4.4.3. 实现

(1) 定义订单信息 OrderBean

package com.test.mapreduce.order;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;public class OrderBean implements WritableComparable<OrderBean> {private String orderId;private double price;public OrderBean() {super();}public OrderBean(String orderId, double price) {super();this.orderId = orderId;this.price = price;}public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}@Overridepublic void readFields(DataInput in) throws IOException {this.orderId = in.readUTF();this.price = in.readDouble();}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(orderId);out.writeDouble(price);}@Overridepublic int compareTo(OrderBean o) {// 1 先按订单id排序(从小到大)int result = this.orderId.compareTo(o.getOrderId());if (result == 0) {// 2 再按金额排序(从大到小)result = price > o.getPrice() ? -1 : 1;}return result;}@Overridepublic String toString() {return orderId + "\t" + price ;}
}

(2) OrderSortMapper 处理流程

package com.test.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class OrderSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{OrderBean bean = new OrderBean();@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {// 1 获取一行数据String line = value.toString();// 2 截取字段String[] fields = line.split("\t");// 3 封装beanbean.setOrderId(fields[0]);bean.setPrice(Double.parseDouble(fields[2]));// 4 写出context.write(bean, NullWritable.get());}
}

(3) 编写 OrderSortReducer 处理流程

package com.test.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;public class OrderSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{@Overrideprotected void reduce(OrderBean bean, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException {// 直接写出context.write(bean, NullWritable.get());}
}

(4) 编写 OrderSortDriver 处理流程

package com.test.mapreduce.order;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class OrderSortDriver {public static void main(String[] args) throws Exception {// 1 获取配置信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 设置jar包加载路径job.setJarByClass(OrderSortDriver.class);// 3 加载map/reduce类job.setMapperClass(OrderSortMapper.class);job.setReducerClass(OrderSortReducer.class);// 4 设置map输出数据key和value类型job.setMapOutputKeyClass(OrderBean.class);job.setMapOutputValueClass(NullWritable.class);// 5 设置最终输出数据的key和value类型job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);// 6 设置输入数据和输出数据路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 10 设置reduce端的分组job.setGroupingComparatorClass(OrderSortGroupingComparator.class);// 7 设置分区job.setPartitionerClass(OrderSortPartitioner.class);// 8 设置reduce个数job.setNumReduceTasks(3);// 9 提交boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

(5) 编写 OrderSortPartitioner 处理流程

package com.test.mapreduce.order;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;public class OrderSortPartitioner extends Partitioner<OrderBean, NullWritable>{@Overridepublic int getPartition(OrderBean key, NullWritable value, int numReduceTasks) {return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;}
}

(6) 编写 OrderSortGroupingComparator 处理流程

package com.test.mapreduce.order;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class OrderSortGroupingComparator extends WritableComparator {protected OrderSortGroupingComparator() {super(OrderBean.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {OrderBean abean = (OrderBean) a;OrderBean bbean = (OrderBean) b;// 将orderId相同的bean都视为一组return abean.getOrderId().compareTo(bbean.getOrderId());}
}

4.5. Combiner合并

4.5.1. 基本概念

(1) combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件
(2) combiner 组件的父类就是 Reducer
(3) combiner 和 reducer 的区别在于运行的位置:Combiner 是在每一个 maptask 所在的节点运行,Reducer 是接收全局所有 Mapper 的输出结果;
(4) combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量。
(5) combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来。

Mapper
3 5 7 ->(3+5+7)/3=5
2 6 ->(2+6)/2=4Reducer
(3+5+7+2+6)/5=23/5    不等于    (5+4)/2=9/2

4.5.2. 案例实操

需求:统计单词计数的过程中对每一个 maptask 的输出进行局部汇总,以减小网络传输量即采用 Combiner 功能。
方案一:增加一个 WordcountCombiner 类继承 Reducer,然后在 WordcountDriver 驱动类中指定 combiner。

package com.test.mr.combiner;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int count = 0;for(IntWritable v :values){count = v.get();}context.write(key, new IntWritable(count));}
}// 驱动类中指定需要使用combiner,以及用哪个类作为combiner的逻辑
job.setCombinerClass(WordcountCombiner.class);

方案二:将 WordcountReducer 作为 combiner 在 WordcountDriver 驱动类中指定,即直接在驱动类中指定 WordCountReducer 作为 combine。

// 指定需要使用combiner,以及用哪个类作为combiner的逻辑
job.setCombinerClass(WordcountReducer.class);

本文为原创文章,如果对你有一点点的帮助,别忘了点赞哦!比心!如需转载,请注明出处,谢谢!

转载于:https://my.oschina.net/aibinxiao/blog/3003240

Hadoop之MapReduce理论篇01相关推荐

  1. Hadoop之mapreduce 实例三

    Hadoop之mapreduce 实例三 转载于:https://www.cnblogs.com/chaoren399/archive/2013/01/04/2844503.html

  2. hadoop和python的关系_Python 的 map 和 reduce 和 Hadoop 的 MapReduce 有什么关系?

    先说结论.Python 的 map 和 reduce 是Python的内置函数,而 Hadoop 的 MapReduce 是一个计算框架. 两者之间没有直接的关系.但是他们的部分计算操作思想是类似的. ...

  3. 解密回声消除技术之一(理论篇)

    http://hulong988.blog.51cto.com 解密回声消除技术之一(理论篇) 2009-06-11 22:24:58 标签:语音 职场 休闲 通讯 原创作品,允许转载,转载时请务必以 ...

  4. 【机器学习】Logistic Regression 的前世今生(理论篇)

    Logistic Regression 的前世今生(理论篇) 本博客仅为作者记录笔记之用,不免有很多细节不对之处. 还望各位看官能够见谅,欢迎批评指正. 博客虽水,然亦博主之苦劳也. 如需转载,请附上 ...

  5. 手撕 CNN 经典网络之 VGGNet(理论篇)

    2014年,牛津大学计算机视觉组(Visual Geometry Group)和Google DeepMind公司一起研发了新的卷积神经网络,并命名为VGGNet.VGGNet是比AlexNet更深的 ...

  6. Hadoop中mapreduce作业日志是如何生成的

    摘要:本篇博客介绍了hadoop中mapreduce类型的作业日志是如何生成的.主要介绍日志生成的几个关键过程,不涉及过多细节性的内容. 本文分享自华为云社区<hadoop中mapreduce作 ...

  7. hadoop之MapReduce学习教程

    hadoop之MapReduce学习 MapReduce概述 MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发"基于Hadoop的数据分析应用" ...

  8. hadoop之mapreduce教程+案例学习(二)

    第3章 MapReduce框架原理 目录 第3章 MapReduce框架原理 3.1 InputFormat数据输入 3.1.1 切片与MapTask并行度决定机制 3.1.2 Job提交流程源码和切 ...

  9. (超详细)大数据Hadoop之MapReduce组件

    一.MapReduce 简介 1.1 MapReduce的概述 在Hadoop生态圈中,MapReduce属于核心,负责进行分布式计算. MapReduce 核心功能是将用户编写的业务逻辑代码和自带默 ...

最新文章

  1. python读取html内容 dom获取_python学习笔记十三 JS,Dom(进阶篇)
  2. TX2更新源失败的问题
  3. Save could not be completed. Eclipse国际化的问题解决
  4. 取消MySQL timestamp列默认ON UPDATE CURRENT_TIMESTAMP
  5. 第二章:系统困境之 试图通过线性努力获得线性增长
  6. 使用视觉信息,为什么能把移动机器人的空间位置信息记录下来
  7. python字符串_python字符串格式化
  8. 【RabbitMQ】5、RabbitMQ任务分发机制
  9. 数据结构 链式哈希表(Hash Table)的接口定义与实现分析(完整代码)
  10. php位运算符与逻辑运算_位运算符及PHP中位运算的应用笔记
  11. 面试题--------3、string stringbuffer stringbuilder的区别
  12. 辨别 优盘 真假 [金士顿]
  13. Intel 内部指令 --- AVX和AVX2学习笔记
  14. 哈尔滨工业大学车万翔:自然语言处理新范式
  15. word-break 换行
  16. 公众号题库系统接口-网课答案解析接口
  17. 使用appendChild和insertBefore
  18. 最新友盟微信,QQ与微博分享集成方案
  19. 本地计算机 上的 OracleOraDb11g_home1TNSListener 服务启动后停止故障解决办法
  20. 快播3在线安装程序变身快播下载器 vb 源代码以及调用方法

热门文章

  1. 使用npm安装一些包失败了的看过来(npm国内镜像介绍)
  2. C#中字符串转换成枚举类型的方法
  3. Linux 操作memcache命令行
  4. 数据绑定控件Reperter
  5. 微软在位Azure自动机器学习服务释无程序代码网页UI
  6. 在PyCharm中自动添加文件头、时间日期等信息
  7. Collections.unmodifiableCollection
  8. httpd-2.4.9.tar.bz2的编译安装配置以及CGI、虚拟主机、https、mod_deflate、mod_status的实现。...
  9. What's the difference between forever and for good?
  10. 中琛源主要的产品是什么