【案例】使用hive和sqoop来实现网站基本指标,PV和UV
1、PV统计网页浏览总量
2、UV去重
-》【需求】统计24小时每个时段的PV和UV
-》建分区表,按天一级,按小时一级,多级分区
-》第一步分析需求
-》第二步获取时间字段,天,小时
-》对于时间格式进行数据清洗,比如:2015-08-28 18:10:00,从中获取日期和小时
-》获取需要有用的字段:id、url、guid、trackTime
-》第三步数据分析
-》使用select sql
-》第四步使用sqoop导出

-》预期结果
日期 小时 PV UV

建库:

create database track_log2;

建表:源表

create table yhd_source2(id              string,url             string,referer         string,keyword         string,type            string,guid            string,pageId          string,moduleId        string,linkId          string,attachedInfo    string,sessionId       string,trackerU        string,trackerType     string,ip              string,trackerSrc      string,cookie          string,orderCode       string,trackTime       string,endUserId       string,firstLink       string,sessionViewNo   string,productId       string,curMerchantId   string,provinceId      string,cityId          string,fee             string,edmActivity     string,edmEmail        string,edmJobId        string,ieVersion       string,platform        string,internalKeyword string,resultSum       string,currentPage     string,linkPosition    string,buttonPosition  string
)
row format delimited fields terminated by '\t'
stored as textfile;

shift+alt 下拉列式编辑

加载数据:

load data local inpath '/opt/datas/2015082818' into table yhd_source;
load data local inpath '/opt/datas/2015082819' into table yhd_source;

分区的方式:静态分区

create table yhd_part1(id string,url string,guid string
)
partitioned by (date string,hour string)
row format delimited fields terminated by '\t';

加载数据,来源于source源表

insert into table yhd_part1 partition (date='20150828',hour='18') select id,url,guid from yhd_qingxi where date='28' and hour='18';
insert into table yhd_part1 partition (date='20150828',hour='19') select id,url,guid from yhd_qingxi where date='28' and hour='19';select id,date,hour from yhd_part1 where date='20150828' and hour='18';

建一张清洗表,将时间字段清洗,提取部分的时间字段出来

create table yhd_qingxi(
id string,
url string,
guid string,
date string,
hour string
)
row format delimited fields terminated by '\t';

字段截取,天&小时

insert into table yhd_qingxi select id,url,guid,substring(trackTime,9,2) date,substring(trackTime,12,2) hour from yhd_source;

分区的方式:动态分区

<property><name>hive.exec.dynamic.partition</name><value>true</value><description>Whether or not to allow dynamic partitions in DML/DDL.</description>
</property>

-》默认值是true,代表允许使用动态分区实现

<property><name>hive.exec.dynamic.partition.mode</name><value>strict</value><description>In strict mode, the user must specify at least one static partition in case the user accidentally overwrites all partitions.</description>
</property>

-》set hive.exec.dynamic.partition.mode=nonstrict;  使用非严格模式

建表:

create table yhd_part2(id string,url string,guid string
)
partitioned by (date string,hour string)
row format delimited fields terminated by '\t';

执行动态分区:

insert into table yhd_part2 partition (date,hour) select * from yhd_qingxi;

-》也可以不写select *  ,但是要写全字段
-》首先根据select * 找到表,按照里面的字段date hour进行匹配

实现PV和UV的统计

PV实现:

select date,hour,count(url) PV from yhd_part1 group by date,hour;

-》按照天和小时进行分区
-》结果:
+-----------+-------+--------+--+
|   date    | hour  |   pv   |
+-----------+-------+--------+--+
| 20150828  | 18    | 64972  |
| 20150828  | 19    | 61162  |
+-----------+-------+--------+--+

UV实现:

select date,hour,count(distinct guid) UV from yhd_part1 group by date,hour; 

-》结果:
+-----------+-------+--------+--+
|   date    | hour  |   uv   |
+-----------+-------+--------+--+
| 20150828  | 18    | 23938  |
| 20150828  | 19    | 22330  |
+-----------+-------+--------+--+

endUserId  guid
登录的身份:
-》游客
-》会员
无论是游客还是会员都会有一个guid
endUserId应该是只针对于会员的,使用账号登录的
将PV和UV结合统计

create table if not exists result as select date,hour,count(url) PV ,count(distinct guid) UV from yhd_part1 group by date,hour; 

-》结果:
+--------------+--------------+------------+------------+--+
| result.date  | result.hour  | result.pv  | result.uv  |
+--------------+--------------+------------+------------+--+
| 20150828     | 18           | 64972      | 23938      |
| 20150828     | 19           | 61162      | 22330      |
+--------------+--------------+------------+------------+--+

将结果导出到mysql表中

先在mysql建表:用于保存结果集

create table if not exists save2(date varchar(30) not null,hour varchar(30) not null,pv varchar(30) not null,uv varchar(30) not null,primary key(date,hour)
);

使用sqoop实现导出到mysql

bin/sqoop export \
--connect \
jdbc:mysql://bigdata-senior02.ibeifeng.com:3306/sqoop \
--username root \
--password 123456 \
--table save2 \
--export-dir /user/hive/warehouse/track_log2.db/result \
--num-mappers 1 \
--input-fields-terminated-by '\001'

