背景

特步集团有限公司是中国领先的体育用品企业之一,主要从事运动鞋、服装及配饰的设计、开发、制造和销售。 为了提高特步零售 BI 主题数据分析的准确性和时效性,降低对 SAP HANA 平台的依赖,2020 年 11 月特步集团首次引入了 Apache Doris 进行数据仓库搭建试点。在经历实时日报(移动端)和《特步全网零售战绩》大屏两个小项目的成功后,于 2021 年 3 月开始逐步启动特步儿童 BI、特步电商 BI、双十一大屏、特步新品牌 BI 等多个项目,经过一年的努力,初步完成了基于 Apache Doris 的零售数据仓库搭建和上线运行。 在这些项目实践过程中,我们遇到了很多困难,也解决了很多问题,在这里总结出来分享给大家。

总体架构

在特步零售数据仓库的项目中,我们大胆的抛弃了传统的 Hive 离线数据处理模式,一步从 SAP HANA 进化到基于 MPP 架构的单一大数据平台架构。项目基于 Apache Doris 集群完成接口数据的接入、仓库层的建模和加工、集市层的建模以及 BI 报表的即时查询。 先展开说明一下这样设计的原因。在前期的项目经历中,我们既有过基于 Hive+Greenplum 搭建卡宾零售 BI 项目的经验,也有基于 Greenplum+MySQL 搭建斐乐 BI 项目的经验,还有基于 Hive+Doris 的安踏户外 BI 项目经验,得到的结论有: ① MPP 架构开发效率高,查询和跑批速度远高于 Hive 数仓; ② MPP 架构支持有限的 Delete 和 Update ,开发的灵活度更高; ③项目交付两套环境,运维难度很大; ④ Doris 在架构设计上比 Greenplum 更为领先,对 OLAP 支持更好,查询性能也更高。 基于以上原因,加上 Hive 数据处理和查询的时效性无法满足业务需求,所以我们坚定的选择选择了 Apache Doris 作为特步零售数据仓库的唯一大数据平台。 确定项目选型以后,我们讨论了数据仓库的分层设计。项目最先启动的是特步儿童 BI 项目,考虑到系统业务数据存在多个来源,复杂的业务指标口径以及来源相同的不同品牌需要进行拆分,我们在数据仓库层采用了 DWD、DWB、DWS 三层加工。 图 1 - 特步儿童 BI 项目系统分层

数据仓库分层逻辑如下:

DWD 模型层: 关联维度数据的加工和明细数据的简单整理,包括头行表结构的合并、商品拆箱处理、命名统一、数据粒度统一等。 DWD 层的销售、库存明细数据均按照系统加工,每个系统的加工逻辑创建一张视图,结果对应一张物理表。 DWD 层大部分采用 Duplicate Key 模型,部分能明确主键的采用 Unique Key 模型。

DWB 基础层:保留共性维度,汇总数据到业务日期、店铺、分公司、SKC 粒度。销售模块 DWB 层合并了不同系统来源的电商数据、线下销售数据、库存明细数据,还关联了维度信息,增加数据过滤条件,加工了分公司维度,确保 DWS 层可以直接使用。DWB 层较多采用 Duplicate Key 模型,便于按照 Key 删除数据,也有部分采用 Aggregate Key 模型。

DWS 汇总层:将 DWB 层加工结果宽表化,并按照业务需求,加工本日、本月、本周、本年、昨日、上月、上周、上年及每个标签对应的同期数据。DWS 层较多采用 Duplicate Key 模型,也有部分采用 Aggregate Key 模型。DWS 层完成了指标的汇总和维度的拓展,为报表提供了统一的数据来源。

ETL 实践

本次项目采用了自研的一站式 DataOps 大 数据管理平台 ,完成系统数据的抽取、加载和转换,以及定时任务的执行等。 在数据分层标准 之下 ,关于 ETL 实践,我们主要完成了一些内容: 01  批量数据导入

批量数据导入我们采用的是目前最主流的开源组件 DataX。自研的 DataOps 大数据管理平台在开源 DataX 的基础上做了很多封装,我们只需要创建数据同步任务,选择数据来源和数据目标表,即可自动生成字段映射和 DataX 规范的 Json 配置文件。

