Flink SQL搭建实时数仓DWD层
1.实时数仓DWD层
DWD是明细数据层,该层的表结构和粒度与原始表保持一致,不过需要对ODS层数据进行清洗、维度退化、脱敏等,最终得到的数据是干净的,完整的、一致的数据。
(1)对用户行为数据解析。
(2)对核心数据进行空值过滤。
(3)对业务数据采集维度模型重新建模,即维度退化。
2.基于车辆出行的维度建模
3.基于Flink SQL搭建实时数仓DWD层
package com.bigdata.warehouse.dwd;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class DwdCarsLog {
public static void main(String[] args) {
//1.获取Stream的执行环境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
//senv.setParallelism(1);
//开启checkpoint容错
//senv.enableCheckpointing(60000);
//senv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//senv.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
//senv.getCheckpointConfig().setCheckpointTimeout(10000);
//senv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//设置状态后端
//(1)开启RocksDB
//senv.setStateBackend(new EmbeddedRocksDBStateBackend());
//(2)设置checkpoint 存储
//senv.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://mycluster/flink/checkpoints"));
//2.创建表执行环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
//3.读取车辆进出事实表
tEnv.executeSql("CREATE TABLE ods_cars_log (" +
" id STRING," +
" opTime STRING," +
" ctype SMALLINT," +
" carCode STRING," +
" cId BIGINT," +
" proc_time as PROCTIME() "+
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'ods_cars_log'," +
" 'properties.bootstrap.servers' = 'hadoop1:9092'," +
" 'properties.group.id' = 'ods_cars_log'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json'" +
")");
//4.读取车辆维度表
tEnv.executeSql("CREATE TABLE dim_base_cars ( " +
" id INT, " +
" owerId INT, " +
" carCode STRING, " +
" carColor STRING, " +
" type TINYINT, " +
" remark STRING, " +
" PRIMARY KEY(id) NOT ENFORCED " +
") WITH ( " +
" 'connector' = 'jdbc', " +
" 'url' = 'jdbc:mysql://hadoop1:3306/sca?useUnicode=true&characterEncoding=utf8', " +
" 'table-name' = 'dim_base_cars', " +
" 'username' = 'hive', " +
" 'password' = 'hive' " +
")");
//5.关联事实表与维度表获取车辆进出明细表
Table resultTable = tEnv.sqlQuery("select " +
"cl.id, " +
"c.owerId, " +
"cl.opTime, " +
"cl.cId, " +
"cl.carCode, " +
"cl.ctype " +
"from ods_cars_log cl " +
"left join dim_base_cars for system_time as of cl.proc_time as c " +
"on cl.carCode=c.carCode");
tEnv.createTemporaryView("resultTable",resultTable);
//6.创建dwd_cars_log表
tEnv.executeSql("CREATE TABLE dwd_cars_log ( " +
" id STRING, " +
" owerId INT, " +
" opTime STRING, " +
" cId BIGINT, " +
" carCode STRING, " +
" ctype SMALLINT, " +
" PRIMARY KEY (id) NOT ENFORCED " +
") WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = 'dwd_cars_log', " +
" 'properties.bootstrap.servers' = 'hadoop1:9092', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")");
//7.将关联结果写入dwd_cars_log表
tEnv.executeSql("insert into dwd_cars_log select * from resultTable");
}
}
4.基于Kafka创建DWD层topic
#创建kafka topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic dwd_cars_log --replication-factor 3 --partitions 1
5.查看实时数仓DWD层结果
#消费kafka topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dwd_cars_log --from-beginning
如果控制台打印预期结果,说明实时数仓DWD层搭建成功。
{"id":"3bfe7e59-4771-4aa8-ab90-80c98010c4ea","owerId":10022759,"opTime":"2022-07-15 11:59:55.443","cId":10000095,"carCode":"青I·PY2MR","ctype":2}
{"id":"36208b62-739b-4eea-abf4-9f26b85b85d1","owerId":10075672,"opTime":"2022-07-15 11:59:56.443","cId":10000311,"carCode":"渝Z·C0AFY","ctype":1}
{"id":"2a5df539-4668-4a42-8013-978b82b3c318","owerId":10126156,"opTime":"2022-07-15 11:59:57.443","cId":10000526,"carCode":"晋B·1RPVV","ctype":1}
{"id":"2bd0ce39-1c39-4db5-9376-68e297fda4b0","owerId":10206773,"opTime":"2022-07-15 11:59:58.443","cId":10000843,"carCode":"冀D·FX3IJ","ctype":2}
{"id":"2959544d-53f9-43e4-9101-96629fecdcc6","owerId":10153485,"opTime":"2022-07-15 11:59:59.443","cId":10000631,"carCode":"晋D·8OWIR","ctype":2}
{"id":"2fd665f9-ea27-44fd-a8cd-1f204ab2d5fc","owerId":10152560,"opTime":"2022-07-15 12:00:00.099","cId":10000627,"carCode":"贵A·MVO77","ctype":2}
{"id":"3c283bc5-5616-43cf-87b2-c94396ced64f","owerId":10103872,"opTime":"2022-07-15 12:00:01.037","cId":10000425,"carCode":"辽L·3C5DU","ctype":1}
{"id":"3634862d-c824-4829-a017-0082b7514471","owerId":10234908,"opTime":"2022-07-15 12:00:02.376","cId":10000961,"carCode":"沪T·QNNXP","ctype":1}
{"id":"2b4a4d0f-4441-4e75-8437-008dfea5c03c","owerId":10228881,"opTime":"2022-07-15 12:00:03.33","cId":10000938,"carCode":"闽E·GZKRQ","ctype":2}
{"id":"2ce336bc-2b31-4089-ae85-a76921c6a306","owerId":10144509,"opTime":"2022-07-15 12:00:04.819","cId":10000596,"carCode
Flink SQL搭建实时数仓DWD层相关推荐
- 个推基于Flink SQL建设实时数仓实践
作为一家数据智能企业,个推在服务垂直行业客户的过程中,会涉及到很多数据实时计算和分析的场景,比如在服务开发者时,需要对App消息推送的下发数.到达数.打开率等后效数据进行实时统计:在服务政府单位时,需 ...
- 基于 Flink 的严选实时数仓实践
https://www.infoq.cn/article/Lrg1J4*tWOak2WLqKyhF 今天分享的内容主要分为四个部分,首先会介绍下严选实时数仓的背景.产生的一些问题.然后是针对这些背景和 ...
- 来电科技:基于 Flink + Hologres 的实时数仓演进之路
简介: 本文将会讲述共享充电宝开创企业来电科技如何基于 Flink + Hologres 构建统一数据服务加速的实时数仓 作者:陈健新,来电科技数据仓库开发工程师,目前专注于负责来电科技大数据平台离线 ...
- 来电科技:基于Flink+Hologres的实时数仓演进之路
简介: 本文将会讲述共享充电宝开创企业来电科技如何基于Flink+Hologres构建统一数据服务加速的实时数仓 作者:陈健新,来电科技数据仓库开发工程师,目前专注于负责来电科技大数据平台离线和实时架 ...
- 应用实践 | 海量数据,秒级分析!Flink+Doris 构建实时数仓方案
作者 : 苏浩 业务背景 Advance Intelligence Group(领创集团)成立于 2016 年,是一家以 AI 技术驱动的科技集团,致力于通过科技创新的本地化应用,改造和重塑金融和零售 ...
- 网易云音乐基于 Flink + Kafka 的实时数仓建设实践
简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍 Flink + Kafka 在网易云音乐的应用实战: 背景.Flink + Kafka 平台化设计.Kafka 在 ...
- flink 写kafka_网易云音乐基于 Flink + Kafka 的实时数仓建设实践
简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍 Flink + Kafka 在网易云音乐的应用实战: 背景 Flink + Kafka 平台化设计 Kafka 在 ...
- flink source 同步_网易云音乐基于 Flink + Kafka 的实时数仓建设实践
简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍 Flink + Kafka 在网易云音乐的应用实战: 背景.Flink + Kafka 平台化设计.Kafka 在 ...
- Flink从入门到精通100篇(八)-美团点评是如何在 Flink平台建立 实时数仓的?
前言 本文根据 Apache Flink 系列直播整理而成,由美团点评数据系统研发工程师黄伟伦老师分享.主要内容如下: 实时数仓建设目的 如何建立实时数仓 仓库质量保证 实时数仓建设目的 实时数仓是一 ...
最新文章
- 微信小程序模板template
- Design Pattern - Facade(C#)
- python—多线程之守护线程
- 计算机约束措施是参考,【计算机应用论文】高校计算机机房管理问题与改进策略(共4460字)...
- 51nod 1717 好数 (水题)
- Python手册 3.7
- Struts--result详解
- 【自考总结】烟火里的尘埃
- MongoDB 常用shell命令汇总
- dcp7080d怎么加墨粉_兄弟打印机DCP,7080D提示更换墨粉该怎么办?|7080D打印机
- 【算法系列】-开根号
- ReadyBoost--Robbie 的基准测试
- 第三方支付接口有哪些?怎么申请?
- U盘灯狂闪,读不到盘
- PS精修电影胶片调色特效[附素材及方法]
- 小米蓝牙键盘怎么连接_小米2手机连接使用蓝牙键盘和蓝牙鼠标教程(原创)
- Flume从入门实战到精通再到面试一文搞定
- 基于51单片机16*32点阵显示屏的设计与实现
- Ineffective mark-compacts near heap limit Allocation failed-JavaScript heap out of memory vue项目内存溢出
- Postman的应用——入门应用
热门文章
- 设计美学 第三章 设计美的文化差异
- ESP32 连接到免费的公共 MQTT 服务器
- 【Java开发语言 01】第一章 Java语言概述(基础常识+Java语言概述+Java程序运行机制及运行过程+Java语言环境的搭建+开发体验hello world+错误:编码GBK的不可映射字符)
- 苹果手机电池怎么保养_苹果手机信号差是网络问题还是手机问题
- 电商新系统如何应对峰值
- POWER DESIGNER导出数据字典
- Android UnitTest
- 表格table标签的属性及使用方式
- RS485为什么需要隔离?什么情况下可以不用隔离?
- Plsql ORA-00054的解决方法