---------------------------------------------------------------------------------------------------------------

[版权申明:本文系作者原创,转载请注明出处]

文章出处:http://blog.csdn.net/sdksdk0/article/details/51628874

作者:朱培

---------------------------------------------------------------------------------------------------------------

本文是结合hadoop中的mapreduce来对用户数据进行分析,统计用户的手机号码、上行流量、下行流量、总流量的信息,同时可以按照总流量大小对用户进行分组排序等。是一个非常简洁易用的hadoop项目,主要用户进一步加强对MapReduce的理解及实际应用。文末提供源数据采集文件和系统源码。

本案例非常适合hadoop初级人员学习以及想入门大数据、云计算、数据分析等领域的朋友进行学习。

一、待分析的数据源

以下是一个待分析的文本文件,里面有非常多的用户浏览信息,保扩用户手机号码,上网时间,机器序列号,访问的IP,访问的网站,上行流量,下行流量,总流量等信息。这里只截取一小段,具体文件在文末提供下载链接。

二、基本功能实现

想要统计出用户的上行流量、下行流量、总流量信息,我们需要建立一个bean类来对数据进行封装。于是新建应该Java工程,导包,或者直接建立一个MapReduce工程。在这里面建立一个FlowBean.java文件。
        private long upFlow;private long dFlow;private long sumFlow;

然后就是各种右键生成get,set方法,还要toString(),以及生成构造函数,(千万记得要生成一个空的构造函数,不然后面进行分析的时候会报错)。

完整代码如下:
package cn.tf.flow;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;public class FlowBean  implements WritableComparable<FlowBean>{private long upFlow;private long dFlow;private long sumFlow;public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getdFlow() {return dFlow;}public void setdFlow(long dFlow) {this.dFlow = dFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public FlowBean(long upFlow, long dFlow) {super();this.upFlow = upFlow;this.dFlow = dFlow;this.sumFlow = upFlow+dFlow;}@Overridepublic void readFields(DataInput in) throws IOException {upFlow=in.readLong();dFlow=in.readLong();sumFlow=in.readLong();}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(dFlow);out.writeLong(sumFlow);}public FlowBean() {super();}@Overridepublic String toString() {return  upFlow + "\t" + dFlow + "\t" + sumFlow;}@Overridepublic int compareTo(FlowBean o) {return this.sumFlow>o.getSumFlow() ? -1:1;}}

然后就是这个统计的代码了,新建一个FlowCount.java.在这个类里面,我直接把Mapper和Reduce写在同一个类里面了,如果按规范的要求应该是要分开写的。

在mapper中,获取后面三段数据的值,所以我的这里length-2,length-3.
       public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 拿到这行的内容转成stringString line = value.toString();String[] fields = StringUtils.split(line, "\t");try {if (fields.length > 3) {// 获得手机号及上下行流量字段值String phone = fields[1];long upFlow = Long.parseLong(fields[fields.length - 3]);long dFlow = Long.parseLong(fields[fields.length - 2]);// 输出这一行的处理结果,key为手机号,value为流量信息beancontext.write(new Text(phone), new FlowBean(upFlow, dFlow));} else {return;}} catch (Exception e) {}}}

在reduce中队数据进行整理,统计

public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {long upSum = 0;long dSum = 0;for (FlowBean bean : values) {upSum += bean.getUpFlow();dSum += bean.getdFlow();}FlowBean resultBean = new FlowBean(upSum, dSum);context.write(key, resultBean);}}
最后在main方法中调用执行。
public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FlowCount.class);job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
当然啦,还需要先在你的hdfs根目录中建立/flow/data数据,然后我那个用户的数据源上传上去。
 bin/hadoop fs -mkdir -p /flow/databin/hadoop fs -put HTTP_20130313143750.dat /flow/databin/hadoop jar  ../lx/flow.jar
把上面这个MapReduce工程打包成一个jar文件,然后用hadoop来执行这个jar文件。例如我放在~/hadoop/lx/flow.jar,然后再hadoop安装目录中执行
bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCount  /flow/data  /flow/output