图 2 - 启数道平台 在项目初期,Doris 未发布 DataX 插件,仅通过原始的 JDBC 插入数据达不到性能要求。产品团队开发了 DataX 加速功能,先将对应数据抽取到本地文件,然后通过 Stream  Load 方式加载入库,可以极大的提升数据抽取速度。数据读取到本地文件取决于网络宽带和本地读写性能,数据加载达到了 2000 千万数据 12.2G 仅需 5 分钟的效果。 图 3 - DataX 加速 此外,DataX 数据同步还支持读取自定义 SQL 的方式,通过自定义 SQL 可以处理 SQL SERVER 这种数据库比较难解决的字符转换问题和偶尔出现的乱码字符问题。

批量数据同步还支持增量模式,通过抽取最近 7 天的数据,配合 Doris 的主键模型,可以轻松解决大部分业务场景下的增量数据抽取。

02  实时数据接入 在实时数据接入方面,由于接入的实时数据都来自于阿里云的 DRDS,所以我们采用的是 Canal+Kafka+Routine Load 模式。详细的配置就不展开了,环境搭建完成以后,只需要取 Canal 里面配置拦截策略,将表对应的流数据映射成 Kafka Topic,然后去 Doris 创建 R outine L oa d 就 OK 了,这里举一个 R outine L oad 的案例。

ALTER TABLE DS_ORDER_INFO ENABLE FEATURE "BATCH_DELETE";CREATE ROUTINE LOAD t02_e3_zy.ds_order_info ON DS_ORDER_INFOWITH MERGECOLUMNS(order_id, order_sn, deal_code, ori_deal_code, ori_order_sn,crt_time = now(), cdc_op),DELETE ON cdc_op="DELETE"    PROPERTIES    (    "desired_concurrent_number"="1",    "max_batch_interval" = "20",    "max_batch_rows" = "200000",    "max_batch_size" = "104857600",    "strict_mode" = "false",    "strip_outer_array" = "true",    "format" = "json",    "json_root" = "$.data",    "jsonpaths" =   "[\"$.order_id\",\"$.order_sn\",\"$.deal_code\",\"$.ori_deal_code\",\"$.ori_order_sn\",\"$.type\" ]"     )FROM KAFKA    (    "kafka_broker_list" = "192.168.87.107:9092,192.168.87.108:9092,192.168.87.109:9092",    "kafka_topic" = "e3_order_info",    "kafka_partitions" = "0",    "kafka_offsets" = "OFFSET_BEGINNING",    "property.group.id" = "ds_order_info",    "property.client.id" = "doris");

03  数据加工 本次项目的数据加工我们是通过 Doris 提供的视图来完成的。利用 Doris 优秀的索引能力,加上完善的 SQL 语法支持,即使再复杂的逻辑,也可以通过视图来实现。用视图加工数据,减少了 代码发布的流程,避免了编译错误的 问题 比 Hive 的脚本式开发更加高效。
在完成模型设计以后,我们会确定模型表的命名、Key 类型等信息。完成表的创建以后,我们会创建表名 +"_v" 的视图,用于处理该表数据的逻辑加工。在大多数情况下,我们都是先清空目标表,然后从视图读取数据写入目标表的,所以我们的调度任务都比较简单,例如:

truncate table xtep_dw.dim_shop_info;insert into xtep_dw.dim_shop_infoselect * from xtep_dw.dim_shop_info_v;

对于数据量特别大的读写任务,则需要分步写入。例如:

truncate table xtep_dw.dwd_god_allocation_detail_drp;insert into xtep_dw.dwd_god_allocation_detail_drpselect * from xtep_dw.dwd_god_allocation_detail_drp_v where UPDATE_DATE BETWEEN '2020-01-01' and '2020-06-30'; insert into xtep_dw.dwd_god_allocation_detail_drpselect * from xtep_dw.dwd_god_allocation_detail_drp_v where UPDATE_DATE BETWEEN '2020-07-01' and '2020-12-31'; insert into xtep_dw.dwd_god_allocation_detail_drpselect * from xtep_dw.dwd_god_allocation_detail_drp_v where UPDATE_DATE BETWEEN '2021-01-01' and '2021-06-30'; insert into xtep_dw.dwd_god_allocation_detail_drpselect * from xtep_dw.dwd_god_allocation_detail_drp_v where UPDATE_DATE BETWEEN '2021-07-01' and '2021-12-31'; insert into xtep_dw.dwd_god_allocation_detail_drpselect * from xtep_dw.dwd_god_allocation_detail_drp_v where UPDATE_DATE BETWEEN '2022-01-01' and '2022-06-30'; insert into xtep_dw.dwd_god_allocation_detail_drpselect * from xtep_dw.dwd_god_allocation_detail_drp_v where UPDATE_DATE BETWEEN '2022-07-01' and '2022-12-31';

