MapReduce程序之Index串联案例
Index串联案例
现有三个文件 a.html b.html c.html ,里面分别记录着单词,如下图所示,现需要计算出 每个单词在每个文件出现的次数,格式如下: Hello a.html-4 b.html-8 c.html-10
a.html
hello tom
hello jim
hello kitty
hello rose
b.html
hello jerry
hello jim
hello kitty
hello jack
c.html
hello jerry
hello java
hello c++
hello c++
hello hello
需求分析:
需要 Hello a.html-4 b.html-8 c.html-10 很明显最后,输出的单词作为key,value是一个字符串 这个字符串是拼接而成,且涉及到文件名和各自的聚合个数
我们能够轻而易举的计算出来的是 每个单词 出现的个数 如:Hello 10 java 20 ...
很明显 一个mapreduce是计算不出来的,所以我们采用两个mapreduce程序串联的写法进行计算
第一个mapredue
1)map:以单词-文件名作为key 个数作为value输出 到reduce
2)reduce:聚合求出
Hello-a.html 10
Hello-b.html 20
Hello-c.html 30
第二个mapreduce
1)map: 读取第一个mapreduce输出的文件 进行切割 Hello a.html 10 以Hello为key 以a.html-10拼接为value输出给reduce
2)reduce :拿到的数据格式应为: Hello(a.html-10,b.html-20,c.html-30)循环迭代器 拼接完成最终结果输出
第一个MapReduce程序
文件名的获取关键:重写父类的setup方法
String fileName = null;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {FileSplit fs = (FileSplit) context.getInputSplit();fileName = fs.getPath().getName();}
如下图:在进行循环读取文件数据之前,先执行的setup方法,所以同一个maptask中 fileName都是同一个
mapreduce程序:
package cn.doit19.hadoop.review.index;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author:tom* @Date:Created in 19:56 2020/11/17*/
@SuppressWarnings("all")
public class Index1 {//Index案例 串联案例static class Index1Map extends Mapper<LongWritable, Text, Text, IntWritable> {Text k = new Text();String fileName = null;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {FileSplit fs = (FileSplit) context.getInputSplit();fileName = fs.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {try {//读一行执行一次String line = value.toString();//line的内容//hello tom//hello jim//hello kitty//hello roseString[] words = line.split("\\s+");//{hello,tom }for (String word : words) {//拼接单词+文件名k.set(word + "-" + fileName);//输出结果到reducecontext.write(k, new IntWritable(1));}} catch (Exception e) {e.printStackTrace();}}}static class Index1Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {try {// key + 迭代器 内容如下// Hello-a.html (1,1,1,1,1) Hello-b.html(1,1,1,1,1)int count = 0;for (IntWritable value : values) {count++;}context.write(key, new IntWritable(count));} catch (Exception e) {e.printStackTrace();}}}public static void main(String[] args) throws Exception {//初始化配置对象Configuration conf = new Configuration();//创建job对象Job job = Job.getInstance(conf);//设置map task 类job.setMapperClass(Index1Map.class);//设置reduce task 类job.setReducerClass(Index1Reducer.class);//设置map输出类型 kvjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//设置reduce 最终输出类型 kvjob.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//设置reduce 数量
// job.setNumReduceTasks(2);//设置输入路径FileInputFormat.setInputPaths(job, new Path("E:\\MR\\index"));//设置输出路径FileOutputFormat.setOutputPath(job, new Path("E:\\MR\\In\\index1"));//提交任务boolean s = job.waitForCompletion(true);}
}
mapreduce程序输出结果:
第二个MapReduce程序
package cn.doit19.hadoop.review.index;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author:tom* @Date:Created in 21:03 2020/11/17*/
@SuppressWarnings("all")
public class Index2 {static class Index2Map extends Mapper<LongWritable, Text, Text, Text> {Text k = new Text();Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {try {//读一行执行一次String line = value.toString();//line的内容// Hello-a.html 10 Hello-b.html 20String[] words = line.split("\\s+");//{Hello-a.html,10 }String[] split = words[0].split("-");k.set(split[0]);//拼接valuev.set(split[1] + "-" + words[1]);//输出结果到reducecontext.write(k, v);} catch (Exception e) {e.printStackTrace();}}}static class Index2Reducer extends Reducer<Text, Text, Text, Text> {Text v = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {StringBuilder sb = new StringBuilder();for (Text value : values) {sb.append(value + " ");}v.set(sb.toString().trim());context.write(key, v);}}public static void main(String[] args) throws Exception {//初始化配置对象Configuration conf = new Configuration();//创建job对象Job job = Job.getInstance(conf);//设置map task 类job.setMapperClass(Index2.Index2Map.class);//设置reduce task 类job.setReducerClass(Index2.Index2Reducer.class);//设置map输出类型 kvjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//设置reduce 最终输出类型 kvjob.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//设置reduce 数量
// job.setNumReduceTasks(2);//设置输入路径FileInputFormat.setInputPaths(job, new Path("E:\\MR\\In\\index1"));//设置输出路径FileOutputFormat.setOutputPath(job, new Path("E:\\MR\\out\\index2"));//提交任务boolean s = job.waitForCompletion(true);}}
输出结果:
更多学习、面试资料尽在微信公众号:Hadoop大数据开发
MapReduce程序之Index串联案例相关推荐
- 超详细MapReduce程序实现WordCount案例
一.案例准备 1.首先在本地创建两个文件,即文件A和文件B touch A B 2.在文件A和文件B中分别添加以下内容 A: China is my motherland I love China B ...
- MapReduce程序之序列化原理与Writable案例
[TOC] MapReduce程序之序列化原理与Writable案例 前言 在编写MapReduce程序时,我们会发现,对于MapReduce的输入输出数据(key-value),我们只能使用Hado ...
- mapreduce程序本地运行,单词统计案例
mapreduce程序本地运行单词统计案例,输入输出数据放在本地 集群模式运行:https://blog.csdn.net/weixin_43614067/article/details/108400 ...
- hadoop的python框架指南_Python之——用Mrjob框架编写Hadoop MapReduce程序(基于Hadoop 2.5.2)...
转载请注明出处:http://blog.csdn.net/l1028386804/article/details/79056120 一.环境准备想了解如何使用原生Python编写MapReduce程序 ...
- 从零开始学习Hadoop--第2章 第一个MapReduce程序
1.Hadoop从头说 1.1 Google是一家做搜索的公司 做搜索是技术难度很高的活.首先要存储很多的数据,要把全球的大部分网页都抓下来,可想而知存储量有多大.然后,要能快速检索网页,用户输入几个 ...
- MapReduce综合学习含Wordcount案例
文章目录 MapReduce简介 MapTask ReduceTask Mapper阶段解读 Reducer阶段解读 MapReduce适用的问题 MapReduce的特点 MapReduce基本思想 ...
- 大数据日志分析项目mapreduce程序
总体思路: 使用flume将服务器上的日志传到hadoop上面,然后使用mapreduce程序完成数据清洗,统计pv,visit模型.最后使用azkaban定时执行程序. 用户每次登录根据sessio ...
- 大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整案例
1 大数据处理的常用方法 大数据处理目前比较流行的是两种方法,一种是离线处理,一种是在线处理,基本处理架构如下: 在互联网应用中,不管是哪一种处理方式,其基本的数据来源都是日志数据,例如对于web应用 ...
- 婚恋交友平台小程序制作开发代码案例
这个是目前比较常见的婚恋交友平台小程序制作开发代码案例解析,很多功能大家都可以参考借鉴,比如关注功能,会员付费功能,权限设置等功能. 上几张图片看看效果 首页部分 class DiaryControl ...
- Hadoop详解(三)——MapReduce原理和执行过程,远程Debug,Writable序列化接口,MapReduce程序编写
MapReduce概述 MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题. MR由两个阶段组成:Map和Reduce,用户只需要实现map()和Re ...
最新文章
- Python 类函数迭代器
- spring项目属性注入和bean管理xml 注入一般属性和集合属性
- android 获取元素的下标_Appium中定位方式by_android_uiautomator
- Mybatis中使用Dao实现类实现增删改查【实际开发中使用代理dao】
- 前端学习(2831):小程序事件绑定
- 一步步编写操作系统 30 cpu的分支预测简介
- c# 构造sql语句
- CentOS 6.6系统安装
- 【恋上数据结构】计数排序
- 2021-04-23 商业文章版权协议分类
- 错误 对‘pcl::console::print(pcl::console::VERBOSITY_LEVEL, char const*, ...)’未定义的引用
- LA 2218 Triathlon(半平面交)
- SD卡无法格式化怎么办?恢复SD卡这样做
- 第十四届恩智浦智能车室外电磁比赛总结
- Spring Boot 2.x基础教程:进程内缓存的使用与Cache注解详解
- JPEG压缩原理详解
- 浅析Tier和Layer的区别
- 创建oracle数据库到达梦数据库的dblink
- 昆石VOS2009/VOS3000 2.1.6.00 操作指南
- 从荣耀MagicV看折叠屏手机的现状及未来发展