文章目录

  • 实验汇总
    • MapReduce计算案例
      • 序列化
        • Writable
        • 常用的Writable类
        • 自定义Writable
        • 序列号案例分析
        • 总结
    • 排序
      • 排序方式
      • 自定义排序
      • 全排序案例
      • 区内排序
    • TextInputFormat切片
      • CombineTextInputFormat切片机制
        • 代码
        • CombineTextInputFormat 切片机制的原理
    • FileInputFormat实现类
      • TextInputFormat
      • KeyValueTextInputFormat
        • KeyValueTextInputFormat具体实现样例
      • NLFileInputFormat
        • NLFileInputFormat具体实现样例
      • ==自定义InputFormat==
        • 样例
        • ==自定义InputFormat==
        • ==自定义RecordReader类==
        • Mapper
        • Reduce
        • Job
        • 总结
    • Partition分区
      • 自定义Partition流程
      • 案例
    • Combiner
      • 自定义Combiner方法
      • 案例
      • 总结
    • GroupingComparator 分组(辅助排序)
      • 案例
  • 自己瞎写的公众号与博客

实验汇总

MapReduce计算案例

序列化

序列化:把结构化对象转换为字节流。

反序列化

java序列化Serializable

Writable

hadoop有自己的序列化机制Writable

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Writable {/** * Serialize the fields of this object to <code>out</code>.* write 是把每个对象序列化到输出流* @param out <code>DataOuput</code> to serialize this object into.* @throws IOException*/void write(DataOutput out) throws IOException;/** * Deserialize the fields of this object from <code>in</code>.  * * <p>For efficiency, implementations should attempt to re-use storage in the * existing object where possible.</p>** 输入流字节反序列化* @param in <code>DataInput</code> to deseriablize this object from.* @throws IOException*/void readFields(DataInput in) throws IOException;
}
  • 让实体类实现Writable接口

    • MR的任意key必须实现WritableComparable接口

    • WritableComparables can be compared to each other, typically via Comparators. Any type which is to be used as a key in the Hadoop Map-Reduce framework should implement this interface.

      @InterfaceAudience.Public
      @InterfaceStability.Stable
      public interface WritableComparable<T> extends Writable, Comparable<T> {}
      

常用的Writable类

  • Text相当于String的Writable

    • @Stringable
      @InterfaceAudience.Public
      @InterfaceStability.Stable
      public class Text extends BinaryComparableimplements WritableComparable<BinaryComparable> {}
      

自定义Writable

/*** @author whai_luo* @Date: 2020/5/20 13:16*/
public class FlowBean implements Writable {private Long upFlow;private Long downFlow;private Long sumFlow;/*** 序列化方法* @param out* @throws IOException*/public void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}/*** 反序列化方法* @param in* @throws IOException*/public void readFields(DataInput in) throws IOException {this.upFlow = in.readLong();this.downFlow = in.readLong();this.sumFlow = in.readLong();}//记得加上get、set、构造方法、equals等方法!
}

