spark sql uv_使用Spark Streaming SQL进行PV/UV统计-阿里云开发者社区
作者:关文选,花名云魄,阿里云E-MapReduce 高级开发工程师,专注于流式计算,Spark Contributor
1.背景介绍
PV/UV统计是流式分析一个常见的场景。通过PV可以对访问的网站做流量或热点分析,例如广告主可以通过PV值预估投放广告网页所带来的流量以及广告收入。另外一些场景需要对访问的用户作分析,比如分析用户的网页点击行为,此时就需要对UV做统计。
使用Spark Streaming SQL,并结合Redis可以很方便进行PV/UV的统计。本文将介绍通过Streaming SQL消费Loghub中存储的用户访问信息,对过去1分钟内的数据进行PV/UV统计,将结果存入Redis中。
2.准备工作
创建E-MapReduce 3.23.0以上版本的Hadoop集群。
下载并编译E-MapReduce-SDK包
git clone git@github.com:aliyun/aliyun-emapreduce-sdk.git
cd aliyun-emapreduce-sdk
git checkout -b master-2.x origin/master-2.x
mvn clean package -DskipTests
编译完后, assembly/target目录下会生成emr-datasources_shaded_${version}.jar,其中${version}为sdk的版本。
数据源
本文采用Loghub作为数据源,有关日志采集、日志解析请参考日志服务。
3.统计PV/UV
一般场景下需要将统计出的PV/UV以及相应的统计时间存入Redis。其他一些业务场景中,也会只保存最新结果,用新的结果不断覆盖更新旧的数据。以下首先介绍第一种情况的操作流程。
3.1启动客户端
命令行启动streaming-sql客户端
streaming-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar
也可以创建SQL语句文件,通过streaming-sql -f的方式运行。
3.1定义数据表
数据源表定义如下
CREATE TABLE loghub_source(user_ip STRING, __time__ TIMESTAMP)
USING loghub
OPTIONS(
sls.project=${sls.project},
sls.store=${sls.store},
access.key.id=${access.key.id},
access.key.secret=${access.key.secret},
endpoint=${endpoint});
其中,数据源表包含user_ip和__time__两个字段,分别代表用户的IP地址和loghub上的时间列。OPTIONS中配置项的值根据实际配置。
结果表定义如下
CREATE TABLE redis_sink
USING redis
OPTIONS(
table='statistic_info',
host=${redis_host},
key.column='user_ip');
其中,user_ip对应数据中的用户IP字段,配置项${redis_host}的值根据实际配置。
3.2创建流作业
CREATE SCAN loghub_scan
ON loghub_source
USING STREAM
OPTIONS(
watermark.column='__time__',
watermark.delayThreshold='10 second');
CREATE STREAM job
OPTIONS(
checkpointLocation=${checkpoint_location})
INSERT INTO redis_sink
SELECT COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval
FROM loghub_scan
GROUP BY TUMBLING(__time__, interval 1 minute), window;
4.3查看统计结果
最终的统计结果如下图所示
可以看到,每隔一分钟都会生成一条数据,key的形式为表名:interval,value为pv和uv的值。
3.4实现覆盖更新
将结果表的配置项key.column修改为一个固定的值,例如定义如下
CREATE TABLE redis_sink
USING redis
OPTIONS(
table='statistic_info',
host=${redis_host},
key.column='statistic_type');
创建流作业的SQL改为
CREATE STREAM job
OPTIONS(
checkpointLocation='/tmp/spark-test/checkpoint')
INSERT INTO redis_sink
SELECT "PV_UV" as statistic_type,COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval
FROM loghub_scan
GROUP BY TUMBLING(__time__, interval 1 minute), window;
最终的统计结果如下图所示
可以看到,Redis中值保留了一个值,这个值每分钟都被更新,value包含pv、uv和interval的值。
4.总结
本文简要介绍了使用Streaming SQL结合Redis实现流式处理中统计PV/UV的需求。后续文章,我将介绍Spark Streaming SQL的更多内容
阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
spark sql uv_使用Spark Streaming SQL进行PV/UV统计-阿里云开发者社区相关推荐
- mysql sql rowcount_ORACLE中的SQL%ROWCOUNT与MySQL中的ROW_COUNT()的一点异同-阿里云开发者社区...
MySQL的ROW_COUNT()和ORACLE中的SQL%ROWCOUNT函数作用并不完全相同.从作用上来说,两者都是返回前一个SQL进行UPDATE,DELETE,INSERT操作所影响的行数,但 ...
- sql盲注 解决_sql盲注-和sql盲注相关的内容-阿里云开发者社区
<白帽子讲WEB安全>学习笔记之第7章 注入攻击 第7章 注入攻击 SQL注入的两个条件:1,用户可以控制输入:2,原本执行的SQL语句并接了用户输入的数据. 7.1 sql注入 SQL注 ...
- 2008 go server sql 批处理_SQL Server 让你的数据来去自如——批处理-阿里云开发者社区...
比如说,我们现在需要建立一个数据库(create database),再建立一个表(create table),如果表的字段很少,手动添加就可以,一个一个插入到表中. 那么如果字段很多怎么办呢?一个一 ...
- python中引入sql的优点_引用sql-和引用sql相关的内容-阿里云开发者社区
bboss持久层改进支持模块sql配置文件引用其它模块sql配置文件中sql语句 bboss持久层改进支持模块sql配置文件引用其它模块sql配置文件中sql语句. 具体使用方法如下: <pro ...
- canal同步mysql到kafka_使用Canal同步MySQL数据到Kafka 得到的数据中sql字段无值-问答-阿里云开发者社区-阿里云...
这个应该跟你的binlog记录模式有关系,binlog有3中模式,ROW(行模式), Statement(语句模式), Mixed(混合模式)三种模式的用法如下: ROW(行模式):记录那条数据修改了 ...
- hive底层原理 sql执行过程_Hive mapreduce SQL实现原理——SQL最终分解为MR任务,而group by在MR里和单词统计MR没有区别了-阿里云开发者社区...
转自:http://blog.csdn.net/sn_zzy/article/details/43446027 SQL转化为MapReduce的过程 了解了MapReduce实现SQL基本操作之后,我 ...
- dms mysql定义变量_数据管理DMS:自建MySQL数据库 全量SQL诊断功能发布啦!-阿里云开发者社区...
MySQL的用户都面临都一个难题,异常或者故障问题难定位,很多时候都靠"猜". 如果比较幸运,异常正在发生,我们还可以获取到会话.引擎状态等信息: 如果没有异常现场,要找到根因,除 ...
- cyq.data 连接mysql_CYQ.Data V5 文本数据库支持SQL语句操作(实现原理解说)-阿里云开发者社区...
CYQ.Data V5版本的文本数据库,以前有过相关的介绍:周末一起用文本数据库玩玩Code First 数据的存储,是基于json格式或xml格式的,而实现的原理,也有一篇介绍: CYQ.Data ...
- spark sql uv_使用Spark Streaming SQL进行PV/UV统计
作者:关文选,花名云魄,阿里云E-MapReduce 高级开发工程师,专注于流式计算,Spark Contributor 1.背景介绍 PV/UV统计是流式分析一个常见的场景.通过PV可以对访问的网站 ...
最新文章
- 扎心!程序员旅行却只能紧紧抱着电脑加班?
- 后端:请谨慎使用Arrays.asList、ArrayList的subList
- 本来中午打算应付下随便吃点,可是连盐都没有放的辣椒炒蛋实在是令人不快...
- 今天研究 Client本来是关联的Expression接口,笔记记录一下。
- Linux环境编译时报错/lib64/libdl.so.2: could not read symbols: Invalid operation
- I00009 用1生成回文数
- Linux系统命令行中vim编辑器取消高亮显示
- 科普:什么是人工智能
- java extjs combobox_Extjs 教程三 “combobox”
- 红警win10黑屏和不显示菜单栏问题 只有声音没有图像的解决
- 利用ode45求解含控制量并且控制量为离散点的动力学方程
- 无人机在计算机专业的应用,嵌入式计算机在无人机系统的应用
- 产品架构图到底是怎么“画”出来的?
- 链表之反转链表,万金油的解题方法(java求解)
- 10进制转2进制,js实现
- JS分子结构编辑器:基于SMILES来绘制分子结构,类似于Marvin JS
- 基于SpringBoot+Vue 实现的OA自动化办公系统
- 科研项目管理系统——应用效果
- Yimin Xiao
- 【转】FLASH经典问答