作者:

卢圣刚,核桃编程数据架构师,拥有多年的大数据开发和架构经验。曾担任易观数据挖掘工程师,熊猫TV大数据架构师。


核桃编程简介

核桃编程成立于2017年8月9日,作为少儿编程教育行业的领导者,始终秉持“让每个孩子爱学习、会学习,让优质的教育触手可及”的使命,致力于以科技手段促进编程教育,凭借首创的AI人机双师教学模式与十级进阶课程体系,实现规模化因材施教,“启发中国孩子的学习力”。截止2019年8月,核桃编程已经成为付费学员规模最大的少儿编程教育机构,帮助超过65万名孩子收获学习兴趣,锻炼编程技能,养成良好思维习惯,学员复购率超91%,学员完课率高达98%,在线原创作品1873万份。

1.业务现状

1.1 业务需求

  • 业务上固定时间开课,在开课时间内,班主任需要实时/准实时地知道学生的学习情况

  • 数据统计维度一般都是按班级,学期汇总,时间范围可能是几个月,甚至一年

  • 业务变化快,需要及时响应业务变化带来的指标逻辑变更

1.2 数据源

数据源

备注

Nginx accesslog

用户行为日志

Mysql binlog

通过阿里云DTS工具同步到Kafka

Kafka

业务方采集日志写入到Kafka集群

1.3 架构改造前方案

现有指标都是将Kafka/Mysql等的数据写入HDFS,使用Hive离线批处理,每10分钟执行一次,循环统计历史累计指标,再定时把数据同步到Mysql,提供给数据后台查询。如下图所示:

1.4 遇到的问题

随着计算的数据量越来越大,逐渐不能满足业务的更新频率要求。

  • 使用Apache Sqoop做全量数据同步,会对业务Mysql库/HDFS造成压力。

  • 使用Apache Sqoop做增量同步,一般只能使用某个时间字段(例如update time)来同步新修改的数据。这样在做分区表时,需要比较复杂的离线合并。

  • 随着数据越来越大,同步以及处理时间会越来越长,满足不了业务实时性需求。

2.实时数仓方案调研

离线的同步方案已经不能满足业务需求,计划迁移到实时方案上来,并做了一些调研。

2.1 迁移流式计算的问题

开发周期长

现有离线任务基本都是动辄几百行SQL,逻辑复杂,把所有逻辑迁移到流式计算,开发难度和改造成本都比较大。
例如离线增量同步,需要先同步全量base数据

sqoop import  \
--hive-import \
--hive-overwrite \
--connect jdbc:mysql://<mysqlurl>  \
--table <mysqltable> \
--hive-table <table_base> \
--hive-partition-key <parcolumn> \
--hive-partition-value <par1>

再消费增量binlog数据,流式写入到hive外部表,最后将两个表合并

insert overwrite table <result_storage_table>select
<col1>,<col2>,<colN>from(select
row_number() over(partition by t.<primary_key_column>order by record_id
desc, after_flag desc) as row_number, record_id, operation_flag, after_flag,
<col1>, <col2>, <colN>from(select
incr.record_id, incr.operation_flag, incr.after_flag, incr.<col1>,
incr.<col2>,incr.<colN>from
<table_log> incrwhere
utc_timestamp< <timestamp>union all select 0
as record_id, 'I' as operation_flag, 'Y' as after_flag, base.<col1>,
base.<col2>,base.<colN>from
<table_base> base) t) gtwhere record_num=1and
after_flag='Y'

而应用Delta Lake只需要一个streaming sql即可实现实时增量同步。

CREATE SCAN <SCAN_TABLE> on <STREAM> using
stream;
CREATE STREAM job
OPTIONS(
checkpointLocation='/cdc',
triggerInterval=30000
)
MERGE INTO <CDC_TABLE> as target
USING (SELECTfrom_unixtime(<col2>,'yyyyMMdd') as
par_date,<col1>FROM(SELECTrecordId,recordType,CAST(before.id as
LONG) as before_id,CAST(after.id as
LONG) as id,after.<col1>,after.ctime,dense_rank() OVER
(PARTITION BY c
oalesce(before.id,after.id) ORDER BY recordId DESC) as rankFROM (SELECTrecordId,recordType,from_json(CAST(beforeImages as STRING), 'id STRING, <col1>
<coltype1>,ctime string') as before,from_json(CAST(afterImages as STRING), 'id STRING, <col1>
<coltype1>,ctime string') as afterFROM (select
from_avro(value) as (recordID, source, dbTable, recordType, recordTimestamp,
extraTags, fields, beforeImages, afterImages) from <SCAN_TABLE>) binlog WHERE
recordType != 'INIT') binlog_wo_init) binlog_extract
WHERE rank=1
) as source
ON target.id = source.before_id
WHEN MATCHED AND source.recordType='UPDATE' THEN
UPDATE SET *
WHEN MATCHED AND source.recordType='DELETE' THEN
DELETE
WHEN NOT MATCHED AND (source.recordType='INSERT' OR
source.recordType='UPDATE') THEN
INSERT *;

