测试数据的下载地址为:http://pan.baidu.com/s/1gdgSn6r


一、文件分析

  首先可以用文本编辑器打开一个HTTP_20130313143750.dat的二进制文件,这个文件的内容是我们的手机日志,文件的内容已经经过了优化,格式比较规整,便于学习研究,感兴趣的读者可以尝试一下。

  我从中截取文件中的一行记录内容进行分析:

1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com

2     4    27    2 481    24681    200

  该日志文件的每个记录,一共有11个字段每个字段的含义如下图1.1所示。

图 1.1

二、思路分析

  我们要统计这个文件中,同一手机号的流量汇总。而我们可以从图1.1中发现,记录中有四个字段以不同的形式表示手机的流量,这时你会想到什么呢?-----那就是面向对象的概念,我们可以自定义一个类来代表一个自定义类型去包含这几个值,用类中的属性,来表示这几个字段,来方面我们对数据的操作。

  现在我们按照MapReduce的分布式计算模型,分析一下如何实现我们的任务。首先我们有未经过处理的原始文件(相当于<k1,v1>),这个文件里存储着我需要的数据就是,那就是一个手机的流量的汇总数据(相当于<k3,v3>),而要从原始数据获得我们最终想要的数据,这中间需要经过一个过程,对原始数据进行初步加工处理,形成中间结果(相当于<k2,V2>),而<K2,V2>这时候代表什么呢?不难看出,将所有的原始数据经过map()函数的分组排序处理后,得到一个中间结果,这个中间结果是一个键值对<K2,V2>,而这里的K2应该就是电话号码,V2就是我们的自定义类型表示手机流量,最后将中间数据经过reduce()函数的归一化处理,得到我们的最终结果。

三、编程实现

1. 代码如下

  1 package mapreduce;
  2
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.io.Writable;
 12 import org.apache.hadoop.mapreduce.Job;
 13 import org.apache.hadoop.mapreduce.Mapper;
 14 import org.apache.hadoop.mapreduce.Reducer;
 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 20
 21 public class KpiApp {
 22     static final String INPUT_PATH = "hdfs://hadoop:9000/wlan";
 23     static final String OUT_PATH = "hdfs://hadoop:9000/out";
 24     public static void main(String[] args) throws Exception{
 25         final Job job = new Job(new Configuration(), KpiApp.class.getSimpleName());
 26
 27         FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1 指定输入文件路径
 28
 29         job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件
 30
 31         job.setMapperClass(MyMapper.class);//1.2指定自定义的Mapper类
 32
 33         job.setMapOutputKeyClass(Text.class);//指定输出<k2,v2>的类型
 34         job.setMapOutputValueClass(KpiWritable.class);
 35
 36         job.setPartitionerClass(HashPartitioner.class);//1.3 指定分区类
 37         job.setNumReduceTasks(1);
 38
 39         //1.4 TODO 排序、分区
 40
 41         //1.5  TODO (可选)合并
 42
 43         job.setReducerClass(MyReducer.class);//2.2 指定自定义的reduce类
 44
 45         job.setOutputKeyClass(Text.class);//指定输出<k3,v3>的类型
 46         job.setOutputValueClass(KpiWritable.class);
 47
 48         FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));//2.3 指定输出到哪里
 49
 50         job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类
 51
 52         job.waitForCompletion(true);//把代码提交给JobTracker执行
 53     }
 54
 55     static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{
 56         protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,KpiWritable>.Context context) throws IOException ,InterruptedException {
 57             final String[] splited = value.toString().split("\t");
 58             final String msisdn = splited[1];
 59             final Text k2 = new Text(msisdn);
 60             final KpiWritable v2 = new KpiWritable(splited[6],splited[7],splited[8],splited[9]);
 61             context.write(k2, v2);
 62         };
 63     }
 64
 65     static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{
 66         /**
 67          * @param    k2    表示整个文件中不同的手机号码
 68          * @param    v2s    表示该手机号在不同时段的流量的集合
 69          */
 70         protected void reduce(Text k2, java.lang.Iterable<KpiWritable> v2s, org.apache.hadoop.mapreduce.Reducer<Text,KpiWritable,Text,KpiWritable>.Context context) throws IOException ,InterruptedException {
 71             long upPackNum = 0L;
 72             long downPackNum = 0L;
 73             long upPayLoad = 0L;
 74             long downPayLoad = 0L;
 75
 76             for (KpiWritable kpiWritable : v2s) {
 77                 upPackNum += kpiWritable.upPackNum;
 78                 downPackNum += kpiWritable.downPackNum;
 79                 upPayLoad += kpiWritable.upPayLoad;
 80                 downPayLoad += kpiWritable.downPayLoad;
 81             }
 82
 83             final KpiWritable v3 = new KpiWritable(upPackNum+"", downPackNum+"", upPayLoad+"", downPayLoad+"");
 84             context.write(k2, v3);
 85         };
 86     }
 87 }
 88
 89 class KpiWritable implements Writable{
 90     long upPackNum;
 91     long downPackNum;
 92     long upPayLoad;
 93     long downPayLoad;
 94
 95     public KpiWritable(){}
 96
 97     public KpiWritable(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad){
 98         this.upPackNum = Long.parseLong(upPackNum);
 99         this.downPackNum = Long.parseLong(downPackNum);
100         this.upPayLoad = Long.parseLong(upPayLoad);
101         this.downPayLoad = Long.parseLong(downPayLoad);
102     }
103
104
105     @Override
106     public void readFields(DataInput in) throws IOException {
107         this.upPackNum = in.readLong();
108         this.downPackNum = in.readLong();
109         this.upPayLoad = in.readLong();
110         this.downPayLoad = in.readLong();
111     }
112
113     @Override
114     public void write(DataOutput out) throws IOException {
115         out.writeLong(upPackNum);
116         out.writeLong(downPackNum);
117         out.writeLong(upPayLoad);
118         out.writeLong(downPayLoad);
119     }
120
121     @Override
122     public String toString() {
123         return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
124     }
125 }

