hadoop编程:分析CSDN注册邮箱分布情况

本文博客链接:http://blog.csdn.net/jdh99,作者:jdh,转载请注明.

环境:

主机:Ubuntu10.04

hadoop版本:1.2.1

开发工具:eclipse4.4.0

说明:

要求:原始数据共6428632条,分析不同邮箱的注册情况,并按使用人数从大到小排序。

分析:hadoop自带一个排序,是按key值来进行排序的。要按值(value)进行排序,需要二次排序。

步骤:

1.job1:统计不同注册邮箱的使用人数,用默认的key值排序,保存在HDFS系统中

2.job2:对job1的输出进行二次排序,按值从大到小排序

结果输出:

使用人数在1W以上的邮箱共有24个:

qq.com    1976196
163.com    1766927
126.com    807895
sina.com    351596
yahoo.com.cn    205491
hotmail.com    202948
gmail.com    186843
sohu.com    104736
yahoo.cn    87048
tom.com    72365
yeah.net    53295
21cn.com    50710
vip.qq.com    35119
139.com    29207
263.net    24779
sina.com.cn    19156
live.cn    18920
sina.cn    18601
yahoo.com    18454
foxmail.com    16432
163.net    15176
msn.com    14211
eyou.com    13372
yahoo.com.tw    10810


源代码:

JOB1:统计不同注册邮箱的人数

CsdnData.java

package com.bazhangkeji.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class CsdnData
{public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: csdndata <in> <out>");System.exit(2);}Job job = new Job(conf, "csdndata");job.setJarByClass(CsdnData.class);job.setMapperClass(MapData.class);job.setReducerClass(ReducerData.class); job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

MapData.java

package com.bazhangkeji.hadoop;
import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;public class MapData extends Mapper<Object, Text, Text, IntWritable>
{IntWritable one = new IntWritable(1);Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringBuffer str_in = new StringBuffer();StringBuffer str_out = new StringBuffer();int index = 0;//初始化字符串str_in.setLength(0);str_out.setLength(0);str_in.append(value.toString());//获得邮箱的起始位置index = str_in.toString().lastIndexOf('@');if (index != -1){word.set(str_in.toString().substring(index + 1).trim().toLowerCase());context.write(word, one);}}
}

ReducerData.java

package com.bazhangkeji.hadoop;
import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;public class ReducerData extends Reducer<Text,IntWritable,Text,IntWritable>
{IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}
}

JOB2:对job1的输出进行二次排序,按值从大到小排序

SortSecond.java

package com.bazhangkeji.hadoop2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class SortSecond
{public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: csdndata <in> <out>");System.exit(2);}Job job = new Job(conf, "sortsecond");job.setJarByClass(SortSecond.class);job.setMapperClass(MapSecond.class);job.setReducerClass(ReduceSecond.class); job.setSortComparatorClass(SortMy.class); //设置自定义二次排序策略job.setOutputKeyClass(KeyMy.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

MapSecond.java

package com.bazhangkeji.hadoop2;
import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;public class MapSecond extends Mapper<LongWritable, Text, KeyMy, IntWritable>
{IntWritable one = new IntWritable(1);Text word = new Text();KeyMy keymy = new KeyMy();public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String str_in = value.toString();int index = 0;index = str_in.indexOf('\t');if (value.toString().length() > 3 && index != -1){String str1 = str_in.substring(0, index);String str2 = str_in.substring(index + 1);if (str1.length() != 0 && str2.length() != 0){one.set(Integer.parseInt(str2));word.set(str1);keymy.setFirstKey(word);keymy.setSecondKey(one);context.write(keymy, one);}}}
}

ReduceSecond.java

