MapReduce编程模板

1.自定义 Mapper类继承类并重写map方法:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{String[] split = value.toString().split(",");//比如每一行aa   bb  cc,根据逗号, 切分后,把字符串为key,value为1for (String word : split) {Text text = new Text(word);IntWritable num = new IntWritable(1);context.write(text, num);}}
}
//hadoop对应java数据类型
Java Hadoop
int IntWritable
long LongWritable
string Text
byte ByteWritable
double DoubleWritable
float FloatWritable
boolean BooleanWritable
null NullWritable

​ 自己定义的需要序列化和反序列化可以通过实现 Writable接口来使用。

​ 在重写map方法时,如果中间处理数据时将类型转化为Java的数据类型,将结果写入上下文对象Context,要重新转为Hadoop的类型。

2.自定义Reducer类集成Reducer,并重写Reduce方法

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {//循环遍历values  求和int sum=0;for(IntWritable v:values){//mapreduce的框架已经帮我们做好了从map出来后已经做好按key分组,// 也就是到这里的,Iterable<IntWritable> values 是同一个单词的数量迭代器,进行相加就可以得到最后的数量//类似于{"aa":[1,1,1,1,1]},所以统计aa单词出现的个数的话,只需要将迭代器中的[1,1,1,1,1]相加就可以得出总数sum+=v.get();}context.write(key, new IntWritable(sum));}
}

3.Driver 主入口,整合mapper和reducer

(1) 配置conf并开启一个job

(2) 指定mapper类和reducer类

(3) 设置map输出key value的类型和设置reduce输出key value的类型

(4) 创建输入流FileInputFormat设置输入的hdfs的指定位置

(4) 创建输出流FileOutputFormat 将结果输出的hdfs的指定位置

(5) job提交语句:job.waitForCompletion(true) ,true表示需要打印日志

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();// 设置默认hdfs访问路径conf.set("fs.defaultFS", "hdfs://master:9000");// 设置Windows跨平台提交job的参数// conf.set("mapreduce.app-submission.cross-platform","true");conf.set("mapred.job.tracker", "master:54311");// 配置访问用户System.setProperty("HADOOP_USER_NAME", "root");//创建一个jobJob job = Job.getInstance(conf);job.setJarByClass(test.class);job.setMapperClass(CountMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);job.setReducerClass(CountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPath(job,new Path("hdfs://master:9000/data/stu_score_sub.csv"));Path out = new Path("hdfs://master:9000/output");FileSystem fs = FileSystem.get(conf);if (fs.exists(out)){fs.delete(out,true);}//配置输入输出的路径FileOutputFormat.setOutputPath(job,out);job.waitForCompletion(true);}

MapReduce各个情景实战

1.多个输入

求每个同学每科成绩的总分
chinese.txt

english.txt

math.txt

Student.java

注意:序列化/反序列化机制:当自定义了一个类之后,如果想要产生的对象在hadoop中进行传输,那么需要这个类实现Hadoop提供的Writable的接口只需要将按序写出并进行序列化/反序列化

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;public class Student implements Writable{  private String name;    private Integer chinese;    private Integer english;    private Integer math;   public String getName() {return name;   }   public void setName(String name) {this.name = name;}   public Integer getChinese() {return chinese;}   public void setChinese(Integer chinese) {this.chinese = chinese;}  public Integer getEnglish() {return english;}   public void setEnglish(Integer english) {this.english = english;}  public Integer getMath() {return math;}public void setMath(Integer math) {this.math = math;}   @Override  // 反序列化 public void readFields(DataInput input) throws IOException {        this.name = input.readUTF();       this.chinese = input.readInt();        this.english = input.readInt();        this.math = input.readInt();   }   @Override// 序列化public void write(DataOutput output) throws IOException {       output.writeUTF(name);      output.writeInt(chinese);       output.writeInt(english);       output.writeInt(math);  }   @Override  public String toString() {      return "Student [name=" + name + ", chinese=" + chinese + ", english=" + english + ", math=" + math + "]";    }
}

