作者:关文选,花名云魄,阿里云E-MapReduce 高级开发工程师,专注于流式计算,Spark Contributor


1.背景介绍

PV/UV统计是流式分析一个常见的场景。通过PV可以对访问的网站做流量或热点分析,例如广告主可以通过PV值预估投放广告网页所带来的流量以及广告收入。另外一些场景需要对访问的用户作分析,比如分析用户的网页点击行为,此时就需要对UV做统计。
使用Spark Streaming SQL,并结合Redis可以很方便进行PV/UV的统计。本文将介绍通过Streaming SQL消费Loghub中存储的用户访问信息,对过去1分钟内的数据进行PV/UV统计,将结果存入Redis中。

2.准备工作

  • 创建E-MapReduce 3.23.0以上版本的Hadoop集群。

  • 下载并编译E-MapReduce-SDK包

git clone git@github.com:aliyun/aliyun-emapreduce-sdk.gitcd aliyun-emapreduce-sdkgit checkout -b master-2.x origin/master-2.xmvn clean package -DskipTests

编译完后, assembly/target目录下会生成emr-datasources_shaded_${version}.jar,其中${version}为sdk的版本。

  • 数据源

本文采用Loghub作为数据源,有关日志采集、日志解析请参考日志服务。

3.统计PV/UV

一般场景下需要将统计出的PV/UV以及相应的统计时间存入Redis。其他一些业务场景中,也会只保存最新结果,用新的结果不断覆盖更新旧的数据。以下首先介绍第一种情况的操作流程。

3.1启动客户端

命令行启动streaming-sql客户端

streaming-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar

也可以创建SQL语句文件,通过streaming-sql -f的方式运行。

3.1定义数据表

数据源表定义如下

CREATE TABLE loghub_source(user_ip STRING, __time__ TIMESTAMP)USING loghubOPTIONS(sls.project=${sls.project},sls.store=${sls.store},access.key.id=${access.key.id},access.key.secret=${access.key.secret},endpoint=${endpoint});

其中,数据源表包含user_ip和__time__两个字段,分别代表用户的IP地址和loghub上的时间列。OPTIONS中配置项的值根据实际配置。
结果表定义如下

CREATE TABLE redis_sinkUSING redisOPTIONS(table='statistic_info',host=${redis_host},key.column='user_ip');

其中,user_ip对应数据中的用户IP字段,配置项${redis_host}的值根据实际配置。

3.2创建流作业

CREATE SCAN loghub_scanON loghub_sourceUSING STREAMOPTIONS(watermark.column='__time__',watermark.delayThreshold='10 second');
CREATE STREAM jobOPTIONS(checkpointLocation=${checkpoint_location})INSERT INTO redis_sinkSELECT COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS intervalFROM loghub_scanGROUP BY TUMBLING(__time__, interval 1 minute), window;

4.3查看统计结果

最终的统计结果如下图所示

可以看到,每隔一分钟都会生成一条数据,key的形式为表名:interval,value为pv和uv的值。

3.4实现覆盖更新

将结果表的配置项key.column修改为一个固定的值,例如定义如下

CREATE TABLE redis_sinkUSING redisOPTIONS(table='statistic_info',host=${redis_host},key.column='statistic_type');

创建流作业的SQL改为

CREATE STREAM jobOPTIONS(checkpointLocation='/tmp/spark-test/checkpoint')INSERT INTO redis_sinkSELECT "PV_UV" as statistic_type,COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS intervalFROM loghub_scanGROUP BY TUMBLING(__time__, interval 1 minute), window;

最终的统计结果如下图所示

可以看到,Redis中值保留了一个值,这个值每分钟都被更新,value包含pv、uv和interval的值。

4.总结

本文简要介绍了使用Streaming SQL结合Redis实现流式处理中统计PV/UV的需求。后续文章,我将介绍Spark Streaming SQL的更多内容。

猜你喜欢

1、重磅|Spark Delta Lake 现在由Linux基金会托管,将成为数据湖的开放标准

2、云栖大会 | Apache Spark 3.0 和 Koalas 最新进展

3、原创干货 | 史上最全的大数据学习资源(Awesome Big Data)

4、Spark Delta Lake 0.4.0 发布,支持 Python API 和部分 SQL

过往记忆大数据微信群,请添加微信:fangzhen0219,备注【进群】