当然,视图开发的模式也是有一定的弊端的,比如不能做版本管理,也不便于备份。为此,我们承受了一次惨痛教训,在项目测试阶段,有同事在 dbwaver 客户端误操作 Drop  掉了 xtep_dw 数据库,导致我们花费了 3-4 天时间才恢复程序。 因此我们紧急开发了 Python 备份程序:

#!/usr/bin/python# -*- coding: UTF-8 -*-import pymysqlimport json import sys import osimport timeif sys.getdefaultencoding() != 'utf-8':    reload(sys)    sys.setdefaultencoding('utf-8')BACKUP_DIR='/data/cron_shell/database_backup'BD_LIST=['ods_add','ods_cdp','ods_drp','ods_e3_fx','ods_e3_jv','ods_e3_pld','ods_e3_rds1','ods_e3_rds2','ods_e3_zy','ods_hana','ods_rpa','ods_temp','ods_xgs','ods_xt1','t01_hana','xtep_cfg','xtep_dm','xtep_dw','xtep_fr','xtep_rpa'] if __name__ == "__main__":    basepath = os.path.join(BACKUP_DIR,time.strftime("%Y%m%d-%H%M%S", time.localtime()))     print basepath     if not os.path.exists(basepath):        # 如果不存在则创建目录         os.makedirs(basepath)     # 连接database    conn = pymysql.connect(        host='192.168.xx.xx',        port=9030,        user='root',        password='****',        database='information_schema',        charset='utf8')    # 得到一个可以执行SQL语句的光标对象    cursor = conn.cursor()  # 执行完毕返回的结果集默认以元组显示    for dbname in BD_LIST:        ##生成数据库的文件夹        dbpath = os.path.join(basepath ,dbname)        print(dbpath)        if not os.path.exists(dbpath):            # 如果不存在则创建目录             os.makedirs(dbpath)                 sql1 = "select TABLE_SCHEMA,TABLE_NAME,TABLE_TYPE from information_schema.tables where TABLE_SCHEMA ='%s';"%(dbname)        print(sql1)        # 执行SQL语句        cursor.execute(sql1)          for row in cursor.fetchall():             tbname = row[1]            filepath = os.path.join(dbpath ,tbname + '.sql')            print(u'%s表结构将备份到路径:%s'%(tbname,filepath))            sql2 = 'show create table %s.%s'%(dbname,tbname)             print(sql2)            cursor.execute(sql2)            for row in cursor.fetchall():                 create_sql = row[1].encode('GB18030')                with open(filepath, 'w') as fp:                    fp.write(create_sql)     # 关闭光标对象    cursor.close()    # 关闭数据库连接    conn.close()

然后配合 Linux 的 Crontab 定时任务,每天三次备份代码。

#备份数据库,每天执行三次,8、12、18点各一次0 8,12,18 * * * python /data/cron_shell/backup_doris_schema.py >>/data/cron_shell/logs/backup_doris_schema.log

04  BI 查询 本次项目采用的前端工具是某国产 BI 软件和定制化开发的 E-Charts 大屏。该 BI 软件 是基于数据集为中心去构建报表,并且支持灵活的数据权限管理。 大多数据情况下我们基于SQL查询创建数据集,可以有效过滤数据。 本次项目基于该 BI 平台构建了 PC 端报表(通过 App 适配手机也可以直接访问),并且新增了自助分析报表,同步开发了几张 E-Charts 大屏,实现大屏、中屏、小屏的统一。 数据查询对 Doris 来说是很 Easy 的了,基本上建好表以后设置好索引,利用好 B itmap ,性能就不会差。 这里需要说明的上,在性能压力不大的情况下合理使用视图来关联多个结果集,可以减少跑批的任务和数据处理层级,有利于报表数据的快速刷新。 在这方面,我们也是尽可能减少 DWS 和 ADS 层的聚合模型,减少大数据量的读写,尽可能复用代码逻辑和模型表,减少跑批时间加强系统稳定性。