最后执行结果如下:


在这整过过程中,我们是有yarnchild的进程在执行的,如下图所示:当整个过程执行完毕之后yarnchild也会自动退出。

三、按总流量从大到小排序

如果你上面这个基本操作以及完成了的话,按总流量排序就非常简单了。我们新建一个FlowCountSort.java.

全部代码如下:

package cn.tf.flow;import java.io.IOException;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowCountSort {public static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line=value.toString();String[] fields=StringUtils.split(line,"\t");String phone=fields[0];long upSum=Long.parseLong(fields[1]);long dSum=Long.parseLong(fields[2]);FlowBean sumBean=new FlowBean(upSum,dSum);context.write(sumBean, new Text(phone));}
}public static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{//进来的“一组”数据就是一个手机的流量bean和手机号@Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {context.write(values.iterator().next(), key);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FlowCountSort.class);job.setMapperClass(FlowCountSortMapper.class);job.setReducerClass(FlowCountSortReducer.class);job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}}

这个主要就是使用了FlowBean.java中的代码来实现的,主要是继承了WritableComparable<FlowBean>接口来实现,然后重写了compareTo()方法。

@Overridepublic int compareTo(FlowBean o) {return this.sumFlow>o.getSumFlow() ? -1:1;}

按照同样的方法对这个文件打成jar包,然后使用hadoop的相关语句进行执行就可以了。

bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCountSort  /flow/output  /flow/sortoutput

结果图:

四、按用户号码区域进行分类

流量汇总之后的结果需要按照省份输出到不同的结果文件中,需要解决两个问题:

1、如何让mr的最终结果产生多个文件: 原理:MR中的结果文件数量由reduce
  task的数量绝对,是一一对应的 做法:在代码中指定reduce task的数量
 
 
  2、如何让手机号进入正确的文件 原理:让不同手机号数据发给正确的reduce task,就进入了正确的结果文件
  要自定义MR中的分区partition的机制(默认的机制是按照kv中k的hashcode%reducetask数)
  做法:自定义一个类来干预MR的分区策略——Partitioner的自定义实现类

主要代码与前面的排序是非常类似的,只要在main方法中添加如下两行代码就可以了。

          //指定自定义的partitionerjob.setPartitionerClass(ProvincePartioner.class);job.setNumReduceTasks(5);

这里我们需要新建一个ProvincePartioner.java来处理号码分类的逻辑。

public class ProvincePartioner extends Partitioner<Text, FlowBean>{private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();static {provinceMap.put("135", 0);provinceMap.put("136", 1);provinceMap.put("137", 2);provinceMap.put("138", 3);      }@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {String prefix = key.toString().substring(0, 3);Integer partNum = provinceMap.get(prefix);if(partNum == null) partNum=4;return partNum;}}

执行方法和前面也是一样的。从执行的流程中我们可以看到这里启动了5个reduce task,因为我这里数据量比较小,所以只启动了一个map task。

到这里,整个用户流量分析系统就全部结束了。关于大数据的更多内容,欢迎关注。点击左上角头像下方“点击关注".感谢您的支持!

数据源下载地址:http://download.csdn.net/detail/sdksdk0/9545935

源码项目地址:https://github.com/sdksdk0/HDFS_MapReduce

大数据实战:用户流量分析系统相关推荐

  1. 毕业设计 - 题目:基于大数据的用户画像分析系统 数据分析 开题

    文章目录 1 前言 2 用户画像分析概述 2.1 用户画像构建的相关技术 2.2 标签体系 2.3 标签优先级 3 实站 - 百货商场用户画像描述与价值分析 3.1 数据格式 3.2 数据预处理 3. ...

  2. 《Spark商业案例与性能调优实战100课》第6课:商业案例之通过Spark SQL实现大数据电影用户行为分析

    <Spark商业案例与性能调优实战100课>第6课:商业案例之通过Spark SQL实现大数据电影用户行为分析 package com.dt.spark.sparksqlimport or ...

  3. 大数据之电商分析系统(一)

    大数据之电商分析系统(一) 一:项目介绍 ​ 本项目来源于企业级电商网站的大数据统计分析平台, 该平台以 Spark 框架为核心, 对电商网站的日志进行离线和实时分析.该大数据分析平台对电商网站的各种 ...

  4. 基于大数据的网站日志分析系统

    本文没有任何代码,只有各个模块工作的大体机制和整体流程.算是一个科普文吧,我也对原理一知半解. 基于大数据的网站日志分析系统 1. 日志数据格式 1.1 访问日志 1.1.1 log_format 1 ...

  5. 大数据平台网站日志分析系统

    1:大数据平台网站日志分析系统,项目技术架构图: 2:大数据平台网站日志分析系统,流程图解析,整体流程如下: ETL即hive查询的sql; 但是,由于本案例的前提是处理海量数据,因而,流程中各环节所 ...

  6. 大数据平台日志存储分析系统解决方案

    大数据平台日志存储分析系统是在大数据平台下,针对业务系统产生的日志记录进行存储和分析.日志数据来自ElasticSearch存储的日志历史数据,并将需要存储和分析的日志数据从ElasticSearch ...

  7. 大数据案例--电信日志分析系统

    目录 一.项目概述 1.概述 二.字段解释分析 1.数据字段 2.应用大类 3.应用小类 三.项目架构 四.数据收集清洗 1.数据收集 2.数据清洗 五.Sqoop使用 1.简介 2.Sqoop安装步 ...

  8. 大数据之“用户行为分析”

    编者按:本文由卢东明为36氪撰写,是大数据系列文章的第2篇.卢东明是SAP公司全球数据库解决方案亚太区技术总监:拥有长达 20 年数据库.数据仓库开发管理经验. 这几年,几家电商的价格战打得不亦乐乎, ...

  9. 腾讯QQ大数据:用户增长分析——用户流失预警

    1,前言:针对用户增长分析这个课题,本文主要从用户防流失的角度,阐述如何基于QQ社交网络数据构建用户流失预警模型,找出高潜流失用户,用于定向开展运营激活,从而有效控制用户流失风险,提升大盘用户的留存率 ...

最新文章

  1. Sublime Text 2/3 Package Control 安装方法(Install Package)
  2. Spring Cloud【Finchley】-16 Zuul的路由配置
  3. 【腾讯面试题】Nginx
  4. 100道JS构造函数面试题
  5. java基础进阶(文件列表,线程,线程组)编程实例(4篇)
  6. html 拼接onmouseout,HTML onmouseout事件用法及代码示例
  7. 王一博、张艺兴等多位明星起诉医美平台更美App
  8. Python给指定文件打上数字签名
  9. canvas 绘制直线 并选中_javascript自学记录:canvas绘图
  10. 大数据可视化有哪些作用和优点
  11. IOS中通知中心NSNotificationCenter应用总结
  12. 计算机图形学设计线宽代码,计算机图形学画圆并改变线宽.pdf
  13. 【地理坐标系、大地坐标系与地图投影与重投影详解】
  14. eleme 分页组件更新
  15. 又一个中国男人荣获巨奖!拿奖拿的手软,却坦言“我对诺奖没有兴趣”...
  16. python中matplotlib的plot函数
  17. cannot connect to X server
  18. 鸿蒙系统 基于安卓,鸿蒙系统(鸿蒙OS),不同于安卓,是面向未来的更广泛的系统...
  19. 小米手机升级后便签内容没了如何找回
  20. linux系统磁盘分区查看,linux下磁盘查看和分区

热门文章

  1. 【Java SE】this引用注意事项
  2. JS向数组添加元素,插入数据
  3. JS数组添加元素的三种方法
  4. qnap+qBitorrent
  5. CentOS7mini版给yum更换阿里云仓库
  6. 免费可用官方天气API推荐
  7. 拼多多售后有哪些处理技巧?店盈通电商解答
  8. fastjson 修改多层嵌套的Json数据
  9. python遍历文件夹排序_python 顺序读取文件夹下面的文件(自定义排序方式)
  10. 原型模式 与 建造者模式