Apache Hadoop使用JAVA语言编程,对于使用java语言的程序员来说,这很友好。一般而言,一个hadoop程序主要分为两个部分:Map和Reduce。

我们可以观察一下其执行流:

Map →\to→ Combiner →\to→ Partitioner →\to→ Sort →\to→ Reduce

下面我们以一个例子来阐述wordcount的执行流如何发生。

假设一个输入文本如下:


ja dj nag
dkda ng nd
ddnjg  ndj

先不考虑mapreduce,若我们需要进行单词计数,本能的,我们希望将每个单词拎出来,一一进行计数。那么我们编程序,自然也希望先将文本中的单词一个个拆出来,然后再进行统计。

那什么是mapreduce呢?简单来说,将这个文本拆成n份(Map阶段,做拆分),然后让n个工人对这n份文本单独进行统计,最后将n个人的结果进行合计(Reduce阶段,进行统计),则得出这样的结果。

有了这样的认识,我们依次来看wordcount的代码:

Mapper: map程序是需要编程实现了。

  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {    private final static IntWritable one = new IntWritable(1);    private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {        StringTokenizer itr = new StringTokenizer(value.toString());        while (itr.hasMoreTokens()) {            word.set(itr.nextToken());            context.write(word, one);        }    }}

上述代码展示的就是WordCount中的Map阶段。首先,我们建立继承自Mapper类的TokenizerMapper类,尖括号中的四个类型分别表示:输入的Key,输入的Value,输出的Key,输出的Value,的类型。

在Hadoop中,输入以文件形式输入,一般输入的Key就是行号,故为Object类型。输入的Value为该行的内容,一般为文本,故为Text类型。

在wordcount中,我们希望中间结果应该统计好的,每小份文本中的单词数量,形如:


ja 1

dj 1

nag 1

……


因而,输入的结果的Key应该是单词,类型为Text,输出为正数,类型为IntWritable。

有人可能纳闷,这个程序和普通的java程序差别也不大嘛。

确实差别不大。仔细想想,一个人统计单词和n个人统计单词,统计单词的动作难道不应该是一样的嘛?所以思考这个程序的时候,我们应该要想象这个程序是运行在n个机器上的,就像n个人在统计单词一样。



Reducer: reduce程序实现如下:

public static class IntSumReducer        extends Reducer<Text, IntWritable, Text, IntWritable> {    private IntWritable result = new IntWritable();    public void reduce(Text key, Iterable<IntWritable> values, Context context    ) throws IOException, InterruptedException {        int sum = 0;        for (IntWritable val : values) {            sum += val.get();        }        result.set(sum);        context.write(key, result);    }
}

Reducer就是进行一个统计工作。输入来自Mapper的输出,所以Key是单词,即为Text类型。Value为单词数量,即IntWritable。输出的理解和Mapper的理解相同,这里也就不赘述。

有一点需要说明,输入的Value是迭代器,形式上看应该类似于如下形式:

k 1 2 3 4

表示有4个Mapper程序给该Reducer结点发送了k的数量,也就是说有4个工人统计到了k,值分别为1个,2个,3个,4个。Reducer端对这些量进行统计,得到结果。

至此,WordCount的程序代码基本上也就理解结束了,但是从执行流程来看,还有许多没被我们发现的部分:



首先是Combiner。可以把他看成一个小的Reducer,他仅在当前Mapper结点上进行统计,什么意思?

观察Mapper程序,你会发现,Mapper得到的结果应该是类似于:


k 1

k 1

b 1

b 1


可以看到,Map仅作拆分,并不会做统计。但是想想,如果我们直接将10000个甚至更多的 k 1 发送到Reducer上,是极其带宽的,如果我们只发一个 k 100000 传输速度会快很多。基于这样的想法,我们利用了一个Combiner程序。在WordCount中,Combiner和Reducer的程序应该是相同的,所以不用重新写。当然,有的程序中可能并不需要Combiner,因而Combiner的加入不能改变Map的输出格式。



Prititioner部分,他是默认执行的部分。有的人会疑惑,服务器是怎么保证将相同关键字的对发送到一个服务器上的。这个就是由Partitioner完成。他有默认的程序,就是按照key进行分类,但是也可以人工定制。它这部分完成的工作,称为Shuffle。



Sort部分也是默认执行的。Combiner(如果有该过程)或者Mapper(没有Combiner过程)结束后,系统就会自动启动Sort函数,按关键字排序,我们也可以通过在Partitioner部分改变key来改变排序的依据。



在主函数部分:

首先完成配置,给Job命名。

Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");

之后,先指定该工程的类,再依次指出各执行流的流(Sort不用,其他若有则要设置):

job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

最后设置输出格式,以及输入输出路径:

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);

需要提醒:

输出文件夹实现不能存在(这个没有原因,工程经验得出的结果)。

整个过程变量变化为:

Text →\to→ →\to→ key list(value) →\to→ results