实时需求响应

在 实时的需求方面,我们分别尝试了 Lambda 架构和 Kappa 架构,最后走出来项目特色的第三条线路——相同的视图逻辑,用不同的调度任务刷新不同范围的数据,实现流批代码复用。 项目早期,我们是按照 Lambda 架构构建的任务,系统数据分为批处理和流处理两条线路,随着批处理的稳定,流处理数据的不准确性就逐步暴露。究其原因,业务系统存在数据物理删除和更新的情况,双流 Join 之后的数据准确性得不到保障。再有就是同时维护两套代码,实时逻辑的更新滞后,变更逻辑的代价也比较大。 项目后期,基于业务要求我们也尝试了把所有的零售逻辑搬迁到流处理平台,以实现流批一体,但是发现无法处理报表上常规要求本同期对比、维度数据变更和复杂条件过滤,导致搬迁工作半途而废。 最后我们结合项目的实际情况,采用批处理和微批处理结合的方式,一套代码,两种跑批模式。T+1 的链路执行最近 6 个月或者全量数据的刷新,微批处理流程刷新当日数据或者本周数据,实现数据的快速迭代更新。以 DWB 层为例:

--批处理任务(每日夜间执行一次)truncate table xtep_dw.dwb_ret_sales;insert into xtep_dw.dwb_ret_sales select * from xtep_dw.dwb_ret_sales_v;--微批任务(白天8-24点每20分钟执行一次)delete from xtep_dw.dwb_ret_sales where report_date='${curdate}';insert into xtep_dw.dwb_ret_sales select * from xtep_dw.dwb_ret_sales_vwhere report_date='${curdate}';

而 DWS 和 ADS 层的情况更为复杂,由于跑批频率太高,为了避免出现用户查看报表时刚好数据被删除的情况,我们采用分区替换的方式来实现数据的无缝切换。

--批处理任务(每日夜间执行一次)truncate table xtep_dw.dws_ret_sales_xt_swap;insert into xtep_dw.dws_ret_sales_xt_swapselect * from xtep_dw.dws_ret_sales_xt_vwhere date_tag in ('本日','本周','本月','本年');insert into xtep_dw.dws_ret_sales_xt_swapselect * from xtep_dw.dws_ret_sales_xt_vwhere date_tag in ('昨日','上周','上月','上年');ALTER TABLE xtep_dw.dws_ret_sales_xt REPLACE WITH TABLE dws_ret_sales_xt_swap;--微批任务(白天8-24点每20分钟执行一次)truncate table xtep_dw.dws_ret_sales_xt_swap; insert into xtep_dw.dws_ret_sales_xt_swapselect * from xtep_dw.dws_ret_sales_xt_v where date_tag in ('本日','本周');ALTER TABLE xtep_dw.dws_ret_sales_xt ADD TEMPORARY PARTITION tp_curr1 VALUES IN ('本日','本周');insert into xtep_dw.dws_ret_sales_xt TEMPORARY PARTITION (tp_curr1) SELECT * from xtep_dw.dws_ret_sales_xt_swap;ALTER TABLE xtep_dw.dws_ret_sales_xt REPLACE PARTITION (p_curr1) WITH TEMPORARY PARTITION (tp_curr1);

弱弱的说一 下,Doris 的分区替换还需要再完善一下,希望可以支持类似于 ClickHouse 的语法,即:ALTER TABLE table2 REPLACE PARTITION partition_expr FROM table1; 同时,由于我们设计了良好的分层架构,对于实时性要求特别高的数据,例如双十一大屏,我们可以直接从 ODS 层汇总数据到报表层,可以实现秒级的实时查询;对于实时性较高的业务,例如移动端实时日报,我们从 DWD 或者 DWB 往上汇总数据,可以实现分钟级的实时; 对于普 通的自助分析或者固定报表,则按照灵活的频率更 新数据,兼顾了二者的时效性和准确性。

其他经验