序列号案例分析

  • 需求:统计每一个手机号耗费的总上行流量、下行流量、总流量

  • 输入数据格式

    7 13560436666 120.196.100.99 1116 954 200
    Id 手机号码 网络ip 上行流量 下行流量 网络状态码

    • Map阶段:抽取一行,切分字段,抽取手机号、上行、下行流量三个字段。

      • 以手机号为key,bean对象(已实现序列号接口)为value输出给Reduce

      • 期望输出给Reduce:

        13560436666 1116 954 2070
        手机号码 上行流量 下行流量 总流量(通过Map计算)

    • Reduce阶段:累积同一个号码的上行、下行流量得到总的流量。

      • 期望输出的最终结果:

        13560436666 1116 + 954 = 2070
        手机号码 上行流量 下行流量 总流量

  • 代码

    • FlowBean代码在上面噢!
    • Mapper
    /***  输入数据格式:*   key为字符长度索引*   value: 7    13560436666  120.196.100.99    1116    954        200*          id    手机号码           网络 ip     上行流量   下行流量   网络状态码** mapper输出 <134....,1116,954>***  期望输出数据格式* 13560436666      1116               954        2070* 手机号码          上行流量            下行流量    总流量** @author whai_luo* @Date: 2020/5/20 13:26*/
    public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取value值String line = value.toString();//切割获取字段String[] fields = line.split(" ");//封装对象//0为id 1为手机号码 ...String phoneNum = fields[1];//获取上下行流量Long upFlow = Long.valueOf(fields[fields.length-3]);Long downFlow = Long.valueOf(fields[fields.length-2]);//封装上传下载流量到FlowBeanFlowBean flowBean = new FlowBean(upFlow,downFlow);System.out.println("Mapper过程:电话号码"+phoneNum+"流量flowBean:"+flowBean);//mapper输出 <134....,1116,954>//<134....,FlowBean>context.write(new Text(phoneNum), flowBean);}
    }
    
    • Reduce
    /**** 输入格式* Text         FlowBean* <15858128962,flowbean1>* <15858128962,flowbean2>* <13682952534,flowbean3>* <1368224.....>** 汇总流量*        输出格式**  Text :         13560436666*                  手机号码**  FlowBean  :     1116          954     2070*                 上行流量       行流量    总流量** @author whai_luo* @Date: 2020/5/20 13:41*/
    public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {/*** 相同的phoneNum进入同一个Reducer* @param key phoneNum* @param values beanFlow多个的集合* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {Long sum_upFlow= Long.valueOf(0);Long sum_downFlow= Long.valueOf(0);//遍历bean进行统计累加for (FlowBean value : values) {sum_downFlow+=value.getDownFlow();sum_upFlow+=value.getDownFlow();}//获取的总和流量进行封装FlowBean resultflowBean = new FlowBean(sum_upFlow,sum_downFlow);//输出context.write(key, resultflowBean); }
    }
    
    • Job
    /*** @author whai_luo* @Date: 2020/5/20 13:51*/
    public class FlowSumDriver {public static void main(String[] args) throws Exception {//输入输出路径根据电脑设置args = new String[]{"E:/phone_data.txt","E:/"};//配置信息 job对象实例Configuration conf = new Configuration();Job job = Job.getInstance(conf);//入口jar包本地路径job.setJarByClass(FlowSumDriver.class);//MapperReduce配置job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);//输出格式指定job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//输入输出路径配置FileInputFormat.setInputPaths(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));//将job和相关jar和类交给yarn运行System.exit(job.waitForCompletion(true)?0:1);}
    }
    
    • 输出结果

      hadoop jar wordcount.jar cn.mr.WordCount [Inputpath] [OutputPath]

      13736230513 2481 24681
      13846544121 200 1432
      13956435636 200 200
      13966251146 404 200
      18271575951 1527 2106
      84188413 4116 1432
      13590439668 954 200
      15910133277 3156 2936
      13729199489 200 200
      13630577991 6960 690
      15043685818 3659 3538
      15959002129 1938 180
      13560439638 200 500
      13470253144 200 200
      13682846555 2910 200
      13992314666 3008 3720
      13509468723 7335 110349
      18390173782 9531 2412
      13975057813 11058 48243
      13768778790 200 200
      13568436656 2481 24681
      13568436656 954 200

总结

1、什么是序列化,其使用场景是什么?

Java中序列化是将对象转换为字节码的过程,字节码能够更好得进行数据的传输,字节码文件在传输过程能够很好的保持完整性。

反序列化是将字节码文件转换为原来对象的方法

序列化的使用场景:跨平台使用,网络中流的形式将对象进行传输,序列化的字节码数据进行储存。

2、简述 hadoop 实现自定义对象序列化的步骤?

本实例中自定义FlowBean

(1)实现writable接口,实现其中的方法write为序列号、化方法,readFileds为反序列号方法

(2)默认空参构造方法,java反射通过字节码反射生成原来对象,需要空参构造方法。

(3)重写write方法,让bean中需要进行序列化的字段写入DataOutput中

(4)重写readFileds方法,读取输入的DataInput,赋值于本实例,进行反序列化

(5)复写toString()

(6)根据要求写入构造方法

排序

Hadoop默认对key按照字典顺序快速排序;

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序方式

  • 部分排序 根据输入的键排序,保证输出的每个文件内部有序。
  • 全排序 最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
  • 辅助排序 在Reduce中对key进行分组,接收key为bean对象时让字段相同的key进入同一个reduce()方法
  • 二次排序 自定义排序,如果compareTo中判断条件为两个即为二次排序

自定义排序

通过bean对象实现WritableComparable接口重写compareTo方法实现排序。

@Override
public int compareTo(FlowBean o) {return Long.compare(o.sumFlow,this.sumFlow);//        int result;//        //按流量大小倒序排列//        if (sumFlow > bean.getSumFlow()){//            //大//            result = -1;//        }else if (sumFlow < bean.getSumFlow()){//            //小//            result = 1;//        }else {//            //相等//            result = 0;//        }//        return result;
}

全排序案例

  • bean

    public class FlowBean implements WritableComparable<FlowBean> {private long upFlow;private long downFlow;private long sumFlow;// 反序列化时,需要反射调用空参构造函数,所以必须有//getset/**
    * 序列化方法
    * @param out
    * @throws IOException
    */@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}/**
    * 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致
    * @param in
    * @throws IOException*/@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}@Overridepublic int compareTo(FlowBean o) {int result;// 按照总流量大小,倒序排列if (sumFlow > bean.getSumFlow()) {result = -1;}else if (sumFlow < bean.getSumFlow()) {result = 1;}else {result = 0;}return result; }
    }
    
  • Mapper Reduce:参考如上案例

  • Job

    public static void main(String[] args) throws Exception {//配置信息 job对象实例Configuration conf = new Configuration();Job job = Job.getInstance(conf);//入口jar包本地路径job.setJarByClass(FlowSumDriver.class);//MapperReduce配置job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);//输出格式指定job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//指定自定义数据分区//        job.setPartitionerClass(ProvincePartitioner.class);////        //同时指定相应数量的 reduce task//        job.setNumReduceTasks(5);//输入路径for (int i = 0; i < args.length-1 ; i++) {FileInputFormat.setInputPaths(job,new Path(args[i]));}FileOutputFormat.setOutputPath(job,new Path(args[args.length-1]));//将job和相关jar和类交给yarn运行System.exit(job.waitForCompletion(true)?0:1);
    }
    
  • 总结

    bean类实现比较的接口,在通过key、value的输入输出时会自动使用该方法来比较。

区内排序

即分区(Partition)+排序的组合

  • 在如上全排序基础上,增加分区代码即可

    /*** @author whai_luo* @Date: 2020/6/3 13:09*/
    public class ProvincePartitioner extends Partitioner<FlowBean,Text> {@Overridepublic int getPartition(FlowBean flowBean,Text text, int numPartitions) {//获取前3位号码String preNum = text.toString().substring(0, 3);int partition = 4;//判断省份if ("136".equals(preNum)) {partition = 0;}else if ("137".equals(preNum)) {partition = 1;}else if ("138".equals(preNum)) {partition = 2;}else if ("139".equals(preNum)) {partition = 3;}//返回结果return partition;}
    }
    
  • job

    //指定自定义数据分区
    job.setPartitionerClass(ProvincePartitioner.class);//同时指定相应数量的 reduce task
    job.setNumReduceTasks(5);
    
  • 执行结果

    排序后实现了对不同号码进入不同文件的分类,并实现流量排序。

TextInputFormat切片

hadoop框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。

CombineTextInputFormat切片机制

  • 输入格式

    5个文件 0.5M + 1.12M + 0.4M + 0.9M + 0.04M

代码

public class MapReduceWCDriver {/*** CombineTextInputFormat 切片机制*      框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会* 是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的* MapTask,处理效率极其低下。**      1、应用场景:*          CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划*          到一个切片中,这样,多个小文件就可以交给一个 MapTask 处理。**      2、虚拟存储切片最大值设置*          CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m*              注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。**      3、切片机制*              生成切片过程包括:虚拟存储过程和切片过程二部分。*          切片过程:*               (a)判断虚拟存储的文件大小是否大于 setMaxInputSplitSize 值,大于等于则单独*                   形成一个切片。*               (b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。**              / *   setMaxInputSplitSize 值为 4M*                        8.02MB = 4M + (2.01M + 2.01 M)**                      (1.7M,5.1M,3.4M,6.8M)=(1.7M)(2.55M、2.55M)(*              * /*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {//配置信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);//job设置主类(本类)job.setJarByClass(MapReduceWCDriver.class);//Mapper reduce设置job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//map输出类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//最终输出job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 如果不设置 InputFormat,它默认用的是 TextInputFormat.classjob.setInputFormatClass(CombineTextInputFormat.class);//虚拟存储切片最大值设置 4mCombineTextInputFormat.setMaxInputSplitSize(job,1024*1024*4);//输入输出路径设置for (int i = 0; i < args.length-1; i++) {FileInputFormat.addInputPath(job,new Path(args[i]));}FileOutputFormat.setOutputPath(job,new Path(args[args.length-1]));//提交/***  Submit the job to the cluster and wait for it to finish.*/boolean b = job.waitForCompletion(true);/*** Terminates the currently running Java Virtual Machine.*/System.exit(b?0:1);}
  • 不做任何处理,运行 WordCount 案例程序,观察切片个数为5

    即5个文件5个输出

  • MapReduceWCDriver增加上述代码后输出结果为1

    调整切片大小为4M时只有1个输出

  • MapReduceWCDriver再次修改如下,结果为3

    调整切片大小为1M时有3个输出

// 如果不设置 InputFormat,它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置 1m
CombineTextInputFormat.setMaxInputSplitSize(job,1024*1024*1);

CombineTextInputFormat 切片机制的原理

  1. 虚拟储存过程

    通过设置setMaxInputSplitSize设置切片输入最大值,输入文件时,这个最大值与文件的大小比较,当小于这个最大值时,自动划分为一个块;当大于这个最大值但小于最大值的2倍,则划分为2个等大小的块;当大于2倍最大值时,分割为一个最大值块,剩余的文件分割为2个大小相得的块。

  2. 实际切片过程

    从(1)中获取到的虚拟存储过程的文件,当虚拟存储文件大于或等于设置的最大值则会形成一个切片块,当小于则会和下一个虚拟存储文件合并形成一个新的切片

FileInputFormat实现类

  • 常见的FileInputFormat实现类

    • TextInputFormat 按行读取
    • KeyValueTextInputFormat
    • NLineInputFormat
    • CombineTextInputFormat
    • 自定义InputFormat
  • FIleInputFormat

FileInputFormat is the base class for all file-based InputFormats. This provides a generic implementation of {@link #getSplits(JobConf, int)}. Subclasses of FileInputFormat can also override the {@link #isSplitable(FileSystem, Path)} method to ensure input-files are not split-up and are processed as a whole by {@link Mapper}s.

/** * A base class for file-based {@link InputFormat}.*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {

TextInputFormat

TextInputFormat是==默认的FileInputFormat实现类==。

  • 按行读取每条记录。键是存储该行在整个文件中的

  • 起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),

/**
An {@link InputFormat} for plain text files.  Files are broken into lines.
Either linefeed(换行) or carriage-return(回车) are used to signal end of line.  Keys are
the position in the file, and values are the line of text..
对于纯文本文件。文件被分成几行。换行或回车都用来表示行结束了。键是文件中的位置,值是文本行*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextInputFormat extends FileInputFormat<LongWritable, Text>implements JobConfigurable {...}

KeyValueTextInputFormat

/**
An {@link InputFormat} for plain text files. Files are broken into lines.
Either linefeed or carriage-return are used to signal end of line. Each line
is divided into key and value parts by a separator byte. If no such a byte
exists, the key will be the entire line and value will be empty.
每一行会通过设定的separator byte被分为kV两个部分,如果没有设置byte则key为一整行、value为空。
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class KeyValueTextInputFormat extends FileInputFormat<Text, Text>implements JobConfigurable {

KeyValueTextInputFormat具体实现样例

  • 需求:统计输入文件中每一行第一个单词相同的行数。

  • Mapper

    通过设置分割符为空格,直接将每一行第一个单词作为key,value为1输出给reduce

@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {//写出context.write(key,new LongWritable(1));
}
  • Reduce

    统计,相同key(即初始第一个单词相同的会进入同一个reduce),只需要用一个全局变量v统计次数即可。

LongWritable v = new LongWritable();
/**
input: <[key],1>
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {//初始为0long sum = 0L;for (LongWritable value : values) {sum += value.get(); //获取数目}v.set(sum);//统计结果输出context.write(key,v);
}
  • Job
//配置文件设置切割符 为空格
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR," ");
// 使用 KeyValueTextInputFormat处理记录数
job.setInputFormatClass(KeyValueTextInputFormat.class);
/* @author whai_luo* @Date: 2020/5/27 12:51*/
public class MapperReduceDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();//配置文件设置切割符 为空格conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR," ");/*每 一 行 均 为 一 条 记 录 , 被 分 隔 符 分 割 为 key , value 。 可 以 通 过 在 驱 动 类 中 设 置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t"); 来设定分隔符。默认分隔符是 tab ( \t )。以下是一个示例,输入是一个包含 4 条记录的分片。其中——> 表示一个(水平方向的)制表符。*/Job job = Job.getInstance(conf);//设置jar包位置,省略....// 使用 KeyValueTextInputFormat处理记录数job.setInputFormatClass(KeyValueTextInputFormat.class);//输入路径for (int i = 0; i < args.length-1 ; i++) {FileInputFormat.setInputPaths(job,new Path(args[i])); }FileOutputFormat.setOutputPath(job,new Path(args[args.length-1]));//job提交job.waitForCompletion(true);}
}
  • 输出

NLFileInputFormat

  • 按照NLFileInputFormat指定的行数划分切片,切片数目=总行数/N

NLineInputFormat which splits N lines of input as one split.
In many “pleasantly” parallel applications, each process/mapper
processes the same input file (s), but with computations are
controlled by different parameters.(Referred to as “parameter sweeps”).
One way to achieve this, is to specify a set of parameters
(one set per line) as input in a control file
(which is the input path to the map-reduce application,
where as the input dataset is specified via a config variable in JobConf.).
The NLineInputFormat can be used in such applications, that splits
the input file such that by default, one line is fed as
a value to one map task, and key is the offset.
i.e. (k,v) is (LongWritable, Text).
The location hints will span the whole mapred cluster.

NLFileInputFormat具体实现样例

  • 需求:对每个单词进行个数统计,要求根据每个输入文件的行数来规定输出多少个切片。要求每三行放入一个切片中。

  • Mapper

    划分每个单词,并循环输出<[key],1>,相同key进入同一个Reduce

    private Text k = new Text();
    private LongWritable v = new LongWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1.获取一行String line = value.toString();//2.切割String[] splits = line.split(" ");//3.循环写出for (int i = 0; i < splits.length ; i++) {k.set(splits[i]);context.write(k,v);}
    }
    
  • Reduce

    统计每个单词1的累加为value

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {//初始为0long sum = 0;for (LongWritable value : values) {sum += value.get();}v.set(sum);//统计结果输出context.write(key,v);
    }
    
  • Job 设置切片

    //设置每个切片 InputSplit 中划分三条记录
    NLineInputFormat.setNumLinesPerSplit(job, 3);
    //使用 NLineInputFormat 处理记录数
    job.setInputFormatClass(NLineInputFormat.class);
    

自定义InputFormat

在企业开发中, Hadoop 框架自带的 InputFormat 类型不能满足所有应用场景,需要自定义 InputFormat 来解决实际问题。

自定义 InputFormat 步骤如下:

  • 自定义一个类继承FileInputFormat。
  • 改写RecordReader,实现一次读取一个完整文件封装为KV。
  • 在输出时使用SequenceFileOutPutFormat输出合并文件。

样例

  • 需求: 将多个小文件合并成一个 SequenceFile 文件(SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对的文件格式),SequenceFile 里面存储着多个文件,存储的形式为文件路径+名称为 key,文件内容为 value。

  • 实现过程

    1. 自定义一个类继承 FileInputFormat 。

      • 重写==isSplitable()==方法,返回false不可切割
      • 重写createRecordReader(),创建自定义的RecordReader对象,并初始化
    2. 改写 RecordReader ,实现一次读取一个完整文件封装为 KV 。

      • 采用IO流一次读取一个文件输出到value中,因为设置了不可切片,最终把所有文

      件都封装到了value中

      • 获取文件路径信息+名称,并设置key
    3. 在输出时使用 SequenceFileOutPutFormat 输出合并文件。

如下图:

自定义InputFormat

// 定义类继承 FileInputFormat
public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{/**默认为每一行切割,现在改为不切割*/@Overrideprotected boolean isSplitable(JobContext context, Pathfilename) {return false;//返回false不可切割}/*** Create a record reader for a given split. The framework will call* {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before* the split is used.* @param split the split to be read* @param context the information about the task* @return a new record reader* @throws IOException* @throws InterruptedException*/@Overridepublic RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {WholeRecordReader recordReader = new WholeRecordReader();recordReader.initialize(split, context);return recordReader;}
}

自定义RecordReader类

  • 一次读取一个文件输出到value中,由于如上isSplitable()已经返回false,故直接不分割文件。
  • 获取文件的路径信息与名称作为一个key
/*** @author whai_luo* @Date: 2020/6/2 14:12*/
public class WholeRecordReader extends RecordReader<Text, BytesWritable> {private FileSplit split;private Configuration configuration;//返回结果值private Text k = new Text();private BytesWritable v = new BytesWritable();private boolean isProgress = true;/*** 切片获取,内容初始化初始化方法,框架会在读取切片数据之前调用此方法,因此,一些在RecordReader工作时需要使用的资源可以在此方法中进行初始化(这些资源必须是可以在inputSplit和taskAttemptContext中可以获取到的)。* @param split 当前RecordReader正在处理的切片* @param context 当前Job的上下文,可以通过此对象获取job的配置对象* @throws IOException* @throws InterruptedException*/@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {this.split = (FileSplit) split;configuration= context.getConfiguration();}/*** key*    文件路径* value*    文件此方法用于读取下一组key-value,类似于迭代器,如果读到数据返回true * Read the next key, value pair.* @return true if a key/value pair was read* @throws IOException* @throws InterruptedException*/@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {//isProgressif (isProgress) {//1.自定义缓存区byte[] contexts = new byte[(int) split.getLength()];FSDataInputStream fsDataInputStream = null;FileSystem fileSystem = null;try {//2.读取文件系统//文件路径Path path = split.getPath();//文件系统获取fileSystem= path.getFileSystem(configuration);//3.从文件系统读取数据fsDataInputStream = fileSystem.open(path);//4.读取文件到缓存区 readFully全部读取IOUtils.readFully(fsDataInputStream,contexts,0,contexts.length);//5.输出文件内容 valuesv.set(contexts,0,contexts.length);//6.文件路径+名称获取String name = split.getPath().toString();//输出k和vk.set(name);} catch (IOException e) {e.printStackTrace();}finally {//流关闭IOUtils.closeStream(fsDataInputStream);}isProgress=false;return true;}return false;}
}

Mapper

直接输出

@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write(key, value);
}

Reduce

直接输出

/*** @author whai_luo*/
public class SequenceFileReducer extends Reducer<Text, BytesWritable,Text,BytesWritable> {@Overrideprotected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {context.write(key,values.iterator().next());}
}

Job

job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);

