作者:关文选,花名云魄,阿里云E-MapReduce 高级开发工程师,专注于流式计算,Spark Contributor

1.背景介绍

PV/UV统计是流式分析一个常见的场景。通过PV可以对访问的网站做流量或热点分析,例如广告主可以通过PV值预估投放广告网页所带来的流量以及广告收入。另外一些场景需要对访问的用户作分析,比如分析用户的网页点击行为,此时就需要对UV做统计。

使用Spark Streaming SQL,并结合Redis可以很方便进行PV/UV的统计。本文将介绍通过Streaming SQL消费Loghub中存储的用户访问信息,对过去1分钟内的数据进行PV/UV统计,将结果存入Redis中。

2.准备工作

创建E-MapReduce 3.23.0以上版本的Hadoop集群。

下载并编译E-MapReduce-SDK包

git clone git@github.com:aliyun/aliyun-emapreduce-sdk.git

cd aliyun-emapreduce-sdk

git checkout -b master-2.x origin/master-2.x

mvn clean package -DskipTests

编译完后, assembly/target目录下会生成emr-datasources_shaded_${version}.jar,其中${version}为sdk的版本。

数据源

本文采用Loghub作为数据源,有关日志采集、日志解析请参考日志服务。

3.统计PV/UV

一般场景下需要将统计出的PV/UV以及相应的统计时间存入Redis。其他一些业务场景中,也会只保存最新结果,用新的结果不断覆盖更新旧的数据。以下首先介绍第一种情况的操作流程。

3.1启动客户端

命令行启动streaming-sql客户端

streaming-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar

也可以创建SQL语句文件,通过streaming-sql -f的方式运行。

3.1定义数据表

数据源表定义如下

CREATE TABLE loghub_source(user_ip STRING, __time__ TIMESTAMP)

USING loghub

OPTIONS(

sls.project=${sls.project},

sls.store=${sls.store},

access.key.id=${access.key.id},

access.key.secret=${access.key.secret},

endpoint=${endpoint});

其中,数据源表包含user_ip和__time__两个字段,分别代表用户的IP地址和loghub上的时间列。OPTIONS中配置项的值根据实际配置。

结果表定义如下

CREATE TABLE redis_sink

USING redis

OPTIONS(

table='statistic_info',

host=${redis_host},

key.column='user_ip');

其中,user_ip对应数据中的用户IP字段,配置项${redis_host}的值根据实际配置。

3.2创建流作业

CREATE SCAN loghub_scan

ON loghub_source

USING STREAM

OPTIONS(

watermark.column='__time__',

watermark.delayThreshold='10 second');

CREATE STREAM job

OPTIONS(

checkpointLocation=${checkpoint_location})

INSERT INTO redis_sink

SELECT COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval

FROM loghub_scan

GROUP BY TUMBLING(__time__, interval 1 minute), window;

4.3查看统计结果

最终的统计结果如下图所示

可以看到,每隔一分钟都会生成一条数据,key的形式为表名:interval,value为pv和uv的值。

3.4实现覆盖更新

将结果表的配置项key.column修改为一个固定的值,例如定义如下

CREATE TABLE redis_sink

USING redis

OPTIONS(

table='statistic_info',

host=${redis_host},

key.column='statistic_type');

创建流作业的SQL改为

CREATE STREAM job

OPTIONS(

checkpointLocation='/tmp/spark-test/checkpoint')

INSERT INTO redis_sink

SELECT "PV_UV" as statistic_type,COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval

FROM loghub_scan

GROUP BY TUMBLING(__time__, interval 1 minute), window;

最终的统计结果如下图所示

可以看到,Redis中值保留了一个值,这个值每分钟都被更新,value包含pv、uv和interval的值。

4.总结

本文简要介绍了使用Streaming SQL结合Redis实现流式处理中统计PV/UV的需求。后续文章,我将介绍Spark Streaming SQL的更多内容

阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!