package com.bazhangkeji.hadoop2;
import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;public class ReduceSecond extends Reducer<KeyMy,IntWritable,Text,IntWritable>
{IntWritable result = new IntWritable();public void reduce(KeyMy key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {context.write(key.getFirstKey(), key.getSecondKey());}
}

KeyMy.java

package com.bazhangkeji.hadoop2;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*** 自定义组合键*/
public class KeyMy implements WritableComparable<KeyMy>{private static final Logger logger = LoggerFactory.getLogger(KeyMy.class);private Text firstKey;private IntWritable secondKey;public KeyMy() {this.firstKey = new Text();this.secondKey = new IntWritable();}public Text getFirstKey() {return this.firstKey;}public void setFirstKey(Text firstKey) {this.firstKey = firstKey;}public IntWritable getSecondKey() {return this.secondKey;}public void setSecondKey(IntWritable secondKey) {this.secondKey = secondKey;}@Overridepublic void readFields(DataInput dateInput) throws IOException {// TODO Auto-generated method stubthis.firstKey.readFields(dateInput);this.secondKey.readFields(dateInput);}@Overridepublic void write(DataOutput outPut) throws IOException {this.firstKey.write(outPut);this.secondKey.write(outPut);}/*** 自定义比较策略* 注意:该比较策略用于 mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段,* 发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整)*/@Overridepublic int compareTo(KeyMy KeyMy) {logger.info("-------KeyMy flag-------");return this.firstKey.compareTo(KeyMy.getFirstKey());}
}

SortMy.java

package com.bazhangkeji.hadoop2;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*** 自定义二次排序策略*/
public class SortMy extends WritableComparator {private static final Logger logger = LoggerFactory.getLogger(SortMy.class);public SortMy() {super(KeyMy.class,true);}@Overridepublic int compare(WritableComparable KeyMyOne,WritableComparable KeyMyOther) {logger.info("---------enter SortMy flag---------");KeyMy c1 = (KeyMy) KeyMyOne;KeyMy c2 = (KeyMy) KeyMyOther;return c2.getSecondKey().get()-c1.getSecondKey().get();//0,负数,正数}
}


参考资料:

1.《hadoop权威指南》

2.  http://zengzhaozheng.blog.51cto.com/8219051/1379271

hadoop编程:分析CSDN注册邮箱分布情况相关推荐

  1. 一种分析代金券使用分布情况的方法python实现版(下)

    上一篇文章中已经找到了用户ID和领取代金券的数量,并按降序排序,下面要做的就简单了,有了前面的经验,我们照抄三份,一份去分析交易的情况,一份去查询用户黑名单库,最后一份去查看用户注册的时间和注册时使用 ...

  2. 电子邮件地址怎么注册填写?手机怎么注册邮箱地址?

    作为主流的交流沟通工具之一,电子邮箱可以说是人手一个甚至多个的了,一些重要的信息.文件等都可以使用电子邮箱来传输,不仅安全性更强,还能长期保存重要的资料.每个人的电子邮箱地址都是不一样的,就像手机号码 ...

  3. matlab的sinxx,用MATLAB程序编程:分析方程f(x)=sinx-x/2=0正根的分布情况,并用二分法求正根近似值,使误差不超过0.01....

    在来matlab里面输入edit fun.m:在弹出的窗口自输入bai以下内du容:function f=fun(x,y):f=x.^2+sin(x.*y)+2*y;:保存一下zhi.最后在matla ...

  4. 数据中心分布情况和业务占比分析

    我国数据中心的市场规模高速增长.受"互联网+".大数据战略.数字经济等国家政策的指引,以及云计算.移动互联网.物联网.大数据.人工智能等快速发展的驱动,我国数据中心的业务收入呈现连 ...

  5. 最新 android系统 设备 分布情况,CNCERT 2018年第一季度国内操作系统及浏览器占比情况分析...

    原标题:CNCERT 2018年第一季度国内操作系统及浏览器占比情况分析 国家互联网应急中心(以下简称CNCERT)对2018年第一季度国内网络访问情况进行了抽样分析,重点针对操作系统及浏览器占比情况 ...

  6. 最新 android系统 设备 分布情况,2019年第二季度国内操作系统及浏览器占比情况分析...

    国家互联网应急中心(以下简称CNCERT)对2019年第二季度国内网络访问情况进行了抽样分析,重点针对操作系统及浏览器占比情况进行统计,发现以下特点: 1.通过移动终端上网的用户数量多于通过PC终端上 ...

  7. Hadoop源代码分析

    http://wenku.baidu.com/link?url=R-QoZXhc918qoO0BX6eXI9_uPU75whF62vFFUBIR-7c5XAYUVxDRX5Rs6QZR9hrBnUdM ...

  8. Python 爬取 6000 篇文章分析 CSDN 是如何进入微信 500 强的

    CSDN 小姐姐们恭祝所有朋友新年快乐! 作者 | 罗昭成,设计 | 张藐,责编 | 唐小引 出品 | CSDN(ID:CSDNnews) 亲爱的小伙伴们,马上就到 2019 年了,你的 2018 年 ...

  9. 用 Python 分析 CSDN 小姐姐一年都做了啥

    点击上方"CSDN",选择"置顶公众号" 关键时刻,第一时间送达! [CSDN 编者按]今天是 2017 年最后一天,在此向所有程序员朋友们道一声「新年快乐」, ...

最新文章

  1. Linux磁盘阵列技术详解(二)--raid 1创建
  2. rdf mysql持久化l_Redis进阶(数据持久化RDF和AOF)
  3. Spring LDAP
  4. 修手机时创意被剽窃,男子向苹果索赔7万亿!是认真的吗?
  5. Hyper-V Server 存储空间
  6. ToStringBuilder.reflectionToString
  7. Python数学建模 空间插值
  8. 从根源上解决libc.so.6版本问题 /lib64/libc.so.6:version 'GLIBC_XXX' not found
  9. 学习经历与求职经历分享
  10. TZOJ 数据结构实验:一元多项式相加
  11. spring应用手册-IOC(XML配置实现)-(8)-bean中的scop属性
  12. ANSYS 有限元分析 后处理 General Postproc
  13. Spring boot集成Redis实现sessions共享时,sessions过期时间问题分析
  14. java project、maven project项目打成可运行的jar包
  15. 八、基于多源数据建成区提取——Landsat数据大气校正
  16. 软件构造习题课一的要点记录
  17. S4A+Arduino互动媒体基础教程 第一节 Arduino连接S4A
  18. java中的repo什么意思,repo是什么意思什么梗 repo的含义及出处
  19. EmWin学习课堂_小白EmWin_EmWin快速入门_EmWin动态内存,显示和触摸屏_EmWin基础配置
  20. selenium爬取淘宝店铺数据

热门文章

  1. 线上服务器突然崩了!?Jenkins 服务器中挖坑病毒解决方案
  2. 华为服务器做系统蓝屏,服务器安装2008r2后蓝屏
  3. 怎样使用MindMapper中的聚焦功能
  4. 为什么空集是集合的子集_空集为什么是任何集合的子集和非任何空集的真子集呢...
  5. Python 下载视频出错 you-get: [error] oops, something went wrong.
  6. win10装win7遇到的问题。
  7. android 桌面快捷方式,Android应用开发之(如何自动在桌面创建快捷方式)
  8. Module3:Alice in Wonderland
  9. A Game of Thrones(30)
  10. 爬取天猫店铺列表页的所有数据