文章目录

  • 一 DWD层需求分析及实现思路
    • 1 分层需求分析
    • 2 每层的职能
    • 3 DWD层职能详细介绍
      • (1)用户行为日志数据
      • (2)业务数据
        • 4 DWD层数据准备实现思路
  • 二 环境搭建
    • 1 创建maven工程
    • 2 修改配置文件
      • (1)添加依赖
      • (2) 添加配置文件
    • 3 创建如下包结构
  • 三 准备用户行为日志-DWD层
    • 1 主要任务
    • 2 分区、分组和分流
    • 3 代码实现
      • (1)接收Kafka数据,并进行转换
        • a 封装操作Kafka的工具类,并提供获取kafka消费者的方法(读)
        • b Flink调用工具类读取数据的主程序
        • c 测试

一 DWD层需求分析及实现思路

1 分层需求分析

建设实时数仓的目的,主要是增加数据计算的复用性。每次新增加统计需求时,不至于从原始数据进行计算,而是从半成品继续加工而成。

这里从kafka的ods层读取用户行为日志以及业务数据,并进行简单处理,写回到kafka作为dwd层。

2 每层的职能

分层 数据描述 生成计算工具 存储媒介
ODS 原始数据,日志和业务数据 日志服务器,maxwell kafka
DWD 根据数据对象为单位进行分流,比如订单、页面访问等等。 FLINK kafka
DWM 对于部分数据对象进行进一步加工,比如独立访问、跳出行为。依旧是明细数据。 FLINK kafka
DIM 维度数据 FLINK HBase
DWS 根据某个维度主题将多个事实数据轻度聚合,形成主题宽表。 FLINK Clickhouse
ADS 把Clickhouse中的数据根据可视化需要进行筛选聚合。 Clickhouse SQL 可视化展示

3 DWD层职能详细介绍

(1)用户行为日志数据

根据日志的不同类别做分流。

前端埋点中的数据全部放在kafka中ods_base_log主题中,如启动日志,页面访问日志,曝光日志等。虽然同是日志,但是却分为不同的种类,将来做数据统计时,全部从这一个主题中获取数据不方便。所以需要从ods_base_log主题中将数据取出来,根据日志的类型,将不同类型的数据放到不同的主题中,进行分流操作,如启动日志放到启动主题中,曝光日志放到曝光主题中,页面日志放到日志主题中。

(2)业务数据

根据业务数据的类型(维度 or 事实)做分流。

MySQL存储的业务数据中有很多张表,这些表分为两类,一类是事实表,一类是维度表。在采集数据时,只要业务数据发生变化就会通过maxwell采集到kafka的ods_base_db_m主题中,并没有区分事实和维度。如果是事实数据,希望将其放到kafka的不同单独主题中,如订单主题,订单明细主题,支付主题等。对于维度数据,不适合存放在kafka中,kafka不适合做长期存储,默认存储7天。海量数据的分析计算,同样不适合存放到MySQL中,因为在做分析计算时要不停的进行查询操作,给业务数据库造成很大的压力,且MySQL对于大量数据的查询,性能也较差。

在使用维度数据时,需要根据维度id查询出具体的数据,K-V型数据库比较适合存储维度数据,根据K获取V效率较高,KV数据库包括Redis和Hbase,Redis对于长期存储压力比较大,最终选择Hbase存储维度数据。

4 DWD层数据准备实现思路

  • 功能1:环境搭建。
  • 功能2:计算用户行为日志DWD层。
  • 功能3:计算业务数据DWD层。

二 环境搭建

1 创建maven工程

创建maven工程,gmall2022-realtime。

2 修改配置文件

(1)添加依赖

<properties><java.version>1.8</java.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target><flink.version>1.12.0</flink.version><scala.version>2.12</scala.version><hadoop.version>3.1.3</hadoop.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.68</version></dependency><!--如果保存检查点到hdfs上,需要引入此依赖--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,这里使用log4j作为具体的日志实现--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

(2) 添加配置文件

在resources目录下创建log4j.properties配置文件

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

3 创建如下包结构

目录 作用
app 产生各层数据的flink任务
bean 数据对象
common 公共常量
utils 工具类

三 准备用户行为日志-DWD层

前面采集的日志数据已经保存到Kafka中,作为日志数据的ODS层,从kafka的ODS层读取的日志数据分为3类,页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回Kafka不同主题中,作为日志DWD层。

页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流。