数据恢复困难

对离线任务来说数据恢复只需要重新执行任务就行。

但对流式计算,当数据异常,或者逻辑变更,需要重新跑全量数据的时候,只能离线补历史数据,再union实时数据。因为Kafka不可能存所有历史数据,而且从头消费追数据时间也会很久。

而为了满足快速恢复的需求,所有指标都需要从一开始准备离线和实时两套代码,类似Lambda架构。

数据验证困难

Kafka在大数据架构中一般充当消息队列的角色,数据保存周期较短。全量历史数据,会消费Kafka写到HDFS。如果一个指标计算了一个月,发现计算结果有异常,很难追溯是当时Kafka数据有问题,还是计算逻辑有问题。HDFS数据虽然可以用来排查,但是HDFS里的数据和当时Kafka的数据是否一致,是不能保证的。

2.2 希望满足的功能

正因为迁移流式作业会有一些迁移成本和问题,所以对实时计算方案提出了一些功能要求。

开发灵活

互联网公司业务发展速度快,人力资源比较紧张,需要更低成本更快捷的开发新指标,满足业务敏捷性的要求。

重跑历史数据方便

业务指标的定义经常发生变更,一旦变更,或者有新的数据指标就需要从最早开始消费。但是历史数据通常非常多,而且一般实时数据源Kafka也不可能存历史所有数据。

数据异常时容易排查问题

以离线数仓为例,几百行的SQL,可以分段执行,来逐步排查。Flink可以埋metrics获取中间过程。

3.基于Delta Lake实时数仓方案

3.1 Delta Lake

Delta Lake是美国Databricks开源的数据湖技术,基于Apache Parquet丰富了数据管理功能,如元数据管理/事务/数据更新/数据版本回溯等。使用Delta Lake可以很方便的将流处理和批处理串联起来,快速构建Near-RealTime的Data Pipeline.

目前阿里巴巴E-MapReduce(简称“EMR”)团队对Delta Lake做了很多功能和性能上的优化,并和Spark做了深度集成,主要以下方面,更多信息详见EMR官方文档,https://help.aliyun.com/document_detail/148369.html

  • SparkSQL支持Update/Delete/Merge Into/Optimize/Vacuum等语法来操作Delta Lake

  • 自研SparkStreaming SQL,支持Delta Lake的相关DML操作

  • Hive&Presto On Delta Lake

  • Delta Lake On OSS(阿里云对象存储)

  • Delta Lake事务冲突检测优化

  • DataSkipping & Zorder性能优化

3.2 SparkStreaming SQL

阿里巴巴EMR团队在StructStreaming基础上自研了SparkStreaming SQL,用户可以很方便的使用SQL来写流式作业的逻辑,大大降低了开发门槛, 详见 SparkStreaming SQL官方文档,https://help.aliyun.com/document_detail/124684.html

  • 批流统一引擎
    可以复用底层SparkSQL/SparkCore的优化

  • 丰富的SQL支持

    CREATE TABLE / CREATE SCAN / CREAT STREAM / CTAS

INSERT INTO / MERGE INTO
          SELECT / WHERE/ GROUP BY / JOIN / UNION ALL

  • 丰富的UDF支持
    Hive UDF / 窗口函数

  • 丰富的数据源支持
    Delta/Kudu/Druid/HBase/MySQL/Redis/SLS/Datahub/TableStore,并且支持Kafka的Exactly Once

github: https://github.com/aliyun/aliyun-emapreduce-sdk

  • Delta Lake深度集成
    结合Delta Lake的使用场景,新增了一些功能的支持(比如流式写动态分区表)

3.3 实时数仓方案

架构方案

基于Delta Lake+SparkStreaming SQL可以快速构建实时数仓的pipeline,如下所示:

  • ODS层
    ODS的数据主要是实时埋点数据,CDC中的binlog日志等

  • DIM维表

  • DW层
    DW层主要是一部分轻度汇总数据,例如用户维度的课程,作业等信息。

    主要复用的是DW层数据,因此针对每一个指标,需要综合考虑是否聚合,聚合到哪一个维度,是否关联维表。

    DW层分为两种

    a.业务简单,基本不会变化。直接写入Kafka。

    b.业务逻辑复杂,数据可能<频繁>变化,写入Delta Lake。实践上看,直接写入Kafka是最容易的方案,但是灵活性很低,历史数据无法追溯,也无法修改。DW层通过引入Delta Lake,可以实现流批统一数据源,历史分区数据恢复等功能。

  • DM层

    DM层就是最后的报表展示指标了,可以将DW层delta表做为数据源,再次汇总后sink到展示用的DataBase。