spark sql uv_使用Spark Streaming SQL进行PV/UV统计-阿里云开发者社区相关推荐

  1. mysql sql rowcount_ORACLE中的SQL%ROWCOUNT与MySQL中的ROW_COUNT()的一点异同-阿里云开发者社区...

    MySQL的ROW_COUNT()和ORACLE中的SQL%ROWCOUNT函数作用并不完全相同.从作用上来说,两者都是返回前一个SQL进行UPDATE,DELETE,INSERT操作所影响的行数,但 ...

  2. sql盲注 解决_sql盲注-和sql盲注相关的内容-阿里云开发者社区

    <白帽子讲WEB安全>学习笔记之第7章 注入攻击 第7章 注入攻击 SQL注入的两个条件:1,用户可以控制输入:2,原本执行的SQL语句并接了用户输入的数据. 7.1 sql注入 SQL注 ...

  3. 2008 go server sql 批处理_SQL Server 让你的数据来去自如——批处理-阿里云开发者社区...

    比如说,我们现在需要建立一个数据库(create database),再建立一个表(create table),如果表的字段很少,手动添加就可以,一个一个插入到表中. 那么如果字段很多怎么办呢?一个一 ...

  4. python中引入sql的优点_引用sql-和引用sql相关的内容-阿里云开发者社区

    bboss持久层改进支持模块sql配置文件引用其它模块sql配置文件中sql语句 bboss持久层改进支持模块sql配置文件引用其它模块sql配置文件中sql语句. 具体使用方法如下: <pro ...

  5. canal同步mysql到kafka_使用Canal同步MySQL数据到Kafka 得到的数据中sql字段无值-问答-阿里云开发者社区-阿里云...

    这个应该跟你的binlog记录模式有关系,binlog有3中模式,ROW(行模式), Statement(语句模式), Mixed(混合模式)三种模式的用法如下: ROW(行模式):记录那条数据修改了 ...

  6. hive底层原理 sql执行过程_Hive mapreduce SQL实现原理——SQL最终分解为MR任务,而group by在MR里和单词统计MR没有区别了-阿里云开发者社区...

    转自:http://blog.csdn.net/sn_zzy/article/details/43446027 SQL转化为MapReduce的过程 了解了MapReduce实现SQL基本操作之后,我 ...

  7. dms mysql定义变量_数据管理DMS:自建MySQL数据库 全量SQL诊断功能发布啦!-阿里云开发者社区...

    MySQL的用户都面临都一个难题,异常或者故障问题难定位,很多时候都靠"猜". 如果比较幸运,异常正在发生,我们还可以获取到会话.引擎状态等信息: 如果没有异常现场,要找到根因,除 ...

  8. cyq.data 连接mysql_CYQ.Data V5 文本数据库支持SQL语句操作(实现原理解说)-阿里云开发者社区...

    CYQ.Data V5版本的文本数据库,以前有过相关的介绍:周末一起用文本数据库玩玩Code First 数据的存储,是基于json格式或xml格式的,而实现的原理,也有一篇介绍: CYQ.Data ...

  9. spark sql uv_使用Spark Streaming SQL进行PV/UV统计

    作者:关文选,花名云魄,阿里云E-MapReduce 高级开发工程师,专注于流式计算,Spark Contributor 1.背景介绍 PV/UV统计是流式分析一个常见的场景.通过PV可以对访问的网站 ...

最新文章

  1. 扎心!程序员旅行却只能紧紧抱着电脑加班?
  2. 后端:请谨慎使用Arrays.asList、ArrayList的subList
  3. 本来中午打算应付下随便吃点,可是连盐都没有放的辣椒炒蛋实在是令人不快...
  4. 今天研究 Client本来是关联的Expression接口,笔记记录一下。
  5. Linux环境编译时报错/lib64/libdl.so.2: could not read symbols: Invalid operation
  6. I00009 用1生成回文数
  7. Linux系统命令行中vim编辑器取消高亮显示
  8. 科普:什么是人工智能
  9. java extjs combobox_Extjs 教程三 “combobox”
  10. 红警win10黑屏和不显示菜单栏问题 只有声音没有图像的解决
  11. 利用ode45求解含控制量并且控制量为离散点的动力学方程
  12. 无人机在计算机专业的应用,嵌入式计算机在无人机系统的应用
  13. 产品架构图到底是怎么“画”出来的?
  14. 链表之反转链表,万金油的解题方法(java求解)
  15. 10进制转2进制,js实现
  16. JS分子结构编辑器:基于SMILES来绘制分子结构,类似于Marvin JS
  17. 基于SpringBoot+Vue 实现的OA自动化办公系统
  18. 科研项目管理系统——应用效果
  19. Yimin Xiao
  20. 【转】FLASH经典问答

热门文章

  1. pv=nrt_中学物理之pV=nRT应用总结篇
  2. 南非数字货币应用潜力巨大 小试牛刀审慎探索
  3. 矿大计算机考研上岸分数,2021--2022中国矿业大学动力工程考研上岸心得及分数线报录比...
  4. 中国石油大学《化工原理二》第三阶段在线作业
  5. zk-03-Zookeeper部署和运行
  6. 【转】我在赶集网工作的两个月总结
  7. 如何将计算机桌面屏幕放大,怎么能把电脑屏幕放大
  8. C语言 打飞机 小游戏
  9. FPGA|通过AS下载固化
  10. ZK-SNARKS | 创建第一个零知识snark电路