1、单词计数

在一定程度上反映了MapReduce设计的初衷--对日志文件进行分析。

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{//该方法循环调用,从文件的split中读取每行调用一次,把该行所在的下标为key,该行的内容为valueprotected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {String[] words = StringUtils.split(value.toString(), ' ');for(String w :words){context.write(new Text(w), new IntWritable(1));}}
}
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{//每组调用一次,这一组数据特点:key相同,value可能有多个。protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum =0;for(IntWritable i: values){sum=sum+i.get();}context.write(key, new IntWritable(sum));}
}
public class RunJob {public static void main(String[] args) {Configuration config =new Configuration();
//  config.set("fs.defaultFS", "hdfs://node1:8020");
//  config.set("yarn.resourcemanager.hostname", "node1");
//      config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");try {FileSystem fs =FileSystem.get(config);Job job =Job.getInstance(config);job.setJarByClass(RunJob.class);job.setJobName("wc");job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path("/usr/input/"));Path outpath =new Path("/usr/output/wc");if(fs.exists(outpath)){fs.delete(outpath, true);}FileOutputFormat.setOutputPath(job, outpath);boolean f= job.waitForCompletion(true);if(f){System.out.println("job completed!");}} catch (Exception e) {e.printStackTrace();}}
}

2、数据去重

最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。

自然会想到将同一个数据的所有记录都交给一台Reduce机器,无论这个数据出现多少次,只要在最终结果中输出一次就可以了。

将单次计数程序稍加改动即可。

public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable>{protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}
}
public class DedupReducer extends Reducer<Text, NullWritable, Text, NullWritable>{protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}
}
public class RunJob {public static void main(String[] args) {Configuration config =new Configuration();
//  config.set("fs.defaultFS", "hdfs://node1:8020");
//  config.set("yarn.resourcemanager.hostname", "node1");config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");try {FileSystem fs =FileSystem.get(config);Job job =Job.getInstance(config);job.setJarByClass(RunJob.class);job.setJobName("dedup");job.setMapperClass(DedupMapper.class);job.setReducerClass(DedupReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);FileInputFormat.addInputPath(job, new Path("/usr/input/"));Path outpath =new Path("/usr/output/dedup");if(fs.exists(outpath)){fs.delete(outpath, true);}FileOutputFormat.setOutputPath(job, outpath);boolean f= job.waitForCompletion(true);if(f){System.out.println("job completed!");}} catch (Exception e) {e.printStackTrace();}}
}

3、排序

对输入文件中的内容进行排序。

输入文件中的每行内容均为一个数字,即一个数据。

要求在输出中每行有两个间隔的数字,第二个数字代表原始数据,第一个数字代表原始数据的位次。

样例输入:

file1:

2

32

654

32

15

765

65223

file2:

5956

22

650

92

file3:

26

54

6

样例输出:

1 2

2 6

3 15

4 22

5 26

6 32

7 32

8 54

9 92

10 650

11 654

12 756

13 5956

14 65223

设计思路:

可以利用MapReduce过程中默认的排序,而不需要自己再实现排序。

重点:

1、待排序数据作为Map任务的key;

2、需要重写partition类,保证整体有序,具体做法是用输入数据的最大值除以系统partition数量的商作为分割数据的边界,即分割数据的边界为此商的1倍、2倍至numPartitions-1倍,这样就能保证执行完partition后是整体有序的。

3、Reduce获得<key, value-list>,根据value-list中元素的个数将输入的key作为value的输出次数。

package hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Sort
{public static class SortMapper extends Mapper<Object, Text, IntWritable, NullWritable>{private NullWritable nw = NullWritable.get();@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, IntWritable, NullWritable>.Context context)throws IOException, InterruptedException{context.write(new IntWritable(Integer.parseInt(value.toString().trim())), nw);}}public static class SortReducer extends Reducer<IntWritable, NullWritable, IntWritable, IntWritable>{private IntWritable counter = new IntWritable(1);@Overrideprotected void reduce(IntWritable key, Iterable<NullWritable> values,Reducer<IntWritable, NullWritable, IntWritable, IntWritable>.Context context)throws IOException, InterruptedException{for(NullWritable nw : values){context.write(counter, key);counter = new IntWritable(counter.get() + 1);}}}public static class SortPartitioner extends Partitioner<IntWritable, NullWritable>{//numPartitions equals with the number of reduce tasks@Overridepublic int getPartition(IntWritable key, NullWritable value, int numPartitions){int maxNumber = 65223;int bound = maxNumber/numPartitions;int keyNumber = key.get();for (int i = 0; i < numPartitions; ++i){if (keyNumber <= (i+1)*bound)return i;}return 0;}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Sort.class);job.setJobName("sort");job.setMapperClass(SortMapper.class);job.setReducerClass(SortReducer.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(NullWritable.class);job.setNumReduceTasks(5);job.setPartitionerClass(SortPartitioner.class);String inputFile = "/home/jinzhao/dataset/input";String outputFile = "/home/jinzhao/dataset/output";FileInputFormat.setInputPaths(job, new Path(inputFile));Path output = new Path(outputFile);FileSystem fs = FileSystem.get(conf);if (fs.exists(output))fs.delete(output, true);FileOutputFormat.setOutputPath(job, output);job.waitForCompletion(true);}
}

4、单表关联

样例输入:

file:

child parent

Tom Lucy

Tom Jack

Jone Lucy

Jone Jack

Lucy Mary

Lucy Ben

Jack Alice

Jack Jesse

Terry Alice

Terry Jesse

Philip Terry

Philip Alma

Mark Terry

Mark Alma

样例输出:

file:

grandchild grandparent

Tom Alice

Tom Jesse

Jone Alice

Jone Jesse

Tom Mary

Tom Ben

Jone Mary

Jone Ben

Philip Alice

philip Jesse

Mark Alice

Mark Jesse

package hadoop;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class stlink
{private static boolean flag = true;public static class stlinkMapper extends Mapper<Object, Text, Text, Text>{@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException{String[] names = value.toString().trim().split("\t");if (names[0].compareTo("child") != 0){             context.write(new Text(names[0]), new Text("parent:"+names[1]));context.write(new Text(names[1]), new Text("child:"+names[0]));}}}public static class stlinkReducer extends Reducer<Text, Text, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException{if (flag){context.write(new Text("grandchild"), new Text("grandparent"));flag = false;}List<String> children = new ArrayList<String>();List<String> parents = new ArrayList<String>();for(Text t : values){String[] kv = t.toString().split(":");if (kv[0].compareTo("child") == 0)children.add(kv[1]);elseparents.add(kv[1]);}for(String c : children)for(String p : parents)context.write(new Text(c), new Text(p));}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job stlinkJob = Job.getInstance(conf);stlinkJob.setJarByClass(stlink.class);stlinkJob.setJobName("single table link");stlinkJob.setMapperClass(stlinkMapper.class);stlinkJob.setReducerClass(stlinkReducer.class);stlinkJob.setOutputKeyClass(Text.class);stlinkJob.setOutputValueClass(Text.class);stlinkJob.setMapOutputKeyClass(Text.class);stlinkJob.setMapOutputValueClass(Text.class);Path input = new Path("/home/jinzhao/dataset/input");Path output = new Path("/home/jinzhao/dataset/output");FileInputFormat.setInputPaths(stlinkJob, input);FileSystem fs = FileSystem.get(conf);if (fs.exists(output))fs.delete(output, true);FileOutputFormat.setOutputPath(stlinkJob, output);stlinkJob.waitForCompletion(true);}
}

