四、MapReduce API介绍

  • 一般MapReduce都是由Mapper, Reducer 及main 函数组成。
  • Mapper程序一般完成键值对映射操作;
  • Reducer 程序一般完成键值对聚合操作;
  • Main函数则负责组装Mapper,Reducer及必要的配置;
  • 高阶编程还涉及到设置输入输出文件格式、设置Combiner、Partitioner优化程序等;

4.1、MapReduce程序模块 : Main 函数

4.2、MapReduce程序模块: Mapper

  • org.apache.hadoop.mapreduce.Mapper

4.3、MapReduce程序模块: Reducer

  • org.apache.hadoop.mapreduce.Reducer

五、MapReduce实例

5.1、流程(Mapper、Reducer、Main、打包运行)

  1. 参考WordCount程序,修改Mapper;
  2. 直接复制 Reducer程序;
  3. 直接复制Main函数,并做相应修改;
  4. 编译打包 ;
  5. 上传Jar包;
  6. 上传数据;
  7. 运行程序;
  8. 查看运行结果;

5.2、实例1:按日期访问统计次数:

1、参考WordCount程序,修改Mapper;
(这里新建一个java程序,然后把下面(1、2、3步代码)复制到类里)

    public static class SpiltMapperextends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();//value: email_address | datepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] data = value.toString().split("\\|",-1);  //word.set(data[1]);   //context.write(word, one);}}

2、直接复制 Reducer程序;

    public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}

3、直接复制Main函数,并做相应修改;

public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "word count");job.setJarByClass(CountByDate.class);   //我们的主类是CountByDatejob.setMapperClass(SpiltMapper.class);  //mapper:我们修改为SpiltMapperjob.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}

4、编译打包 (jar打包)



build出现错误及解决办法:


完成

5/6、上传jar包&数据
email_log_with_date.txt数据包链接:https://pan.baidu.com/s/1HfwHCfmvVdQpuL-MPtpAng
提取码:cgnb

上传数据包(注意开启hdfs):

上传OK(浏览器:master:50070查看)

7、运行程序
(注意开启yarn)

上传完成后:

(master:8088)


8、查看结果
(master:50070)


5.3、实例2:按用户访问次数排序

Mapper、Reducer、Main程序
SortByCountFirst.Mapper

