工程结构:

在整个案例过程中,代码如下:

WordCountMapper的代码如下:

package cn.toto.bigdata.mr.wc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

/**

* 这里的Mapper是hadoop-mapreduce-client-core-2.8.0.jar中的内容

* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

* KEYIN     :是指框架读取到的数据的key的类型,在默认的InputFormat下,读到的key是一行文本的起始偏移量,所以key的类型是Long

* VALUEIN   :是指框架读取到的数据的value的类型,在默认的InputFormat下,读到的value是一行文本的内容,所以value的类型是String

* KEYOUT    :是指用户自定义逻辑方法返回的数据中key的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的key是单词,所以是String

* VALUEOUT  :是指用户自定义逻辑方法返回的数据中value的类型,由用户业务逻辑决定,在此wordcount程序中,我们输出的value是单词的数量,所以是Integer

*

* 但是,String,Long等jdk中自带的数据类型,在序列化是,效率比较低,hadoop为了提高序列化效率,自定义了一套序列化框架,

* 所以,在hadoop的程序中,如果该数据需要进行序列化(写磁盘,或者网络传输),就一定要用实现了hadoop序列化框架的数据类型

*

* Long       ----> LongWritable

* String     ----> Text

* Integer    ----> IntWritable

* Null       ----> NullWritable

*/

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

/**

* 这就是mapreduce框架中一个主体运行程序MapTask所要调用的用户业务逻辑方法

* MapTask会驱动InputFormat去读取数据(keyIN,VALUEIN),每读到一个KV对,就传入这个用户写的map方法中调用一次

* 在默认的inputformat实现中,此处的一个key就是一行的起始偏移量,value就是一行的内容

*/

@Override

protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)

throws IOException, InterruptedException {

String line = value.toString();

String[] words = line.split(" ");

for (String word : words) {

context.write(new Text(word), new IntWritable(1));

}

}

}

WordCountReducer的代码如下:

package cn.toto.bigdata.mr.wc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

/** reducetask在调我们写的reduce方法

reducetask应该收到了前一阶段(map阶段)中所有maptask输出的数据中的一部分

(数据的key.hashcode%reducetask数==本reductask号)

reducetask将这些收到kv数据拿来处理时,是这样调用我们的reduce方法的:

先将自己收到的所有的kv对按照k分组(根据k是否相同)

将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中所有的v用一个迭代器传给reduce方法的变量values

*/

@Override

protected void reduce(Text key, Iterable<IntWritable> values,

Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

int count = 0;

for(IntWritable v : values) {

count += v.get();

}

context.write(key, new IntWritable(count));

}

}

WordCountDriver的代码如下:

package cn.toto.bigdata.mr.wc;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**

* 本类是客户端用来指定wordcount job程序运行时所需要的很多参数:

* 比如,指定用哪个组件作为数据读取器、数据结果输出器

*     指定用哪个类作为map阶段的业务逻辑类,哪个类作为reduce阶段的业务逻辑类

*     指定wordcount job程序的jar包所在路径

*     ....

*     运行前准备工作

*     1、将当前的工程导出成wordcount.jar

*     2、准备/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容类似:

*                  The true

nobility is

in being

superior to

your previous

self guess

3、将 /home/toto/software/wordcount通过 hadoop fs -put wordcount /wordcount 上传到hdfs

*

*     以及其他各种需要的参数

*     hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver

*     上面的命令等同:

*     java -cp wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver

*

*     上面的含义是通过hadoop jar将hadoop classpath的jar都拷贝到应用中,并且指定执行cn.toto.bigdata.mr.wc.WordCountDriver

*

*     最后查看结果的方式是:hadoop fs -cat /wordcount/output/part-r-00000,通过这个命令可以查看查看到

*/

