文章目录

  • 一 DWS层-商品主题计算
    • 1 把JSON字符串数据流转换为统一数据对象的数据流
      • (1)转换订单宽表流数据
      • (2)转换支付宽表流数据
    • 2 把统一的数据结构流合并为一个流
      • (1)代码
      • (2)测试
    • 3 设定事件时间与水位线
    • 4 分组、开窗、聚合
    • 5 补充商品维度信息
      • (1)关联商品维度
      • (2)关联SPU维度
      • (3)关联品类维度
      • (4)关联品牌维度
      • (5)测试
    • 6 写入ClickHouse
      • (1)在ClickHouse中创建商品主题宽表
      • (2)为主程序增加写入ClickHouse的Sink
      • (3)整体测试
  • 二 DWS层-地区主题表(FlinkSQL)
    • 1 需求分析与思路
    • 2 在pom.xml文件中添加FlinkSQL相关依赖
    • 3 创建ProvinceStatsSqlApp,定义Table流环境
    • 4 MyKafkaUtil增加一个DDL的方法
    • 5 把数据源定义为动态表并指定水位线
      • (1)指定`WATERMARK`
      • (2)系统内置函数
      • (3)给计算列起别名
      • (4)完整代码

一 DWS层-商品主题计算

1 把JSON字符串数据流转换为统一数据对象的数据流

(1)转换订单宽表流数据

// 4.6 转换订单宽表流数据
SingleOutputStreamOperator<ProductStats> orderWideStatsDS = orderWideStrDS.map(new MapFunction<String, ProductStats>() {@Overridepublic ProductStats map(String jsonStr) throws Exception {OrderWide orderWide = JSON.parseObject(jsonStr, OrderWide.class);ProductStats productStats = ProductStats.builder().sku_id(orderWide.getSku_id()).order_sku_num(orderWide.getSku_num()).order_amount(orderWide.getSplit_total_amount()).ts(DateTimeUtil.toTs(orderWide.getCreate_time())).orderIdSet(new HashSet(Collections.singleton(orderWide.getOrder_id()))).build();return productStats;}}
);

(2)转换支付宽表流数据

// 4.7 转换支付宽表流数据
SingleOutputStreamOperator<ProductStats> paymentWideStatsDS = paymentWideStrDS.map(new MapFunction<String, ProductStats>() {@Overridepublic ProductStats map(String jsonStr) throws Exception {PaymentWide paymentWide = JSON.parseObject(jsonStr, PaymentWide.class);ProductStats productStats = ProductStats.builder().sku_id(paymentWide.getSku_id()).payment_amount(paymentWide.getSplit_total_amount()).paidOrderIdSet(new HashSet(Collections.singleton(paymentWide.getOrder_id()))).ts(DateTimeUtil.toTs(paymentWide.getCallback_time())).build();return productStats;}}
);

2 把统一的数据结构流合并为一个流

(1)代码

// TODO 5 将不同流的数据通过union合并到一起
DataStream<ProductStats> unionDS = clickAndDisplayStatsDS.union(favorStatsDS,cartStatsDS,refundStatsDS,commentStatsDS,orderWideStatsDS,paymentWideStatsDS
);unionDS.print(">>>");

(2)测试

  • 启动ZK、Kafka、logger.sh、ClickHouse、Redis、HDFS、Hbase、Maxwell

    redis-server /home/hzy/redis2022.conf
    sudo systemctl start clickhouse-server
    
  • 运行BaseLogApp

  • 运行BaseDBApp

  • 运行OrderWideApp

  • 运行PaymentWideApp

  • 运行ProductsStatsApp

  • 运行rt_applog目录下的jar包(可以获取到曝光 display_ct和点击数据click_ct)

  • 运行rt_dblog目录下的jar包(可以获取到sku_id、cart_ct、favor_ct,下单数量,下单金额、orderIdSet等)

  • 查看控制台输出(如电脑性能不够,可以将日志和业务两条线分开测试)

3 设定事件时间与水位线

// TODO 6 指定watermark以及提取时间时间字段
SingleOutputStreamOperator<ProductStats> productStatsWithWatermarkDS = unionDS.assignTimestampsAndWatermarks(WatermarkStrategy.<ProductStats>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<ProductStats>() {@Overridepublic long extractTimestamp(ProductStats productStats, long recordTimestamp) {return productStats.getTs();}})
);

4 分组、开窗、聚合