Mapper代码

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**统计每个人三科的每科各月的总成绩 * key : 姓名 * value : student * Map : 映射数据 *  * Mapper 数量 = 切片的数量  */public class ScoreMapper extends Mapper<LongWritable, Text, Text, Student> { @Override  protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text,Student>.Context context)    throws IOException, InterruptedException {      // 文件名称,根据文件名称判断成绩是哪个科目的FileSplit split = (FileSplit) context.getInputSplit();      Student student = new Student();   // 每行的内容        // 1 zhang 89  月份 姓名 成绩 if (split.getPath().getName().equals("chinese.txt")) {student.setChinese(Integer.valueOf(score));         student.setEnglish(0);          student.setMath(0);     } else if (split.getPath().getName().equals("math.txt")) {student.setEnglish(Integer.valueOf(score));         student.setMath(0);         student.setChinese(0);} else if (split.getPath().getName().equals("english.txt")) {student.setMath(Integer.valueOf(score));           student.setChinese(0);          student.setEnglish(0);  }String lineContent = value.toString() ;       String [] datas = lineContent.split(" ");        String name = datas[1];        String score = datas[2];           student.setName(name);      context.write(new Text(name), student); }}

上面用到的FileSplit类用法

 FileSplit fs = new FileSplit();String pathname=fs.getPath().getName(); //获取目录名字int depth = fs.getPath().depth();       //获取目录深度fs.getClass(); //获取当前类long length = fs.getLength(); //获取文件长度  SplitLocationInfo[] locationInfo =fs.getLocationInfo(); //获取位置信息String[] locations = fs.getLocations(); //获取位置

Reducer代码

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ScoreReducer extends Reducer<Text, Student, Text, Student> { @Override  protected void reduce(Text key, Iterable<Student> values, Reducer<Text, Student, Text, Student>.Context context) throws IOException, InterruptedException {     Student student = new Student();       student.setName(key.toString());        Integer chinese = 0;       Integer english = 0;       Integer math = 0;      for(Student stu : values){          chinese = chinese + stu.getChinese();         english = english + stu.getEnglish();         math = math + stu.getMath();      }       student.setChinese(chinese);        student.setEnglish(english);        student.setMath(math);      context.write(key, student);    }
}

Driver代码

public class ScoreDriver {   public static void main(String[] args) throws ClassNotFoundException,IOException,InterruptedException {         Configuration conf = new Configuration();      Job job = Job.getInstance(conf);       job.setJarByClass(ScoreDriver.class);       job.setMapperClass(ScoreMapper.class);      job.setReducerClass(ScoreReducer.class);        job.setMapOutputKeyClass(Text.class);       job.setMapOutputValueClass(Student.class);      job.setOutputKeyClass(Text.class);      job.setOutputValueClass(Student.class); // 读取路径下的所有文件,此时 result 文件夹不存在                           FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.76.131:9000/score"));                         FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.76.131:9000/score/result"));                 job.waitForCompletion(true);    }
}

2.排序

根据电影热度对电影排序
惊天破 72
机械师2 83
奇异博士 67
但丁密码 79
比利林恩的中场战事 84
侠探杰克:永不回头 68
龙珠Z:复活的弗利萨 79
长城 56

Mapper 排序是根据KEY值进行排序的,所以 PO类作为KEY值

MovieBean.java

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MovieBean implements WritableComparable<MovieBean>{  private String name;    private Integer hotNum; public String getName() {return name;}  public void setName(String name) {this.name = name;}   public Integer getHotNum() {return hotNum;} public void setHotNum(Integer hotNum) {this.hotNum = hotNum;}  @Override  public void readFields(DataInput input) throws IOException {        this.name = input.readUTF();       this.hotNum = input.readInt(); }   @Override  public void write(DataOutput output) throws IOException {       output.writeUTF(this.name);     output.writeInt(this.hotNum);   }   @Override  public String toString() {      return "MovieBean [name=" + name + ", hotNum=" + hotNum + "]";  }// 降序排序:旧对象 - 当前对象  @Override  public int compareTo(MovieBean o) {     //return Integer.compare(o.getHotNum(), this.getHotNum())return o.getHotNum() - this.getHotNum();   }
}

继承WritableComparable接口,重写 compareTo()函数,定义比较结果

Mapper代码

public class SortMapper extends Mapper<LongWritable, Text, MovieBean, NullWritable> {  @Override  protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,MovieBean, NullWritable>.Context context) throws IOException, InterruptedException {      String line = value.toString();        String [] datas = line.split(" ");       MovieBean movieBean = new MovieBean();     movieBean.setName(datas[0]);        movieBean.setHotNum(Integer.valueOf(datas[1]));     context.write(movieBean, NullWritable.get());   }
}

Driver代码

public class SortDriver {    public static void main(String[] args) throws IllegalArgumentException,IOException, ClassNotFoundException, InterruptedException {      Configuration conf = new Configuration();      Job job = Job.getInstance(conf);       job.setJarByClass(SortDriver.class);        job.setMapperClass(SortMapper.class);       job.setMapOutputKeyClass(MovieBean.class);      job.setMapOutputValueClass(NullWritable.class);     FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.76.131:9000/sort"));                          FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.76.131:9000/sort/result"));                  job.waitForCompletion(true);    }
}

3.多层MR处理 多 Job 串联

一个稍复杂点的处理逻辑往往需要多个 MapReduce 程序串联处理,多 job 的串联可以借助 MapReduce 框架的 JobControl 实现

在第一层MR处理基础上
添加第二个JOB处理第一个JOB的运行结果
例子:
计算每人3个月的总收入并排序

第一个MR:计算每人的总收入
第二个MR:按照收入进行排序Mapper