+----------+------+-------+-------+
| date     | hour | pv    | uv    |
+----------+------+-------+-------+
| 20150828 | 18   | 64972 | 23938 |
| 20150828 | 19   | 61162 | 22330 |
+----------+------+-------+-------+

hive默认的分隔符:\001

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------

需求分析:

UV统计

方案分析:

1,传统的方式是把session_id放入Set实现自动去重,Set.size()获得UV,但是这种方式只能在单机上有效

2,可行的方案(类似WordCount的计算去重word总数):

bolt1通过fieldGrouping进行多线程局部汇总,下一级bolt2进行单线程保存session_id和count数到Map且进行遍历,可以得到:

PV,UV,访问深度(每个session_id的浏览数)

按日期统计

2014-05-01 UV数

去重需求分析:

既然去重,必须持久化数据:

1,内存

数据结构map(中小企业)

2,no-sql分布式数据库,如hbase(大企业)

storm应用场景广泛

但能做的复杂度有限,通常都是汇总型的。

你如果想做数据分析,很难,但是你可以做一些数据分析之前的工作,就是源数据预处理,写库

spout:package base;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;/*** Created by Administrator on 2016/10/7.*/
public class SourceSpout implements IRichSpout{/*数据源Spout*/private static final long serialVersionUID = 1L;Queue<String> queue = new ConcurrentLinkedQueue<String>();SpoutOutputCollector collector = null;String str = null;@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {try{this.collector = spoutOutputCollector;Random random = new Random();String[] hosts = {"www.taobao.com"};String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7","CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53","2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };for (int i = 0;i < 100; i++){queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]);}}catch (Exception e){e.printStackTrace();}}@Overridepublic void close() {}@Overridepublic void activate() {}@Overridepublic void deactivate() {}@Overridepublic void nextTuple() {if(queue.size() >= 0){collector.emit(new Values(queue.poll()));try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void ack(Object o) {System.out.println("spout ack:"+o.toString());}@Overridepublic void fail(Object o) {System.out.println("spout fail:"+o.toString());}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("log"));}    @Overridepublic Map<String, Object> getComponentConfiguration() {return null;}
}

格式化:

package user_visit;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import tools.DataFmt;import java.util.Map;/*** Created by Administrator on 2016/10/8.*/
public class FmtLogBolt implements IBasicBolt{/*这个相比于irich的好处就是不用显性的去回调它的ask和fail方法*/private static final long serialVersionUID = 1L;@Overridepublic void prepare(Map map, TopologyContext topologyContext) {}String eachLog = null;@Overridepublic void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {eachLog = tuple.getString(0);if(eachLog != null && eachLog.length() > 0){//日期,session_idbasicOutputCollector.emit(new Values(DataFmt.getCountDate(eachLog.split("\t")[2],DataFmt.date_short),eachLog.split("\t")[1]));}}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {//这两个名称的作用就是在下一级通过这两个名称获取outputFieldsDeclarer.declare(new Fields("date","session_id"));}    @Overridepublic Map<String, Object> getComponentConfiguration() {return null;}
}

局部汇总:

package user_visit;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;import java.util.HashMap;
import java.util.Map;/*** Created by Administrator on 2016/10/8.* 统计每个session_id的pv*/
public class DeepVisitBolt implements IBasicBolt{/**/private static final long serialVersionUID = 1L;@Overridepublic void prepare(Map map, TopologyContext topologyContext) {}//map存局部汇总的值Map<String, Integer> counts = new HashMap<String, Integer>();@Overridepublic void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {String dateString = tuple.getStringByField("date");String session_id = tuple.getStringByField("session_id");/*我们要去重,就需要把我们要去重的东西放到map的key里面*/Integer count = counts.get(dateString+"_"+session_id);if(count == null){count = 0;}count++;counts.put(dateString+"_"+session_id, count);//这是我们的局部汇总,我们需要把它发到我们的下一级做一个总的汇总basicOutputCollector.emit(new Values(dateString+"_"+session_id, count));}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("date_session_id", "count"));}    @Overridepublic Map<String, Object> getComponentConfiguration() {return null;}
}

统计:

package user_visit;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import tools.DataFmt;import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;/*** Created by Administrator on 2016/10/6.*/
public class UVSumBolt implements IBasicBolt{private static final long serialVersionUID = 1L;String cur_date = null;long beginTime = System.currentTimeMillis();long endTime = 0;Map<String, Integer> counts = new HashMap<String, Integer>();@Overridepublic void prepare(Map map, TopologyContext topologyContext) {cur_date = DataFmt.getCountDate("2014-01-07", DataFmt.date_short);}@Overridepublic void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {try{endTime = System.currentTimeMillis();long PV = 0;//总数long UV = 0;//个数,去重后String dateSession_id = tuple.getString(0);Integer countInteger = tuple.getInteger(1);//判断数据是不是当天的,而且比当前日期还要打if(!dateSession_id.startsWith(cur_date) && DataFmt.parseDate(dateSession_id.split("_")[0]).after(DataFmt.parseDate(cur_date))){cur_date = dateSession_id.split("_")[0];counts.clear();}counts.put(dateSession_id, countInteger);if(endTime - beginTime >= 2000){//获取word去重个数,遍历counts的keyset,取countIterator<String> i2 = counts.keySet().iterator();while(i2.hasNext()){String key = i2.next();if(key != null){if(key.startsWith(cur_date)){UV ++;PV += counts.get(key);}}}System.out.println("PV=" + PV + "; UV=" + UV);}}catch (Exception e){throw new FailedException("SumBolt fail!");}}@Overridepublic void cleanup() {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}
}

使用hive和sqoop来实现统计24小时每个时段的PV和UV,storm计算网站UV(去重计算模式)相关推荐