spark sql uv_使用Spark Streaming SQL进行PV/UV统计相关推荐

  1. spark sql uv_使用Spark Streaming SQL进行PV/UV统计-阿里云开发者社区

    作者:关文选,花名云魄,阿里云E-MapReduce 高级开发工程师,专注于流式计算,Spark Contributor 1.背景介绍 PV/UV统计是流式分析一个常见的场景.通过PV可以对访问的网站 ...

  2. 使用Spark Streaming SQL基于时间窗口进行数据统计

    1.背景介绍 流式计算一个很常见的场景是基于事件时间进行处理,常用于检测.监控.根据时间进行统计等系统中.比如埋点日志中每条日志记录了埋点处操作的时间,或者业务系统中记录了用户操作时间,用于统计各种操 ...

  3. Spark四大组件包括Spark Streaming、Spark SQL、Spark MLlib和Spark GraphX。

    Spark四大组件包括Spark Streaming.Spark SQL.Spark MLlib和Spark GraphX.它们的主要应用场景是: Spark Streaming: Spark Str ...

  4. 【Spark】一条 SQL 在 Apache Spark 之旅(上)

    1.概述 转载学习加深印象:一条 SQL 在 Apache Spark 之旅(上) Spark SQL 是 Spark 众多组件中技术最复杂的组件之一,它同时支持 SQL 查询和 DataFrame ...

  5. spark(day06-spark算法、Spark Sql)

    案例 处理u.data文件用户id 电影id 用户打分要求基于u.data文件,建立推荐系统模型,为789号用户推荐10部电影建模时,k的取值10~50之间,迭代次数:5~20次之间 λ:0.01~0 ...

  6. 学习笔记Spark(七)—— Spark SQL应用(2)—— Spark DataFrame基础操作

    二.Spark DataFrame基础操作 2.1.DataFrame DataFrame是一种不可变的分布式数据集,这种数据集被组织成指定的列,类似于关系数据库中的表. 数据集的每一列都带有名称和类 ...

  7. 学习笔记Spark(六)—— Spark SQL应用(1)—— Spark SQL简介、环境配置

    一.Spark SQL简介 1.1.Spark SQL特性 Spark SQL是Spark Core之上的一个组件,它引入了一个称为SchemaRDD的新- 数据抽象,它为结构化和半结构化数据提供支持 ...

  8. 如何查询spark版本_掌握Spark SQL中的查询执行

    了解您的查询计划 自从Spark 2.x以来,由于SQL和声明性DataFrame API,在Spark中查询数据已成为一种奢侈. 仅使用几行高级代码就可以表达非常复杂的逻辑并执行复杂的转换. API ...

  9. Spark学习笔记(7)---Spark SQL学习笔记

    Spark SQL学习笔记 Spark SQL学习笔记设计到很多代码操作,所以就放在github, https://github.com/yangtong123/RoadOfStudySpark/bl ...

最新文章

  1. trogan连接不上_解决连接不上网(Connection not connected).doc
  2. DT技术助力企业创新转型
  3. rust(64)-指针类型(1)
  4. 187. Repeated DNA Sequences重复的DNA子串序列
  5. 从框架源码中学习创建型设计模式
  6. cpu上干硅脂怎么清理_笔记本电脑散热硅脂的正确涂法
  7. vue component created没有触发_面试!面试!面试!vue常见面试题。
  8. c语言编写一个多位数的倒数
  9. 牛客SQL22 统计各个部门的工资记录数
  10. elasticsearch 随笔
  11. js中的call及apply
  12. 简单无聊的Minecraft主世界与地狱坐标转换器
  13. java中的约瑟夫问题_Java 解决约瑟夫问题
  14. 教你如何设置让Excel窗口总是在最前面
  15. 基于Open vSwitch搭建虚拟路由器
  16. svg动画 html,30个超棒的 SVG 动画展示【上篇】
  17. 根据出生日期获取农历信息
  18. 毫米和像素怎么换算_图片的像素和毫米之间的换算关系
  19. c#语言定义文档pdf,C#如何更改Word的语言设置.pdf
  20. BZOJ3730 震波+BZOJ4372 烁烁的游戏(动态点分治)

热门文章

  1. P4175 [CTSC2008]网络管理(整体二分)
  2. P3978 [TJOI2015]概率论(生成函数)
  3. Codeforces Round #606 (Div. 2, based on Technocup 2020 Elimination Round 4) 构造
  4. Codeforces Round #686 (Div. 3) F. Array Partition 二分 + 线段树
  5. HDU - 4497 GCD and LCM 数论gcd
  6. CF1479D Odd Mineral Resource
  7. The 2019 ICPC Asia Shanghai Regional Contest
  8. P2498 [SDOI2012]拯救小云公主
  9. L2-006 树的遍历
  10. [CQOI2012] 局部极小值(状压DP + 容斥 + 搜索)