MapperReducer
用java代码实现wordcount
新建Java类MRMapper
package MR_wc;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MRMapper extends Mapper<LongWritable,Text,Text,IntWritable> {IntWritable ONE =new IntWritable(1);@Overrideprotected void setup(Context context) throws IOException, InterruptedException {/*super.setup(context);*/System.out.println("-----------Mapper.setup--------------");}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {/*super.map(key, value, context);*/System.out.println("-----------Mapper.map--------------");// 获取内容,转为小写final String line =value.toString().toLowerCase();// 按照分隔符进行拆分final String[] splits = line.split(" ");//输出for (String word : splits){context.write(new Text(word),ONE);}}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {/*super.cleanup(context);*/System.out.println("-----------Mapper.cleanup--------------");}
}
新建Java类MRReducer
package MR_wc;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MRReducer extends Reducer<Text,IntWritable,Text,IntWritable>{@Overrideprotected void setup(Context context) throws IOException, InterruptedException {System.out.println("-----------Reducer.setup--------------");}@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {System.out.println("-----------Reducer.reduce--------------");//初始值 countint count=0;for (IntWritable value : values){count += value.get();}context.write(key,new IntWritable(count));}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {System.out.println("-----------Reducer.cleanup--------------");}
}
新建Java类MRDriver(连接MRMapper和MRReducer)
package MR_wc;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MRDrive {public static void main(String[] args) throws Exception {String input = "data/wc";String output = "out_mr_wc";final Configuration configuration=new Configuration();//1.获取job对象final Job job =Job.getInstance(configuration);//1.5 删除输出文件夹FileUtils.deleteTarget(configuration,output);//2.设置classjob.setJarByClass(MRDrive.class);//3.设置 Mapper 和 Reducerjob.setMapperClass(MRMapper.class);job.setReducerClass(MRReducer.class);// 4.设置 Mapper 阶段输出数据的 kv 类型,对应就是MRMapper 的第三 四个参数的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//5.设置 Reducer 阶段输出数据的 kv 类型,对应就是MRReducer 的第三 四个参数的类型job.setOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//设置combinerjob.setCombinerClass(MRReducer.class);//6. 设置输入输出的路径FileInputFormat.setInputPaths(job,new Path(input));FileOutputFormat.setOutputPath(job,new Path(output));//7.提交jobfinal boolean result = job.waitForCompletion(true);System.exit(result ? 0:1);}}
新建Java类FileUtils(删除输出文件夹)
package MR_wc;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;public class FileUtils {public static void deleteTarget(Configuration configuration, String output) throws Exception{final FileSystem fileSystem = FileSystem.get(configuration);final Path path =new Path(output);if(fileSystem.exists(path)){fileSystem.delete(path,true);}}}
运行结果
hadoop解决数据倾斜的方法。
什么是数据倾斜?
比如一个文件, a b c
a 1亿个 b 1个 c 1个
1.combiner
a 1亿个 (a,1亿)
b 1个 (b,1)
c 1个 (c,1)
减少数据的网络传输
但是avg不适合,如果导致数据倾斜的key分布在很多不同的文件,不同mapper,这种方法就不适合了。
100mapper每 个mapper里1万个a
2.导致数据倾斜的key分布在很多不同的文件的时候
2.1 局部聚合+全局聚合
第一次map:对于导致数据倾斜的key,加上一个随机数前缀
1_a,2_a,3_a...
这样的话,本来相同的key是不是也会被分到多个reducer中进行局部聚合
第二次map:去掉key的随机前缀,进行全局聚合
思想:两次MR,第一.次将key|随机散列到不同的reducer进行处理,达到负载均衡的目的
第二次再根据去掉key的随机前缀,按照原本的key进行reduce处理
2.2 增加reducr数,提高并行度
job.setNumReduceTasks(int);
比如本来a,b,c只有三个reduce,现在我将reduce数改为30个,是不是也相当于处理a的reducer就多了
job.setNumReduceTasks(0);
设置成0,reducer就不输出了,直接将map的结果写在输出文件中
2.3 实现自定义分区
partitioner:按照某种规则(可以自定义)对map输出的数据进行分区操作
默认的是HashPartitioner job.setPartitionerClass (HashPartitioner.class) ;
顺序:map => partitioner => reduce
根据数据分布情况,自定义散列函数,将key均匀分配到不同的reducer
shuffle 优化
1. map端
1.1 减少输入的文件个数,对小文件进行合并
1.2 combiner
2. I/O
数据传输时进行压缩
3. reduce端
3.1 设置map reduce共存,mapred-site.xml
mapreduce.job.reduce.slowstart.completedmaps 默认0.05
也就是说map在执行到5%的时候开始为reduce进行申请资源,开始执行reduce操作
3.2 尽量少用reduce
reduce会产生大量的网络消耗,但是该用还是要用
3.3 增加reducer,提高并行度
4.整体
4.1 合理的设置map数和reduce数
4.2 mr on yarn 进行参数调优 container
4.3 加配置(机器,内存等)
一些其他面试题
1.是否可以只要map没有reduce?
可以
2.是否可以只有reduce没有map?
可以
作业
1.combiner不适合做avg,为什么?
用combiner做avg计算可能会得到错误的结果。
例如,计算1950年一段时期的平均温度:
map(1950,0,20,10) map(1950,25,15)
正常: reduce(O, 20, 10,25, 15) =14
利用组合函数combiner (avg) : reduce (avg ( map(0,20,10)),avg ( map(25,15)) ) =》avg (10,20)=15, 结果就不对了!所以平均值就不适用combiner了.
2.有哪几种压缩方式?分别的优缺点和适用场景。
(1)Gzip
优点
压缩解压速度快 , 压缩率高 , hadoop本身支持
处理压缩文件时方便 , 和处理文本一样
大部分linux 系统自带 Gzip 命令 , 使用方便
缺点
不支持切片
使用场景
文件压缩后在130M以内 (一个块大小) , 都可以使用 GZip 压缩(因为Gzip唯一的缺点是不能切片)
总结 : 不需要切片的情况下 可以使用
(2)BZip2
优点
压缩率高(高于Gzip)
可以切片
hadoop自带 使用方便
缺点
压缩解压速度超级慢
使用场景
不要求压缩速率 ,但是对压缩率有要求的情况下 比如备份历史记录 , 备份文件
或者 输出的文件较大 , 处理后的数据需要减少磁盘存储并且以后使用数据的情况较少 (解压 / 压缩的情况较少)
对于单个文件 较大 ,又想压缩减少磁盘空间 , 并且兼容之前的应用程序的情况
总结 : 对于压缩解压速度没有要求的情况下
(3)Lzo
优点
压缩解压速度比较快 , 压缩率也可以
支持切片 是hadoop 比较流行的压缩格式
可以在linux 下安装 lzo命令 使用方便
缺点
压缩率比Gzip低一些
hadoop 本身不支持, 需要自己安装
使用Lzo 格式的文件时需要做一些特殊处理(为了支持 Split 需要建立索引 , 还需要家将 InputFormat 指定为Lzo 格式 [特殊]
使用场景
压缩以后还大于 200M 的文件 , 且文件越大 Lzo 的优势越明显
(原因很简单 , 四种压缩方式 只有BZip2 , Gzip 支持切片 , 然后 BZip2 你懂的 , 速度贼慢 , 只能用于特定的场景, 所以 Lzo 是比较经常用的 )
总结 : 压缩后文件还是比较大 需要切片的情况下 推荐使用
(4)Snappy
优点
高压缩解压速度 , 压缩率还可以
缺点
不能切片
压缩率比Gzip小
hadoop本身不支持 需要安装
使用场景
当Mapeduce的Map阶段输出的数据比较大的时候 , 作为Map到Reduce的中间数据的压缩格式
作为一个MapReduce作业的输出和另一个MapReduce的输入
总结 : 因为 压缩率不怎么样 还不能切片 , 所以在一般的作为输入文件压缩时可以用 GZip 和 Lzo 都比Snappy
3.yarn参数调优。
下载
wget http://public-repo-1.hortonworks.com/HDP/tools/2.6.0.3/hdp_manual_install_rpm_helper_files-2.6.0.3.8.tar.gz
解压
tar -zxvf hdp_manual_install_rpm_helper_files-2.6.0.3.8.tar.gz
4.打包运行在服务器,可以参考官网的例子。
5.map数和reduce数由什么决定?
map数量由splitSize来决定的,reduce数量,由partition决定。
MapperReducer相关推荐
- mapper-reducer word count 实例
统计一个文件里各单词的个数,假设这个文件很大. 原理如下图: 编写代码: WCMapper.java package zengmg.hadoop.mr.wordcount;import java.io ...
- 通过MapReduce统计每个单子在每个文件中出现的次数(FileSplit的使用),单词作为key,所在文本和次数作为value进行统计
代码如下: package cn.toto.bigdata.mr.index;import java.io.IOException;import org.apache.hadoop.conf.Conf ...
- 使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner
工程结构: 在整个案例过程中,代码如下: WordCountMapper的代码如下: package cn.toto.bigdata.mr.wc; import java.io.IOException ...
- Hadoop之MRjob入门
Hadoop之MRjob入门 一.mrjob的安装 Hadoop 的各 Python 框架对比 使用 mrjob 实现词组统计 **启动 Hadoop 集群** **启动 hadoop 集群:** 代 ...
- 快学Big Data -- Hadoop(十三)
Hadoop总结 谁说大象不能跳舞,大象能跳舞啊!!!!不过跳起来是笨重的...... 概述 Hadoop 是一个性能.可靠性.可扩展性.可管理性的软件,为以后的分布式打下了基础,接下来咱们好好的深刨 ...
- 大数据Hadoop之HDFS和MapReduce_02_01
Hadoop 主要由HDFS和MapReduce 引擎两部分组成.最底部是HDFS,它存储hadoop集群中所有存储节点上的文件.HDFS 的上一层是MapReduce 引擎,该引擎由JobTrack ...
- 基于虚拟化的混合云集群——基于集群管理监控系统
摘要 本博文将详细的介绍本人在实现过程中的所经历的项目,同时对象将进行的详细的说明.将对项目的背景,项目的技术,项目的架构设计,项目的难点,项目的开发周期,项目中详细的功能的开发,项目中本人的主要工作 ...
- 视频教程-新版全面系统完整的电信客服综合案例教程-大数据
新版全面系统完整的电信客服综合案例教程 张长志技术全才.擅长领域:区块链.大数据.Java等.10余年软件研发及企业培训经验,曾为多家大型企业提供企业内训如中石化,中国联通,中国移动等知名企业.拥有丰 ...
- MapReduce内部shuffle过程详解(Combiner的使用)
Maptask调用一个组件FileInputFormat FileInputFormat有一个最高层的接口 --> InputFormat 我们不需要去写自己的实现类,使用的就是内部默认的组件: ...
最新文章
- AWS re-Invent最新发布AI产品解析:场景为王
- SocketAPI,CAsyncSocket,CSocket内幕及其用法
- mysql慢查询分析
- 织梦php远程连接数据库,用PHP连接Oracle for NT 远程数据库
- 高橋君とカード / Tak and Cards(AtCoder-2037)
- VS2019调试查看变量_单片机编程软件一点通,IAR单片机编程软件工程调试方法
- 事件触发控制_基于事件触发机制的直流微电网多混合储能系统分层协调控制方法...
- 【体系结构】buffer cache的个人理解
- 检测编码并制作一切UTF-8
- 中间件配置文件-redis
- 关于哈希表,你该了解这些!
- JNI基础:JNI数据类型和类型描述符
- 系统分析员应具备的能力
- 浅谈混频器之镜像频率
- 2021-10-13爬虫requests总结
- Liunx学习笔记 - 07 - 02 正则表达式与文件格式化处理
- 最快下载微软必应Bing搜索背景图片的方法
- SAP寄售业务会计凭证的分配码原理分析
- @技术文章为什么要写得简练
- 服务器虚拟化课程总结,虚拟化与云计算课程总结报告