  1. sqoop同步时间戳到mysql_在sqoop导入中使用24小时时间戳

    我想从使用自由格式查询的oracle导入数据,并且需要以24小时格式存储时间戳.在sqoop导入中使用24小时时间戳 sqoop import --connect jdbc:oracle:thin:( ...

  2. mysql统计24小时数据

    示例1:根据时间点统计个数 SELECTDATE_FORMAT(create_time,'%Y-%m-%d %H') AS date,COUNT( * ) AS dateCountFROMsys_me ...

  3. python根据excel时间表统计24小时各小时区间点的个数

    1.首先使用excel中的HOUR()函数,将日期数据(年/月/日 时:分:秒)转换为小时,表格命名为hour.xlsx: 2.使用python读取excel数据hour.xlsx,将小时列转换为列表 ...

  4. mysql 统计 uv pv_结合Hive、Sqoop统计日志pv和uv

    分析 数据源格式 121508281810000000 http://www.yhd.com/?union_ref=7&cp=0 3 PR4E9HWE38DMN4Z6HUG667SCJNZXM ...

  5. sql实现--根据不同类型的时间戳按24小时每小时统计数据,并补齐数据和取整

    两种不同的时间戳转换成24小时 第一种:1546308369067(标准时间戳) FROM_UNIXTIME(C.RECHARGE_TIME/1000,'%H') AS DATE_NO 第二种:201 ...

  6. 最近24小时记录:虚拟机与Wireshark 2.0

    最近24小时记录:虚拟机与Wireshark 2.0 (1)虚拟机如果NAT模式而无法联网,需要检查实体机中VMware的NAT服务是否开启. (2)从Wireshark 2.0版本开始,Wiresh ...

  7. 大屏监控系统实战(14)-24小时得票数量统计曲线制作

    一.概述 这一章节我们来制作<24小时得票数量统计曲线>,这个跟10分钟投票的增量类似,不过他显示的是每个博主的总得票数,时间范围我设置为24小时,取每个小时博主的总得票数,通过这个监控, ...

  8. hive时间AM PM格式转化为24小时制 按小时分morning,noon 思路+演示

    解决的问题是在自己做小项目是遇到的,如需要可在添加链接描述这篇博客获取所需的项目文件自行练习. 解决思路:上午时间不变 下午的时间+12 hql语句测试 select time,case when i ...

  9. Hive及Sqoop的表操作

    环境信息 1. 硬件: 内存ddr3 4G及以上的x86架构主机一部 系统环境:windows 2. 软件: 运行vmware或者virtualbox 3. 其他: 无 步骤与方法 1. Hive.S ...

最新文章

  1. php 原生查询mongo,PHP操作MongoDB的原生CURD方法
  2. visualstudio发布网站到服务器,发布到网站 - Visual Studio (Windows) | Microsoft Docs
  3. WinCE下串口虚拟软件
  4. Android自动化测试之MonkeyRunner
  5. 全局事件-广播(Broadcast)
  6. 欢乐纪中某A组赛【2019.7.5】
  7. (五十五)iOS多线程之GCD
  8. 熊猫“大虾”-03/07/2011开始修炼
  9. Effective Java~23. 类层次优于标签类
  10. 发布自己的CocoaPods的步骤
  11. 最最简单的一个爬虫代码
  12. uni中的web-view
  13. 四阶五级matlab,微分方程数值解法matlab(四阶龙格—库塔法).ppt
  14. 洛谷 P5602 小E与美食 (尚贤)
  15. LEP环境搭建(linux easy profiling)
  16. Python极简实现IoU
  17. 一键清理电脑垃圾文件
  18. rtl8723bu linux wifi驱动移植教程
  19. html 图片 空白,HTML图片标签空白解决方法
  20. win10系统开发环境搭建

热门文章

  1. SCEA考试(SUN认证架构师)考什么(含真题及考点)
  2. ROS服务srv文件
  3. MATLAB下MSS (Marine Systems Simulator)工具箱安装方式
  4. 狗屁不通文章生成器写的文章,评分软件赞不绝口
  5. 企业邮箱登录,心之所向,素履以往
  6. 谷歌Chrome 100正式版发布:启用全新图标,修复28个安全漏洞
  7. 2.5 TCP/IP命令(flushdns)
  8. java动态代理中的invoke方法
  9. Elasticsearch分页查询实现
  10. mask-rcnn报错: IndexError: boolean index did not match indexed array along dimension 0;...