// TODO 7 分组 -- 按照商品id分组
KeyedStream<ProductStats, Long> keyedDS = productStatsWithWatermarkDS.keyBy(ProductStats::getSku_id);// TODO 8 开窗
WindowedStream<ProductStats, Long, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.seconds(10)));// TODO 9 聚合计算
SingleOutputStreamOperator<ProductStats> reduceDS = windowDS.reduce(new ReduceFunction<ProductStats>() {@Overridepublic ProductStats reduce(ProductStats stats1, ProductStats stats2) throws Exception {stats1.setDisplay_ct(stats1.getDisplay_ct() + stats2.getDisplay_ct());stats1.setClick_ct(stats1.getClick_ct() + stats2.getClick_ct());stats1.setCart_ct(stats1.getCart_ct() + stats2.getCart_ct());stats1.setFavor_ct(stats1.getFavor_ct() + stats2.getFavor_ct());stats1.setOrder_amount(stats1.getOrder_amount().add(stats2.getOrder_amount()));stats1.getOrderIdSet().addAll(stats2.getOrderIdSet());stats1.setOrder_ct(stats1.getOrderIdSet().size() + 0L);stats1.setOrder_sku_num(stats1.getOrder_sku_num() + stats2.getOrder_sku_num());stats1.setPayment_amount(stats1.getPayment_amount().add(stats2.getPayment_amount()));stats1.getRefundOrderIdSet().addAll(stats2.getRefundOrderIdSet());stats1.setRefund_order_ct(stats1.getRefundOrderIdSet().size() + 0L);stats1.setRefund_amount(stats1.getRefund_amount().add(stats2.getRefund_amount()));stats1.getPaidOrderIdSet().addAll(stats2.getPaidOrderIdSet());stats1.setPaid_order_ct(stats1.getPaidOrderIdSet().size() + 0L);stats1.setComment_ct(stats1.getComment_ct() + stats2.getComment_ct());stats1.setGood_comment_ct(stats1.getGood_comment_ct() + stats2.getGood_comment_ct());return stats1;}},new ProcessWindowFunction<ProductStats, ProductStats, Long, TimeWindow>() {@Overridepublic void process(Long aLong, Context context, Iterable<ProductStats> elements, Collector<ProductStats> out) throws Exception {for (ProductStats productStats : elements) {productStats.setStt(DateTimeUtil.toYMDHMS(new Date(context.window().getStart())));productStats.setEdt(DateTimeUtil.toYMDHMS(new Date(context.window().getEnd())));productStats.setTs(new Date().getTime());out.collect(productStats);}}}
);

5 补充商品维度信息

因为除了下单操作之外,其它操作,只获取到了商品的id,其它维度信息是没有的。

(1)关联商品维度

SingleOutputStreamOperator<ProductStats> productStatsWithSkuDS = AsyncDataStream.unorderedWait(reduceDS,new DimAsyncFunction<ProductStats>("DIM_SKU_INFO") {@Overridepublic void join(ProductStats productStats, JSONObject dimJsonObj) throws Exception {productStats.setSku_name(dimJsonObj.getString("SKU_NAME"));productStats.setSku_price(dimJsonObj.getBigDecimal("PRICE"));productStats.setCategory3_id(dimJsonObj.getLong("CATEGORY3_ID"));productStats.setSpu_id(dimJsonObj.getLong("SPU_ID"));productStats.setTm_id(dimJsonObj.getLong("TM_ID"));}@Overridepublic String getKey(ProductStats productStats) {return productStats.getSku_id().toString();}},60, TimeUnit.SECONDS
);

(2)关联SPU维度

SingleOutputStreamOperator<ProductStats> productStatsWithSpuDS =AsyncDataStream.unorderedWait(productStatsWithSkuDS,new DimAsyncFunction<ProductStats>("DIM_SPU_INFO") {@Overridepublic void join(ProductStats productStats, JSONObject jsonObject) throws Exception {productStats.setSpu_name(jsonObject.getString("SPU_NAME"));}@Overridepublic String getKey(ProductStats productStats) {return String.valueOf(productStats.getSpu_id());}}, 60, TimeUnit.SECONDS);

(3)关联品类维度

SingleOutputStreamOperator<ProductStats> productStatsWithCategory3DS =AsyncDataStream.unorderedWait(productStatsWithSpuDS,new DimAsyncFunction<ProductStats>("DIM_BASE_CATEGORY3") {@Overridepublic void join(ProductStats productStats, JSONObject jsonObject) throws Exception {productStats.setCategory3_name(jsonObject.getString("NAME"));}@Overridepublic String getKey(ProductStats productStats) {return String.valueOf(productStats.getCategory3_id());}}, 60, TimeUnit.SECONDS);