5、多表关联

样例输入:

factory:

factoryname addressed

Beijing Red Star 1

Shenzhen Thunder 3

Guangzhou Honda 2

Beijing Rising 1

Guangzhou Development Bank 2

Tencent 3

Bank of Beijing 1

address:

1 Beijing

2 Guangzhou

3 Shenzhen

4 Xian

package hadoop;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class mtlink
{private static boolean flag = true;public static class mtlinkMapper extends Mapper<Object, Text, Text, Text>{@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException{String str = value.toString();if (str.contains("factoryname") || str.contains("addressname"))return;String[] infos = str.trim().split(" ");if (infos[0].charAt(0) >= '0' && infos[0].charAt(0) <= '9')context.write(new Text(infos[0]), new Text("right:" + strCombine(infos, "right")));elsecontext.write(new Text(infos[infos.length - 1]), new Text("left:" + strCombine(infos, "left")));}private String strCombine(String[] strs, String direction){StringBuilder sb = new StringBuilder();if (direction.compareTo("right") == 0)for(int i = 1; i < strs.length; ++i)sb.append(strs[i] + " ");elsefor (int i = 0; i < strs.length - 1; ++i)sb.append(strs[i] + " ");return sb.toString().trim();}}public static class mtlinkReducer extends Reducer<Text, Text, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException{if (flag){context.write(new Text("factoryname"), new Text("adressname"));flag = false;}List<String> companies = new ArrayList<String>();String place = "huoxing";for (Text t : values){String[] kv = t.toString().trim().split(":");if (kv[0].compareTo("right") == 0)place = kv[1];elsecompanies.add(kv[1]);}for (String s : companies)context.write(new Text(s), new Text(place));}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job mtlinkJob = Job.getInstance(conf);mtlinkJob.setJarByClass(mtlink.class);mtlinkJob.setJobName("multiple tables link");mtlinkJob.setMapperClass(mtlinkMapper.class);mtlinkJob.setReducerClass(mtlinkReducer.class);mtlinkJob.setOutputKeyClass(Text.class);mtlinkJob.setOutputValueClass(Text.class);Path input = new Path("/home/jinzhao/dataset/input");Path output = new Path("/home/jinzhao/dataset/output");FileInputFormat.setInputPaths(mtlinkJob, input);FileSystem fs = FileSystem.get(conf);if (fs.exists(output))fs.delete(output, true);FileOutputFormat.setOutputPath(mtlinkJob, output);mtlinkJob.waitForCompletion(true);}
}