在项目过程中,我们还遇到一些其它问题,这里简单总结一下。
01 Doris BE 内存溢出
查询任务耗用的内存过大,导致 Doris BE 挂掉的情况,我们也出现过。我们采取的方法是所有表都创建 3 副本,然后给 D oris 进程配置 Supervisord 自启动进程,失败的任务通过调度平台的重试功能,一般都可以在 3 次重试机会以内跑过。 02  SQL 任务超时 批处理过程中确实会有一些复杂的任务或者写入数据太多的任务会超时,除了调大 timeout 参数(目前设置为 10 分钟)以外,我们还把任务做了切分。前面的调度任务案例已经可以看到,有些写入的 SQL 我们是按照分区字段或者日期区间来分批计算和写入的。 03 删除语句不支持表达式 ‍ 删除语句不支持表达式,我认为是 Doris 后续需要优化的一个功能点。在 Doris 无法实现的情况下,我们通过改造调度平台的参数功能,先计算好参数值,然后传入变量的方式实现了动态条件删除数据。前文的调度任务代码也有案例。 04  Drop 表闪回 误删除重要的表是数据仓库开发过程中比较常见的情况,表结构我们可以通过 Python 做好备份,但是表数据实在没有更好的办法。这里 Doris 提供了一个很好的功能——Recover 功能,推荐给大家。误删除的表在 1 天以内可以支持闪回。

结束语

目前 Apache Doris 在特步集团的应用已经得到了用户的认可,今年 2 月底又对 Doris 集群进行了硬件升级,接下来会基于现有的接口数据拓展到特步品牌 BI 应用,并且迁移更多的 HANA 数仓应用到 Doris 平台。随着应用的深入,我们需要加强对 Doris 集群、Routine Load 和 Flink 任务的监控,及时发出异常预警,缩短故障恢复时间。同时,随着 向量化引擎的逐步成熟和查询优化器的进一步完善,我们需要调整一些 SQL 写法,降低批处理对系统资源的占用,让集群更好的同时服务批处理和查询需求。当然,也期待社区在资源隔 离方面可以有更进一步的完善。 最后提一个重要的产品优化方向,希望社区予以考虑: 为了可以更好的用 Doris 替代 Hive 数仓,希望社区可以考虑开发存储过程功能。 最后, 感谢 Apache Doris 社区给予的支持,也感谢百度开源了这么优秀的产品!同时要感谢信息部领导林总和曾经理给予的大力支持与包容。 祝愿 Apache Doris 社区发展越 来越好!

- 作者介绍 -

杨宏武
特步零售数据仓库项目架构师,Apache Doris 活跃 Contributor,负责公司产品研发和 Apache Doris 应用方向的落地和推广。

王春波
特步零售数据仓库项目技术经理 ,《高效使用Greenplum:入门、进阶与数据中台》的作者,“数据中台研习社”号主,零售数仓项目实施专家。

欢迎关注 Apache Doris(incubating)官方公众号

【精彩文章】

应用实践 | Apache Doris 在小米集团的运维实践
应用实践|知乎用户画像与实时数据的架构与实践
应用实践|百度爱番番实时 CDP 建设实践

相关链接:

Apache Doris官方网站:

http://doris.incubator.apache.org

Apache Doris Githu b:

https://github.com/apache/incubator-doris

Apache Doris 开发者邮件组:

dev@doris.apache.org

本文分享自微信公众号 - ApacheDoris(gh_80d448709a68)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“ OSC源创计划 ”,欢迎正在阅读的你也加入,一起分享。

