Iceberg 在基于 Flink 的流式数据入库场景中的应用
本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点。
应用场景
流式数据入库,是大数据和数据湖的典型应用场景。上游的流式数据,如日志,或增量修改,通过数据总线,经过必要的处理后,汇聚并存储于数据湖,供下游的应用(如报表或者商业智能分析)使用。
上述的应用场景通常有如下的痛点,需要整个流程不断的优化:
- 支持流式数据写入,并保证端到端的不重不丢(即 exactly-once);
- 尽量减少中间环节,能支持更实时(甚至是 T+0)的读取或导出,给下游提供更实时更准确的基础数据;
- 支持 ACID,避免脏读等错误发生;
- 支持修改已落地的数据,虽然大数据和数据湖长于处理静态的或者缓慢变化的数据,即读多写少的场景,但方便的修改功能可以提升用户体验,避免用户因为极少的修改,手动更换整个数据文件,甚至是重新导出;
- 支持修改表结构,如增加或者变更列;而且变更不要引起数据的重新组织。
引入 Iceberg 作为 Flink sink
为了解决上述痛点,我们引入了 Iceberg 作为数据落地的格式。Iceberg 支持 ACID 事务、修改和删除、独立于计算引擎、支持表结构和分区方式动态变更等特性,很好的满足我们的需求。
同时,为了支持流式数据的写入,我们引入 Flink 作为流式处理框架,并将 Iceberg 作为 Flink sink。
下文主要介绍 Flink Iceberg sink 的实现框架和要点。但在这之前,需要先介绍一些实现中用到的 Flink 基本概念。
Flink 基本概念
从 Flink 的角度如何理解"流"和"批"
Flink 使用 DataFrame API 来统一的处理流和批数据。
Stream, Transformation 和 Operator
一个 Flink 程序由 stream 和 transformation 组成:
- Stream: Transformation 之间的中间结果数据;
- Transformation:对(一个或多个)输入 stream 进行操作,输出(一个或多个)结果 stream。
当 Flink 程序执行时,其被映射成 Streaming Dataflow,由如下的部分组成:
- Source (operator):接收外部输入给 Flink;
- Transformation (operator):中间对 stream 做的任何操作;
- Sink (operator):Flink 输出给外部。
下图为 Flink 官网的示例,展示了一个以 Kafka 作为输入 Source,经过中间两个 transformation,最终通过 sink 输出到 Flink 之外的过程。
State, Checkpoint and Snapshot
Flink 依靠 checkpoint 和基于 snapshot 的恢复机制,保证程序 state 的一致性,实现容错。
Checkpoint 是对分布式的数据流,以及所有 operator 的 state,打 snapshot 的过程。
■ State
一个 operator 的 state,即它包含的所有用于恢复当前状态的信息,可分为两类:
- 系统 state:如 operator 中对数据的缓存。
- 用户自定义 state:和用户逻辑相关,可以利用 Flink 提供的 managed state,如 ValueState、ListState,来存储。
State 的存储位置,可以分为:
- Local:内存,或者本地磁盘
- State backend:远端的持久化存储,如 HDFS。
如下图所示:
■ Checkpoint
Flink 做 checkpoint 的过程如下:
- Checkpoint coordinator 首先发送 barrier 给 source。
- Source 做 snapshot,完成后向 coordinator 确认。
- Source 向下游发送 barrier。
- 下游 operator 收到所有上游的 barrier 后,做 snapshot,完成后向 coordinator 确认。
- 继续往下游发送 barrier,直到 sink。
- Sink 通知 coordinator 自己完成 checkpoint。
- Coordinator 确认本周期 snapshot 做完。
如下图所示:
■ Barrier
Barrier 是 Flink 做分布式 snapshot 的重要概念。它作为一个系统标记,被插入到数据流中,随真实数据一起,按照数据流的方向,从上游向下游传递。
由于每个 barrier 唯一对应 checkpoint id,所以数据流中的 record 实际被 barrier 分组,如下图所示,barrier n 和 barrier n-1 之间的 record,属于 checkpoint n。
Barrier 的作用是在分布式的数据流中,将 operator 的多个输入流按照 checkpoint对齐(align),如下图所示:
Flink Iceberg sink
了解了上述 Flink 的基本概念,这些概念又是如何被应用和映射到 Flink Iceberg sink 当中的呢?
总体框架
如图,Flink Iceberg sink 有两个主要模块和两个辅助模块组成:
实现要点
■ Writer
- 在当前的实现中,Java 的 Map 作为每条记录,输入给 writer。内部逻辑先将其转化为作为中间格式的 Avro IndexedRecord,而后通过 Iceberg 里的 Parquet 相关 API,累积的写入 DataFile。
- 使用 Avro 作为中间格式是一个临时方案,为简化适配,并最大限度的利用现有逻辑。但长期来看,使用中间格式会影响处理效率,社区也在试图通过 ISSUE-870 来去掉 Avro,进而使用 Iceberg 内建的数据类型作为输入,同时也需要加入一个到 Flink 内建数据类型的转换器。
- 在做 checkpoint 的过程中,发送 writer 自己的 barrier 到下游的 committer 之前,关闭单个 Parquet 文件,构建 DataFile,并发送 DataFile 的信息给下游。
■ Committer
- 全局唯一的 Committer 在收到上游所有 writer 的 barrier 以后,将收到的 DataFile 的信息填入 manifest file,并使用 ListState 把 manifest file 作为用户自定义的 state,保存于 snapshot 中。
- 当 checkpoint 完成以后,通过 merge append 将 manifest file 提交给 Iceberg。Iceberg 内部通过后续的一系列操作完成 commit。最终让新加入的数据对其他的读任务可见。
试用 Flink Iceberg sink
社区上 https://github.com/apache/incubator-iceberg/pull/856 提供了可以试用的原型代码。下载该 patch 放入 master 分支,编译并构建即可。如下的程序展示了如何将该 sink 嵌入到 Flink 数据流中:
// Configurate catalog
org.apache.hadoop.conf.Configuration hadoopConf =new org.apache.hadoop.conf.Configuration();
hadoopConf.set(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,META_STORE_URIS);
hadoopConf.set(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,META_STORE_WAREHOUSE);Catalog icebergCatalog = new HiveCatalog(hadoopConf);// Create Iceberg table
Schema schema = new Schema(...
);
PartitionSpec partitionSpec = builderFor(schema)...
TableIdentifier tableIdentifier =TableIdentifier.of(DATABASE_NAME, TABLE_NAME);
// If needed, check the existence of table by loadTable() and drop it
// before creating it
icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);// Obtain an execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Enable checkpointing
env.enableCheckpointing(...);// Add Source
DataStream<Map<String, Object>> dataStream =env.addSource(source, typeInformation);// Configure Ieberg sink
Configuration conf = new Configuration();
conf.setString(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,META_STORE_URIS);
conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME);
conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME);// Append Iceberg sink to data stream
IcebergSinkAppender<Map<String, Object>> appender =new IcebergSinkAppender<Map<String, Object>>(conf, "test").withSerializer(MapAvroSerializer.getInstance()).withWriterParallelism(1);
appender.append(dataStream);// Trigger the execution
env.execute("Sink Test");
后续规划
Flink Iceberg sink 有很多需要完善的地方,例如:上文中提到的去掉 Avro 作为中间格式;以及在各种失败的情况下是否仍能保证端到端的 exactly-once;按固定时长做 checkpoint,在高低峰时生成不同大小的 DataFile,是否对后续读不友好等。这些问题都在我们的后续规划中,也会全数贡献给社区。
参考资料:
[1] Iceberg 官网:
https://iceberg.apache.org/
[2] Flink 1.10文 档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/
[3] Neflix 提供的 Flink Iceberg connector 原型:
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg
[4] Flink Iceberg sink 设计文档:
https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing
[5] Flink 容错机制(checkpoint) :
https://www.cnblogs.com/starzy/p/11439988.html
# 社区活动推荐 #
普惠全球开发者,这一次,格外与众不同!首个 Apache 顶级项目在线会议 Flink Forward 全球直播中文精华版来啦,聚焦 Alibaba、Google、AWS、Uber、Netflix、新浪微博等海内外一线厂商,经典 Flink 应用场景,最新功能、未来规划一览无余。点击下方链接可了解更多大会详情:https://developer.aliyun.com/live/2594?spm=a2c6h.14242504.J_6074706160.2.3fca361f4cYyQx
原文链接
本文为云栖社区原创内容,未经允许不得转载。
Iceberg 在基于 Flink 的流式数据入库场景中的应用相关推荐
- 网易游戏基于 Flink 的流式 ETL 建设
简介:网易游戏流式 ETL 建设实践及调优经验分享- 网易游戏资深开发工程师林小铂为大家带来网易游戏基于 Flink 的流式 ETL 建设的介绍.内容包括: 业务背景 专用 ETL EntryX 通用 ...
- 使用 Flink Hudi 构建流式数据湖
简介: 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型的不断优化演进. 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增 ...
- 小白学习Flink系列--第二篇-01(流式数据概念)
导读 要想彻底理解Flink,就要了解流数据的前世今生,流数据的语义.特点,以及如何处理,以下文章就能很好的解释流数据的概念和模型,对了解Flink有很大的帮助 前言 今天流式数据处理在大数据领域是一 ...
- flink源码分析_Flink源码分析之深度解读流式数据写入hive
前言 前段时间我们讲解了flink1.11中如何将流式数据写入文件系统和hive [flink 1.11 使用sql将流式数据写入hive],今天我们来从源码的角度深入分析一下.以便朋友们对flink ...
- Apache Griffin+Flink+Kafka实现流式数据质量监控实战
点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 八股文教给我,你们专心刷题和面试 Hi,我是王知无,一个大数据领域的原创作者. 放心关注我,获取更 ...
- 流式数据、批式数据、实时数据、历史数据的区别
1.流式数据.批式数据.实时数据.历史数据的区别: 根据数据处理的时效性,大数据处理系统可分为批式(batch)大数据和流式(streaming)大数据两类. 其中,批式大数据又被称为历史大数据,流式 ...
- 流式数据架构理论 ◆ 基本概念
基本概念 流 流是一种为无界数据集设计的数据处理引擎,这种引擎具备以下特征: (1)具备强一致性,即支持 exactly-once 语义 (2)提供丰富的时间工具,如事件时间.处理时间.窗口 (3)保 ...
- Tech Talk 活动预告|构建流式数据湖,让实时数据“水到渠成”
从 TB 到 PB 到 EB...... 过去十年,数据量以惊人的速度增长. 据 IDC 发布<数据时代 2025>报告显示,2025 全球每年产生的数据将从 2018 年的 33ZB 增 ...
- 大数据Hadoop之——新一代流式数据湖平台 Apache Hudi
文章目录 一.概述 二.Hudi 架构 三.Hudi的表格式 1)Copy on Write(写时复制) 2)Merge On Read(读时合并) 3)COW vs MOR 四.元数据表(Metad ...
最新文章
- BZOJ4668: 冷战 [并查集 按秩合并]
- 【Linux】Linux 文件中^M字符处理
- python释放变量内存_Python尚学堂高淇|1113引用的本质栈内存,堆内存,内存的示意图,标识符,变量的声明初始化,垃圾回收机制...
- JDK源码系列(3)-String
- SQL Server 全文索引创建
- 数据分析与挖掘建模实战003:单因子探索分析与可视化001数据案例介绍
- 关联容器——map、set
- .net安装_如何安装GWAS分析软件R包:GAPIT
- 服务提供者框架理解草图
- 图论+dp poj 1112 Team Them Up!
- Handler处理机制
- 137_STLink驱动安装以及调试器使用测试
- leetcode blind 75
- 云南国税网上办税无法打印发票的解决办法
- PHP设置header出现警告headers already sent by (output started at......
- JavaScript 手写函数柯里化 curry
- Python案例1—人民币与美元的汇率兑换V_8.0
- 教你如何安慰失戀人?
- 宽带拨号上网显示服务器失效,拨号上网失败 宽带连接错误651怎么办
- Data Import Handler - DIH相关命令
热门文章
- 在java语法中继承_java中的继承
- 包包的结构制图_15种常见领型的结构制图
- jupyternotebook虚拟环境无法连接服务_详解pycharm连接远程linux服务器的虚拟环境的方法_python...
- 【LeetCode笔记】198. 打家劫舍(Java、动态规划)
- 折线图 java_java报表折线图
- pep8 python 编码规范_如何用好python编码规范,写一手漂亮的代码
- sql 百分号_SQL思维导图和代码分享
- html调整文字位在基线显示,html – 将标题对齐到相同的基线,无论后续文字是什么?...
- python 爬取贝壳网小区名称_Python爬虫实战:爬取贝壳网二手房40000条数据
- android 高德地图 sh1,百度、高德地图获取发布版(Release)SHA1