1 主要任务

  • 识别新老用户:本身客户端业务有新老用户的标识,但是不够准确,需要用实时计算再次确认(不涉及业务操作,只是单纯的做个状态确认)。
  • 利用侧输出流实现数据拆分:根据日志数据内容,将日志数据分为3类, 页面日志、启动日志和曝光日志。页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光日志侧输出流。
  • 将不同流的数据推送下游的kafka的不同Topic中。

整体流程如下图:

2 分区、分组和分流

三者之间的关系和区别如下图:

3 代码实现

(1)接收Kafka数据,并进行转换

a 封装操作Kafka的工具类,并提供获取kafka消费者的方法(读)

/*** 操作kafka工具类*/
public class MyKafkaUtil {private static final String KAFKA_SERVER = "hadoop101:9092,hadoop102:9092,hadoop103:9092";// 获取kafka的消费者public static FlinkKafkaConsumer<String> getKafkaSource(String topic,String groupId){Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);// 定义消费者组props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),props);}
}

b Flink调用工具类读取数据的主程序

/*** 对日志数据进行分流操作*  启动、曝光、页面*    启动日志放到启动侧输出流中*    曝光日志放到曝光侧输出流中*    页面日志放到主流中*  将不同流的数据写回到kafka的dwd主题中*/
public class BaseLogApp {public static void main(String[] args) throws Exception{// TODO 1 基本环境准备// 流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(4);// TODO 2 检查点相关设置// 开启检查点// 每5S中开启一次检查点,检查点模式为EXACTLY_ONCEenv.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);//   设置检查点超时时间env.getCheckpointConfig().setCheckpointTimeout(60000L);//   设置重启策略// 重启三次,每次间隔3s钟env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));//   设置job取消后,检查点是否保留env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//   设置状态后端 -- 基于内存 or 文件系统 or RocksDB//   内存:状态存在TaskManager内存中,检查点存在JobManager内存中//   文件系统:状态存在TaskManager内存中,检查点存在指定的文件系统路径中//   RocksDB:看做和Redis类似的数据库,状态存在TaskManager内存中,检查点存在JobManager内存和本地磁盘上//   hadoop中nm的地址env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));//   指定操作HDFS的用户System.setProperty("HADOOP_USER_NAME","hzy");// TODO 3 从kafka读取数据// 声明消费的主题和消费者组String topic = "ods_base_log";String groupId = "base_log_app_group";// 获取kafka消费者FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);// 读取数据,封装为流DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);// TODO 4 对读取的数据进行结构的转换 jsonStr -> jsonObj
//        // 匿名内部类实现
//        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(
//                new MapFunction<String, JSONObject>() {//                    @Override
//                    public JSONObject map(String jsonStr) throws Exception {//                        return JSON.parseObject(jsonStr);
//                    }
//                }
//        );
//        // lambda表达式实现
//        kafkaDS.map(
//                jsonStr -> JSON.parse(jsonStr)
//        );// 方法的默认调用,注意导入的是alibaba JSON包SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);jsonObjDS.print(">>>");// TODO 5 修复新老访客状态// TODO 6 按照日志类型对日志进行分流// TODO 7 将不同流的数据写到kafka的dwd不同主题中env.execute();}
}

c 测试

# 启动zookeeper
# 启动kafka
# 启动采集服务
logger.sh start
# 启动nm以将检查点保存到hdfs上
start-dfs.sh
# 等待安全模式关闭,启动主程序,如果出现权限问题,可以将权限放开
hdfs dfs -chmod -R 777 /
# 或者增加以下代码到主程序中
System.setProperty("HADOOP_USER_NAME","hzy");
# 程序运行起来后,启动模拟生成日志数据jar包,在主程序中可以接收到数据

【实时数仓】DWD层需求分析及实现思路、idea环境搭建、实现DWD层处理用户行为日志的功能相关推荐

  1. 当 TiDB 与 Flink 相结合:高效、易用的实时数仓

    简介:利用实时数仓,企业可以实现实时 OLAP 分析.实时数据看板.实时业务监控.实时数据接口服务等用途.但想到实时数仓,很多人的第一印象就是架构复杂,难以操作与维护.而得益于新版 Flink 对 S ...

  2. 【统一数据开发平台】-OLAP分析平台和实时数仓实践和优化

    一.业务背景 BIGO 是一家面向海外的以短视频直播业务为主的公司, 目前公司的主要业务包括 BigoLive (全球直播服务),Likee (短视频创作分享平台),IMO (免费通信工具) 三部分, ...