public class WordCountDriver {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

conf.set("fs.defaultFS", "hdfs://hadoop:9000");

/*conf.set("mapreduce.framework.name", "yarn");

conf.set("yarn.resourcemanager.hostname", "mini1");*/

Job job = Job.getInstance(conf);

//告诉框架,我们的程序所在jar包的路径

// job.setJar("c:/wordcount.jar");

job.setJarByClass(WordCountDriver.class);

//告诉框架,我们的程序所用的mapper类和reducer类

job.setMapperClass(WordCountMapper.class);

job.setReducerClass(WordCountReducer.class);

//告诉框架,我们的mapperreducer输出的数据类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

// 告诉框架,我们的数据读取、结果输出所用的format组件

// TextInputFormat是mapreduce框架中内置的一种读取文本文件的输入组件

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

// 告诉框架,我们要处理的文件在哪个路径下,注意若hdfs中已经有了/wordcount/input/这个文件,说明

FileInputFormat.setInputPaths(job, new Path("/wordcount/input/"));

// 告诉框架,我们的处理结果要输出到哪里去

FileOutputFormat.setOutputPath(job, new Path("/wordcount/output/"));深

boolean res = job.waitForCompletion(true);

System.exit(res?0:1);

}

}

运行前的准备工作:

运行前准备工作

1、将当前的工程导出成wordcount.jar

2、准备/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容类似:

The true

nobility is

in being

superior to

your previous

self guess

3、将 /home/toto/software/wordcount通过 hadoop fs -put wordcount /wordcount 上传到hdfs

最后,可以执行的命令是:

hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver

执行后的效果如下:

B:使用WordCount本地运行,并且使用Combiner的案例(主要改变是在WordCountDriver中),代码如下:

package cn.toto.bigdata.mr.wc;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/*** 本类是客户端用来指定wordcount job程序运行时所需要的很多参数:* 比如,指定用哪个组件作为数据读取器、数据结果输出器*     指定用哪个类作为map阶段的业务逻辑类,哪个类作为reduce阶段的业务逻辑类*     指定wordcount job程序的jar包所在路径*     ....*     运行前准备工作*     1、将当前的工程导出成wordcount.jar*     2、准备/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容类似:The truenobility is in being superior to your previous self guess3、将 /home/toto/software/wordcount通过 hadoop fs -put wordcount /wordcount 上传到hdfs中*     *     以及其他各种需要的参数*     hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver*     上面的命令等同:*     java -cp wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver*     *     上面的含义是通过hadoop jar将hadoop classpath的jar都拷贝到应用中,并且指定执行cn.toto.bigdata.mr.wc.WordCountDriver*     *     最后查看结果的方式是:hadoop fs -cat /wordcount/output/part-r-00000,通过这个命令可以查看查看到 */
public class WordCountDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();//conf.set("fs.defaultFS", "hdfs://hadoop:9000");/*conf.set("mapreduce.framework.name", "yarn");conf.set("yarn.resourcemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);//告诉框架,我们的程序所在jar包的路径// job.setJar("c:/wordcount.jar");job.setJarByClass(WordCountDriver.class);//告诉框架,我们的程序所用的mapper类和reducer类job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//使用tCombiner,使用Combiner的好处是让数据在mapper task中就做统计求和,然后将求和后的结果传递给//reducer,然后reducer可以在进行求和。这样的好处是减少了reducer的工作。让每个mapper task自己做聚合,//通过分担的方式让效率得以提升,由于combiner的内容结构,编程规范也是集成reducer,所以在当前场景中可以将combiner直接//设置成WordCountReducerjob.setCombinerClass(WordCountReducer.class);//告诉框架,我们的mapperreducer输出的数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 告诉框架,我们的数据读取、结果输出所用的format组件// TextInputFormat是mapreduce框架中内置的一种读取文本文件的输入组件job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);// 告诉框架,我们要处理的文件在哪个路径下,注意若hdfs中已经有了/wordcount/input/这个文件,说明FileInputFormat.setInputPaths(job, new Path("e:/wordcount/input/"));// 告诉框架,我们的处理结果要输出到哪里去FileOutputFormat.setOutputPath(job, new Path("e:/wordcount/output/"));boolean res = job.waitForCompletion(true);System.exit(res?0:1);}
}

准备工作:

在E盘下准备e:/wordcount/input/a.txt,其中的内容如下:

The true
nobility is
in being
superior to
your previous
self guess
No great
discovery
was ever
made without
a bold
Knowledge will
give you
power but
character respect
The sun
is just
rising in
the morning
of another
day I
I figure
life is
a gift
and I
don't intend
on wasting

右键运行上面的代码,进入:

E:\wordcount\output\part-r-00000中看结果,结果内容如下:

I    3
Knowledge   1
No  1
The 2
a   2
and 1
another 1
being   1
bold    1
but 1
character   1
day 1
discovery   1
don't  1
ever    1
figure  1
gift    1
give    1
great   1
guess   1
in  2
intend  1
is  3
just    1
life    1
made    1
morning 1
nobility    1
of  1
on  1
power   1
previous    1
respect 1
rising  1
self    1
sun 1
superior    1
the 1
to  1
true    1
was 1
wasting 1
will    1
without 1
you 1
your    1

经过上面的所有步骤之后,程序已经编写完成

总结:

3.MAPREDUCE中的Combiner[dht1]

(1)combiner是MR程序中Mapper和Reducer之外的一种组件

(2)combiner组件的父类就是Reducer

(3)combiner和reducer的区别在于运行的位置:

Combiner是在每一个maptask所在的节点运行

Reducer是接收全局所有Mapper的输出结果;

(4) combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量

具体实现步骤:

1、 自定义一个combiner继承Reducer,重写reduce方法

2、 在job中设置: job.setCombinerClass(CustomCombiner.class)

(5) combiner能够应用的前提是不能影响最终的业务逻辑

而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来


Combiner的使用要非常谨慎

因为combiner在mapreduce过程中可能调用也肯能不调用,可能调一次也可能调多次

所以:combiner使用的原则是:有或没有都不能影响业务逻辑

===============================================================================

流量统计和自定义类实现序列化案例:

