因Hadoop集群平台网络限制,只能在eclipse里先写好代码再提交jar到集群平台namenode上执行,不能实时调试,所以没有配置eclipse的hadoop开发环境,只是引入了hadoop的lib包。

eclipse的hadoop开发环境配置可参考:http://www.cnblogs.com/xia520pi/archive/2012/05/20/2510723.html

MapReduce的基础开发也是通过该博客系列学习到,很感谢!

1、数据去重
   在MapReduce流程中,map的输出<key,value>经过shuffle过程聚集成<key,value-list>后会交给reduce。 map将输入的value复制到输出数据的key上,并直接输出;
   经过shuffle,相同key形成<key,value-list>,作为reduce的输入;reduce将输入中的key复制到输出数据的key上,并直接输出。利用MapReduce对key的汇聚机制将重复的数据去掉。

1)在namenode系统tmp目录下新建输入文件file1.txt和file2.txt,上传到hadoop
         命令:hadoop fs -put /tmp/file1.txt /tmp/fjs/in
         注意:file1.txt和file2.txt中要有重复的行,可以看出去重效果
   2)编码DataDedup并导出mr.jar和上传到namenode系统tmp目录下
         注意:通过eclipse工程导出Runnable JAR file包含hadoop lib包,如通过Fatjar不要勾选one-JAR。

代码如下:

package com.data;import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class DataDedup {public static class Map extends Mapper<Object,Text,Text,Text>{private static Text line=new Text();//每行数据public void map(Object key,Text value,Context context) throws IOException,InterruptedException{line=value;context.write(line, new Text(""));}}public static class Reduce extends Reducer<Text,Text,Text,Text>{public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{context.write(key, new Text(""));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: datadedup <in> <out>");System.exit(2);}Job job = new Job(conf, "data dedup");job.setJarByClass(DataDedup.class);job.setNumReduceTasks(1);//设置reduce输入文件一个,方便查看结果,如设置为0就是不执行reduce,map就输出结果job.setMapperClass(Map.class);job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

3)执行jar并查看结果
         执行:yarn jar /tmp/mr.jar /tmp/fjs/in /tmp/fjs/out 
         注意:out目录不能先创建,hadoop会自己创建
         查看:hadoop fs -text /tmp/fjs/out/part-r-00000.bz2
        注意:这里执行的Hadoop集群平台reduce输出文件是bz2压缩
                   为方便观察结果,设置reduce数量为1,job.setNumReduceTasks(1)

2、数据排序
   MapReduce过程中按照key值排序的,如果key为封装int的IntWritable类型则按照数字大小对key排序,如果key为封装为String的Text类型,则按照字典顺序对字符串排序。
   在map中将读入的数据转化成IntWritable型,然后作为key值输出(value任意)。reduce拿到<key,value-list>后,将输入的key作为value输出,并根据value-list中元素的个数决定输出的次数。输出的key(即代码中的linenum)是一个全局变量,它统计当前key的位次。MapReduce过程中map和reduce即完成任务,不需要配置Combiner任务。
   1)在namenode系统tmp目录下新建输入文件file3.txt和file4.txt,上传到hadoop
        命令:hadoop fs -put /tmp/file3.txt /tmp/fjs/in1
        注意:file3.txt和file4.txt中每行输入整数,最好有重复;
   2)编码DataSort并导出mr.jar和上传到namenode系统tmp目录下
        注意:和DataDedup放在同一工程下,导出时,要选择DataSort作为主类

代码如下:

package com.data;import java.io.IOException;
import java.util.Iterator;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class DataSort {//map将输入中的value化成IntWritable类型,作为输出的keypublic static class Map extends Mapper<Object,Text,IntWritable,IntWritable>{private static IntWritable data=new IntWritable();public void map(Object key,Text value,Context context) throws IOException,InterruptedException{String line=value.toString();data.set(Integer.parseInt(line));context.write(data, new IntWritable(1));}}//reduce将输入中的key复制到输出数据的key上,//然后根据输入的value-list中元素的个数决定key的输出次数//用全局linenum来代表key的位次public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{private static IntWritable linenum = new IntWritable(1);public void reduce(IntWritable key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{Iterator<IntWritable> itVal=values.iterator();while(itVal.hasNext()){//for(IntWritable val:values){            context.write(linenum, key);linenum = new IntWritable(linenum.get()+1);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: datasort <in> <out>");System.exit(2);}Job job = new Job(conf, "data sort");job.setJarByClass(DataSort.class);job.setNumReduceTasks(1);//设置reduce输入文件一个,方便查看结果,如设置为0就是不执行reduce,map就输出结果job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

3)执行jar并查看结果
        执行:yarn jar /tmp/mr.jar /tmp/fjs/in1 /tmp/fjs/out1 
       查看:hadoop fs -text /tmp/fjs/out1/part-r-00000.bz2