05 MapReduce应用案例01相关推荐

  1. 05 MapReduce应用案例02

    6.統計每個月份中,最高的三個溫度. 輸入格式:年月日 空格 時分秒 TAB 溫度 inputfile: 1949-10-01 14:21:02    34c 1949-10-02 14:01:02  ...

  2. 05 MapReduce应用案例03

    8.PageRank Page-rank源于Google,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度. Page-rank实现了将链接价值概念作为排名因素. 算法原理 – 入链 == ...

  3. hadoop之mapreduce教程+案例学习(二)

    第3章 MapReduce框架原理 目录 第3章 MapReduce框架原理 3.1 InputFormat数据输入 3.1.1 切片与MapTask并行度决定机制 3.1.2 Job提交流程源码和切 ...

  4. 大数据 - MapReduce编程案例 -BH3

    MapReduce编程案例 用mapreduce解决问题的关键是确定key,只有key相同的结果才会到同一个reduce中进行处理 默认分区使用HashPartitoner,hashCode%redu ...

  5. MapReduce经典案例总结

    MapReduce经典案例总结 首先得搭好hadoop环境,windows搭好单机环境 1.根据手机号统计上网流量,统计上行流量.下行流量和总流量 数据如下,文件 flow.log,上传到hadoop ...

  6. hadoop之mapreduce教程+案例学习(一)

    第1章 MapReduce概述 目录 第1章 MapReduce概述 1.1 MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发"基于Hadoop的数据分析 ...

  7. 【赵强老师】MapReduce编程案例之求工资总额

    先看视频. [赵强老师]MapReduce编程案例之求工资总额 Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上, ...

  8. MapReducer——MapReduce编程案例:求部门的工资总额(2)

    MapReduce编程案例:求部门的工资总额 1.员工表  SQL:select deptno,sum(sal) from emp group by deptno; 2.分析数据处理的过程 3.开发程 ...

  9. MapReduce基本案例

    MapReduce基本案例 案例1. 单词统计 案例2.序列化重写 案例1. 单词统计 对文件里的单词进行计数 输入数据 ss ss cls cls jiao banzhang xue hadoop ...

最新文章

  1. Python 为什么用 # 号作注释符?
  2. Android应用程序进程启动过程的源代码分析(1)
  3. 第一个Hadoop程序——WordCount
  4. java保存私钥_如何在Java中使用密钥库来存储私钥?
  5. 01-python中字符串的常见操作
  6. 【转】css_重绘重排
  7. 散列表平均查找长度_如何查找链接列表的长度?
  8. 基于两输出一输入BP神经网络的传感器检测数据融合(附带MATLAB代码)
  9. 人工智能实践:tensorflow笔记
  10. 5、异步流(Asynchronous Flow)
  11. 形式语言与自动机 第4章 正规文法和正规集的性质
  12. GIC/ITS代码分析(1)MADT表
  13. android进入相机不显示缩略图,无法显示图片,缩略图可以
  14. discuz调用php,PHP调用discuz登陆框实现外部登录
  15. 终于有人把云原生数据库讲明白了
  16. 忙里偷闲( ˇˍˇ )闲里偷学【C语言篇】——(6)动态内存分配
  17. Swift实现自定义控件之---UISwitch
  18. 能力培养——学习的10种好方法
  19. 电脑有网络,QQ却登录不上?
  20. 设计_01_带你理解23种设计模式

热门文章

  1. socket编程常见函数使用方法
  2. 向量余弦值python_向量/矩阵的余弦值打印(元素明智的操作) 使用Python的线性代数
  3. python 忽略 异常_如何忽略Python中的异常?
  4. strictmath_Java StrictMath ceil()方法与示例
  5. Java类class isAssignableFrom()方法及示例
  6. 考研C++必刷题(一)
  7. 42. 接雨水 golang
  8. Qt实现对json文件的解析
  9. 正则表达式特别需要注意的点:“空“字符的匹配
  10. open函数和errno全局变量