1.InputFormat是用于处理各种数据源的。下面实现自定义的InputFormat,数据源是来自于内存。
1.1 在程序的job.setInputFormatClass(MySelfInputFormat.class);
1.2 实现InputFormat extends InputFormat<k,v>,实现其中的2个方法,分别是getSplits(…)和createRecordReader(…)
1.3 getSplits(…)返回的是java.util.List,里面中的每个元素是InputSplit。每个InputSpilt对应一个mapper任务。
1.4 InputSplit是对原始海量数据源的划分。本例中是在内存中产生数据,封装到InputSplit中。
1.5 InputSplit封装的必须是hadoop数据类型,实现Writable接口。
1.6 RecordReader读取每个InputSplit中的数据,解析成一个个的<k,v>,供map处理。
1.7 RecordReader有4个核心方法,分别是initialize(…),nextKeyValue(),getCurrentKey()和getCurrentValue()。
1.8 initialize(…)的重要性在于拿到InputSplit和定义临时变量。
1.9 nextKeyValue(…)方法的每次调用可以获得key和value值
1.10 当nextKeyValue(…)调用后,紧接着调用getCurrentKey()和getCurrentValue()。
————————————————
版权声明:本文为CSDN博主「波哥的技术积累」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/woshisap/article/details/42275305