3、总结:
   通过MapReduce开发之词汇统计和排序以及数据去重和排序,进一步理解Map-Reduce的计算框架,其代码处理流程通俗描述为:
   1)MapReduce对输入的文件分片并按照行提取<key,value>,作为Map的输入,其中key是每行的偏移量,value就是每一行的数据;
   2)MapReduce调用自定义的map函数来处理输入的<key,value>并以<key,value>输出加工后的数据;
   3)MapReduce开始shuffle处理map输出的<key,value>,形成<key,value-list>,汇集相同key的值为list;
   4)最后MapReduce调用自定义的reduce函数来处理输入的<key,value-list>,并以<key,value>输出结果;
   整个步骤就是:分片-map-汇聚-reduce,在框架之上定义自己的map和reduce函数来加工数据。

MapReduce基础开发之二数据去重和排序相关推荐

  1. MapReduce基础开发之十二ChainMapper和ChainReducer使用

    1.需求场景:    过滤无意义的单词后再进行文本词频统计.处理流程是: 1)第一个Map使用无意义单词数组过滤输入流: 2)第二个Map将过滤后的单词加上出现一次的标签: 3)最后Reduce输出词 ...

  2. MapReduce基础开发之六Map多输入

    在实际MapReduce开发中,会遇到需要数据多输入路径并有对应的Map函数来处理,这需要MultipleInputs.addInputPath(job, path, inputFormatClass ...

  3. MapReduce基础开发之四参数传递

    Map和Reduce函数是在各节点进行,如果要在MapReduce数据加工中使用共同参数,要如何传参呢?方法有二: 1.Configuration类的set和get的方法读取xml/txt文件设置或自 ...

  4. MapReduce基础开发之十读写ORC File

    1.ORC File Orc是Hive特有的一种列式存储的文件格式,它有着非常高的压缩比和读取效率,因此很快取代了之前的RCFile,成为Hive中非常常用的一种文件格式. 2.编译ORC Jar包 ...

  5. MapReduce基础开发之七Hive外部表分区

    MapReduce输出的数据在实际应用中会加载在不同子目录下,比如按照日期.地区等,然后统一到外部表,这就需要用到hive表的分区. 假设输出的数据放在/tmp/fjs/dpi父目录下,下面分别有三个 ...

  6. MapReduce基础开发之五分布式下载ftp文件到本地再迁移到hdfs

    为利用Hadoop集群平台的分布存储和计算能力,基于MapReduce将ftp文件分布式下载并上传到HDFS中. 1.文件移动流程:ftp服务器->datanode本地目录->HDFS目录 ...

  7. MapReduce基础开发之三字段处理并输出Hive表

    1.MR设计和开发    1)设计:      输入:用户名 | 数字ip | 时间戳 |  url      MR处理:正则表达式匹配url,满足则解析url并转换ip和时间戳,      输出:用 ...

  8. MapReduce基础开发之一词汇统计和排序(wordcount)

    统计/var/log/boot.log中含k的字符的数量,并对含k的字符按照数量排序.需分两个job完成,一个用来统计,一个用来排序. 一.统计 1.上传文件到hadoop:    1)新建文件夹:h ...

  9. MapReduce基础开发之十一DistributedCache使用

    1.需求场景:    过滤无意义的单词后再进行文本词频统计.处理流程是: 1)预定义要过滤的无意义单词保存成文件,保存到HDFS中: 2)程序中将该文件定位为作业的缓存文件,使用Distributed ...

最新文章

  1. 常用 CSS 选择器
  2. js数组中的splice()方法
  3. 非标准配置linux,剖析非标准波特率的设置和使用于Linux操作系统中
  4. OpenGL material light材质灯光的实例
  5. Oracle12c:安装后新建用户及其默认表空间,并创建表测试
  6. UWP开发细节记录:判断文件类型
  7. 简述java在安卓开发中的应用_Java 自定义注解在安卓开发中的简单运用
  8. python 实现的键盘记录器 小功能
  9. jquerymobile iscrollview
  10. Android系统信息获取 之八:WIFI设备和WIFI信号信息获取
  11. linux 提示libaio.so.1,解决Mysql报错缺少libaio.so.1
  12. 【无线链路】无线发射功率以及接收灵敏度
  13. oCPC实践录 | oCPC产品设计与出价原理(1)
  14. bigworld游戏服务器架构参考
  15. geoserver发布TIF格式瓦片地图
  16. 春日里有skycc营销软件相陪
  17. OSChina 周四乱弹 ——快速辨别妹子有无男友!
  18. PHP之MySQL笔记4
  19. 光纤仿真相关参数——光纤损耗、数值孔径、归一化参数
  20. 海思芯片部署MPP并验证功能

热门文章

  1. c语言 五子棋 悔棋代码,跪求C语言五子棋悔棋部分实现
  2. ubuntu14安装mysql5.6_ubuntu14.04安装mysql5.6.37
  3. js实现撤销恢复_我们常用的撤销和恢复功能,它们使用了什么设计模式实现吗?...
  4. VPS批量管理软件--远程桌面批量管理
  5. 好用的netcat工具
  6. Spring-配置bean的方法(工厂方法和Factorybean)【转】
  7. 在visual studio中使用git版本系统(zz)
  8. JAVA共通関数--文字列に空白を追加する
  9. 错误:由于系统启用了内核调试器,因此不可能进行调试解决方案
  10. 理解ROS话题---ROS学习第5篇