View Code

2 .运行结果如下

图 3.1

转载于:https://www.cnblogs.com/sunddenly/p/3995662.html

Hadoop日记Day13---使用hadoop自定义类型处理手机上网日志相关推荐

  1. hadoop自定义类型注意问题

    自定义类型要实现WritableComparable 接口,(之前只实现Writable  ,结果报错) 问题的主要原因是因为自定义类型在Partitioners 阶段要用到hashCode() 方法 ...

  2. java emr_java – EMR – 在Hadoop(和YARN)中使用自定义日志记录appender

    在我们的EMR集群中,我们使用自定义log4j-appender和log4j.properties来允许我们将日志转发到Splunk并让我们做一些魔术,提供的库和配置不知道如何操作. 在EMR 3.x ...

  3. 零起步的Hadoop实践日记(搭建hadoop和hive)

    2014-3-10 [需求] 接受的工作需要处理海量数据,第一步先用工具做一些运营数据的产出,考虑采用hadoop方便以后跟随数据量变大可以补充机器,而不用动统计逻辑. 当前的hadoop社区非常活跃 ...

  4. Hadoop入门基础教程 Hadoop之单词计数

    单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版"Hello World",该程序的完整代码可以在Hadoop安装包的src/exampl ...

  5. 【Big Data - Hadoop - MapReduce】初学Hadoop之图解MapReduce与WordCount示例分析

    Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS) ...

  6. Hadoop集群搭 Hadoop分布式文件系统架构和设计

    Hadoop集群搭建 先决条件 确保在你集群中的每个节点上都安装了所有必需软件. 获取Hadoop软件包. 安装 安装Hadoop集群通常要将安装软件解压到集群内的所有机器上. 通常,集群里的一台机器 ...

  7. CENTOS上的网络安全工具(十二)走向Hadoop(4) Hadoop 集群搭建

    目录 〇.踩坑指南 1.OpenJDK的版本 2.WEB用户 一.克隆虚拟机 二.配置主机名和网络 1.配置网络 2.设置主机名 3.将主机关系对应名写入host文件 三.配置免密SSH访问 1.本机 ...

  8. 【Hadoop生态圈】1.Hadoop入门教程及集群环境搭建

    文章目录 1.简介 2.环境准备 3.安装hadoop 3.修改Hadoop配置文件 3.1.hadoop-env.sh配置 3.2.core-site.xml配置 3.3.hdfs-site.xml ...

  9. Hadoop学习系列之Hadoop、Spark学习路线(很值得推荐)

    Hadoop学习系列之Hadoop.Spark学习路线(很值得推荐) 文章出自:http://www.cnblogs.com/zlslch/p/5448857.html 1 Java基础: 视频方面: ...

最新文章

  1. PCL:点云中的超体素数据
  2. liunx内核编译安装
  3. 不能解决,复选框在request对象获取的信息后显示在用户信息里面为中文的选项名...
  4. 985女研究生连算法都不会,还面试什么大厂!
  5. cad线加粗怎么设置_原来CAD的线条还可以这样加粗!还能修改初始单位!太实用了...
  6. Echarts 自定义数据视图
  7. EasyUI学习笔记8:MIS开发利器_ datagrid插件(下)(终结篇)
  8. linux 修改 java 内存_Linux 和 Windows修改Java虚拟机内存大小
  9. c语言磁盘文件只有写没读,C语言的磁盘文件问题
  10. 怎么把pdf转换为html,如何将PDF转换成HTML网页格式呢?
  11. Anaconda3的安装
  12. flutter 序列化_如何在Flutter中序列化对象
  13. 虚拟机安装ubuntu18.04及其srs服务器的搭建
  14. 理解 zookeeper
  15. OKHttp 的使用
  16. mysql查询男生基本情况_MySQL(一)基本查询
  17. csm和uefi_【一点资讯】关于CSM和UEFI你要知道的一些事 www.yidianzixun.com
  18. 计算机30秒自动更换的桌面软件,无需任何软件即可在计算机上的多个桌面墙纸之间自动切换...
  19. python打印星号组成的三角形_Python利用for循环打印星号三角形的案例
  20. python编写英文字典_python如何制作英文字典

热门文章

  1. 回复博友:初学ERP的建议
  2. GitHub 推出安全新功能,帮助开源软件发现漏洞和机密信息
  3. 2018年6月26日笔记
  4. 使用gulp+browser-sync搭建Sass自动化编译以及自动刷新所需要的插件
  5. dyld Library not loaded @rpath libswiftCore dylib 解决
  6. MyBatis 实现关联表查询
  7. Java内嵌Groovy脚本引擎进行业务规则剥离(一)
  8. memcache获取所有内存数据
  9. Directx11教程40 纹理映射(10)
  10. SQLite指南(0) 表和索引的文件存储结构