备注:
EMR团队提供了流式Merge Into功能,可以通过写SparkStreaming SQL的方式来做CDC回放binlog到Delta表。
详见CDC同步文档,https://help.aliyun.com/document_detail/148382.html

问题的优化

在使用Delta Lake的过程中,我们也发现了一些问题,详细的解决方案和建议如下:

小文件多

CDC流式Merge回放binlog的过程中,会不断产生小文件,需要对小文件进行一些处理,EMR提供了一些优化方案

  • 新增串行auto compaction的功能
    在CDC流式作业运行过程中,根据一定的策略对小文件进行合并compact操作

  • 使用Adaptive Execution
    打开自适应执行开关,可以有效减少Merge过程产生的小文件,如单个batch从100个小文件减少到1~2个文件。

Compact冲突问题

如果不使用串行Compact功能,需要定期手工对Delta表进行Compact合并小文件,但是经常碰到Compact在事务提交的时候和CDC流作业事务提交产生冲突,是的CDC流或者Compact失败,这块也提供了一些优化以及建议:

  • 优化Delta内核冲突机制,使得CDC流能够稳定运行,不会因为Compact挂掉

  • 使用分区表,批量对分区进行Compact,减少冲突概率

  • 在数据库表update/delete操作很少的时候进行Compact(可以使用EMR工作流调度)

  • 使用EMR工作流中的作业重试功能,当遇到Compact事务提交失败时进行重试

架构方案进一步说明

• 为什么不直接从ODS计算

以核桃的到课指标为例,数据源是Kafka的埋点topic,需要计算的指标有个人维度到课数据,学期维度,班级维度,学期维度,市场渠道维度。
每个维度都需要消费所有的埋点数据,从中挑出到课相关的事件。并且每个维度的计算程序都需要查询HBase/Mysql关联相关的学期,班级,unit等维表。
一旦有整体逻辑的调整,例如过滤测试班数据,不可能从ods层就把数据过滤掉(这样从底层就开始丢失数据,后期无法追查),那么所有程序都需要重新调整,添加这个过滤逻辑。

• 怎么恢复数据

理想情况是,实时与离线使用同一套SQL,同一套计算逻辑,同一个数据源,这样随时可以用离线脚本重跑历史数据。但是现实是没有哪个框架支持。所谓流批一体,都是在引擎层面,例如Spark的streaming和SQL都是batch的方式,流只是更小的批。而Flink则希望用流的方式去处理批数据,批只是有边界的流。针对高阶的SQL API,流批都有很大的区别。基于Delta Lake的分区表,将dw层的实时数据按时间分区,这样可以随时用离线作业恢复历史分区的数据。而DW之上的汇总因为数据量相对较小,恢复之后可以用流作业从头消费。

4. 业务效果

Delta Lake实时数仓在核桃编程部分数据仓库生产环境上线后,部分业务统计指标已基于新架构产出,指标更新延迟从几十分钟,提升到1分钟以内。班主任可以更快获取学生的学习状态,及时跟进学习进度,从而显著提升了教学质量。
在CDC应用后,数据同步延迟从半小时提升到30秒,同时解决了Sqoop高并发同步时对业务数据库的影响。数据分析人员Ad-Hoc查询时,可以获取实时的业务数据,明显提升了数据分析效果,并且可以更及时的指导业务发展。

5. 后续计划

根据目前的业务应用效果,后续大数据团队会继续梳理业务范围所有实时指标,进一步优化实时数仓各层的结构,推进全面应用基于Delta Lake的实时数仓建设。
基于Delta Lake模式执行、时间旅行等特性,进一步推进机器学习场景下对Delta的应用,构造更可靠、易扩展的Data Pipeline。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。

阅读原文