给出完整的WordCount代码:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {    public static class TokenizerMapper            extends Mapper<Object, Text, Text, IntWritable> {        private final static IntWritable one = new IntWritable(1);        private Text word = new Text();        public void map(Object key, Text value, Context context        ) throws IOException, InterruptedException {            StringTokenizer itr = new StringTokenizer(value.toString());            while (itr.hasMoreTokens()) {                word.set(itr.nextToken());                context.write(word, one);            }        }    }    public static class IntSumReducer            extends Reducer<Text, IntWritable, Text, IntWritable> {        private IntWritable result = new IntWritable();        public void reduce(Text key, Iterable<IntWritable> values,                           Context context        ) throws IOException, InterruptedException {            int sum = 0;            for (IntWritable val : values) {                sum += val.get();            }            result.set(sum);            context.write(key, result);        }    }    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf, "word count");        job.setJarByClass(WordCount.class);        job.setMapperClass(TokenizerMapper.class);        job.setCombinerClass(IntSumReducer.class);        job.setReducerClass(IntSumReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        System.exit(job.waitForCompletion(true) ? 0 : 1);    }
}

对应我上述的输入,得出的输出为:

d   1
da 1
dj 1
dk 1
dnjg 1
ja 1
nag  1
nd 1
ndj  1
ng 1

理解WordCount相关推荐

  1. 如何理解wordcount

    执行wordCount代码之前需要先导入几个包,找到haoop安装目录下的share和mapreduce目录底下包,如图: wordCount中的计算框架mapreduce的具体流程: shuffli ...

  2. Redis之Vs Memcached

    面试官会问问你 redis 和 memcached 的区别,但是 memcached 是早些年各大互联网公司常用的缓存方案,但是现在近几年基本都是 redis,没什么公司用 memcached 了. ...

  3. spark系列3:spark入门编程与介绍

    3. Spark 入门 目标 通过理解 Spark 小案例, 来理解 Spark 应用 理解编写 Spark 程序的两种常见方式 spark-shell spark-submit Spark 官方提供 ...

  4. Spark-----Spark 与 Hadoop 对比,Spark 集群搭建与示例运行,RDD算子简单入门

    目录 一.Spark 概述 1.1. Spark是什么 1.2. Spark的特点(优点) 1.3. Spark组件 1.4. Spark和Hadoop的异同 二.Spark 集群搭建 2.1. Sp ...

  5. (1)Hadoop 的第一个程序 WordCount 理解

    Hadoop 的第一个程序 WordCount 理解 map and Reduce 相关概念 Map 将数据拆分成一个个键值对, reduce 负责将一个个键值对进行归集, 最后统计出结果 machi ...

  6. Hadoop-MapReduce+HDFS文件格式和压缩格式+split和Maptask关系+WordCount剖析+shuffle理解

    一. MapReduce on Yarn流程 1. 什么是MapReduce MapReduce是一个计算框架,核心思想是"分而治之",表现形式是有个输入(input),mapre ...

  7. WordCount的理解与MapReduce的执行过程

    WordCount的入门 WordCount是最常见.最基本的一个需求,例如进行词频统计.用户访问记录统计.如果数据量非常小的情况下,使用单机.批处理的方式就可以很快得到结果.但是如果数据量非常大,数 ...

  8. 初学Hadoop之图解MapReduce与WordCount示例分析

    Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS) ...

  9. MapReduce—第一个WordCount程序

    Mapper类 我们自定义MyMapper类并继承Mapper, import java.io.IOException;import org.apache.hadoop.io.IntWritable; ...

最新文章

  1. vim+cscope+ctags打造属于自己的IDE
  2. Jasperreport之与Springboot整合
  3. HashMap多线程并发问题分析
  4. 在php里让字体划过变色,鼠标划过字体时如何用css来实现字体变色?(代码实测)...
  5. 简记用ArcGIS处理某项目需求中数据的步骤
  6. Spring Cloud实战小贴士:Feign的继承特性(伪RPC模式)
  7. 内存检测_Android native内存检测工具介绍
  8. 函数指针数组指针+结构体数组
  9. (8)Verilog include 头文件使用路径(FPGA不积跬步101)
  10. 你绝对想不到,会Linux的程序员,到底有多吃香!
  11. erlang开发工具之intellij idea基本使用
  12. RHCE之DHCP配置详解
  13. ashx实现ajax功能遇到的浏览器缓存问题
  14. 运维人必知必会的Zabbix核心命令
  15. AOP(execution表达式)
  16. python关系图谱_利用Python+Gephi构建LOL全英雄间的关联图谱
  17. 静态网页设计——春节
  18. 异常检测方法——DBSCAN、孤立森林、OneClassSVM、LOF、同比环比、正态分布、箱线图
  19. 电脑系统重装怎么找回文件 重装系统文件怎么找回来
  20. Everything Is Generated In Equal Probability(HDU - 6595,概率期望)

热门文章

  1. Android Studio安装及环境配置教程
  2. 电池容量足够低如何触发自动关机(Riogrande platformQualcom platform)
  3. Vue路由传参的几种方式
  4. 泛微OA_E9之明细表选择框,选项带出主表审批人
  5. 自己在家怎么染发步骤,染发步骤与技巧
  6. 哪个品牌的蓝牙耳机质量比较好?内行推荐四款好用的蓝牙耳机
  7. Win11的两个实用技巧系列之nvidia控制面板没反应和闪退解决方法
  8. 【测试】微信群聊的测试用例
  9. bmap、百度地图设置手型和十字架型
  10. 初试只考计算机网络的学校,2020计算机考研全国985/211高校初试科目统计分析汇总...