以下有两个 MapReduce 任务,分别是 Flow的 SumMR 和 SortMR,其中有依赖关系:SumMR 的输出是 SortMR 的输入,所以 SortMR 的启动得在 SumMR 完成之后

    public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job jobsum = Job.getInstance(conf);jobsum.setJarByClass(RunManyJobMR.class);jobsum.setMapperClass(FlowSumMapper.class);jobsum.setReducerClass(FlowSumReducer.class);jobsum.setMapOutputKeyClass(Text.class);jobsum.setMapOutputValueClass(Flow.class);jobsum.setCombinerClass(FlowSumReducer.class);jobsum.setOutputKeyClass(Text.class);jobsum.setOutputValueClass(Text.class);FileInputFormat.setInputPaths(jobsum, "d:/flow/input");FileOutputFormat.setOutputPath(jobsum, new Path("d:/flow/output12"));Job jobsort = Job.getInstance(conf);jobsort.setJarByClass(RunManyJobMR.class);jobsort.setMapperClass(FlowSortMapper.class);jobsort.setReducerClass(FlowSortReducer.class);jobsort.setMapOutputKeyClass(Flow.class);jobsort.setMapOutputValueClass(Text.class);jobsort.setOutputKeyClass(NullWritable.class);jobsort.setOutputValueClass(Flow.class);FileInputFormat.setInputPaths(jobsort, "d:/flow/output12");FileOutputFormat.setOutputPath(jobsort, new Path("d:/flow/sortoutput12"));ControlledJob sumcj = new ControlledJob(jobsum.getConfiguration());ControlledJob sortcj = new ControlledJob(jobsort.getConfiguration());sumcj.setJob(jobsum);sortcj.setJob(jobsort);// 设置作业依赖关系sortcj.addDependingJob(sumcj);JobControl jc = new JobControl("flow sum and sort");jc.addJob(sumcj);jc.addJob(sortcj);Thread jobThread = new Thread(jc);jobThread.start();while(!jc.allFinished()){Thread.sleep(500);}jc.stop();}

4.TopN算法-自定义 GroupComparator

输入文件格式

algorithm,liuyifei,75,85,62,48,54,96,15
computer,huangjiaju,85,75,86,85,85
english,liuyifei,76,95,86,74,68,74,48
english,huangdatou,48,58,67,86,15,33,85
algorithm,huanglei,76,95,86,74,68,74,48
algorithm,huangjiaju,85,75,86,85,85,74,86
computer,huangdatou,48,58,67,86,15,33,85

输出文件格式

k=3, 按课程分4个文件,每个文件保存平均成绩前3的人名和平均成绩

algorithm huangjiaju的成绩:62.0
algorithm liutao的成绩:56.57
algorithm huanglei的成绩:55.89

实现Comparable接口的比较类MyCom