package cn.toto.bigdata.mr.wc.flowsum;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/*** 自定义的类要被mapreduce使用,需要序列化WritableComparable*/
public class FlowBean implements WritableComparable<FlowBean> {private String phoneNbr;private long upFlow;private long dFlow;private long sumFlow;/*** */public FlowBean() {}/*** 序列化框架在反序列化操作创建对象实例时会调用无参构造*/@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(phoneNbr);out.writeLong(upFlow);out.writeLong(dFlow);out.writeLong(sumFlow);}/*** 反序列化方法,注意:字段的反序列化顺序与序列化时的顺序保持一致*/@Overridepublic void readFields(DataInput in) throws IOException {this.phoneNbr = in.readUTF();this.upFlow = in.readLong();this.dFlow = in.readLong();this.sumFlow = in.readLong();}@Overridepublic int compareTo(FlowBean o) {return (int)(o.getSumFlow() - this.sumFlow);}public void set(long upFlow,long dFlow) {this.upFlow = upFlow;this.dFlow = dFlow;this.sumFlow = upFlow + dFlow;}public void set(String phoneNbr,long upFlow, long dFlow) {this.phoneNbr = phoneNbr;this.upFlow = upFlow;this.dFlow = dFlow;this.sumFlow = upFlow + dFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getdFlow() {return dFlow;}public void setdFlow(long dFlow) {this.dFlow = dFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public String getPhoneNbr() {return phoneNbr;}public void setPhoneNbr(String phoneNbr) {this.phoneNbr = phoneNbr;}@Overridepublic String toString() {return "FlowBean [phoneNbr=" + phoneNbr + ", upFlow=" + upFlow + ", dFlow=" + dFlow + ", sumFlow=" + sumFlow+ "]";}
}
package cn.toto.bigdata.mr.wc.flowsum;import java.io.IOException;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowSum {//在kv中传输我们自定义的对象时可以的,但是必须实现hadoop的序列化机制 implements Writablepublic static class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {Text k = new Text();FlowBean v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//将读到的一行数据进行字段切分String line = value.toString();String[] fields = StringUtils.split(line,"\t");//抽取业务所需要的个字段String phoneNbr = fields[1];long upFlow = Long.parseLong(fields[fields.length - 3]);long dFlow = Long.parseLong(fields[fields.length - 2]);k.set(phoneNbr);v.set(upFlow, dFlow);context.write(k, v);}}public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {FlowBean v = new FlowBean();/*** reduce方法接收到的key是某一组<a手机号,bean><a手机号,bean><a手机号,bean>中的第一个手机号* reduce方法接收到的vlaues是这一组kv中的所有bean的一个迭代器*/@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {long upFlowCount = 0;long dFlowCount = 0;for (FlowBean bean : values) {upFlowCount += bean.getUpFlow();dFlowCount += bean.getdFlow();}v.set(upFlowCount, dFlowCount);context.write(key, v);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/** conf.set("mapreduce.framework.name", "yarn");* conf.set("yarn.resourcemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);// 告诉框架,我们的程序所在jar包的路径// job.setJar("c:/wordcount.jar");job.setJarByClass(FlowSum.class);// 告诉框架,我们的程序所用的mapper类和reducer类job.setMapperClass(FlowSumMapper.class);job.setReducerClass(FlowSumReducer.class);// 告诉框架,我们的mapperreducer输出的数据类型/** job.setMapOutputKeyClass(Text.class);* job.setMapOutputValueClass(FlowBean.class);*/// 如果map阶段输出的数据类型跟最终输出的数据类型一致,就只要以下两行代码来指定job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 框架中默认的输入输出组件就是这俩货,所以可以省略这两行代码/** job.setInputFormatClass(TextInputFormat.class);* job.setOutputFormatClass(TextOutputFormat.class);*/// 告诉框架,我们要处理的文件在哪个路径下FileInputFormat.setInputPaths(job, new Path("E:/learnTempFolder/flow/input/"));// 告诉框架,我们的处理结果要输出到哪里去FileOutputFormat.setOutputPath(job, new Path("E:/learnTempFolder/flow/output/"));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
}
package cn.toto.bigdata.mr.wc.flowsum;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 实现流量汇总并且按照流量大小倒序排序 前提:处理的数据是已经汇总过的结果文件* * @author* */
public class FlowSumSort {public static class FlowSumSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {FlowBean k = new FlowBean();Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String[] fields = line.split("\t");String phoneNbr = fields[0];long upFlowSum = Long.parseLong(fields[1]);long dFlowSum = Long.parseLong(fields[2]);k.set(upFlowSum, dFlowSum);v.set(phoneNbr);context.write(k, v);}}public static class FlowSumSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {@Overrideprotected void reduce(FlowBean bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException {context.write(phoneNbrs.iterator().next(), bean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FlowSumSort.class);// 告诉框架,我们的程序所用的mapper类和reducer类job.setMapperClass(FlowSumSortMapper.class);job.setReducerClass(FlowSumSortReducer.class);job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);// 告诉框架,我们的mapperreducer输出的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 告诉框架,我们要处理的文件在哪个路径下(注意:这里的程序执行)FileInputFormat.setInputPaths(job, new Path("E:/learnTempFolder/flow/output/"));// 告诉框架,我们的处理结果要输出到哪里去FileOutputFormat.setOutputPath(job, new Path("E:/learnTempFolder/flow/sortout/"));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
}

运行条件模拟:

1、配置环境变量为HADOOP_HOME=E:\learnTempFolder\hadoop-2.7.3

2、从CSDN资源上下载支持win10版本的:E:\learnTempFolder\hadoop-2.7.3\bin\winutils.exe 和 E:\learnTempFolder\hadoop-2.7.3\bin\hadoop.dll

界面效果如下:

3、准备要处理的资料:

HTTP_20130313143750.dat 数据文件的具体内容如:

1363157985066    13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4            4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99          2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4            4   0   240 0   200
1363157993044   18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99  iface.qiyi.com  视频网站    15  12  1527    2106    200
1363157995074   84138413    5C-0E-8B-8C-E8-20:7DaysInn  120.197.40.4    122.72.52.12        20  16  4116    1432    200
1363157993055   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
1363157995033   15920133257 5C-0E-8B-C7-BA-20:CMCC  120.197.40.4    sug.so.360.cn   信息安全    20  20  3156    2936    200
1363157983019   13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82          4   0   240 0   200
1363157984041   13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4    s19.cnzz.com    站点统计    24  9   6960    690 200
1363157973098   15013685858 5C-0E-8B-C7-F7-90:CMCC  120.197.40.4    rank.ie.sogou.com   搜索引擎    28  27  3659    3538    200
1363157986029   15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99  www.umeng.com   站点统计    3   3   1938    180 200
1363157992093   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          15  9   918 4938    200
1363157986041   13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4            3   3   180 180 200
1363157984040   13602846565 5C-0E-8B-8B-B6-00:CMCC  120.197.40.4    2052.flash2-http.qq.com 综合门户    15  12  1938    2910    200
1363157995093   13922314466 00-FD-07-A2-EC-BA:CMCC  120.196.100.82  img.qfc.cn      12  12  3008    3720    200
1363157982040   13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99  y0.ifengimg.com 综合门户    57  102 7335    110349  200
1363157986072   18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99  input.shouji.sogou.com  搜索引擎    21  18  9531    2412    200
1363157990043   13925057413 00-1F-64-E1-E6-9A:CMCC  120.196.100.55  t3.baidu.com    搜索引擎    69  63  11058   48243   200
1363157988072   13760778710 00-FD-07-A4-7B-08:CMCC  120.196.100.82          2   2   120 120 200
1363157985066   13726238888 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157993055   13560436666 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200

4、先运行FlowSum(右键执行Java程序)

运行生成的文件为E:\learnTempFolder\flow\output\part-r-00000,内容如下:

13480253104  180 180 360
13502468823 7335    110349  117684
13560436666 1116    954 2070
13560439658 2034    5892    7926
13602846565 1938    2910    4848
13660577991 6960    690 7650
13719199419 240 0   240
13726230503 2481    24681   27162
13726238888 2481    24681   27162
13760778710 120 120 240
13826544101 264 0   264
13922314466 3008    3720    6728
13925057413 11058   48243   59301
13926251106 240 0   240
13926435656 132 1512    1644
15013685858 3659    3538    7197
15920133257 3156    2936    6092
15989002119 1938    180 2118
18211575961 1527    2106    3633
18320173382 9531    2412    11943
84138413    4116    1432    5548

5、运行FlowSumSort(注意不要删除上面的part-r-00000)

运行后产生的文件内容是:

13502468823  7335    110349  117684
13925057413 11058   48243   59301
13726230503 2481    24681   27162
18320173382 9531    2412    11943
13560439658 2034    5892    7926
13660577991 6960    690 7650
15013685858 3659    3538    7197
13922314466 3008    3720    6728
15920133257 3156    2936    6092
84138413    4116    1432    5548
13602846565 1938    2910    4848
18211575961 1527    2106    3633
15989002119 1938    180 2118
13560436666 1116    954 2070
13926435656 132 1512    1644
13480253104 180 180 360
13826544101 264 0   264
13719199419 240 0   240

当然,我们也可以一次性求和并运算出结果输出到指定的文件目录中,代码如下:

package cn.toto.bigdata.mr.wc.flowsum;import java.io.IOException;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class OneStepSumSort {public static class OneStepSumSortMapper extends Mapper<LongWritable, Text, Text, FlowBean> {Text k = new Text();FlowBean v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//将读到的一行数据进行字段切分String line = value.toString();String[] fields = StringUtils.split(line,"\t");//抽取业务所需要的各字段String phoneNbr = fields[1];long upFlow = Long.parseLong(fields[fields.length - 3]);long dFlow = Long.parseLong(fields[fields.length - 2]);k.set(phoneNbr);v.set(phoneNbr, upFlow, dFlow);context.write(k, v);}}public static class OneStepSumSortReducer extends Reducer<Text, FlowBean, Text, FlowBean> {TreeMap<FlowBean, Text> treeMap = new TreeMap<FlowBean,Text>();@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {int upCount = 0;int dCount = 0;for (FlowBean bean : values) {upCount += bean.getUpFlow();dCount += bean.getdFlow();}FlowBean sumBean = new FlowBean();sumBean.set(key.toString(), upCount, dCount);Text text = new Text(key.toString());treeMap.put(sumBean, text);}@Overrideprotected void cleanup(Reducer<Text, FlowBean, Text, FlowBean>.Context context)throws IOException, InterruptedException {Set<Entry<FlowBean, Text>> entrySet = treeMap.entrySet();for (Entry<FlowBean, Text> ent : entrySet) {context.write(ent.getValue(), ent.getKey());}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(OneStepSumSort.class);// 告诉框架,我们的程序所用的mapper类和reducer类job.setMapperClass(OneStepSumSortMapper.class);job.setReducerClass(OneStepSumSortReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 告诉框架,我们的mapperreducer输出的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 告诉框架,我们要处理的文件在哪个路径下FileInputFormat.setInputPaths(job, new Path("E:/flow/input/"));// 告诉框架,我们的处理结果要输出到哪里去FileOutputFormat.setOutputPath(job, new Path("E:/flow/sortout/"));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
}

到"E:/flow/sortout/"目录下,查看结果:

即:

13502468823  FlowBean [phoneNbr=13502468823, upFlow=7335, dFlow=110349, sumFlow=117684]
13925057413 FlowBean [phoneNbr=13925057413, upFlow=11058, dFlow=48243, sumFlow=59301]
13726238888 FlowBean [phoneNbr=13726230503, upFlow=2481, dFlow=24681, sumFlow=27162]
18320173382 FlowBean [phoneNbr=18320173382, upFlow=9531, dFlow=2412, sumFlow=11943]
13560439658 FlowBean [phoneNbr=13560439658, upFlow=2034, dFlow=5892, sumFlow=7926]
13660577991 FlowBean [phoneNbr=13660577991, upFlow=6960, dFlow=690, sumFlow=7650]
15013685858 FlowBean [phoneNbr=15013685858, upFlow=3659, dFlow=3538, sumFlow=7197]
13922314466 FlowBean [phoneNbr=13922314466, upFlow=3008, dFlow=3720, sumFlow=6728]
15920133257 FlowBean [phoneNbr=15920133257, upFlow=3156, dFlow=2936, sumFlow=6092]
84138413    FlowBean [phoneNbr=84138413, upFlow=4116, dFlow=1432, sumFlow=5548]
13602846565 FlowBean [phoneNbr=13602846565, upFlow=1938, dFlow=2910, sumFlow=4848]
18211575961 FlowBean [phoneNbr=18211575961, upFlow=1527, dFlow=2106, sumFlow=3633]
15989002119 FlowBean [phoneNbr=15989002119, upFlow=1938, dFlow=180, sumFlow=2118]
13560436666 FlowBean [phoneNbr=13560436666, upFlow=1116, dFlow=954, sumFlow=2070]
13926435656 FlowBean [phoneNbr=13926435656, upFlow=132, dFlow=1512, sumFlow=1644]
13480253104 FlowBean [phoneNbr=13480253104, upFlow=180, dFlow=180, sumFlow=360]
13826544101 FlowBean [phoneNbr=13826544101, upFlow=264, dFlow=0, sumFlow=264]
13926251106 FlowBean [phoneNbr=13719199419, upFlow=240, dFlow=0, sumFlow=240]

6、为不同的手机号设置分区,让不同的手机号在不同的文件中。方法如下:

A:下面是自定义分区,自定分区的代码如下:

package cn.toto.bigdata.mr.wc.flowsum;import java.util.HashMap;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;/*** 自定义分区要继承Partitioner*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean>{private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();static {provinceMap.put("138", 0);provinceMap.put("139", 1);provinceMap.put("136", 2);provinceMap.put("137", 3);provinceMap.put("135", 4);}@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {Integer code = provinceMap.get(key.toString().substring(0,3));if (code != null) {return code;}return 5;}
}

B:测试一下自定义分区:

package cn.toto.bigdata.mr.wc.flowsum;import java.io.IOException;import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowSumProvince {public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean> {Text k = new Text();FlowBean v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 将读到的一行数据进行字段切分String line = value.toString();String[] fields = StringUtils.split(line, "\t");// 抽取业务所需要的各字段String phoneNbr = fields[1];long upFlow = Long.parseLong(fields[fields.length - 3]);long dFlow = Long.parseLong(fields[fields.length - 2]);k.set(phoneNbr);v.set(phoneNbr, upFlow, dFlow);context.write(k, v);}}public static class FlowSumProvinceReducer extends Reducer<Text, FlowBean, Text, FlowBean> {@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {int upCount = 0;int dCount = 0;for (FlowBean bean : values) {upCount += bean.getUpFlow();dCount += bean.getdFlow();}FlowBean sumBean = new FlowBean();sumBean.set(key.toString(), upCount, dCount);context.write(key, sumBean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FlowSumProvince.class);// 告诉框架,我们的程序所用的mapper类和reducer类job.setMapperClass(FlowSumProvinceMapper.class);job.setReducerClass(FlowSumProvinceReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 告诉框架,我们的mapperreducer输出的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//设置shuffle的分区组件使用我们自定义的分区组件,按照手机号进行分区,注意在自定义的手机号分区中有5个,所以我们的分区不能少于6个job.setPartitionerClass(ProvincePartitioner.class);//设置reduce task的数量job.setNumReduceTasks(6);//告诉框架,我们要处理的文件在哪个路径下FileInputFormat.setInputPaths(job, new Path("E:/flow/input/"));//告诉框架,我们的处理结果要输出到哪里去Path out = new Path("E:/flow/provinceout/");FileSystem fs = FileSystem.get(conf);if (fs.exists(out)) {fs.delete(out,true);}FileOutputFormat.setOutputPath(job, out);boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
}

C:运行所需的准备:

数据文件:

文件内容如下:

1363157985066    13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4            4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99          2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4            4   0   240 0   200
1363157993044   18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99  iface.qiyi.com  视频网站    15  12  1527    2106    200
1363157995074   84138413    5C-0E-8B-8C-E8-20:7DaysInn  120.197.40.4    122.72.52.12        20  16  4116    1432    200
1363157993055   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
1363157995033   15920133257 5C-0E-8B-C7-BA-20:CMCC  120.197.40.4    sug.so.360.cn   信息安全    20  20  3156    2936    200
1363157983019   13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82          4   0   240 0   200
1363157984041   13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4    s19.cnzz.com    站点统计    24  9   6960    690 200
1363157973098   15013685858 5C-0E-8B-C7-F7-90:CMCC  120.197.40.4    rank.ie.sogou.com   搜索引擎    28  27  3659    3538    200
1363157986029   15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99  www.umeng.com   站点统计    3   3   1938    180 200
1363157992093   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          15  9   918 4938    200
1363157986041   13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4            3   3   180 180 200
1363157984040   13602846565 5C-0E-8B-8B-B6-00:CMCC  120.197.40.4    2052.flash2-http.qq.com 综合门户    15  12  1938    2910    200
1363157995093   13922314466 00-FD-07-A2-EC-BA:CMCC  120.196.100.82  img.qfc.cn      12  12  3008    3720    200
1363157982040   13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99  y0.ifengimg.com 综合门户    57  102 7335    110349  200
1363157986072   18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99  input.shouji.sogou.com  搜索引擎    21  18  9531    2412    200
1363157990043   13925057413 00-1F-64-E1-E6-9A:CMCC  120.196.100.55  t3.baidu.com    搜索引擎    69  63  11058   48243   200
1363157988072   13760778710 00-FD-07-A4-7B-08:CMCC  120.196.100.82          2   2   120 120 200
1363157985066   13726238888 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157993055   13560436666 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200

运行后的结果如下:

part-r-00001中内容:

等等

使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner相关推荐

  1. python统计单词出现次数最多的5个单词_【Python】统计文本中单词的出现次数前十的单词...

    代码: # 读取一个文本,并且统计文本中单词的出现次数 def read_file(): # 在windows环境中的编码问题,指定utf-8 with open('F:/python源码/实验区/0 ...

  2. linux awk统计文本单词,shell统计文本中单词的出现次数

    Ubuntu14.04 给定一个文本,统计其中单词出现的次数 # solution 1 grep与awk配合使用,写成一个sh脚本 fre.sh sh fre.sh wordfretest.txt # ...

  3. python单词个数统计_Python 统计文本中单词的个数

    1.读文件,通过正则匹配 def statisticWord(): line_number = 0 words_dict = {} with open (r'D:\test\test.txt',enc ...

  4. c语言统计输入文本不同字母单词数,统计文本中单词的个数

    ㈠ 统计一行文本的单词个数:输入一行字符,统计其中单词的个数.个单词之间用空格分隔,空格数可以是多个, 代码部分: #include int main() { int count=0; char te ...

  5. python中英文字频率_python实现统计文本中单词出现的频率详解

    本文实例为大家分享了python统计文本中单词出现频率的具体代码,供大家参考,具体内容如下 #coding=utf-8 import os from collections import Counte ...

  6. java统计文章中单词出现的次数

    统计一篇文章中单词出现的次数,要存储单词和次数,根据次数排序输出,要用Map数据结构保存键值对.首先想到是用TreeMap<String, Integer>,它为有序映射表,但默认按照键K ...

  7. 模拟MapReduce编程的程序案例(用于统计文本中单词出现频率)

    本案例要实现的目标: 1.模拟修改配置,通过发指令的方式统计一个文件中出现的单词的字数. 案例代码结构如下: 在整个案例中需要有以下几类文件: A:worker服务端,用于类似Mapreduce接收j ...

  8. python统计文本中单词出现次数

    任一个英文的纯文本文件,统计其中的单词出现的个数,其实就是考察re的运用,代码: #-*-coding:utf-8-*- import redef count_words(file_path):wit ...

  9. Python 统计文本中单词的个数

    1.读文件,通过正则匹配 1 def statisticWord(): 2 line_number = 0 3 words_dict = {} 4 with open (r'D:\test\test. ...

最新文章

  1. iOS-语法syntax
  2. java如何获得当前文件路径
  3. 小甲鱼python全部视频_小甲鱼全套教程之Python系列视频教程
  4. 1.4信息系统基础-软件构件技术知识
  5. php -- PDO异常处理
  6. Glib2之dbus用法(五)
  7. 大数据Hadoop学习记录(1)----HDFS目录和文件Shell操作
  8. 允许其他用户通过本计算机连接+连接手机,如何用手机搜索到的WF网络通过数据线连接台式电脑,让台式电脑共享网络...
  9. RPM软件包管理(安装、卸载、查询、制作)
  10. 搜索引擎的原理以及倒排索引技术
  11. 常用的测试用例设计方法有那些?看这一篇就够了
  12. xray漏扫工具学习。
  13. java 什么是过滤器_java中的过滤器是什么
  14. 一键获取graphpad同款主题
  15. 移动端 背景音乐 自动播放
  16. 究极小白的第一篇csdn博客
  17. mysql 汉字转成拼音
  18. 定风波·莫听穿林打叶声
  19. 016 | 乡村振兴战略下农村宅基地有偿退出现状 | 大学生创新训练项目申请书 | 极致技术工厂
  20. ECDSA 和 ECDH

热门文章

  1. Flask框架(flask-script扩展命令行和flask中数据库migrate扩展的使用)
  2. VTK:图像理想高通用法实战
  3. VTK:vtkCellCenters用法实战
  4. wxWidgets:通过组合现有小部件制作新的可重用小部件
  5. boost::spirit模块实现将由某个分隔符分隔的任意键/值对解析为 std::vector的测试程序
  6. boost::log::sinks::simple_event_log_backend用法的测试程序
  7. boost::hana::zero用法的测试程序
  8. boost::fusion::traits::is_view用法的测试程序
  9. Boost:自定义vector的测试程序
  10. boost的chrono模块操作时钟对象的测试程序