package demo;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;public class SortByCountFirst {//1、修改Mapperpublic static class SpiltMapperextends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();//value: email_address | datepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] data = value.toString().split("\\|",-1);word.set(data[0]);context.write(word, one);}}//2、直接复制 Reducer程序,不用修改public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}//3、直接复制Main函数,并做相应修改;public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: demo.SortByCountFirst <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "sort by count first ");job.setJarByClass(SortByCountFirst.class);   //我们的主类是CountByDatejob.setMapperClass(SpiltMapper.class);  //mapper:我们修改为SpiltMapperjob.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

SortByCountSecond.Mapper

package demo;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;public class SortByCountSecond {//1、修改Mapperpublic static class SpiltMapperextends Mapper<Object, Text, IntWritable, Text> {private IntWritable count = new IntWritable(1);private Text word = new Text();//value: email_address \t countpublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] data = value.toString().split("\t",-1);word.set(data[0]);count.set(Integer.parseInt(data[1]));context.write(count,word);}}//2、直接复制 Reducer程序,不用修改public static class ReverseReducerextends Reducer<IntWritable,Text,Text,IntWritable> {public void reduce(IntWritable key, Iterable<Text> values,Context context) throws IOException, InterruptedException {for (Text val : values) {context.write(val,key);}}}//3、直接复制Main函数,并做相应修改;public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: demo.SortByCountFirst <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "sort by count first ");job.setJarByClass(SortByCountSecond.class);   //我们的主类是CountByDatejob.setMapperClass(SpiltMapper.class);  //mapper:我们修改为SpiltMapper
//        job.setCombinerClass(IntSumReducer.class);job.setReducerClass(ReverseReducer.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

然后打包上传

yarn jar sortbycount.jar demo.SortByCountSecond -Dmapreduce.job.queuename=prod email_log_with_date.txt sortbycountfirst_output00
yarn jar sortbycount.jar demo.SortByCountSecond -Dmapreduce.job.queuename=prod email_log_with_date.txt sortbycountfirst_output00 sortbycountsecond_output00

学习笔记Hadoop(十四)—— MapReduce开发入门(2)—— MapReduce API介绍、MapReduce实例相关推荐

  1. Spring Security技术栈学习笔记(十四)使用Spring Social集成QQ登录验证方式

    上一篇文章<Spring Security技术栈开发企业级认证与授权(十三)Spring Social集成第三方登录验证开发流程介绍>主要是介绍了OAuth2协议的基本内容以及Spring ...

  2. 深度学习笔记(十四)—— 超参数优化[Hyperparameter Optimization]

      这是深度学习笔记第十四篇,完整的笔记目录可以点击这里查看.      训练神经网络会涉及到许多超参数设置.神经网络中最常见的超参数包括: the initial learning rate lea ...

  3. Windows保护模式学习笔记(十四)—— 阶段测试

    Windows保护模式学习笔记(十四)-- 阶段测试 题目一 解题步骤 题目二 解题步骤 题目一 描述:给定一个线性地址,和长度,读取内容 int ReadMemory(OUT BYTE* buffe ...

  4. OpenCV学习笔记(十四):重映射:remap( )

    OpenCV学习笔记(十四):重映射:remap( ) 图像的坐标映射是通过原图像与目标图像之间建立一种映射关系,这种映射关系有两种,一种是计算原图像任意像素在映射后图像的坐标位置,另一种是计算变换后 ...

  5. QT学习笔记(十四):QLayout的属性介绍

    QT学习笔记(十四):QLayout的属性介绍 主要包括QBoxLayout.和QGridLayout以及QFormLayout等的参数类似. 我主要说明一下QGridLayout在QtDesigne ...

  6. MATLAB学习笔记(十四)

    MATLAB学习笔记(十四) 一.线性方程组求解 1.1 直接法 1.1.1 利用左除运算符 1.1.2 利用矩阵分解 1.2 迭代法 1.2.1 雅可比(Jacobi)迭代法 1.2.2 高斯-赛德 ...

  7. python数据挖掘学习笔记】十四.Scipy调用curve_fit实现曲线拟合

    #2018-03-28 10:02:08 March Wednesday the 13 week, the 087 day SZ SSMR python数据挖掘学习笔记]十四.Scipy调用curve ...

  8. Cty的Linux学习笔记(十四)

    Linux学习笔记--第十四篇 环境变量配置文件: /etc/profile:预设了几个重要的变量,例如PATH,USER,LOGNAME,MAIL,INPUTRC,HOSTNAME,HISTSIZE ...

  9. Slicer学习笔记(十四)vtkMRMLVolumeNode 与 vtkImageData

    Slicer学习笔记(十四)vtkMRMLVolumeNode 与 vtkImageData 1.Adding in MRML to Slicer 2.MRML Scene 3.MRML Nodes ...

  10. javaSE学习笔记——第十四天正则表达式、Math类、System类、Data类、Calendar类等

    javaSE学习第十四天 java知识 正则表达式的概述和简单使用 A:正则表达式 是指一个用来描述或者匹配一系列符合某个语法规则的字符串的单个字符串.其实就是一种规则.有自己特殊的应用. 作用:比如 ...

最新文章

  1. react循环key值_React源码揭秘(三):Diff算法详解
  2. 二十一、Hadoop学记笔记————kafka的初识
  3. 关于CSS3圆角矩形的一些学习探讨
  4. 单片微型计算机系统应用和开发特点,单片微机原理与应用(第2版)
  5. jquery 判断点击次数_jquery编程开发实现点击页面计算点击次数
  6. E: Sub-process /usr/bin/dpkg returned an error code (1) Ubuntu安装apt-get命令报错
  7. .NET文件上传的大小限制配置
  8. bash脚本编程之条件判断、条件测试
  9. 国内交易平台关闭了,教你如何把Zcash(zec)兑换成人民币
  10. 华为手机怎么使用新系统鸿蒙,华为手机鸿蒙系统如何退回EMU
  11. SPSS中的数据分析—信度效度检验【1】
  12. python 爬取学信网登录页面
  13. 华为服务器修改snmp用户,华为S5700交换机Telnet、SSH、SNMP基础远程管理配置
  14. ElasticStack搭建Java日志收集分析,并将其构建docker镜像-学习笔记
  15. 数字化时代,全方位解读商业智能BI
  16. linux内核熵池,快速的给内核熵池喂随机数
  17. IMWeb提升营Day5
  18. 【博客汇总】UV打印机常见问题汇总(更新中...)
  19. Mirai源码分析报告 (1)
  20. 牛客刷题错题(一)——测试知识

热门文章

  1. 系统试运行报告是谁写的_最新标准:水污染源在线监测系统(CODCr、NH3N 等)安装技术规范(1)...
  2. 关联规则(Association Rules)笔记
  3. 数学建模大赛赛题解析:Mathorcup高校数学建模挑战赛-环形穿梭车的设计与调度
  4. 机器学习中的矩阵向量求导(二) 矩阵向量求导之定义法
  5. CNN(Convolutional Neural Networks)算法
  6. linux下面子目录绑定域名的方法,.htaccess绑定子域名到子目录方法
  7. tf_geometric的安装
  8. Matplotlib实例教程(七)密度图
  9. 微服务接口限流的设计与思考(附GitHub框架源码)
  10. lvs为何不能完全替代DNS轮询--转