(4)关联品牌维度

SingleOutputStreamOperator<ProductStats> productStatsWithTmDS =AsyncDataStream.unorderedWait(productStatsWithCategory3DS,new DimAsyncFunction<ProductStats>("DIM_BASE_TRADEMARK") {@Overridepublic void join(ProductStats productStats, JSONObject jsonObject) throws Exception {productStats.setTm_name(jsonObject.getString("TM_NAME"));}@Overridepublic String getKey(ProductStats productStats) {return String.valueOf(productStats.getTm_id());}}, 60, TimeUnit.SECONDS);productStatsWithTmDS.print(">>>");

(5)测试

  • 运行rt_applog目录下的jar包。

  • 运行rt_dblog目录下的jar包,执行两次以改变水位线,触发窗口提交操作。

6 写入ClickHouse

(1)在ClickHouse中创建商品主题宽表

create table product_stats_2022 (stt DateTime,edt DateTime,sku_id  UInt64,sku_name String,sku_price Decimal64(2),spu_id UInt64,spu_name String ,tm_id UInt64,tm_name String,category3_id UInt64,category3_name String ,display_ct UInt64,click_ct UInt64,favor_ct UInt64,cart_ct UInt64,order_sku_num UInt64,order_amount Decimal64(2),order_ct UInt64 ,payment_amount Decimal64(2),paid_order_ct UInt64,refund_order_ct UInt64,refund_amount Decimal64(2),comment_ct UInt64,good_comment_ct UInt64 ,ts UInt64
)engine =ReplacingMergeTree( ts)partition by  toYYYYMMDD(stt)order by   (stt,edt,sku_id );

(2)为主程序增加写入ClickHouse的Sink

// TODO 11 将结果写入到ClickHouse
productStatsWithTmDS.addSink(ClickhouseUtil.getJdbcSink("insert into product_stats_2022 values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
);

(3)整体测试

  • 启动ZK、Kafka、logger.sh、ClickHouse、Redis、HDFS、Hbase、Maxwell
  • 运行BaseLogApp
  • 运行BaseDBApp
  • 运行OrderWideApp
  • 运行PaymentWideApp
  • 运行ProductsStatsApp
  • 运行rt_applog目录下的jar包
  • 运行rt_dblog目录下的jar包
  • 查看控制台输出
  • 查看ClickHouse中products_stats_2022表数据

注意:一定要匹配两个数据生成模拟器的日期,否则窗口无法匹配上。

二 DWS层-地区主题表(FlinkSQL)

统计主题 需求指标 输出方式 计算来源 来源层级
地区 pv 多维分析 page_log直接可求 dwd
uv 多维分析 需要用page_log过滤去重 dwm
下单(单数,金额) 可视化大屏 订单宽表 dwm

地区主题主要是反映各个地区的销售情况。从业务逻辑上地区主题比起商品更加简单,业务逻辑也没有什么特别的就是做一次轻度聚合然后保存,所以使用flinkSQL,来完成该业务。

1 需求分析与思路

  • 定义Table流环境
  • 把数据源定义为动态表
  • 通过SQL查询出结果表
  • 把结果表转换为数据流
  • 把数据流写入目标数据库

如果是Flink官方支持的数据库,也可以直接把目标数据表定义为动态表,用insert into 写入。由于ClickHouse目前官方没有支持的jdbc连接器(目前支持Mysql、 PostgreSQL、Derby)。也可以制作自定义sink,实现官方不支持的连接器。但是比较繁琐。

2 在pom.xml文件中添加FlinkSQL相关依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.version}</artifactId><version>${flink.version}</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.version}</artifactId><version>${flink.version}</version>
</dependency>

3 创建ProvinceStatsSqlApp,定义Table流环境