static class MyCom implements Comparable<MyCom>{//首字段为人名,次字段为平均成绩private String tname;private Double tscore;//自动生成getset方法public String getTname() {return tname;}public void setTname(String tname) {this.tname = tname;}public Double getTscore() {return tscore;}public void setTscore(Double tscore) {this.tscore = tscore;}@Overridepublic int compareTo(MyCom o) {//对传入的平均成绩进行比较return this.tscore.compareTo(o.getTscore());}}

Map代码

static class TopMaper extends Mapper<LongWritable,Text,Text,Text> {//输入类型为<偏移量,一行文本>,输出类型为<Text,Text>private Text mkey=new Text();private Text mvalue=new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {System.out.println("map");//按,拆分当前行字符串String[] lines=value.toString().split(",");//第一个字符串为课程,写入输出keymkey.set(lines[0]);//过滤为空的非法数据if (lines==null||lines.length<1){return;}//按下标得到[2]以后所有字符串,转换为double求和double sum=0;for (int i=2;i<lines.length;i++){sum+=new Double(lines[i]);}//DecimalFormat规定小数点后保留两位DecimalFormat df=new DecimalFormat("0.00");//输出value为人名,平均成绩mvalue.set(lines[1]+","+df.format (sum/lines.length-2));context.write(mkey,mvalue);}
}

Reduce代码

static class TopReduceer extends Reducer<Text,Text,Text,Text> {private Text rvalue=new Text();@Overrideprotected void reduce(Text mkey, Iterable<Text> iter, Context context) throws IOException, InterruptedException {System.out.println("reduce");//将MyCom类放入List,通过ArrayList实现List<MyCom> slist=new ArrayList<>();//遍历传入的人名和成绩for (Text it:iter){//按,拆分String[] lines = it.toString().split(",");MyCom c=new MyCom();c.setTname(lines[0]);//写入人名c.setTscore(new Double(lines[1]));//写入平均成绩//将写好的MyCom放入Listslist.add(c);}//Collections.sort实现对列表的升序排序Collections.sort(slist);//Collections.reverse反转升序后的元素,即降序Collections.reverse(slist);//topk个元素,即输出平均成绩最高的前3条记录for (int k=0;k<3;k++){MyCom s = slist.get(k);rvalue.set(s.getTname() + "的成绩:" + s.getTscore());context.write(mkey, rvalue);}}
}

执行主方法

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration cfg=new Configuration();Job job = Job.getInstance(cfg);//设置主方法所在的类job.setJarByClass(Topk.class);job.setMapperClass(TopMaper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(TopReduceer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//设置reducetask数量为4,默认为1job.setNumReduceTasks(4);//不重写Partitioner会按map输出的key进行分区,分区数为reducetask数//输入路径和输出路径的设置FileInputFormat.addInputPath(job, new Path("d:\\mr\\input\\grade.txt"));FileOutputFormat.setOutputPath(job, new Path("d:\\mr\\outtopk"));System.exit(job.waitForCompletion(true)?0:1);
}

5.全局计数器

以下是一个利用全局计数器来统计一个目录下所有文件出现的单词总数和总行数

package com.mr.counter;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CounterWordCount {enum CouterWordCountC{COUNT_WORDS, COUNT_LINES}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(CounterWordCount.class);job.setMapperClass(WCCounterMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);Path inputPath = new Path("d:/wordcount/input");FileInputFormat.setInputPaths(job, inputPath);job.setNumReduceTasks(0);Path outputPath = new Path("d:/wordcount/output");FileSystem fs = FileSystem.get(conf);if(fs.exists(outputPath)){fs.delete(outputPath, true);}FileOutputFormat.setOutputPath(job, outputPath);boolean waitForCompletion = job.waitForCompletion(true);System.exit(waitForCompletion?0:1);}private static class WCCounterMapper extends Mapper<LongWritable, Text, Text,LongWritable>{@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 统计行数,因为默认读取文本是逐行读取,所以 map 执行一次,行数+1context.getCounter(CouterWordCountC.COUNT_LINES).increment(1L);String[] words = value.toString().split(" ");for(String word: words){// 统计单词总数,遇见一个单词就+1context.getCounter(CouterWordCountC.COUNT_WORDS).increment(1L);}}}
}

6.MapJoin

MapJoin 适用于有一份数据较小的连接情况。做法是直接把该小份数据直接全部加载到内存 当中,按链接关键字建立索引。然后大份数据就作为 MapTask 的输入,对 map()方法的每次 输入都去内存当中直接去匹配连接。然后把连接结果按 key 输出,这种方法要使用 hadoop 中的 DistributedCache 把小份数据分布到各个计算节点,每个 maptask 执行任务的节点都需 要加载该数据到内存,并且按连接关键字建立索引
现有两份数据 movies.dat 和 ratings.dat

数据样式分别为:

Movies.dat ---- 字段含义:movieid, moviename, movietype

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance

Ratings.dat ---- 字段含义:userid, movieid, rate, timestamp

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968

Select * from movie a join ratings b on a.movieid = b.movieid

现要求对两表进行连接,要求输出最终的结果有以上六个字段:

movieid, userid, rate, moviename, movietype, timestamp

第一步:封装 MovieRate,方便数据的排序和序列化

package com.mr.mymapjoin;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MovieRate implements WritableComparable<MovieRate>{private String movieid;private String userid;private int rate;private String movieName;private String movieType;private long ts;public String getMovieid() {return movieid;}public void setMovieid(String movieid) {this.movieid = movieid;}public String getUserid() {return userid;}public void setUserid(String userid) {this.userid = userid;}public int getRate() {return rate;}public void setRate(int rate) {this.rate = rate;}public String getMovieName() {return movieName;}public void setMovieName(String movieName) {this.movieName = movieName;}public String getMovieType() {return movieType;}public void setMovieType(String movieType) {this.movieType = movieType;}public long getTs() {return ts;}public void setTs(long ts) {this.ts = ts;}public MovieRate() {}public MovieRate(String movieid, String userid, int rate, String movieName,String movieType, long ts) {this.movieid = movieid;this.userid = userid;this.rate = rate;this.movieName = movieName;this.movieType = movieType;this.ts = ts;}@Overridepublic String toString() {return movieid + "\t" + userid + "\t" + rate + "\t" + movieName+ "\t" + movieType + "\t" + ts;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(movieid);out.writeUTF(userid);out.writeInt(rate);out.writeUTF(movieName);out.writeUTF(movieType);out.writeLong(ts);}@Overridepublic void readFields(DataInput in) throws IOException {this.movieid = in.readUTF();this.userid = in.readUTF();this.rate = in.readInt();this.movieName = in.readUTF();this.movieType = in.readUTF();this.ts = in.readLong();}@Overridepublic int compareTo(MovieRate mr) {int it = mr.getMovieid().compareTo(this.movieid);if(it == 0){return mr.getUserid().compareTo(this.userid);}else{return it;}}
}

第二步:编写 MapReduce 程序

package com.mr.mymapjoin;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MovieRatingMapJoinMR {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://hadoop02:9000");System.setProperty("HADOOP_USER_NAME","hadoop");Job job = Job.getInstance(conf);// job.setJarByClass(MovieRatingMapJoinMR.class);job.setJar("/home/hadoop/mrmr.jar");job.setMapperClass(MovieRatingMapJoinMRMapper.class);job.setMapOutputKeyClass(MovieRate.class);job.setMapOutputValueClass(NullWritable.class);// job.setReducerClass(MovieRatingMapJoinMReducer.class);// job.setOutputKeyClass(MovieRate.class);// job.setOutputValueClass(NullWritable.class);job.setNumReduceTasks(0);String minInput = args[0];String maxInput = args[1];String output = args[2];FileInputFormat.setInputPaths(job, new Path(maxInput));Path outputPath = new Path(output);FileSystem fs = FileSystem.get(conf);if(fs.exists(outputPath)){fs.delete(outputPath, true);}FileOutputFormat.setOutputPath(job, outputPath);URI uri = new Path(minInput).toUri();job.addCacheFile(uri);boolean status = job.waitForCompletion(true);System.exit(status?0:1);}static class MovieRatingMapJoinMRMapper extends Mapper<LongWritable,Text,MovieRate,NullWritable>{// 用来存储小份数据的所有解析出来的 key-valueprivate static Map<String, Movie> movieMap = new HashMap<String, Movie>();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {Path[] localCacheFilePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());String myfilePath = localCacheFilePaths[0].toString();System.out.println(myfilePath);URI[] cacheFiles = context.getCacheFiles();System.out.println(cacheFiles[0].toString());BufferedReader br = new BufferedReader(new FileReader(myfilePath.toString()));// 此处的 line 就是从文件当中逐行读到的 movieString line = "";while(null != (line = br.readLine())){String[] splits = line.split("::");movieMap.put(splits[0], new Movie(splits[0], splits[1], splits[2]));}IOUtils.closeStream(br);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {String[] splits = value.toString().split("::");String userid = splits[0];String movieid = splits[1];int rate = Integer.parseInt(splits[2]);long ts = Long.parseLong(splits[3]);String movieName = movieMap.get(movieid).getMovieName();String movieType = movieMap.get(movieid).getMovieType();MovieRate mr = new MovieRate(movieid, userid, rate, movieName, movieType,ts);context.write(mr, NullWritable.get());}}
}

7.最简单的wordcount

测试数据:

zhangyong zhangrui zhangqin
zhangyong zhangrui zhangqin
zhangyong zhangrui zhangqin

mapper类

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 首先获取一行数据String line = value.toString ();// 将行内的单词进行切分,使用一个数组进行保存,切分数据时根据源数据得知可以使用空格的方式切分。String[] arr = line.split (" ");for (String str : arr) {context.write (new Text (str), new LongWritable (1));}}
}

reducer类

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {@Overridepublic void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {// 定义变量记录单词出现的次数long sum = 0;for (LongWritable val : values) {// 记录总次数sum += val.get ();}// 输出数据,key就是单词,value就是在map阶段这个单词出现的总次数context.write (key, new LongWritable (sum));}
}

Driver类

public class WordCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 获取当前的默认配置Configuration conf = new Configuration ();// 获取代表当前mr作业的job对象Job job = Job.getInstance (conf);// 指定一下当前程序的入口类job.setJarByClass (WordCountDriver.class);//指定当前Mapper、Reducer任务的类job.setMapperClass (WordCountMapper.class);job.setReducerClass (WordCountReducer.class);//设置Mapper的结果类型job.setMapOutputKeyClass (Text.class);job.setMapOutputValueClass (LongWritable.class);// 设置Reducer的结果类型job.setOutputKeyClass (Text.class);job.setOutputValueClass (LongWritable.class);//设置待分析的文件夹路径(linux的路径地址)FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/mapreduce"));FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/mapreduce"));if (!job.waitForCompletion (true)) {return;}}
}

8.求温度平均值

测试数据:

2329999919500515070000
9909999919500515120022
9909999919500515180011
9509999919490324120111
6509999919490324180078

代码

public class HeightMapper extends Mapper<LongWritable, Text, Text, LongWritable> {@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取一段数据String line = value.toString ();//获取年份String year = line.substring (8, 12);//获取温度(强制转换一下)int t = Integer.parseInt (line.substring (18, 22));context.write (new Text (year),new LongWritable (t));}
}public class HeightReducer extends Reducer<Text, LongWritable, Text, LongWritable> {@Overridepublic void reduce(Text year, Iterable<LongWritable> t, Context context) throws IOException, InterruptedException {long max = 0;for (LongWritable data : t) {if (max < data.get ()) {max = data.get ();}}context.write (year, new LongWritable (max));}
}public class HeightDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration ();Job job = Job.getInstance (conf);job.setJarByClass (HeightDriver.class);job.setMapperClass (HeightMapper.class);job.setReducerClass (HeightReducer.class);job.setMapOutputKeyClass (Text.class);job.setMapOutputValueClass (LongWritable.class);job.setOutputKeyClass (Text.class);job.setOutputValueClass (LongWritable.class);FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/wendu/"));FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/wendu"));job.waitForCompletion (true);}
}

