Partitioner

public abstract class Partitioner<KEY, VALUE>

(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

key.hashCode()% numReduceTasks;

按照我们的需求:3分区
13*
15*
其他*

reducer = 分区数 1
reducer > 分区数 产生很多空的没用的文件
200 200 199
reducer < 分区数

自定义分区器

package com.ccj.pxj.phone.partition;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Access implements Writable {private  String phone;private  long up;private  long down;private  long sum;public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}public long getUp() {return up;}public void setUp(long up) {this.up = up;}public long getDown() {return down;}public void setDown(long down) {this.down = down;}public Access(String phone, long up, long down) {this.phone = phone;this.up = up;this.down = down;this.sum=up+down;}public Access() {}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(phone);out.writeLong(up);out.writeLong(down);out.writeLong(sum);}@Overridepublic void readFields(DataInput in) throws IOException {this.phone= in.readUTF();this.up= in.readLong();this.down=in.readLong();this.sum=in.readLong();}@Overridepublic String toString() {returnphone + '\t' +up +"\t" + down +"\t" + sum ;}
}
package com.ccj.pxj.phone.partition;
import com.ccj.pxj.phone.utils.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class SerDriver {public static void main(String[] args) throws Exception {String input = "data/phone_data .txt";String output = "out";// 1)获取Job对象Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);FileUtils.deleteOutput(configuration, output);// 2)本job对应要执行的主类是哪个job.setJarByClass(SerDriver.class);// 3)设置Mapper和Reducerjob.setMapperClass(MyMaper .class);job.setReducerClass(MyReduce.class);// 4)设置Mapper阶段输出数据的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Access.class);
//        设置自定义分区器job.setPartitionerClass(PxjPartition.class);
//        设置reduce个数=>决定最终文件的个数job.setNumReduceTasks(3);// 5)设置Reducer阶段输出数据的类型job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(Access.class);// 6)设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(input));FileOutputFormat.setOutputPath(job, new Path(output));// 7)提交作业boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}public static class  MyMaper extends Mapper<LongWritable, Text,Text, Access>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] data = value.toString().split("\t");String phone = data[1];// 上行流量long up = Long.parseLong(data[data.length - 3]);// 下行流量long down = Long.parseLong(data[data.length - 2]);context.write(new Text(phone),new Access(phone,up,down));}}public  static  class MyReduce extends Reducer<Text, Access, NullWritable, Access>{@Overrideprotected void reduce(Text key, Iterable<Access> values, Context context) throws IOException, InterruptedException {long ups=0;long downs=0;for (Access value : values) {ups+=value.getUp();downs+=value.getDown();}context.write(NullWritable.get(),new Access(key.toString(),ups,downs));}}
}
package com.ccj.pxj.phone.partition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PxjPartition  extends Partitioner<Text,Access> {@Overridepublic int getPartition(Text text, Access access, int numPartition) {String phone = text.toString();/* if("13".equals(phone)){return  0;}else if("15".equals(phone)){return 1;}else {return 2;}}*/if(phone.startsWith("13")){return 0;}else if(phone.startsWith("15")){return 1;}else{return 2;}}
}

说明:
reduce个数=>决定最终文件的个数
reducer = 分区数 1
reducer > 分区数 产生很多空的没用的文件
200 200 199
reducer < 分区数 报错

Combiner

map: MapTask
reduce:ReduceTask
combiner:MapTask
父类是Reducer
对于每个MapTask的输出进行局部汇总/本地聚合
业务逻辑和Reducer是一模一样的
没使用 combiner
Combine input records=0Combine output records=0
使用后Combine input records=12Combine output records=7
package com.ccj.pxj.commine.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable> {IntWritable ONE = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] words = line.split(",");for (String word : words) {context.write(new Text(word),ONE);}}
}
package com.ccj.pxj.commine.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum=0;for (IntWritable value : values) {sum+=value.get();}context.write(key,new IntWritable(sum));}
}
package com.ccj.pxj.commine.wc;
import com.ccj.pxj.phone.utils.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WCDriver {public static void main(String[] args)throws  Exception {String input = "data/1.txt";String output = "out1";// 1)获取Job对象Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);FileUtils.deleteOutput(configuration, output);// 2)本job对应要执行的主类是哪个job.setJarByClass(WCDriver.class);// 3)设置Mapper和Reducerjob.setMapperClass(WCMapper.class);job.setReducerClass(WCReducer.class);// 4)设置Mapper阶段输出数据的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//设置Combinerjob.setCombinerClass(WCReducer.class);// 5)设置Reducer阶段输出数据的类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//// 6)设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(input));FileOutputFormat.setOutputPath(job, new Path(output));// 7)提交作业boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

排序

在MapReduce中,内置的这些数据类型不仅具有Writable的功能
还具有排序的功能
WritableComparable得实现三个方法writereadFieldscompareTo

全排: order by
分区排序:sort by

package com.ccj.pxj.sort;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*** 自定义序列化类且排序*/
public class Traffic implements WritableComparable<Traffic> {private  String phone;private  long up;private  long down;private  long sum;public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}public long getUp() {return up;}public void setUp(long up) {this.up = up;}public long getDown() {return down;}public void setDown(long down) {this.down = down;}public Traffic(String phone, long up, long down) {this.phone = phone;this.up = up;this.down = down;this.sum=up+down;}public Traffic() {}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(phone);out.writeLong(up);out.writeLong(down);out.writeLong(sum);}@Overridepublic void readFields(DataInput in) throws IOException {this.phone= in.readUTF();this.up= in.readLong();this.down=in.readLong();this.sum=in.readLong();}@Overridepublic String toString() {returnphone + '\t' +up +"\t" + down +"\t" + sum ;}@Overridepublic int compareTo(Traffic o) {return this.sum>o.sum?-1:1;}
}
package com.ccj.pxj.sort;
import com.ccj.pxj.phone.utils.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class AllSortDriver {public static void main(String[] args) throws Exception {String input = "data/phone_data .txt";String output = "out";// 1)获取Job对象Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);FileUtils.deleteOutput(configuration, output);// 2)本job对应要执行的主类是哪个job.setJarByClass(AllSortDriver.class);// 3)设置Mapper和Reducerjob.setMapperClass(MyMaper.class);job.setReducerClass(MyReduce.class);// 4)设置Mapper阶段输出数据的类型job.setMapOutputKeyClass(Traffic.class);job.setMapOutputValueClass(Text.class);// 5)设置Reducer阶段输出数据的类型job.setOutputKeyClass(Traffic.class);job.setOutputValueClass(Text.class);// 6)设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(input));FileOutputFormat.setOutputPath(job, new Path(output));// 7)提交作业boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}public static class  MyMaper extends Mapper<LongWritable, Text,Traffic, Text>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] data = value.toString().split("\t");String phone = data[1];// 上行流量long up = Long.parseLong(data[data.length - 3]);// 下行流量long down = Long.parseLong(data[data.length - 2]);context.write(new Traffic(phone,up,down),new Text(phone));}}public  static  class MyReduce extends Reducer< Traffic,Text, Text, Traffic>{/* @Overrideprotected void reduce(Text key, Iterable<Traffic> values, Context context) throws IOException, InterruptedException {long ups=0;long downs=0;for (Traffic value : values) {ups+=value.getUp();downs+=value.getDown();}context.write(NullWritable.get(),new Traffic(key.toString(),ups,downs));}*/@Overrideprotected void reduce(Traffic key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(new Text(value),key);}}}
}
分区内排序
package com.ccj.pxj.sort;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*** 自定义序列化类且排序*/
public class Traffic implements WritableComparable<Traffic> {private  String phone;private  long up;private  long down;private  long sum;public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}public long getUp() {return up;}public void setUp(long up) {this.up = up;}public long getDown() {return down;}public void setDown(long down) {this.down = down;}public Traffic(String phone, long up, long down) {this.phone = phone;this.up = up;this.down = down;this.sum=up+down;}public Traffic() {}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(phone);out.writeLong(up);out.writeLong(down);out.writeLong(sum);}@Overridepublic void readFields(DataInput in) throws IOException {this.phone= in.readUTF();this.up= in.readLong();this.down=in.readLong();this.sum=in.readLong();}@Overridepublic String toString() {returnphone + '\t' +up +"\t" + down +"\t" + sum ;}@Overridepublic int compareTo(Traffic o) {return this.sum>o.sum?-1:1;}
}
package com.ccj.pxj.sort;
import com.ccj.pxj.phone.utils.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PartitionSortDriver {public static void main(String[] args) throws Exception {String input = "data/phone_data .txt";String output = "out";// 1)获取Job对象Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);FileUtils.deleteOutput(configuration, output);// 2)本job对应要执行的主类是哪个job.setJarByClass(PartitionSortDriver.class);// 3)设置Mapper和Reducerjob.setMapperClass(MyMaper.class);job.setReducerClass(MyReduce.class);job.setPartitionerClass(PhonePartitioner.class);job.setNumReduceTasks(3);// 4)设置Mapper阶段输出数据的类型job.setMapOutputKeyClass(Traffic.class);job.setMapOutputValueClass(Text.class);// 5)设置Reducer阶段输出数据的类型job.setOutputKeyClass(Traffic.class);job.setOutputValueClass(Text.class);// 6)设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(input));FileOutputFormat.setOutputPath(job, new Path(output));// 7)提交作业boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}public static class  MyMaper extends Mapper<LongWritable, Text,Traffic, Text>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] data = value.toString().split("\t");String phone = data[1];// 上行流量long up = Long.parseLong(data[data.length - 3]);// 下行流量long down = Long.parseLong(data[data.length - 2]);context.write(new Traffic(phone,up,down),new Text(phone));}}public  static  class MyReduce extends Reducer< Traffic,Text, Text, Traffic>{/* @Overrideprotected void reduce(Text key, Iterable<Traffic> values, Context context) throws IOException, InterruptedException {long ups=0;long downs=0;for (Traffic value : values) {ups+=value.getUp();downs+=value.getDown();}context.write(NullWritable.get(),new Traffic(key.toString(),ups,downs));}*/@Overrideprotected void reduce(Traffic key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(new Text(value),key);}}}
}

OutputFormat

有一类很常见的需求:按照一定的规则把数据给我写到某个文件中去
package com.ccj.outputformat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class PxjRecordWriter extends RecordWriter<Text,NullWritable> {FSDataOutputStream pxjout;FSDataOutputStream wfyout;FileSystem fileSystem;public PxjRecordWriter(TaskAttemptContext context){try{fileSystem=FileSystem.get(context.getConfiguration());pxjout=fileSystem.create(new Path("out/pxj"));wfyout=fileSystem.create(new Path("out/wfy"));}catch (Exception e){e.printStackTrace();}}@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {if(key.toString().contains("pxj")){pxjout.write(key.toString().getBytes());}else{wfyout.write(key.toString().getBytes());}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {IOUtils.closeStream(pxjout);IOUtils.closeStream(wfyout);}
}
package com.ccj.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PxjDataOutputFormat extends FileOutputFormat<Text,NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {return new PxjRecordWriter(job);}
}
package com.ccj.outputformat;
import com.ccj.pxj.phone.utils.FileUtils;
import com.ccj.pxj.sort.PhonePartitioner;
import com.ccj.pxj.sort.Traffic;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PxjDataDriver {public static void main(String[] args) throws Exception {String input = "data/log.txt";String output = "out";// 1)获取Job对象Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);FileUtils.deleteOutput(configuration, output);// 2)本job对应要执行的主类是哪个job.setJarByClass(PxjDataDriver.class);// 3)设置Mapper和Reducerjob.setMapperClass(MyMaper.class);job.setReducerClass(MyReduce.class);//job.setPartitionerClass(PhonePartitioner.class);//  job.setNumReduceTasks(3);// 4)设置Mapper阶段输出数据的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);// 5)设置Reducer阶段输出数据的类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.setOutputFormatClass(PxjDataOutputFormat.class);// 6)设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(input));FileOutputFormat.setOutputPath(job, new Path(output));// 7)提交作业boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}public static class  MyMaper extends Mapper<LongWritable, Text,Text, NullWritable>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(value,NullWritable.get());}}public  static  class MyReduce extends Reducer< Text,NullWritable, Text,NullWritable>{@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {for (NullWritable value : values) {context.write(new Text(key+"\r\n"),value);}}}
}

RecordWriter<Text, NullWritable>怎么拿到呢?
==> 抄源码
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
上下文是一个框,什么东西都往里面装,什么东西都从里面取
源码、源码里面的单元测试 这是我们学习最好的资源
代码具有工业性的参考价值

作者:pxj(潘陈)
日期:2020-01-18

Hadoop的MR编程实现partition、sort和自定义outputformat相关推荐

  1. 图解hadoop的MR计算流程

    图解hadoop的MR计算流程: 1.左边部分是map阶段,右边部分是reduce阶段 1)我们可以看出左边的有四个map task,一般情况下一个map tasl处理一个split的数据,一个spl ...

  2. C语言分区排序partition sort 算法(附完整源码)

    分区排序partition sort 算法 分区排序partition sort 算法的完整源码(定义,实现,main函数测试) 分区排序partition sort 算法的完整源码(定义,实现,ma ...

  3. hadoop之mr案例

    mr案例 (1)创建maven项目 (2)在po,.xml添加下面代码 <dependencies><dependency><groupId>junit</g ...

  4. Mr.J-- jQuery学习笔记(十九)--自定义动画实现图标特效

    之前有写过自定义动画Mr.J-- jQuery学习笔记(十八)--自定义动画 这次实现一个小demo 图标特效 页面渲染 <!DOCTYPE html> <html lang=&qu ...

  5. hadoop 自定义OutputFormat

    前言 在某些业务场景下,需要对原始的数据进行合理的分类输出,减少后续的程序处理数据带来的麻烦,其实这也属于ETL中的一种,比如,我们收集到了一份原始的日志,主体字段为区域编码,需要根据这个编码将这份日 ...

  6. sort函数自定义排序

    sort函数自定义排序 咳咳,自定义排序说实话用的地方还是很多的,像ACM里面,天梯赛里面,特别是天梯,必出这玩意...咳咳,水点字数,咱继续. 首先哈,我们来看一下c++自带的排序: 可以看出哈,默 ...

  7. 大数据入门--hadoop(三)--MR编程

    MR相关内容 InputFormat(负责切片和为mapTask读取数据) 分区(负责将mapTask阶段的KV,按照分区进入不同的ReduceTask.分区数量小于等于ReduceTask数量,即R ...

  8. hadoop jar包_【大数据学习】Hadoop的MR分布式开发小实战

    前提:hadoop集群应部署完毕. 一.实战科目 做一个Map Reduce分布式开发,开发内容为统计文件中的单词出现次数. 二.战前准备 1.本人在本地创建了一个用于执行MR的的文件,文件中有209 ...

  9. mr编程实现手机流量统计和读取MySQL数据

    一.MapReduce编程规范 Mapper阶段extends Mapper<LongWritable, Text, Text, IntWritable>LongWritable, Tex ...

最新文章

  1. 网关 0.0.0.0_久违的升级——全新米家智能多模网关
  2. Java程序启动同时复制resources下文件到jar包同级目录
  3. python opencv cv2.cvtColor()方法(将图像从一种颜色空间转换为另一种颜色空间)(转换成灰度图)
  4. 敏捷转型该怎么转?来看看这本书怎么说的吧
  5. 川崎焊接机器人编程实例_机器人现场编程-川崎机器人示教-综合命令.pptx
  6. struts2的struts.xml的详细配置1
  7. 如何在乌版图系统添加拼音输入法!
  8. 调用函数的ALV、面向对象的ALV设置带选择列
  9. Oracle instr用法
  10. 拓端tecdat|tableau的骑行路线地理数据可视化
  11. Android中免Root实现Hook的Dexposed框架实现原理解析以及如何实现应用的热修复
  12. linux强制android横屏,Android 强制横屏的方法
  13. 无偿加班竟然被开除,程序员坐不住了
  14. linux ltp,LTP
  15. 游戏密码123456问题
  16. 实时数据库简介和比较---PI
  17. 计算机专业毕业后的工作方向
  18. linux下创建用户分组及设置分组权限
  19. 机器视觉光源的作用及分类
  20. 路由器在接一个路由器设置方法

热门文章

  1. Python爬虫之爬取起点中文网
  2. mysql字段自动计算_《MySQL必知必会》计算字段
  3. sql选取连续三天登录的用户
  4. 数组排序(O(n的二次方))
  5. awk 指定列后面插入一列
  6. 管理领英邮件类型和频率
  7. 【托业】【跨栏】TEST06
  8. ndk开发环境!我在华为做Android外包的真实经历!顺利通过阿里Android岗面试
  9. 【MATLAB】分时段分类汇总代码
  10. java数据库中间件实现,分布式数据库中间件DDM的实现原理