package com.hzy.gmall.realtime.app.dws;
/*** 地区主题统计 -- SQL*/
public class ProvinceStatsSqlApp {public static void main(String[] args) throws Exception {// TODO 1 环境准备// 1.1 流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1.2 表执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1.3 设置并行度env.setParallelism(4);// TODO 2 检查点相关设置(略)env.execute();}
}

4 MyKafkaUtil增加一个DDL的方法

public static String getKafkaDDL(String topic,String groupId){String ddl = "'connector' = 'kafka'," +"  'topic' = '"+topic+"'," +"  'properties.bootstrap.servers' = '"+KAFKA_SERVER+"'," +"  'properties.group.id' = '"+groupId+"'," +"  'scan.startup.mode' = 'latest-offset'," +"  'format' = 'json'";return ddl;
}

5 把数据源定义为动态表并指定水位线

(1)指定WATERMARK

WATERMARK 定义了表的事件时间属性,其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。相当于是API格式中的提取时间字段操作。

watermark_strategy_expression 定义了 watermark 的生成策略。它允许使用包括计算列在内的任意非查询表达式来计算 watermark ;表达式的返回类型必须是 TIMESTAMP(3),表示了从 Epoch 以来的经过的时间。 返回的 watermark 只有当其不为空且其值大于之前发出的本地 watermark 时才会被发出(以保证 watermark 递增)。每条记录的 watermark 生成表达式计算都会由框架完成。 框架会定期发出所生成的最大的 watermark ,如果当前 watermark 仍然与前一个 watermark 相同、为空、或返回的 watermark 的值小于最后一个发出的 watermark ,则新的 watermark 不会被发出。 Watermark 根据 pipeline.auto-watermark-interval 中所配置的间隔发出。 若 watermark 的间隔是 0ms ,那么每条记录都会产生一个 watermark,且 watermark 会在不为空并大于上一个发出的 watermark 时发出。

使用事件时间语义时,表必须包含事件时间属性和 watermark 策略。

Flink 提供了几种常用的 watermark 策略。

  • 严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column

    发出到目前为止已观察到的最大时间戳的 watermark ,时间戳大于最大时间戳的行被认为没有迟到。

  • 递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

    发出到目前为止已观察到的最大时间戳减 1 的 watermark ,时间戳大于或等于最大时间戳的行被认为没有迟到。相当于是API格式中的单调递增策略。

  • 有界乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit

    发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark ,例如, WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND 是一个 5 秒延迟的 watermark 策略。

其中WATERMARK FOR rowtime AS rowtime是把某个字段设定为EVENT_TIME。

详细说明。

(2)系统内置函数

将字符串转换为时间戳。

TO_TIMESTAMP(string1[, string2]) Converts date time string string1 with format string2 (by default: ‘yyyy-MM-dd HH:mm:ss’) under the session time zone (specified by TableConfig) to a timestamp.Only supported in blink planner.

详细说明。

(3)给计算列起别名

<computed_column_definition>:column_name AS computed_column_expression [COMMENT column_comment]

(4)完整代码

字段名要和kafka中的json属性完全一致。

// TODO 3 从指定的数据源(kafka)读取数据,转换为动态表,并指定水位线
String orderWideTopic = "dwm_order_wide";
String groupId = "province_stats";
tableEnv.executeSql("CREATE TABLE order_wide (" +" province_id BIGINT," +" province_name STRING," +" province_area_code STRING," +" province_iso_code STRING," +" province_3166_2_code STRING," +" order_id STRING," +" split_total_amount DOUBLE," +" create_time STRING," +" rowtime as TO_TIMESTAMP(create_time)," +" WATERMARK FOR rowtime AS rowtime - INTERVAL '3' SECOND" +" ) WITH (" + MyKafkaUtil.getKafkaDDL(orderWideTopic,groupId) +")");

【实时数仓】DWS层之商品主题计算、地区主题表(FlinkSQL)相关推荐

  1. Flink SQL搭建实时数仓DWD层

    1.实时数仓DWD层 DWD是明细数据层,该层的表结构和粒度与原始表保持一致,不过需要对ODS层数据进行清洗.维度退化.脱敏等,最终得到的数据是干净的,完整的.一致的数据. (1)对用户行为数据解析. ...

  2. 首汽约车驶向极速统一之路!出行平台如何基于StarRocks构建实时数仓?

    作者:王满,高级数据架构工程师 首汽约车(以下简称 "首约")是首汽集团为响应交通运输部号召,积极拥抱互联网,推动传统出租车行业转型升级,加强建设交通强国而打造的网约车出行平台. ...

  3. 美团点评基于 Flink 的实时数仓平台实践

    摘要:数据仓库的建设是"数据智能"必不可少的一环,也是大规模数据应用中必然面临的挑战,而 Flink 实时数仓在数据链路中扮演着极为重要的角色.本文中,美团点评高级技术专家鲁昊为大 ...

