主要基于数据

#students.txt

/*
1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班
1500100004,葛德曜,24,男,理科三班
1500100005,宣谷芹,22,女,理科五班
1500100006,边昂雄,21,男,理科二班
1500100007,尚孤风,23,女,文科六班
1500100008,符半双,22,女,理科六班
1500100009,沈德昌,21,男,理科一班
1500100010,羿彦昌,23,男,理科六班
1500100011,宰运华,21,男,理科三班
1500100012,梁易槐,21,女,理科一班
1500100013,逯君昊,24,男,文科二班
1500100014,羿旭炎,23,男,理科五班
1500100015,宦怀绿,21,女,理科一班
1500100016,潘访烟,23,女,文科一班
1500100017,高芷天,21,女,理科五班
1500100018,骆怜雪,21,女,文科六班
1500100019,娄曦之,24,男,理科三班
1500100020,杭振凯,23,男,理科四班
1500100021,连鸿晖,22,男,理科六班
1500100022,薄运珧,23,男,文科四班
1500100023,东鸿畴,23,男,理科二班
1500100024,湛慕卉,22,女,文科二班
1500100025,翁飞昂,22,男,文科四班
……
*/

#score.txt

/*
1500100001,1000001,98
1500100001,1000002,5
1500100001,1000003,137
1500100001,1000004,29
1500100001,1000005,85
1500100001,1000006,52
1500100002,1000001,139
1500100002,1000002,102
1500100002,1000003,44
1500100002,1000004,18
1500100002,1000005,46
1500100002,1000006,91
1500100003,1000001,48
1500100003,1000002,132
1500100003,1000003,41
1500100003,1000007,32
1500100003,1000008,7
1500100003,1000009,99
1500100004,1000001,147
1500100004,1000002,69
1500100004,1000003,37
1500100004,1000007,87
1500100004,1000008,21
1500100004,1000009,60
1500100005,1000001,105
……
*/

求性别人数

package com.shujia.MapReduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Demo2GenderCnt {// Map端public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {String[] splits = value.toString().split(",");String gender = splits[3];// 以性别作为key 1作为valuecontext.write(new Text(gender), new IntWritable(1));}}// Reduce端public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int cnt = 0;// 统计性别人数for (IntWritable value : values) {cnt += value.get();}context.write(key, new IntWritable(cnt));}}// Driver端public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);//这个名字在yarn的web界面上可以看到job.setJobName("Demo2GenderCnt");job.setJarByClass(Demo2GenderCnt.class);job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 配置输入输出路径FileInputFormat.addInputPath(job, new Path("/student/input"));// 输出路径不需要提前创建,如果该目录已存在则会报错// 通过HDFS的JavaAPI判断输出路径是否存在Path outPath = new Path("/student/output");FileSystem fs = FileSystem.get(conf);if (fs.exists(outPath)) {fs.delete(outPath, true);}FileOutputFormat.setOutputPath(job, outPath);// 等待job运行完成job.waitForCompletion(true);/*** 1、准备数据,将students.txt上传至HDFS的/student/input目录下面* hdfs dfs -mkdir -p /student/input* hdfs dfs -put students.txt /student/input/* 2、提交MapReduce任务* hadoop jar Hadoop-1.0.jar com.shujia.MapReduce.Demo2GenderCnt* 3、查看日志、杀死任务* yarn logs -applicationId application_1644480440500_0006* yarn application -kill application_1644480440500_0007*/}
}

求总分

package com.shujia.MapReduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Demo3SumScore {// Map端public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, IntWritable>.Context context) throws IOException, InterruptedException {String[] splits = value.toString().split(",");String id = splits[0];String score = splits[2];// 以id作为key 分数score作为valuecontext.write(new LongWritable(Long.parseLong(id)), new IntWritable(Integer.parseInt(score)));}}// Reduce端public static class MyReducer extends Reducer<LongWritable, IntWritable, LongWritable, IntWritable> {@Overrideprotected void reduce(LongWritable key, Iterable<IntWritable> values, Reducer<LongWritable, IntWritable, LongWritable, IntWritable>.Context context) throws IOException, InterruptedException {int sumScore = 0;// 统计学生总分for (IntWritable value : values) {sumScore += value.get();}context.write(key, new IntWritable(sumScore));}}// Driver端public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJobName("Demo3SumScore");job.setJarByClass(Demo3SumScore.class);job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(LongWritable.class);job.setMapOutputValueClass(IntWritable.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(LongWritable.class);job.setOutputValueClass(IntWritable.class);// 配置输入输出路径FileInputFormat.addInputPath(job, new Path("/student/score/input"));// 输出路径不需要提前创建,如果该目录已存在则会报错// 通过HDFS的JavaAPI判断输出路径是否存在Path outPath = new Path("/student/score/output");FileSystem fs = FileSystem.get(conf);if (fs.exists(outPath)) {fs.delete(outPath, true);}FileOutputFormat.setOutputPath(job, outPath);// 等待job运行完成job.waitForCompletion(true);/*** 1、准备数据,将students.txt上传至HDFS的/student/input目录下面* hdfs dfs -mkdir -p /student/score/input* hdfs dfs -put score.txt /student/score/input/* 2、提交MapReduce任务* hadoop jar Hadoop-1.0.jar com.shujia.MapReduce.Demo3SumScore* 3、查看日志、杀死任务* yarn logs -applicationId application_1644480440500_0006* yarn application -kill application_1644480440500_0007*/}
}