9.分区多路输出

测试数据:

13901000123 zs bj 343
13202111011 ww sh 456
13901000123 zs bj 1024
13207551234 ls sz 758

Partitioner类

public class AddPartitioner extends Partitioner<Text, PartFlowBean> {@Overridepublic int getPartition(Text text, PartFlowBean flowBean, intnumPartitioner) {String addr = flowBean.getAddr();if (addr.equals("bj")) {return 0;//输出part-r-00000} else if (addr.equals("sh")) {return 1;//输出part-r-00001} else {return 2;//输出part-r-00002}}}

编写MR

public class PartFlowMapper extends Mapper<LongWritable, Text, Text, PartFlowBean> {@Overridepublic void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {String line = value.toString ();/**[13901000123,zk,bj,343]phone = 13901000123;name = zk;addr = bj;flow = 343;*/String[] info = line.split (" ");PartFlowBean flowBean = new PartFlowBean ();flowBean.setPhone (info[0]);flowBean.setName (info[1]);flowBean.setAddr (info[2]);flowBean.setFlow (Integer.parseInt (info[3]));context.write (new Text (flowBean.getName ()), flowBean);}
}public class PartFlowReducer extends Reducer<Text, PartFlowBean, PartFlowBean,NullWritable> {@Overridepublic void reduce(Text key, Iterable<PartFlowBean> values, Contextcontext) throws IOException, InterruptedException {PartFlowBean result = new PartFlowBean ();for (PartFlowBean value : values) {result.setPhone (value.getPhone ());result.setPhone (value.getPhone ());result.setName (value.getName ());result.setAddr (value.getAddr ());result.setFlow (result.getFlow () + value.getFlow ());}context.write (result, NullWritable.get ());}
}public class PartFlowDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration ();Job job = Job.getInstance (conf);job.setJarByClass (PartFlowDriver.class);job.setMapperClass (PartFlowMapper.class);job.setReducerClass (PartFlowReducer.class);/*** 下面的两个类如果不写的话,那么就不会生效。*/// 设置分区类job.setPartitionerClass (AddPartitioner.class);// 设置分区数量job.setNumReduceTasks (3);job.setMapOutputKeyClass (Text.class);job.setMapOutputValueClass (PartFlowBean.class);job.setOutputKeyClass (PartFlowBean.class);job.setOutputValueClass (NullWritable.class);FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/partition"));FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/partition"));job.waitForCompletion (true);}
}

运行结果:

part-r-00000

FlowBean{phone='13901000123', name='zs', addr='bj', flow=1367}

part-r-00001

FlowBean{phone='13202111011', name='ww', addr='sh', flow=456}

part-r-00002

FlowBean{phone='13207551234', name='ls', addr='sz', flow=758}

10.分区并全排序

82 239 231
23 22 213
123 232 124
213 3434 232
4546 565 123
231 231

Partitioner类

/*** @Author zhangyong* @Date 2020/4/14 9:39* @Version 1.0* 全排序* 将上述文件内容按照数字位数分别写入三个文件,如下* 0-99的写入到文件1* 100-999写入到文件2* 1000-其他数据写入到文件3*/
public class AutoPartitioner extends Partitioner<IntWritable, IntWritable> {@Overridepublic int getPartition(IntWritable key, IntWritable value, int numPartitions) {String num = String.valueOf (key.get ());if (num.matches ("[0-9][0-9]") || num.matches ("[0-9]")) {return 0;} else if (num.matches ("[0-9][0-9][0-9]")) {return 1;} else {return 2;}}
}

11.推荐认识好友

测试数据:

tom rose
tom jim
tom smith
tom lucy
rose tom
rose lucy
rose smith
jim tom

第一个mapper类

public class OneFriendMapper extends Mapper<LongWritable, Text, Text, Text> {/*** 输入的key和value是根据文件内容来确定。* 输出的key和value是因为在业务逻辑中设定的输出是name-friend好友关系。*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 获取每行的数据String line = value.toString();// 获取姓名String name = line.split(" ")[0];// 获取好友String friend = line.split(" ")[1];context.write(new Text(name), new Text(friend));}
}

第一个reducer类

public class OneFriendReducer extends Reducer<Text, Text, Text, IntWritable> {/*** 输入key和value要和mapper的输出保持一致。* Text和IntWritable:* 如果是好友-1,如果不是好友就用-2。*/@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {ArrayList<String> friendList = new ArrayList<>();//处理好友关系for (Text value : values) {friendList.add(value.toString());if (key.toString().compareTo(value.toString()) < 0) {context.write(new Text(key + "-" + value), new IntWritable(1));} else {context.write(new Text(value + "-" + key), new IntWritable(1));}}// 处理可能相识的好友。for (int i = 0; i < friendList.size(); i++) {for (int j = 0; j < friendList.size(); j++) {String friend1 = friendList.get(i);String friend2 = friendList.get(j);if (friend1.compareTo(friend2) < 0) {context.write(new Text(friend1 + "-" + friend2), new IntWritable(2));}}}}
}

第二个mapper类

public class TwoFriendMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 获取一行数据String line = value.toString();// 获取朋友关系的信息String friendInfo = line.split("\t")[0];// 获取朋友关系的深度int deep = Integer.parseInt(line.split("\t")[1]);context.write(new Text(friendInfo), new IntWritable(deep));}
}

第二个reducer类

public class TwoFriendReducer extends Reducer<Text, IntWritable, Text, NullWritable> {@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {Boolean flag = true;/*** 设定好友关系为true的时候进行输出* 因为题目要求是输出可能相识的好友。所以为true的代码应该是2* 也就是好友关系为1的时候设置变量为false*/for (IntWritable value : values) {if (value.get() == 1) {flag = false;}}if (flag) {context.write(key, NullWritable.get());}}
}

Driver类

public class FriendDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();//设置第一轮MapReduce的相应处理类与输入输出Job job1 = Job.getInstance(conf);job1.setJarByClass(FriendDriver.class);job1.setMapperClass(OneFriendMapper.class);job1.setReducerClass(OneFriendReducer.class);job1.setMapOutputKeyClass(Text.class);job1.setMapOutputValueClass(Text.class);job1.setOutputKeyClass(Text.class);job1.setOutputValueClass(IntWritable.class);//设置路径(传输、结果)FileInputFormat.setInputPaths(job1, new Path("hdfs://anshun115:9000/friend"));FileOutputFormat.setOutputPath(job1, new Path("hdfs://anshun115:9000/result/friend"));//如果第一轮MapReduce完成再做这里的代码if (job1.waitForCompletion(true)) {Job job2 = Job.getInstance(conf);// 设置第二个Job任务的Mapperjob2.setMapperClass(TwoFriendMapper.class);job2.setMapOutputKeyClass(Text.class);job2.setMapOutputValueClass(IntWritable.class);// 设置第二个Job任务的Reducerjob2.setReducerClass(TwoFriendReducer.class);job2.setOutputKeyClass(Text.class);job2.setOutputValueClass(NullWritable.class);/*** 设置第二个Job任务是输入输出路径。* 此处的输入路径是第一个job任务的输出路径* 注意设置路径时,里面传入的job应该是当前的job任务,如下所示,应该是job2。* 如果写成前面的job任务名称,在运行时则会爆出错误,提示路径不存在。*/FileInputFormat.setInputPaths(job2, new Path("hdfs://anshun115:9000/result/friend"));FileOutputFormat.setOutputPath(job2, new Path("hdfs://anshun115:9000/result/friend2"));// 此处提交任务时,注意用的是job2。job2.waitForCompletion(true);}
}

12。自定义文件名并多路输出

public class two {public static class TWOMapper extends Mapper<LongWritable, Text, LongWritable, Text> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {String[] split = value.toString().split(",");// 课程String clazz = split[0];// 姓名String name = split[1];// 总分double zf = 0;for (int i = 2; i < split.length; i++) {// 分数double fen = Double.parseDouble(split[i]);zf += fen;}//平均分double v = zf / (split.length - 2);BigDecimal bd = new BigDecimal(v);BigDecimal bd1 = bd.setScale(2, BigDecimal.ROUND_HALF_UP);context.write(new LongWritable(bd1.longValue()),new Text(clazz+","+name));}}public static class TWOReducer extends Reducer<LongWritable,Text,Text, NullWritable> {private MultipleOutputs<Text,NullWritable> mos;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//在setup运行时,重新初始化这个类mos = new MultipleOutputs<Text,NullWritable>(context);}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {mos.close();}@Overrideprotected void reduce(LongWritable key, Iterable<Text> values, Reducer<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {for (Text value: values) {// 课程String clazz = value.toString().split(",")[0];// 姓名String name = value.toString().split(",")[1];if (clazz.equals("computer"))mos.write("computer",clazz+","+name+","+key,NullWritable.get());if (clazz.equals("english"))mos.write("english",clazz+","+name+","+key,NullWritable.get());if (clazz.equals("algorithm"))mos.write("algorithm",clazz+","+name+","+key,NullWritable.get());if (clazz.equals("math"))mos.write("math",clazz+","+name+","+key,NullWritable.get());}}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {FileInputFormat.addInputPath(job,new Path("hdfs://master:9000/data/stu_score_sub.csv"));Path out = new Path("hdfs://master:9000/output2");FileSystem fs = FileSystem.get(conf);if (fs.exists(out)){fs.delete(out,true);}//配置输入输出的路径FileOutputFormat.setOutputPath(job,out);MultipleOutputs.addNamedOutput(job, "computer", TextOutputFormat.class, Text.class, NullWritable.class);MultipleOutputs.addNamedOutput(job, "english", TextOutputFormat.class, Text.class, NullWritable.class);MultipleOutputs.addNamedOutput(job, "algorithm", TextOutputFormat.class, Text.class, NullWritable.class);MultipleOutputs.addNamedOutput(job, "math", TextOutputFormat.class, Text.class, NullWritable.class);job.waitForCompletion(true);}
}

大数据竞赛MR培训与题型相关推荐

