用java代码实现wordcount

新建Java类MRMapper

package MR_wc;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MRMapper extends Mapper<LongWritable,Text,Text,IntWritable> {IntWritable ONE =new IntWritable(1);@Overrideprotected void setup(Context context) throws IOException, InterruptedException {/*super.setup(context);*/System.out.println("-----------Mapper.setup--------------");}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {/*super.map(key, value, context);*/System.out.println("-----------Mapper.map--------------");// 获取内容,转为小写final String line =value.toString().toLowerCase();// 按照分隔符进行拆分final String[] splits = line.split(" ");//输出for (String word : splits){context.write(new Text(word),ONE);}}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {/*super.cleanup(context);*/System.out.println("-----------Mapper.cleanup--------------");}
}

新建Java类MRReducer

package MR_wc;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MRReducer extends Reducer<Text,IntWritable,Text,IntWritable>{@Overrideprotected void setup(Context context) throws IOException, InterruptedException {System.out.println("-----------Reducer.setup--------------");}@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {System.out.println("-----------Reducer.reduce--------------");//初始值 countint count=0;for (IntWritable value : values){count += value.get();}context.write(key,new IntWritable(count));}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {System.out.println("-----------Reducer.cleanup--------------");}
}

新建Java类MRDriver(连接MRMapper和MRReducer)

package MR_wc;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MRDrive {public static void main(String[] args) throws Exception {String input = "data/wc";String output = "out_mr_wc";final Configuration configuration=new Configuration();//1.获取job对象final Job job =Job.getInstance(configuration);//1.5 删除输出文件夹FileUtils.deleteTarget(configuration,output);//2.设置classjob.setJarByClass(MRDrive.class);//3.设置 Mapper 和 Reducerjob.setMapperClass(MRMapper.class);job.setReducerClass(MRReducer.class);// 4.设置 Mapper 阶段输出数据的 kv 类型,对应就是MRMapper 的第三 四个参数的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//5.设置 Reducer 阶段输出数据的 kv 类型,对应就是MRReducer 的第三 四个参数的类型job.setOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//设置combinerjob.setCombinerClass(MRReducer.class);//6. 设置输入输出的路径FileInputFormat.setInputPaths(job,new Path(input));FileOutputFormat.setOutputPath(job,new Path(output));//7.提交jobfinal boolean result = job.waitForCompletion(true);System.exit(result ? 0:1);}}

新建Java类FileUtils(删除输出文件夹)

package MR_wc;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;public class FileUtils {public static  void deleteTarget(Configuration configuration, String output) throws Exception{final FileSystem fileSystem = FileSystem.get(configuration);final Path path =new Path(output);if(fileSystem.exists(path)){fileSystem.delete(path,true);}}}

运行结果

hadoop解决数据倾斜的方法。

什么是数据倾斜?

比如一个文件, a b c

a 1亿个  b 1个   c 1个

1.combiner

a  1亿个  (a,1亿)

b  1个        (b,1)

c  1个        (c,1)

减少数据的网络传输

但是avg不适合,如果导致数据倾斜的key分布在很多不同的文件,不同mapper,这种方法就不适合了。

100mapper每 个mapper里1万个a

2.导致数据倾斜的key分布在很多不同的文件的时候

2.1   局部聚合+全局聚合

第一次map:对于导致数据倾斜的key,加上一个随机数前缀

1_a,2_a,3_a...

这样的话,本来相同的key是不是也会被分到多个reducer中进行局部聚合

第二次map:去掉key的随机前缀,进行全局聚合

思想:两次MR,第一.次将key|随机散列到不同的reducer进行处理,达到负载均衡的目的

第二次再根据去掉key的随机前缀,按照原本的key进行reduce处理

2.2   增加reducr数,提高并行度

job.setNumReduceTasks(int);

比如本来a,b,c只有三个reduce,现在我将reduce数改为30个,是不是也相当于处理a的reducer就多了

job.setNumReduceTasks(0);

设置成0,reducer就不输出了,直接将map的结果写在输出文件中

2.3   实现自定义分区

partitioner:按照某种规则(可以自定义)对map输出的数据进行分区操作

默认的是HashPartitioner  job.setPartitionerClass (HashPartitioner.class) ;

顺序:map => partitioner => reduce

根据数据分布情况,自定义散列函数,将key均匀分配到不同的reducer

shuffle 优化

1.  map端

1.1  减少输入的文件个数,对小文件进行合并

1.2  combiner

2.  I/O

数据传输时进行压缩

3.  reduce端

3.1        设置map  reduce共存,mapred-site.xml

mapreduce.job.reduce.slowstart.completedmaps  默认0.05

也就是说map在执行到5%的时候开始为reduce进行申请资源,开始执行reduce操作

3.2        尽量少用reduce

reduce会产生大量的网络消耗,但是该用还是要用

3.3        增加reducer,提高并行度

4.整体

4.1        合理的设置map数和reduce数

4.2        mr on yarn        进行参数调优  container

4.3        加配置(机器,内存等)

一些其他面试题

1.是否可以只要map没有reduce?

可以

2.是否可以只有reduce没有map?

可以

作业

1.combiner不适合做avg,为什么?

用combiner做avg计算可能会得到错误的结果。

例如,计算1950年一段时期的平均温度:

map(1950,0,20,10) map(1950,25,15)

正常: reduce(O, 20, 10,25, 15) =14

利用组合函数combiner (avg) : reduce (avg ( map(0,20,10)),avg ( map(25,15)) ) =》avg (10,20)=15, 结果就不对了!所以平均值就不适用combiner了.

2.有哪几种压缩方式?分别的优缺点和适用场景。

(1)Gzip
优点
压缩解压速度快 , 压缩率高 , hadoop本身支持
处理压缩文件时方便 , 和处理文本一样
大部分linux 系统自带 Gzip 命令 , 使用方便
缺点
不支持切片
使用场景
文件压缩后在130M以内 (一个块大小) , 都可以使用 GZip 压缩(因为Gzip唯一的缺点是不能切片)
总结 : 不需要切片的情况下 可以使用

(2)BZip2
优点
压缩率高(高于Gzip)
可以切片
hadoop自带 使用方便
缺点
压缩解压速度超级慢
使用场景
不要求压缩速率 ,但是对压缩率有要求的情况下 比如备份历史记录 , 备份文件
或者 输出的文件较大 , 处理后的数据需要减少磁盘存储并且以后使用数据的情况较少 (解压 / 压缩的情况较少)
对于单个文件 较大 ,又想压缩减少磁盘空间 , 并且兼容之前的应用程序的情况
总结 : 对于压缩解压速度没有要求的情况下

(3)Lzo
优点
压缩解压速度比较快 , 压缩率也可以
支持切片 是hadoop 比较流行的压缩格式
可以在linux 下安装 lzo命令 使用方便
缺点
压缩率比Gzip低一些
hadoop 本身不支持, 需要自己安装
使用Lzo 格式的文件时需要做一些特殊处理(为了支持 Split 需要建立索引 , 还需要家将 InputFormat 指定为Lzo 格式 [特殊]
使用场景
压缩以后还大于 200M 的文件 , 且文件越大 Lzo 的优势越明显
(原因很简单 , 四种压缩方式 只有BZip2 , Gzip 支持切片 , 然后 BZip2 你懂的 , 速度贼慢 , 只能用于特定的场景, 所以 Lzo 是比较经常用的 )
总结 : 压缩后文件还是比较大 需要切片的情况下 推荐使用

(4)Snappy
优点
高压缩解压速度 , 压缩率还可以
缺点
不能切片
压缩率比Gzip小
hadoop本身不支持 需要安装
使用场景
当Mapeduce的Map阶段输出的数据比较大的时候 , 作为Map到Reduce的中间数据的压缩格式
作为一个MapReduce作业的输出和另一个MapReduce的输入
总结 : 因为 压缩率不怎么样 还不能切片 , 所以在一般的作为输入文件压缩时可以用 GZip 和 Lzo 都比Snappy

3.yarn参数调优。

下载

wget http://public-repo-1.hortonworks.com/HDP/tools/2.6.0.3/hdp_manual_install_rpm_helper_files-2.6.0.3.8.tar.gz

解压

tar -zxvf hdp_manual_install_rpm_helper_files-2.6.0.3.8.tar.gz

4.打包运行在服务器,可以参考官网的例子。

5.map数和reduce数由什么决定?

map数量由splitSize来决定的,reduce数量,由partition决定。

MapperReducer相关推荐

  1. mapper-reducer word count 实例

    统计一个文件里各单词的个数,假设这个文件很大. 原理如下图: 编写代码: WCMapper.java package zengmg.hadoop.mr.wordcount;import java.io ...

  2. 通过MapReduce统计每个单子在每个文件中出现的次数(FileSplit的使用),单词作为key,所在文本和次数作为value进行统计

    代码如下: package cn.toto.bigdata.mr.index;import java.io.IOException;import org.apache.hadoop.conf.Conf ...

  3. 使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner

    工程结构: 在整个案例过程中,代码如下: WordCountMapper的代码如下: package cn.toto.bigdata.mr.wc; import java.io.IOException ...

  4. Hadoop之MRjob入门

    Hadoop之MRjob入门 一.mrjob的安装 Hadoop 的各 Python 框架对比 使用 mrjob 实现词组统计 **启动 Hadoop 集群** **启动 hadoop 集群:** 代 ...

  5. 快学Big Data -- Hadoop(十三)

    Hadoop总结 谁说大象不能跳舞,大象能跳舞啊!!!!不过跳起来是笨重的...... 概述 Hadoop 是一个性能.可靠性.可扩展性.可管理性的软件,为以后的分布式打下了基础,接下来咱们好好的深刨 ...

  6. 大数据Hadoop之HDFS和MapReduce_02_01

    Hadoop 主要由HDFS和MapReduce 引擎两部分组成.最底部是HDFS,它存储hadoop集群中所有存储节点上的文件.HDFS 的上一层是MapReduce 引擎,该引擎由JobTrack ...

  7. 基于虚拟化的混合云集群——基于集群管理监控系统

    摘要 本博文将详细的介绍本人在实现过程中的所经历的项目,同时对象将进行的详细的说明.将对项目的背景,项目的技术,项目的架构设计,项目的难点,项目的开发周期,项目中详细的功能的开发,项目中本人的主要工作 ...

  8. 视频教程-新版全面系统完整的电信客服综合案例教程-大数据

    新版全面系统完整的电信客服综合案例教程 张长志技术全才.擅长领域:区块链.大数据.Java等.10余年软件研发及企业培训经验,曾为多家大型企业提供企业内训如中石化,中国联通,中国移动等知名企业.拥有丰 ...

  9. MapReduce内部shuffle过程详解(Combiner的使用)

    Maptask调用一个组件FileInputFormat FileInputFormat有一个最高层的接口 --> InputFormat 我们不需要去写自己的实现类,使用的就是内部默认的组件: ...

最新文章

  1. AWS re-Invent最新发布AI产品解析:场景为王
  2. SocketAPI,CAsyncSocket,CSocket内幕及其用法
  3. mysql慢查询分析
  4. 织梦php远程连接数据库,用PHP连接Oracle for NT 远程数据库
  5. 高橋君とカード / Tak and Cards(AtCoder-2037)
  6. VS2019调试查看变量_单片机编程软件一点通,IAR单片机编程软件工程调试方法
  7. 事件触发控制_基于事件触发机制的直流微电网多混合储能系统分层协调控制方法...
  8. 【体系结构】buffer cache的个人理解
  9. 检测编码并制作一切UTF-8
  10. 中间件配置文件-redis
  11. 关于哈希表,你该了解这些!
  12. JNI基础:JNI数据类型和类型描述符
  13. 系统分析员应具备的能力
  14. 浅谈混频器之镜像频率
  15. 2021-10-13爬虫requests总结
  16. Liunx学习笔记 - 07 - 02 正则表达式与文件格式化处理
  17. 最快下载微软必应Bing搜索背景图片的方法
  18. SAP寄售业务会计凭证的分配码原理分析
  19. @技术文章为什么要写得简练
  20. 服务器虚拟化课程总结,虚拟化与云计算课程总结报告

热门文章

  1. 什么时候跳槽,为什么离职,你想好了么?
  2. unbuntu 中部署jenkins
  3. JavaScript在线教程网址
  4. javascript-原生javascript实现类似节奏大师小游戏
  5. Win10免费升级 Win11 有时间限制
  6. poi操作word模板替换数据并且导出word
  7. android电量伪装原理,安卓电量伪装神器
  8. 网站域名https显示证书错误如何解决
  9. 第三周项目4 穷举法
  10. 华为力推自研AI芯片,还记得大明湖畔的寒武纪吗?