1.架构说明

在上6节当中,我们已经完成了从ods层到dwd层的转换,包括日志数据和业务数据,下面我们开始做dwm层的任务。

DWM 层主要服务 DWS,因为部分需求直接从 DWD 层到DWS 层中间会有一定的计算量,而且这部分计算的结果很有可能被多个 DWS 层主题复用,所以部分 DWD 会形成一层 DWM,我们这里主要涉及业务:

  • 访问UV计算
  • 跳出明细计算
  • 订单宽表
  • 支付宽表

因为实时计算与离线不同,实时计算的开发和运维成本都是非常高的,要结合实际情况考虑是否有必要象离线数仓一样,建一个大而全的中间层。如果没有必要大而全,这时候就需要大体规划一下要实时计算出的指标需求了。把这些指标以主题宽表的形式输出就是我们的 DWS 层。

统计主题 需求指标 输出方式 计算来源 来源层级
访客 pv 可视化大屏 page_log直接可求 dwd
uv 可视化大屏 需要用page_log过滤去重 dwm
跳出率 可视化大屏 需要用page_log行为判断 dwm
进入页面数 可视化大屏 需要识别开始访问标识 dwd
连续访问时长 可视化大屏 page_log直接可求 dwd
商品 点击 多维分析 page_log直接可求 dwd
收藏 多维分析 收藏表 dwd
加入购物车 多维分析 购物车表 dwd
下单 可视化大屏 订单宽表 dwm
支付 多维分析 支付宽表 dwm
退款 多维分析 退款表 dwd
评论 多维分析 评论表 dwd
地区 pv 多维分析 page_log直接可求 dwd
uv 多维分析 需要page_log过滤去重 dwm
下单 可视化大屏 订单宽表 dwm
关键词 搜索关键词 可视化大屏 page_log直接可求 dwd
点击商品关键词 可视化大屏 商品主题下单再次聚合 dws
下单商品关键词 可视化大屏 商品主题下单再次聚合 dws

2. 访客UV计算

UV,全称是 Unique Visitor,即独立访客,对于实时计算中,也可以称为 DAU(Daily Active User),即每日活跃用户,因为实时计算中的uv通常是指当日的访客数。那么如何从用户行为日志中识别出当日的访客,那么有两点:

  • 其一,是识别出该访客打开的第一个页面,表示这个访客开始进入我们的应用
  • 其二,由于访客可以在一天中多次进入应用,所以我们要在一天的范围内进行去重

代码,新建任务UniqueVisitApp.java,我们要从kafka的ods层消费数据,主题为:dwd_page_log

package com.zhangbao.gmall.realtime.app.dwm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;/*** @author: zhangbao* @date: 2021/9/12 19:51* @desc: uv 计算**/
public class UniqueVisitApp {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//设置并行度env.setParallelism(4);//设置检查点env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointTimeout(60000);env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/uniqueVisit"));//指定哪个用户读取hdfs文件System.setProperty("HADOOP_USER_NAME","zhangbao");//从kafka读取数据源String sourceTopic = "dwd_page_log";String group = "unique_visit_app_group";FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);//数据转换SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.map(obj -> JSON.parseObject(obj));jsonObjDs.print("jsonObjDs >>>");try {env.execute("task uniqueVisitApp");} catch (Exception e) {e.printStackTrace();}}
}

测试从kafka消费数据

  • 启动服务:zk,kf,logger.sh ,hadoop
  • 运行任务:BaseLogTask.java,UniqueVisitApp.java
  • 执行日志生成服务器
  • 查看控制台输出

目前任务执行流程

UniqueVisitApp程序接收到的数据

{"common": {"ar": "440000","uid": "48","os": "Android 11.0","ch": "xiaomi","is_new": "0","md": "Sumsung Galaxy S20","mid": "mid_9","vc": "v2.1.134","ba": "Sumsung"},"page": {"page_id": "login","during_time": 4621,"last_page_id": "good_detail"},"ts": 1631460110000
}

3. 核心过滤流程

从kafka的ods层取出数据之后,就该做具体的uv处理了。

1.首先用 keyby 按照 mid 进行分组,每组表示当前设备的访问情况

2.分组后使用 keystate 状态,记录用户进入时间,实现 RichFilterFunction 完成过滤

3.重写 open 方法用来初始化状态