应用实践 | 特步集团基于 Apache Doris 的零售数据仓库项目实践相关推荐

  1. 小米 A/B 实验场景基于 Apache Doris 的查询提速优化实践

    导读:Apache Doris 是小米集团内部应用最为广泛的 OLAP 引擎之一,本文主要从数据的角度分析 A/B 实验场景查询的性能现状,探讨基于 Apache Doris 的性能优化的解决方案.经 ...

  2. 应用实践 | 物易云通基于 Apache Doris 的实时数据仓库建设

    导读: 物易云通目前已成为国内产融供应链运营服务平台的领军企业之一,平台年交易额超过 200 亿元 , 随着公司业务的快速发展,对数据计算分析的时效要求也越来越高.经数据团队的调研对比,于 2021 ...

  3. 知乎基于 Apache Doris 的 DMP 平台架构建设实践

    1. DMP 业务背景 DMP 平台是大家老生常谈的话题.在早期广告系统出现之后就拥有了类似的 DMP 平台,比如:腾讯的广点通.阿里巴巴的达摩盘等都是业界做的比较好的 DMP 平台典型.而知乎搭建属 ...

  4. 知乎基于 Apache Doris 的 DMP 平台架构建设实践|万字长文详解

    导读:知乎基于业务需求搭建了 DMP 平台,本文详细的介绍了 DMP 的工作原理及架构演进过程,同时介绍了 Apache Doris 在 DMP 平台的应用实践,本文对大家了解 DMP 工作方式很有帮 ...

  5. 基于 Apache Doris 数仓在作业帮的实践

    1 背景 作业帮大数据团队主要负责建设公司级数仓,向公司各个重要产品线(拉新.教学.BI等)提供面向业务的数据信息,如到课时长.答题情况等.在过去半年多时间内,我们基于Apache Doris,构建了 ...

  6. 数仓体系效率全面提升!同程数科基于 Apache Doris 的数据仓库建设

    应用实践 | 数仓体系效率全面提升!同程数科基于 Apache Doris 的数据仓库建设 导读:同程数科成立于 2015 年,是同程集团旗下的旅游产业金融服务平台.2020 年,同程数科基于 Apa ...

  7. 如何基于 Apache Doris 与 Apache Flink 快速构建极速易用的实时数仓

    随着大数据应用的不断深入,企业不再满足离线数据加工计算的时效,实时数据需求已成为数据应用新常态.伴随着实时分析需求的不断膨胀,传统的数据架构面临的成本高.实时性无法保证.组件繁冗.运维难度高等问题日益 ...

  8. DockOne微信分享( 九十):猎豹移动基于CoreOS在AWS上的项目实践

    本文讲的是DockOne微信分享( 九十):猎豹移动基于CoreOS在AWS上的项目实践[编者的话]本次分享介绍基于AWS的EC2服务如何设计和搭建适合自己业务的架构方案实现全球多region部署,介 ...

  9. 光大银行刘淼:基于华为云GaussDB(DWS) 数据仓库创新实践

    摘要:面向未来数据平台3.0要做架构减法,平台由N->1,华为云GaussDB(DWS)未来作为数据仓库唯一平台,数据链路实现从数据湖直接到华为云GaussDB(DWS)数据仓库. 日前,华为举 ...

最新文章

  1. NAR:psRobot-植物小RNA分析系统
  2. UI组件之AdapterView及其子类(二)GridView网格视图的使用
  3. 阿里产品专家杨文韬:你想了解的1688都在这里
  4. 当代家长现状。。 | 今日最佳
  5. 支付宝第三方授权登陆
  6. 飞鸽传书 扩散全身的
  7. C++编程学到什么程度可以面试工作?
  8. Julia : where与类型限定
  9. centos下yum安装lamp
  10. 中国金融出版社出版的2016版《综合》
  11. JSON对象按照ASCII对key值排序
  12. PCL RANSAC点云配准
  13. css控制图片自适应大小
  14. win10声卡驱动问题:未检测到任何音频设备
  15. errorC1083 无法打开源文件 c1xx
  16. 对往届软件工程的思考——写在软件工程开课之际 by 姜健
  17. Eclipse启动时闪退问题解决方案
  18. 计算机专业学生进行职业决策,计算机专业学生职业生涯规划书.doc
  19. Deep Domain Confusion:Maximinzing for Domain Invariance阅读笔记
  20. 为Debian解决Mercury MW150US无线网卡驱动

热门文章

  1. OceanBase1.4安装教程
  2. 分享:微信砍价活动源码java
  3. 【清华大学-郑莉教授】C++语言程序设计 函数的参数函数的内联、重载和系统函数的调用
  4. grunt gulp npm yarn pnpm webpack
  5. Openstack技术\在Docker容器中部署MySQL,并通过外部mysql客户端操作MySQL Server
  6. Talent Show 分数规划加背包dp(还不太懂)
  7. HTML——基本标签和重要概念
  8. X64位内存注入DLL技术(可躲避检测DLL,破解盗用DLL)
  9. Visum中导入GIS地图
  10. 树莓派+python flask 调用天气api接口实现天气数据web