关联

package com.shujia.MapReduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Demo4Join {public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {// 区分value到底是哪个文件的数据String v = value.toString();if (v.contains(",")) {// 学生数据String[] stuSplits = v.split(",");long id = Long.parseLong(stuSplits[0]);String name = stuSplits[1];String clazz = stuSplits[4];context.write(new LongWritable(id), new Text(name + "," + clazz + "|"));} else {// 总分数据String[] sumScoreSplit = v.split("\t");context.write(new LongWritable(Long.parseLong(sumScoreSplit[0])), new Text(sumScoreSplit[1] + "#"));}}}public static class MyReducer extends Reducer<LongWritable, Text, LongWritable, Text> {@Overrideprotected void reduce(LongWritable key, Iterable<Text> values, Reducer<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {String stuV = "";String sumScoreV = "";for (Text value : values) {String v = value.toString();if (v.contains("|")) {// 学生数据stuV = v.replace("|", "");} else {// 总分数据sumScoreV = v.replace("#", "");}}context.write(key, new Text(stuV + "," + sumScoreV));}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();// 设置MapReduce输出的K-V的分隔符conf.set("mapred.textoutputformat.separator", ",");Job job = Job.getInstance(conf);job.setJobName("Demo4Join");job.setJarByClass(Demo4Join.class);job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(LongWritable.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(LongWritable.class);job.setOutputValueClass(Text.class);// 配置输入输出路径FileInputFormat.addInputPath(job, new Path("/student/input"));FileInputFormat.addInputPath(job, new Path("/student/score/output"));// 输出路径不需要提前创建,如果该目录已存在则会报错// 通过HDFS的JavaAPI判断输出路径是否存在Path outPath = new Path("/student/join/output");FileSystem fs = FileSystem.get(conf);if (fs.exists(outPath)) {fs.delete(outPath, true);}FileOutputFormat.setOutputPath(job, outPath);// 等待job运行完成job.waitForCompletion(true);/*** 创建目录* hdfs dfs -mkdir -p /student/join/output* 提交任务* hadoop jar Hadoop-1.0.jar com.shujia.MapReduce.Demo4Join*/}
}

map端的过滤

package com.shujia.MapReduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Demo5MRFilter {public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {// 过滤出文科三班的学生String clazz = value.toString().split(",")[4];if ("文科三班".equals(clazz)) {context.write(value, NullWritable.get());}}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();// 设置MapReduce输出的K-V的分隔符conf.set("mapred.textoutputformat.separator", ",");Job job = Job.getInstance(conf);job.setJobName("Demo5MRFilter");job.setJarByClass(Demo5MRFilter.class);job.setMapperClass(MyMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 配置输入输出路径FileInputFormat.addInputPath(job, new Path("/student/input"));// 输出路径不需要提前创建,如果该目录已存在则会报错// 通过HDFS的JavaAPI判断输出路径是否存在Path outPath = new Path("/student/filter/output");FileSystem fs = FileSystem.get(conf);if (fs.exists(outPath)) {fs.delete(outPath, true);}FileOutputFormat.setOutputPath(job, outPath);// 等待job运行完成job.waitForCompletion(true);/*** hdfs dfs -mkdir -p /student/filter/output* hadoop jar Hadoop-1.0.jar com.shujia.MapReduce.Demo5MRFilter*/}
}

combiner预聚合

#combiner发生在map端的reduce

package com.shujia.MapReduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;// 统计每个单词出现的次数
public class Demo6WordCountCombiner {// Map阶段public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {/*** @param key     Map端输入的key->偏移量* @param value   Map端输入的value->一行数据* @param context MapReduce整个过程的上下文环境->可以获取MapReduce程序运行时的一些参数、状态,可以将Map的输出发送到Reduce* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {// 实现自己的map端逻辑String vStr = value.toString();// 按照空格进行切分,将每个单词切分出来String[] words = vStr.split(" ");// 遍历每一个单词,构造成k-v格式/*** hadoop hive hbase spark flink* ====>* hadoop 1* hive 1* hbase 1* spark 1* flink 1*/for (String word : words) {Text keyOut = new Text(word);IntWritable valueOut = new IntWritable(1);// 通过context将构建好的k-v发送出去context.write(keyOut, valueOut);}}}// 自定义的Combinerpublic static class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {// 实现自己的Reduce逻辑int sum = 0; // 保存每个单词的数量for (IntWritable value : values) {// 遍历values迭代器sum += value.get();}// 将Reduce统计得到的结果输出到HDFScontext.write(key, new IntWritable(sum));}}// Reduce阶段public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {/*** @param key     Map端输出的数据按照key进行分组过后的数据中的key,在这里相当于每个单词* @param values  Map端输出的数据按照key进行分组过后,相同key的所有的value组成的集合(迭代器)* @param context MapReduce的上下文环境,主要用于输出数据到HDFS* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {// 实现自己的Reduce逻辑int sum = 0; // 保存每个单词的数量for (IntWritable value : values) {// 遍历values迭代器sum += value.get();}// 将Reduce统计得到的结果输出到HDFScontext.write(key, new IntWritable(sum));}}// Driver端(将Map、Reduce进行组装)public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 创建配置文件Configuration conf = new Configuration();// 创建一个Job实例Job job = Job.getInstance(conf);// 对Job进行一些简单的配置job.setJobName("Demo6WordCountCombiner");// 通过class类设置运行Job时该执行哪一个类job.setJarByClass(Demo6WordCountCombiner.class);// 对Map端进行配置// 对Map端输出的Key的类型进行配置job.setMapOutputKeyClass(Text.class);// 对Map端输出的Value的类型进行配置job.setMapOutputValueClass(IntWritable.class);// 配置Map任务该运行哪一个类job.setMapperClass(MyMapper.class);// 设置Combinerjob.setCombinerClass(MyCombiner.class);// 对Reduce端进行配置// 对Reduce端输出的Key的类型进行配置job.setOutputKeyClass(Text.class);// 对Reduce端输出的Value的类型进行配置job.setOutputValueClass(IntWritable.class);// 配置Reduce任务该运行哪一个类job.setReducerClass(MyReducer.class);// 配置输入输出路径FileInputFormat.addInputPath(job, new Path("/wordCount/input"));// 输出路径不需要提前创建,如果该目录已存在则会报错// 通过HDFS的JavaAPI判断输出路径是否存在Path outPath = new Path("/wordCount/output");FileSystem fs = FileSystem.get(conf);if (fs.exists(outPath)) {fs.delete(outPath, true);}FileOutputFormat.setOutputPath(job, outPath);// 等待job运行完成job.waitForCompletion(true);/*** 1、准备数据,将words.txt上传至HDFS的/wordCount/input目录下面* hdfs dfs -mkdir -p /wordCount/input* hdfs dfs -put words.txt /wordCount/input* 2、提交MapReduce任务* hadoop jar Hadoop-1.0.jar com.shujia.MapReduce.Demo6WordCountCombiner*/}
}

#注意combiner预聚合 适合Max、Min、Sum

MapReduce代码编写--求性别人数、求总分、关联、map端的过滤、combiner预聚合相关推荐

  1. Python代码编写 米家设备接入homekit(mac端)

    1.打开mac 2.打开终端item 3.查看版本 $ conda -V 4.安装pip, $ conda install pip=20.2.4 出现状况,报错 [Errno 13] Permissi ...

  2. 【手把手 脑把脑】教会你使用idea基于MapReduce的统计数据分析(从问题分析到代码编写)

    目录 1 编程前总分析 1.1 数据源 1.2 需要掌握的知识 1.2.1 Hadoop对比java的数据类型 1.2.2 MapReduce流程简介 1.3.3 MapReduce流程细分 2 编码 ...

  3. Java黑皮书课后题第8章:*8.2(求矩阵对角线元素的和)使用下面的方法头编写一个方法,求n*n的double类型矩阵中主对角线上所有数字的和。编写一个程序,读取一个4*4的矩阵,显示主对角线和

    *8.2(求矩阵对角线元素的和)使用下面的方法头编写一个方法,求n*n的double类型矩阵中主对角线上所有数字的和.编写一个程序,读取一个4*4的矩阵,显示主对角线和 题目 题目描述与运行示例 破题 ...

  4. Java黑皮书课后题第8章:*8.1(求矩阵中各列数字的和)使用下面的方法头编写一个方法,求矩阵中特定列的所有元素的和。编写一个测试程序,读取3*4矩阵,显示每列元素的和

    *8.1(求矩阵中各列数字的和)使用下面的方法头编写一个方法,求矩阵中特定列的所有元素的和.编写一个测试程序,读取3*4矩阵,显示每列元素的和 题目 题目描述与运行示例 破题 代码 题目 题目描述与运 ...

  5. Java黑皮书课后题第7章:7.10(找出最小元素的下标)使用下面的方法头编写一个方法,求出一个整数数组中的最小元素下标。编写测试程序,提示用户输入10个数字,调用这个方法返回最小值的下标(多个则最小

    7.10(找出最小元素的下标)使用下面的方法头编写一个方法,求出一个整数数组中的最小元素下标.编写测试程序,提示用户输入10个数字,调用这个方法返回最小值的下标(多个则返回最小的下标) 题目 题目描述 ...

  6. Java黑皮书课后题第7章:7.9(找出最小元素)使用下面的方法头编写一个方法,求出一个整数数组中的最小元素。编写测试程序,提示用户输入10个数字,调用这个方法返回最小值,并显示这个最小值

    7.9(找出最小元素)使用下面的方法头编写一个方法,求出一个整数数组中的最小元素.编写测试程序,提示用户输入10个数字,调用这个方法返回最小值,并显示这个最小值 题目 题目描述与运行示例 破题 代码 ...

  7. C语言(CED)编写一个程序,求两个字符之间的加减运算。

    (请先看置顶博文)https://blog.csdn.net/GenuineMonster/article/details/104495419 复习C语言,不同的心境遇到了不同的问题: 问题: 编写一 ...

  8. c语言编程序按下列公式计算e的值,编写程序,利用公式 ,求出e的近似值

    已知一个三角形中三条边的长度分别为a,b和c,编写程序利用公式求出三角形的面积,其中S=(a+b+c)/2.要求边长a. usingSystem;usingSystem.Collections.Gen ...

  9. 【大数据day14】——MapReduce的运行机制详解(案列:Reduce 端实现 JOIN, Map端实现 JOIN,求共同好友)

    文章目录 1 .MapReduce的运行机制详解 1.1:MapTask 工作机制 详细步骤 配置 1.2 :ReduceTask 工作机制 详细步骤 1.3:Shuffle 过程 2. 案例: Re ...

最新文章

  1. TensorFLow能够识别的图像文件,可以通过numpy
  2. Java的equals() 和 hashCode()
  3. sample solution
  4. 一文解读使用WinDbg排查iis 中CPU占用高的站点问题
  5. win10cmd重置系统_win10命令提示符一键还原修复系统
  6. 微观经济学 读书笔记
  7. HGDB单机问题解决—致命错误:已保留的连接位置为执行非复制请求的超级用户预留
  8. excel计算加权和
  9. 儿童学计算机编程好处,孩子学编程的好处是什么
  10. C语言标识符之——“~“
  11. 面板模型进行熵值法分析
  12. 基于Jenkins和Kubernetes流水线实现应用的自动发版
  13. ML-czy的小组任务2
  14. 13 | 关于写简历,必须要注意的一些细节
  15. 算法实现:判断二叉树是否为完全二叉树
  16. 长沙互联网公司和生活成本
  17. 农民伯伯android,[同人]墨水儿哥哥乡下小山村的农民伯伯生活.avi
  18. 金融粉去哪里引流?哪个平台做金融引流好?贴吧精准引流金融粉技巧
  19. cadence SPB17.4 - allegro - 做logo封装
  20. 【弹性布局】【设置主轴,交叉轴对齐方式】

热门文章

  1. 三大c4d人物角色模型素材网站 实用 精选
  2. python 打开电脑摄像头
  3. 网络设置巨形帧_网卡设置 网卡的高级设置说明
  4. 360网站卫士推出风行计划 号称速度提升5倍
  5. 《企业经营统计学》CH.4 企业产出统计分析
  6. 不重启显示新增硬盘(虚拟机)
  7. bl小说里面有个机器人管家_《真实的人类》第二季开拍 机器人管家与小女孩重逢 播出时间未定...
  8. 一定要用Chrome吗?有了这些网页翻译插件,任意浏览器都能畅快玩耍
  9. 期货分仓软件 /股票分仓软件的功能有哪些?
  10. java文件删除选择框,从列表框中删除文件