  4. 【实时数仓】DWS层访客主题计算(续)、商品主题计算

    文章目录 一 DWS层-访客主题计算 1 写入OLAP数据库 (1)增加ClickhouseUtil a JdbcSink.<T>sink( )的四个参数说明 b ClickhouseUt ...

  5. 【实时数仓】DWS层的定位、DWS层之访客主题计算(PV、UV、跳出次数、计入页面数、连续访问时长)

    文章目录 一 DWS层与DWM层的设计 1 设计思路 2 需求梳理 3 DWS层定位 二 DWS层-访客主题计算 1 需求分析与思路 2 功能实现 (1)封装VisitorStatsApp,读取Kaf ...

  6. 数据仓库—stg层_数据仓库之Hive快速入门 - 离线实时数仓架构

    数据仓库VS数据库 数据仓库的定义: 数据仓库是将多个数据源的数据经过ETL(Extract(抽取).Transform(转换).Load(加载))理之后,按照一定的主题集成起来提供决策支持和联机分析 ...

  7. 【实时数仓】DWD层需求分析及实现思路、idea环境搭建、实现DWD层处理用户行为日志的功能

    文章目录 一 DWD层需求分析及实现思路 1 分层需求分析 2 每层的职能 3 DWD层职能详细介绍 (1)用户行为日志数据 (2)业务数据 4 DWD层数据准备实现思路 二 环境搭建 1 创建mav ...

  8. 华为云发布 GaussDB(DWS) 实时数仓,技术创新释放行业数据价值

    8 月 31 日,在华为云 TechWave 大数据专题日上,华为云发布了 GaussDB(DWS)实时数仓,工商银行.广东移动.清华大学等分享了大数据技术创新及应用实践. 围绕数据全生命周期提供整体 ...

  9. 数据查询和业务流分开_数据仓库介绍与实时数仓案例

    1.数据仓库简介 数据仓库是一个面向主题的(Subject Oriented).集成的(Integrate).相对稳定的(Non-Volatile).反映历史变化(Time Variant)的数据集合 ...

  10. 如果你也想做实时数仓…

    数据仓库也是公司数据发展到一定规模后必然会提供的一种基础服务,数据仓库的建设也是"数据智能"中必不可少的一环. 本文将从数据仓库的简介.经历了怎样的发展.如何建设.架构演变.应用案 ...

最新文章

  1. 这才是Matlab的正确打开方式!——Matlab矩阵、绘图、函数计算与数据读取
  2. centos mysql 设置_CentOS下MySql优化及安全设置centos
  3. 谈谈程序链接及分段那些事
  4. 100篇论文串讲对话系统前世今生
  5. php 实现 java com.sun.org.apache.xml.internal.security.utils.Base64 Byte数组加密
  6. java 支付类的接口,Java后端支付大杂烩之core.dao,service,web(重点是接口的设计)(二)...
  7. vue之自行实现派发与广播-dispatch与broadcast
  8. java项目中用了Disruptor之后,性能提升了10倍
  9. Android百分比布局支持和垂直TextView
  10. 【bzoj1999】[Noip2007]Core树网的核 树的直径+双指针法+单调队列
  11. asp和php关系,php和asp对象的等价关系_PHP教程
  12. windows 线程核心内容
  13. ap漫游测试软件,AC+AP方案选择,TP无缝漫游强过UBNT?胖AP如何实现802.11r?
  14. 数学小故事之 被柯西坑了的两个天才数学家——阿贝尔和伽罗瓦
  15. win2008服务器系统玩红警,Win10系统不能玩红警2的解决方法
  16. github Your account has been flagged.提示情况的解决办法
  17. 蒟蒻重返c++,学海拾贝
  18. 销售漏斗是什么?有什么作用!
  19. Google Earth Engine(GEE)——给GEE地图加入指北针
  20. mysql excel 同步数据_mysql导入excel数据

热门文章

  1. SQL查询某个年龄段的月平均收入的思考
  2. 基于稳态视觉诱发电位识别的虚拟家用电器脑机交互控制接口设计
  3. 【创意】9个令人尖叫的环保品牌创意
  4. FE-5000型铁电性能测试仪
  5. 应用层网关防火墙简介
  6. ipynb引用另一个ipynb
  7. uniapp阻止页面返回
  8. C++和NASM联合编译
  9. School StartsFirstProject~UnityVR(HTCVive设备开发)
  10. 烦死人了,全彩led显示屏在使用过程中总是跳闸,原因跟解决方案分享