总结

总结来说,InputFormat是将文件切片----->再转化为对转交给Mapper处理。在InputFormat类中只有两个方法,一个负责切片,一个返回能将切片信息转化为相应的键值对的对象

而当使用KeyValueInputFormat并设置分隔符后,Mapper以分隔符前的内容作为Key来接收,以分隔符后面的内容作为Value来接收。那么在数据提交到Mapper之前,数据就必须被格式化为满足Mapper接收的格式,这个工作就是由InputFormat来完成的,而InputFormat实际上并不能完成这项工作,而是创建一个RecordReader来完成这项转换工作

Partition分区

  • 默认Partition 默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
/** * Partition keys by their {@link Object#hashCode()}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {public void configure(JobConf job) {}/** Use {@link Object#hashCode()} to partition. */public int getPartition(K2 key, V2 value,int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}

自定义Partition流程

  1. 继承Partition,重写getPartition()方法
  2. job设置Partition
  3. job设置ReduceTask数量
  • 分区数量

    • 如果ReduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
    • 如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
    • 如 果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件 part-r-00000;

例如:假设自定义分区数为5,则

job.setNumReduceTasks(1); 会正常运行,只不过会产生一个输出文件

job.setNumReduceTasks(2); 会报错

job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件

案例

需求:安照要求将文本内容输出到不同文件中。

  • 输入数据 电话号码文本

    13736230513 2481 24681

    13956435636 132 1512

    13846544121 264 0

    13630577991 6960 690

    13560439638 918 4938

  • 期望输出 5个输出文件

  • Partition分区

    /*** @author whai_luo* @Date: 2020/6/3 13:09*/
    public class ProvincePartitioner extends Partitioner<Text,FlowBean> {@Overridepublic int getPartition(Text text, FlowBean flowBean, int numPartitions) {//获取前3位号码String preNum = text.toString().substring(0, 3);int partition = 4;//判断省份if ("136".equals(preNum)) {partition = 0;}else if ("137".equals(preNum)) {partition = 1;}else if ("138".equals(preNum)) {partition = 2;}else if ("139".equals(preNum)) {partition = 3;}//返回结果return partition;}
    }
    
  • Mapper

    /***  输入数据格式:*   13736230513 2481 24681* 13956435636 132 1512* 13846544121 264 0* 13630577991 6960 690* 13560439638 918 4938*** @author whai_luo* @Date: 2020/5/20 13:26*/
    public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {FlowBean v = new FlowBean();Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取value值String line = value.toString();//切割获取字段String[] fields = line.split(" ");//封装对象//0为手机号码 ...String phoneNum = fields[0];//获取上下行流量long upFlow = Long.valueOf(fields[fields.length-2]);long downFlow = Long.valueOf(fields[fields.length-1]);//封装上传下载流量到FlowBeank.set(phoneNum);v.setDownFlow(downFlow);v.setUpFlow(upFlow);//mapper输出 <134....,1116,954>//<134....,FlowBean>context.write(k, v);}
    }
    
  • Reduce

  • Job

    public static void main(String[] args) throws Exception {//配置信息 job对象实例Configuration conf = new Configuration();Job job = Job.getInstance(conf);//入口jar包本地路径job.setJarByClass(FlowSumDriver.class);//MapperReduce配置job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);//输出格式指定job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//指定自定义数据分区job.setPartitionerClass(ProvincePartitioner.class);//同时指定相应数量的 reduce taskjob.setNumReduceTasks(5);//输入路径for (int i = 0; i < args.length-1 ; i++) {FileInputFormat.setInputPaths(job,new Path(args[i]));}FileOutputFormat.setOutputPath(job,new Path(args[args.length-1]));//将job和相关jar和类交给yarn运行System.exit(job.waitForCompletion(true)?0:1);}
    }
    
  • 输出结果

Combiner

  • Combiner是MR程序中Mapper和Reducer之外的一种组件。

  • Combiner组件的父类就是Reducer。

  • Combiner和Reducer的区别在于运行的位置

    • Combiner是在每一个MapTask所在的节点运行;
  • Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。

  • Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。

自定义Combiner方法

  1. 自定义一个 Combiner 继承 Reducer,重写 Reduce 方法

  2. job设置job.setCombinerClass(....Combiner.class)

案例

  • 需求: 对每一个MapTask的输出局部汇总Combiner

  • 方案1 Combiner继承Reducer

/*** @author whai_luo* @Date: 2020/6/3 21:29*/
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {IntWritable value = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {//汇总int sum = 0;for (IntWritable value : values) {sum += value.get();//每次加1}context.write(key, value);}
}
  • 方案2 将Reducer直接作为Combiner
// 指定需要使用 Combiner,以及用哪个类作为 Combiner 的逻辑
job.setCombinerClass(WordcountReducer.class);

总结

1、Hadoop MR 框架的 Combiner 机制有什么作用?

对Reduce前的MapTask的输出进行局部汇总,以减少网络传输量,“reduce的输入每个key所对应的value将是一大串1,但处理的文本很多时,这一串1已将占用很大的带宽,如果我们在map的输出给于reduce之前做一下合并或计算,那么传给reduce的数据就会少很多,减轻了网络压力。”

2 .自定义Combiner步骤

  1. Combiner继承Reduce,重写reduce()
  2. job设置Combiner

说白了,就是在Mapper后增加一个Combiner过程,构成整个MapTask,这个Combiner需要继承自Reduce

参考job内如下方法:

/*** Set the combiner class for the job.* @param cls the combiner to use* @throws IllegalStateException if the job is submitted*/
public void setCombinerClass(Class<? extends Reducer> cls) throws IllegalStateException {ensureState(JobState.DEFINE);conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
}

GroupingComparator 分组(辅助排序)

案例

  • 需求:输出如下的每个订单中最贵的商品

    订单id 商品id 金额

    0000001 Pdt_01 222.8
    0000002 Pdt_05 722.4
    0000001 Pdt_02 33.8
    0000003 Pdt_06 232.8
    0000003 Pdt_02 33.8
    0000002 Pdt_03 522.8
    0000002 Pdt_04 122.4

    • 利用“订单 id 和成交金额”作为 key,可以将 Map 阶段读取到的所有订单数据按照 id 升序排序,如果 id 相同再按照金额降序排序,发送到 Reduce。
    • 在 Reduce 端利用 groupingComparator 将订单 id 相同的 kv 聚合成组,然后取第一个即是该订单中最贵商品,

  • bean

    注意compareTo,按照id升序排序,相同id,比较价格

    public class OrderBean implementsWritableComparable<OrderBean> {private int order_id;private double price;/*** 订单数据按* 照 id 升序排序,如果 id 相同再按照金额降序排序* @param o* @return*/@Overridepublic int compareTo(OrderBean o) {if (order_id > o.getOrder_id()){//this大,往后return 1;}else if (order_id < o.getOrder_id()){//this小,往前return -1;}else {//id相同 this.price大 往前return price>o.getPrice()?-1:1;}}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(order_id);out.writeDouble(price);}@Overridepublic void readFields(DataInput in) throws IOException{order_id = in.readInt();price = in.readDouble();}
    }
    
  • Mapper

    /**** 输入数据* 0000001 Pdt_01 222.8* 0000002 Pdt_05 722.4* 0000001 Pdt_02 33.8* 0000003 Pdt_06 232.8* 0000003 Pdt_02 33.8* 0000002 Pdt_03 522.8* 0000002 Pdt_04 122.4* 输出<bean,null>* @author whai_luo* @Date: 2020/6/4 16:46*/
    public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {OrderBean oBean = new OrderBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//切割数据String line = value.toString();String[] s = line.split("\t");//获取字段//封装beanoBean.setOrder_id(Integer.parseInt(s[0]));oBean.setPrice(Double.parseDouble(s[2]));context.write(oBean,NullWritable.get());}
    }
    
  • GroupingComparator

    我们需要欺骗Reducer 只靠order_id进行分组,不需要price

/****Reduce类中的reduce方法中key一样,values有多个,是什么情况下的key是一样的,* 能不能自定义。其实这就是 GroupingComparator分组(辅助排序)的作用。** 如果 id 相同再按照金额降序排序* 自定义分组规则 订单号一样的来到同一个分组中*分组:发生在数据调用reduce方法之前 相同key的作为一组去调用** 默认相同的key即OrderReducer相同才会分配到相同的Reducer* 我们需要欺骗Reducer 只靠order_id进行分组,不需要price** @author whai_luo* @Date: 2020/6/4 18:46*/
public class OrderSortGroupingComparator extends WritableComparator {/*** Compare two WritableComparables.** <p> The default implementation uses the natural ordering, calling {@link* Comparable#compareTo(Object)}.* @param a* @param b* @return*/@Overridepublic int compare(WritableComparable a,WritableComparable b) {OrderBean aBean = (OrderBean) a;OrderBean bBean = (OrderBean) b;int result;if (aBean.getOrder_id() > bBean.getOrder_id()) {result = 1;} else if (aBean.getOrder_id() < bBean.getOrder_id()) {result = -1;} else {result = 0;}return result;}/***  ??*/protected OrderSortGroupingComparator() {super(OrderBean.class,true);}}

自己瞎写的公众号与博客

普通二本数据科学与大数据技术专业菜鸟一个,望各位大神多多指导!互相学习进步!

whai的个人博客 whaifree.top 欢迎留言!

MapReduce强化实验相关推荐

  1. Hadoop安装实验及MapReduce编程实验指导

    实验环境:Red Hat 6.5.Hadoop-2.6.5  JDK1.7.0版本 具体参考实验指导书,本文档做辅助工作,详细命令请看教学实验指导书. 1.Hadoop安装实验 准备工作 配置主机名 ...

  2. MapReduce计数器实验

    一.实验目的 通过实验掌握基本的MapReduce编程方法: 掌握用MapReduce解决一些常见的数据处理问题,编写计数器程序. 二.实验平台 操作系统:Linux(建议CentOS6.5): Ha ...

  3. 实验1:MapReduce课程实验

    1.本地创建一个文件,"words.txt"   ,上传到hdfs:  "/neusoftin" 下 package hdfs;import java.io.I ...

  4. java mapreduce编程_Hadoop实验——MapReduce编程(1)

    实验目的 通过实验掌握基本的MapReduce编程方法. 掌握用MapReduce解决一些常见的数据处理问题,包括数据去重.数据排序和数据挖掘等. 通过操作MapReduce的实验,模仿实验内容,深入 ...

  5. 【大数据实验】06:MapReduce操作

    MapReduce操作 OVERVIEW MapReduce操作 实验环境 一.WordCount单词计数 1.实验内容 2.实验原理 3.实验步骤 (1)启动Hadoop集群 (2)准备数据文件 ( ...

  6. Hadoop——实验七:MapReduce编程实践

    文章目录 一. 实验目的 二. 实验内容 三. 实验步骤及结果分析  1. 基于ubuntukylin14.04(7)版本,安装hadoop-eclipse-kepler-plugin-2.6.0.j ...

  7. c语言程序设计实验教学,C语言程序设计实验教学改革研究

    语言程序设计是目前各高校计算机及其相关专业的一门基础课程,也是其它工科专业学生必修的计算机基础课程之一,是学生学习程序设计的入门课程.同时,该课程是一门应用性很强的学科.随着计算机技术飞速发展,计算机 ...

  8. 重磅 | 教育部:2023年前将实验操作纳入考试,积极探索跨学科(STEAM)教育

    编者按:教育部发布的一系列政策文件,小编认为都是指向素质教育这个终极目标,只有不断推进教育创新才能培养出源源不断的创新人才,来应对人工智能时代的巨大挑战.这些政策将对于STEAM教育行业产生重大影响, ...

  9. 【华为ICT大赛2022-2023-----云赛道】加分项-沙箱实验流程及实验步骤

    参加云赛道的小伙伴们可要记着报名加分项,可是200分啊,点击查看视频步骤,轻轻松松拿到手!为我们大赛助力加油!!! 报名点击下方链接 一.华为云大赛平台 点击下面链接观看视频 华为ICT大赛[2022 ...

  10. 迷信、强化学习与认知的若干思考

    首先在这里祝大家情人节快乐! 说到迷信,我们可能并不陌生,迷信作为一个已经被我们贴上颜色标签的词,我们每个人都会有特殊的反应.说到强化学习,我们可能也并不陌生,作为人工智能博弈类的常用方法,至今还并未 ...

最新文章

  1. php mysql不大小写吗,PHP+MYSQL大小写有关问题
  2. poj2387TillCowsComHome Dijlstra
  3. 智·御未来 亚信安全巡展·2017即将起航
  4. 如何选择商业智能BI工具
  5. 通过创建制定版本react-native项目解决“Unable to resolve module `AccessibilityInfo` ”的问题...
  6. iframe父页面和子页面高度自适应
  7. markdown 书写代码
  8. DIV CSS设计时IE6、IE7、FF 与兼容性有关的特性
  9. 洛谷P2671 求和 数学 前缀和
  10. 服务器远程桌面 设置 时间,如何设置使远程桌面连接时间变长?
  11. 【精选】抽奖点名、随机抽奖PPT素材合集,年会、老师必备
  12. 使用Keras-yolov3来识别滑动验证码
  13. MCU通过UART实现OTA在线升级流程
  14. 傻妞机器人对接TG【无需QQ】
  15. 星光嵌入式WM8978音频模块发布
  16. 计算机网络.第三节课.笔记.信道复用、频分复用、统计时分复用、波分复用、时分复用、码分复用、双绞线带宽、双绞线
  17. Android 获取手机存储空间
  18. 惠普喷墨打印机卡纸了
  19. python读conf配置文件_python读写配置文件操作示例
  20. 线上展厅vr展厅虚拟解说 广州商迪

热门文章

  1. matlab 数组 冒号,matlab中冒号的用法
  2. F5学习——Part 1(基于LTM模块的基本组网模式)
  3. Android开发之隐示意图跳转
  4. ACE_TAO 017 ACE_wrappers\examples\Reactor
  5. html submit提交事件,submit方法与onsubmit事件
  6. 如何对CSDN中的博客和博客文章进行修改和管理
  7. 卡拉赞服务器延迟,卡拉赞开荒详细功略(前门)
  8. mysql服务器cpu使用率过高100%
  9. 转载大牛对Microsoft的认识
  10. OSChina 周三乱弹 ——carlos 你和你的电脑怎么过的幸福?