  1. AI大数据竞赛平台和网站

    http://2021全国大学生大数据竞赛含金量如何? - DataCastle数据城堡的回答 - 知乎 https://www.zhihu.com/question/490822570/answer ...

  2. 2016CDA杯大数据竞赛排行榜出炉 人大统计学院摘得桂冠!

    近日, 2016年CDA杯大数据全生态全国高校创新创业竞赛决赛暨颁奖典礼(以下简称CDA杯),在CDA数据分析研究院成功举办. 本次CDA杯大数据竞赛由经管之家与CDA Institute 共同主办 ...

  3. 在Kaggle上赢得大数据竞赛的技巧和窍门

    在Kaggle上赢得大数据竞赛的技巧和窍门 解决方案 平台 数据 应用 方法 阅读1906  原文:The tips and tricks I used to succeed on Kaggle  作 ...

  4. 阿里巴巴天池大数据竞赛黄金联赛全面开战,全球同步报名,只为寻找最聪明的你!...

    阿里巴巴天池大数据竞赛黄金联赛全面开战,全球同步报名,只为寻找最聪明的你!          天池大数据竞赛是由阿里巴巴集团主办,面向全球新生代力量的高端算法竞赛.通过开放海量数据和"天池& ...

  5. 赛后总结:第四届工业大数据竞赛注塑成型