4.重写 filter 方法进行过滤

  • 可以直接筛掉 last_page_id 不为空的字段,因为只要有上一页,说明这条不是这个用户进入的首个页面。
  • 状态用来记录用户的进入时间,只要这个 lastVisitDate 是今天,就说明用户今天已经访问过了所以筛除掉。如果为空或者不是今天,说明今天还没访问过,则保留。
  • 因为状态值主要用于筛选是否今天来过,所以这个记录过了今天基本上没有用了,这里 enableTimeToLive 设定了 1 天的过期时间,避免状态过大。
package com.zhangbao.gmall.realtime.app.dwm;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.text.SimpleDateFormat;
import java.util.Date;/*** @author: zhangbao* @date: 2021/9/12 19:51* @desc: uv 计算**/public class UniqueVisitApp {public static void main(String[] args) {//webui模式,需要添加pom依赖StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//        StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment();//设置并行度env.setParallelism(4);//设置检查点env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointTimeout(60000);env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/uniqueVisit"));//指定哪个用户读取hdfs文件System.setProperty("HADOOP_USER_NAME","zhangbao");//从kafka读取数据源String sourceTopic = "dwd_page_log";String group = "unique_visit_app_group";FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);DataStreamSource<String> kafkaDs = env.addSource(kafkaSource);//数据转换SingleOutputStreamOperator<JSONObject> jsonObjDs = kafkaDs.map(obj -> JSON.parseObject(obj));//按照设备id分组KeyedStream<JSONObject, String> keyByMid = jsonObjDs.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid"));//过滤SingleOutputStreamOperator<JSONObject> filterDs = keyByMid.filter(new RichFilterFunction<JSONObject>() {ValueState<String> lastVisitDate = null;SimpleDateFormat sdf = null;@Overridepublic void open(Configuration parameters) throws Exception {//初始化时间sdf = new SimpleDateFormat("yyyyMMdd");//初始化状态ValueStateDescriptor<String> lastVisitDateDesc = new ValueStateDescriptor<>("lastVisitDate", String.class);//统计日活dau,状态数据保存一天,过一天即失效StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1)).build();lastVisitDateDesc.enableTimeToLive(stateTtlConfig);this.lastVisitDate = getRuntimeContext().getState(lastVisitDateDesc);}@Overridepublic boolean filter(JSONObject jsonObject) throws Exception {//上一个页面如果有值,则不是首次访问String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");if(lastPageId != null && lastPageId.length()>0){return false;}//获取用户访问日期Long ts = jsonObject.getLong("ts");String mid = jsonObject.getJSONObject("common").getString("mid");String lastDate = sdf.format(new Date(ts));//获取状态日期String lastDateState = lastVisitDate.value();if(lastDateState != null && lastDateState.length()>0 && lastDateState.equals(lastDate)){System.out.println(String.format("已访问! mid:%s,lastDate:%s",mid,lastDate));return false;}else {lastVisitDate.update(lastDate);System.out.println(String.format("未访问! mid:%s,lastDate:%s",mid,lastDate));return true;}}});filterDs.print("filterDs >>>");try {env.execute("task uniqueVisitApp");} catch (Exception e) {e.printStackTrace();}}
}

注:1.在测试时,发现uv没有数据,所以把BaseLogTask任务的侧输出流改一下,如下图所示:

​ 2.webui模式添加pom依赖

<!-- flink webui -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.12.0</version>
</dependency>

4. 测试

  • 启动zk,kafka,logger.sh,hdfs,BaseLogTask,UniqueVisitApp
  • 执行流程
    • 模拟生成的日志jar >> nginx >> 日志采集服务 >> kafka(ods) >> baseLogApp(分流) >> kafka(dwd) >> UniqueVisitApp(独立访客) >> dwm_unique_visit

经测试,流程已通。

7.Flink实时项目之独立访客开发相关推荐

  1. 【Flink】需求实现之独立访客数量的计算 和 布隆过滤器的原理及使用

    文章目录 一 独立访客数量计算 二 布隆过滤器 1 什么是布隆过滤器 2 实现原理 (1)HashMap 的问题 (2)布隆过滤器数据结构 3 使用布隆过滤器去重 一 独立访客数量计算 public ...

  2. java基础巩固-宇宙第一AiYWM:为了维持生计,做项目经验之~【多用户关注共同的参数的统计功能】开发总结、再来个独立访客(Unique Visitor,简称UV)统计番外篇~整起

    实验室项目来了一个新需求:因为我们那个系统主要是人家用下位机把一些路面参数.空气湿度等参数测出来之后发送到我们阿里云服务器上的pg数据库中,然后我们对数据进行页面上的展示.然后公司那边说有个需求,就是 ...

  3. 唯一身份访问者(独立访客)与访问次数的区别

    唯一身份访问者(独立访客)与访问次数的区别 在进行网站分析前,对网站分析的基本度量的了解是非常必要的,这样才会深入理解网站分析,否则就会不知所云. 定义: 1.唯一身份访问者(Unique Visit ...

  4. 网站独立访客数UV的统计--海量数据去重

    问题描述:统计每一小时的网站独立访客数UV(Unique Visitor) 问题背景:这是尚硅谷大数据技术之电商用户行为数据分析的一道例题,武晟然老师讲授的方法是,自定义布隆过滤器进行UV统计.受到老 ...

  5. 网站统计中的PV(访问量):UV(独立访客):IP(独立IP)的定义与区别

    --------首先来看看ip.uv和pv的定义---------- PV(访问量):即Page View, 即页面浏览量或点击量,用户每次刷新即被计算一次. UV(独立访客):即Unique Vis ...

  6. 独立IP、特产浏览量(PV)、访问次数(VV)、独立访客(UV)有什么区别?

    转自 http://blog.sina.com.cn/s/blog_a5fc76bb0101073a.html 访问次数(VV):记录所有访客1天内访问了多少次您的特产,相同的访客有可能多次访问您的特 ...

  7. 如何查看微信小程序的累计独立访客(UV)

    查看方式: 累计独立访客(UV)不低于 1000 点击左侧菜单栏的统计模块 3.选择数据看板-访问核心数据-,累计用户数就可以看到了 开通条件 累计独立访客(UV)不低于 1000 存在刷粉行为或有严 ...

  8. 4.Flink实时项目之数据拆分

    Python微信订餐小程序课程视频 https://edu.csdn.net/course/detail/36074 Python实战量化交易理财系统 https://edu.csdn.net/cou ...

  9. Java项目:springboot访客管理系统

    作者主页:夜未央5788 简介:Java领域优质创作者.Java项目.学习资料.技术互助 文末获取源码 项目介绍 springboot搭建的访客管理系统,针对高端基地做严格把控来访人员信息管理,用户后 ...

  10. 9.Flink实时项目之订单宽表

    1.需求分析 订单是统计分析的重要的对象,围绕订单有很多的维度统计需求,比如用户.地区.商品.品类.品牌等等.为了之后统计计算更加方便,减少大表之间的关联,所以在实时计算过程中将围绕订单的相关数据整合 ...

最新文章

  1. 近期活动盘点:工业大数据讲座、大数据自杀风险感知讲座、数据法学研讨会、海外学者短期讲学(12.3-12.13)
  2. 一次Binder通信最大可以传输多大的数据?
  3. WCF入门(三)——对象序列化
  4. python快速入门 pdf-十分钟快速入门python
  5. jvm性能调优实战 -57数据日志分析系统的OOM问题排查
  6. 干货 | 广电行业数字化时代的「数据破局」指南
  7. Maven插件之buildnumber-maven-plugin
  8. 关于c/c++/obj-c的混合使用 (2010-06-22 10:05:33)
  9. java ee maven_真正释放Maven和Java EE的强大功能
  10. Linux内核分析作业第八周
  11. if test 多条件_秒懂Python编程中的if __name__ == #x27;main#x27; 作用和原理
  12. RESTful Request:GET/PUT/DELETE/POST/HEAD/OPTIONS
  13. jstack会导致JVM停顿
  14. 文本数据抽取经验总结
  15. CocosBuilder 使用技巧
  16. 神逸之作:国产快速启动软件神品ALTRun
  17. 城市电子地图站点 推荐
  18. 天文观测理论——已知像素大小、焦距,求像素分辨率
  19. LinkedIn高级分析师王益:大数据时代的理想主义和现实主义(图灵访谈)
  20. 微信小程序九宫格抽奖

热门文章

  1. rhythmbox插件开发笔记1:简介入门
  2. 温故而知新(一)—— 再看RNN、LSTM、GRU
  3. Mqtt开发笔记:windows下C++ ActiveMQ客户端介绍、编译和使用
  4. 凛冬之翼---php写入数据库时汉字全部变为空白
  5. Cause: dx.jar is missing
  6. 我的美国CS面试经验分享
  7. 修改window本地hosts文件,修改域名指向
  8. EMQ荣获工信部第五届“绽放杯”5G 应用 征集大赛智慧金融专题一等奖
  9. 偏差(variation)的分类
  10. macbook air上安装ubuntu双系统