核桃编程Delta Lake实时数仓应用实践相关推荐

  1. 美团点评基于 Flink 的实时数仓平台实践

    摘要:数据仓库的建设是"数据智能"必不可少的一环,也是大规模数据应用中必然面临的挑战,而 Flink 实时数仓在数据链路中扮演着极为重要的角色.本文中,美团点评高级技术专家鲁昊为大 ...

  2. Flink 与 TiDB 联合发布实时数仓最佳实践白皮书

    简介:点击链接,动动手指获取白皮书-另外,实时数仓 Meetup 议题征集中! GitHub 地址 https://github.com/apache/flink 欢迎大家给 Flink 点赞送 st ...

  3. 腾讯云原生实时数仓建设实践

    腾讯云原生实时数仓建设实践 实时数仓面临的挑战 实时数仓被广泛应用于腾讯各大业务,涉及的平台众多,从统计信息中可以看出,集群规模庞大,数据量极大. 复杂的使用场景和超大的数据量,导致我们在实时数仓的建 ...

  4. 58同城宝实时数仓建设实践

    背景 作为国内领先的覆盖生活全领域的服务平台,58同城业务覆盖招聘.房产.汽车.金融等生活领域的各个方面.58同城宝是针对生活服务信息做广告推广的平台,依托58同城海量的商户和每天更新的生活数据,58 ...

  5. 美团点评基于 Flink 的实时数仓建设实践

    来自:美团技术团队 引言 近些年,企业对数据服务实时化服务需求日益增多.本文整理了常见实时数据组件的性能特点和适用场景,介绍了美团如何通过 Flink 引擎构建实时数据仓库,从而提供高效.稳健的实时数 ...

  6. 网易云音乐基于 Flink + Kafka 的实时数仓建设实践

    简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍 Flink + Kafka 在网易云音乐的应用实战: 背景.Flink + Kafka 平台化设计.Kafka 在 ...

  7. 滴滴基于 Flink 的实时数仓建设实践

    简介:随着滴滴业务的高速发展,业务对于数据时效性的需求越来越高,而伴随着实时技术的不断发展和成熟,滴滴也对实时建设做了大量的尝试和实践.本文主要以顺风车这个业务为引子,从引擎侧.平台侧和业务侧各个不同 ...

  8. flink 写kafka_网易云音乐基于 Flink + Kafka 的实时数仓建设实践

    简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍 Flink + Kafka 在网易云音乐的应用实战: 背景 Flink + Kafka 平台化设计 Kafka 在 ...

  9. 机器学习从入门到精通50讲(四)-实时数仓应用实践案例

    前言   随着实时技术的不断发展和商家实时应用场景的不断丰富,有赞在实时数仓建设方面做了大量的尝试和实践.本文主要分享有赞在建设实时数仓过程中所沉淀的经验,内容包括以下五个部分: 建设背景 应用场景 ...

  10. 从 Storm 迁移到 Flink,美团外卖实时数仓建设实践

    简介: 本文主要介绍一种通用的实时数仓构建的方法与实践.实时数仓以端到端低延迟.SQL 标准化.快速响应变化.数据统一为目标. 作者:朱良 本文主要介绍一种通用的实时数仓构建的方法与实践.实时数仓以端 ...

最新文章

  1. 阿里云服务器怎么安装docker
  2. 窗口的z-order是什么?PyQt5
  3. 1.13.、1.14.Flink 支持的DataType和序列化、Flink Broadcast Accumulators Counters Distributed Cache
  4. input输入框获取焦点时,光标置于最右
  5. 关于JPQL UPDATE 语句的 一点体会
  6. 升级PowerShell至4.0版本
  7. 9600波特率每秒传送多少字节_arduino传送字符串json到python解析为字典数据
  8. java 程序硬盘,用Java得到硬盘空间
  9. 金字塔测试早已过时!
  10. python属于汇编语言还是高级语言_计算机语言Python解释器
  11. 微信公众号获得城市及街道位置信息
  12. java有什么岗位_java开发有哪些岗位?相关岗位及工作职责
  13. finecms aip.php漏洞,finecms V5 会员头像任意文件上传漏洞 附修复代码
  14. matlab怎么启动一个图形窗,Matlab在一个图形窗口里画多个图形的操作教程
  15. 基于C4D的3d设计
  16. 【4. 扫描节点】 分布式漏洞扫描系统设计与实现
  17. 程序员的工匠精神何在?何来?
  18. 批量保存拼多多批发商城商品主图及视频
  19. 飞凌嵌入式i.MX8MM在智慧医疗麻醉系统中的应用方案解析
  20. 优思学院|六西格玛管理的核心理念是什么?

热门文章

  1. 酷播云视频列表功能教程,简单实现视频列表连播
  2. JAVA中rpm什么意思,RPM常用命令介绍
  3. pngimg 可以商用吗_免费商用无版权素材 免费图库 抠图 PNG 插画素材
  4. ubuntu系统打开.chm文件方式
  5. linux 压力测试pps,开发一款集群ddos防火墙压力测试
  6. linux系统挂载光盘镜像ISO的方法
  7. 吴伯凡-认知方法论-为什么说“盲维”是认知升级的重要概念
  8. 3dmax中如何解决镜头穿透模型?
  9. 苹果电脑怎么断网?你会断网吗? Mac电脑断网方法
  10. SQL中模式的定义和删除