    赛后总结:第四届工业大数据竞赛注塑成型 原文首发于我的公众号 前言 以第四届工业大数据竞赛虚拟量测任务为例,介绍大家的思路.自己代码乱写,导致不知道最后要复现的是哪个,加上工作上各种人员优化,就没有进 ...

  6. 【数据竞赛】2020年11月国内大数据竞赛信息-奖池5000万

    2020年11月:下面是截止到2020年11月国内还在进行中的大数据比赛题目,非常丰富,大家选择性参加,初学者可以作为入门练手,大佬收割奖金,平时项目不多的,比赛是知识非常好的实践方式,本号会定期发布 ...

  7. 比赛报名 | 第二届ChineseCSCW恒电杯大数据竞赛

    第二届ChineseCSCW Cup大数据竞赛(恒电杯)将于2021年09月16日至18日与第16届全国计算机支持的协同工作与社会计算学术会议 (Chinese Conference on Compu ...

  8. 阿里巴巴大数据竞赛(2014年3月10日到11月)

    大赛简介 阿里巴巴大数据竞赛是阿里巴巴集团主办,在阿里巴巴大数据科研平台--"天池"上开展的,基于天猫海量真实用户的访问数据的推荐算法大赛. 本次比赛的目的是让广大的高校同学在大数 ...

  9. 开始报名啦!——第二届融360“天机”金融风控大数据竞赛火热来袭

    炎炎夏日正好是提升数据分析能力,成为数据科学家的好机会.由融360和统计之都共同主办的第二届融360"天机"金融风控大数据竞赛开始报名啦!不论你是什么学校什么专业,只要你对大数据和 ...

  10. 2022年第三届MathorCup高校数学建模挑战赛——大数据竞赛(baseline)

    教育部<高等学校人工智能创新行动计划>教技[2018]3号,鼓励对计算机专业类的智能科学与技术.数据科学与大数据技术等专业进行调整和整合,鼓励各个领域与大数据进行深度融合,通过大数据技术促 ...

最新文章

  1. etw系统provider事件较多_【Flutter 实战】文件系统目录
  2. android 代码设置inputtype,android – 如何正确设置EditText的InputType?
  3. 图解粒子群优化算法(PSO)
  4. 《Atlas基础教程》勘误(持续更新)
  5. Sklearn:sklearn.preprocessing之StandardScaler 的transform()函数和fit_transform()函数清晰讲解及其案例应用
  6. Kotlin实践(2)-生成jvm程序
  7. byte数组转字符串_leetcode刷题844比较含退格的字符串(带代码解析,带知识点回顾)...
  8. urllib2.urlopen超时问题
  9. 央视曝光:全国第九大电商平台倒了!创始人卷走260亿,1200万人被骗
  10. android listview 向上自动滚动效果,Android通过代码控制ListView上下滚动的方法
  11. C++ 数据抽象 封装 接口
  12. javascript一些基础知识
  13. springboot集成redis_cluster两种方式
  14. 【Go语言入门教程】Go语言简介
  15. 喜报!易基因“同源基因特异性甲基化时序数据分析方法”获专利授权
  16. vim实用技巧总结 [Linux]
  17. jdbc操作数据库实现查询产品、增加产品库存量例子
  18. UKF-协方差矩阵分解
  19. SAP中导出电子表格问题处理案例
  20. Maven手动导入依赖

热门文章

  1. ST7735 TFT显示屏 显示问题
  2. namecheap,namesilo域名注册优势,国外域名注册,2018 namesilo注册优惠码
  3. Maven配置文件示例
  4. 《流畅的Python第二版》读书笔记——文本和字节序列
  5. xml解析错误语法错误
  6. Python小白的飞桨之旅
  7. 强烈推荐几款IDEA插件,12款小白神器
  8. 2022 年江苏省职业院校技能大赛(中职) 网络搭建与应用赛项公开赛卷
  9. 路由器自动获取ip失败
  10. 智力问答选择题_智力问答题题库