  3. 各厂实时数仓案例大全

    目录 前言: 一.实时数仓建设目的 二.实时数仓建设方案 1. 滴滴顺风车实时数仓案例 2. 快手实时数仓场景化案例 3. 腾讯看点实时数仓案例 4. 有赞实时数仓案例 前言: 实时需求日趋迫切 目前 ...

  4. Flink SQL搭建实时数仓DWD层

    1.实时数仓DWD层 DWD是明细数据层,该层的表结构和粒度与原始表保持一致,不过需要对ODS层数据进行清洗.维度退化.脱敏等,最终得到的数据是干净的,完整的.一致的数据. (1)对用户行为数据解析. ...

  5. 02Flink实时数仓(尚硅谷)- DWD层数据准备

    文章目录 第1章 需求分析及实现思路 1.1 分层需求分析 1.2 每层的职能 1.3 DWD 层数据准备实现思路 第2章 功能 1:环境搭建 第3章 功能 2:准备用户行为日志 DWD 层 3.1 ...

  6. 数据仓库—stg层_数据仓库之Hive快速入门 - 离线实时数仓架构

    数据仓库VS数据库 数据仓库的定义: 数据仓库是将多个数据源的数据经过ETL(Extract(抽取).Transform(转换).Load(加载))理之后,按照一定的主题集成起来提供决策支持和联机分析 ...

  7. 【Flink实时数仓】数据仓库项目实战 《四》日志数据分流 【DWD】

    文章目录 [Flink实时数仓]数据仓库项目实战 <四>日志数据分流-流量域 [DWD] 1.流量域未经加工的事务事实表 1.1主要任务 1.1.1数据清洗(ETL) 1.1.2新老访客状 ...

  8. [电商实时数仓] 数据仓库建模过程分析

    文章目录 1.数据仓库概述 1.1 数据仓库概念 1.2 数据仓库核心架构 2.数据仓库建模概述 2.1 数据仓库建模的意义 2.2 数据仓库建模方法论 2.2.1 ER模型 2.2.2 维度模型 3 ...

  9. 美团买菜基于 Flink 的实时数仓建设

    摘要:本文整理自美团买菜实时数仓技术负责人严书,在 Flink Forward Asia 2022 实时湖仓专场的分享.本篇内容主要分为四个部分: 背景介绍 技术愿景和架构设计 典型场景.挑战与应对 ...

最新文章

  1. 【MM模块】Source Lists 货源清单
  2. UML学习总结(1)——UML学习入门
  3. 比较两个二维数组是否相等
  4. 3-5 获取命令行参数
  5. 分布式会话拦截器2 - 会话判断
  6. [Leedcode][JAVA][第4题][寻找两个正序数组中的中位数][二分查找][双指针]
  7. 【Hibernate框架开发之九】Hibernate 性能优化笔记!(遍历、一级/二级/查询/缓存/乐观悲观锁等优化算法)...
  8. python查询模块路径_Visual Studio 2017中的Python无法通过“搜索路径”查找模块
  9. 多项式拟合lm_R语言多项式回归
  10. 手把手教你使用Numpy、Matplotlib、Scipy等5个Python库
  11. Codeforces Round #342 (Div. 2)
  12. Linux 使用 shell 脚本处理字符串
  13. 新产品、新团队、新技术
  14. List中根据某个实体的属性去重
  15. android5开机动画耗时,Android 开机速度优化
  16. JS上传图片到七牛云
  17. 【python】自动发送微信消息或文件
  18. C++厘米和英寸的换算
  19. 【硬刚大数据】从零到大数据专家之Apache Doris篇
  20. cdr 2021 卸载

热门文章

  1. 美国宾州计算机学校,美国宾州有什么好大学
  2. 下载ERA5-Land数据及数据处理(每小时转日数据)
  3. 【混合云小知识】混合云应用场景包含哪些?
  4. mysql 覆盖写入_INSERT ON CONFLICT覆盖写入
  5. 招生考试之友2017文科理科
  6. java取当前周期、月初至月末、季度初至季度末日期
  7. MSP430晶振配置详解
  8. 搭建SpringBoot+Vue 项目 完整流程
  9. 强大的项目管理软件:OmniPlan Pro 4 mac中文版
  10. 单片机、芯片、arduino、树莓派、Jetson Nano、esp32、stm32