实例

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("XXX");DataStream<RowData> stream = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(true).build();stream.print();
env.execute("IcebergRead");

流程详解

首先看build()方法:

public DataStream<RowData> build() {Preconditions.checkNotNull(this.env, "StreamExecutionEnvironment should not be null");FlinkInputFormat format = this.buildFormat();ScanContext context = this.contextBuilder.build();TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));if (!context.isStreaming()) {int parallelism = this.inferParallelism(format, context);if (this.env.getMaxParallelism() > 0) {parallelism = Math.min(parallelism, this.env.getMaxParallelism());}return this.env.createInput(format, typeInfo).setParallelism(parallelism);} else {StreamingMonitorFunction function = new StreamingMonitorFunction(this.tableLoader, context);String monitorFunctionName = String.format("Iceberg table (%s) monitor", this.table);String readerOperatorName = String.format("Iceberg table (%s) reader", this.table);return this.env.addSource(function, monitorFunctionName).transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format));}
}

该方法主要做了两件事情:

  • buildFormat()方法,利用tableLoader加载对应的table,然后通过这个table获取到对应的Schema、FileIO、EncryptionManager;再加上contextBuilder.build()方法构建出的ScanContext对象,一起组装成了负责辅助DataSource读取数据、分发数据的InputFormat;然后分两种情况,批读取和流读取

  • 在流式读取情况下,将StreamingMonitorFunction和StreamingReaderOperator算子注册到env上

StreamingMonitorFunction:不停的扫描iceberg表看是否有新的snapshot生成,如果有则生成CombinedScanTask发向下游。
StreamingReaderOperator:一旦收到source发来的split,会将其放到一个队列中,然后通过一个MailboxExecutor线程处理,这种结构可以将读取数据和处理checkpoint barriers功能分离,避免潜在的背压。

StreamingMonitorFunction

继承关系:

它实现了CheckpointedFunction接口,所以能够保证在source端的一致性;
另外,因为它并没有实现ParallelSourceFunction接口,所以它注定只能有一个并行度。这里的目的是确保在只有一个线程去监控Iceberg表和分发任务,多线程只会发生数据错乱。
run()方法流程:

public void run(SourceFunction.SourceContext<FlinkInputSplit> ctx) throws Exception {this.sourceContext = ctx;while(this.isRunning) {this.monitorAndForwardSplits();Thread.sleep(this.scanContext.monitorInterval().toMillis());}}

monitorAndForwardSplits()方法,获取表当前最新的快照snapshotId,如果记录了lastSnapshotId,那就生成lastSnapshotId到snapshotId之间的增量文件的FlinkInputSplit对象:

void monitorAndForwardSplits() {this.table.refresh();Snapshot snapshot = this.table.currentSnapshot();if (snapshot != null && snapshot.snapshotId() != this.lastSnapshotId) {long snapshotId = snapshot.snapshotId();ScanContext newScanContext;if (this.lastSnapshotId == -1L) {newScanContext = this.scanContext.copyWithSnapshotId(snapshotId);} else {snapshotId = this.toSnapshotIdInclusive(this.lastSnapshotId, snapshotId, this.scanContext.maxPlanningSnapshotCount());newScanContext = this.scanContext.copyWithAppendsBetween(this.lastSnapshotId, snapshotId);}LOG.debug("Start discovering splits from {} (exclusive) to {} (inclusive)", this.lastSnapshotId, snapshotId);long start = System.currentTimeMillis();FlinkInputSplit[] splits = FlinkSplitPlanner.planInputSplits(this.table, newScanContext, this.workerPool);LOG.debug("Discovered {} splits, time elapsed {}ms", splits.length, System.currentTimeMillis() - start);start = System.currentTimeMillis();synchronized(this.sourceContext.getCheckpointLock()) {FlinkInputSplit[] var9 = splits;int var10 = splits.length;int var11 = 0;while(true) {if (var11 >= var10) {this.lastSnapshotId = snapshotId;break;}FlinkInputSplit split = var9[var11];this.sourceContext.collect(split);++var11;}}LOG.debug("Forwarded {} splits, time elapsed {}ms", splits.length, System.currentTimeMillis() - start);}}

此处核心:

  • 构造出从startSnapshotId到snapshotId之间的增量FlinkInputSplit(FlinkSplitPlanner.planInputSplits为核心内容,流程二详细讲解)
  • 将FlinkInputSplit分配给下游进一步的处理

StreamingReaderOperator

继承关系:

一些参数:

private final MailboxExecutor executor;
private FlinkInputFormat format;
private transient SourceFunction.SourceContext<RowData> sourceContext;
private transient ListState<FlinkInputSplit> inputSplitsState;
private transient Queue<FlinkInputSplit> splits;
private transient SplitState currentSplitState;

其中:

  • executor是暴露出来的一个执行器,这个线程同时处理用户操作和checkpoint动作,我们一次只预定一个InputSplit去读取,因此当新的checkpoint到达是能被触发而不是被InputSplit读取操作阻塞。
  • inputSplitsState为存储FlinkInputSplit的状态变量,即需要被读取的FlinkInputSplit,会在checkpoint持久化。
  • splits为当前周期需要读取的FlinkInputSplit,会在initializeState从inputSplitsState读出来。
  • currentSplitState表示当前的读取状态。

处理数据流程:

public void processElement(StreamRecord<FlinkInputSplit> element) {this.splits.add((FlinkInputSplit)element.getValue());this.enqueueProcessSplits();
}

将接收到的数据加入splits然后调用enqueueProcessSplits方法

private void enqueueProcessSplits() {if (this.currentSplitState == StreamingReaderOperator.SplitState.IDLE && !this.splits.isEmpty()) {this.currentSplitState = StreamingReaderOperator.SplitState.RUNNING;this.executor.execute(this::processSplits, this.getClass().getSimpleName());}}

在executor中异步的执行了如下操作:

  1. 从列表头中取出一个FlinkInputSplit对象,调用FlinkInputFormat.open()
  2. 轮询调用FlinkInputFormat.nextRecord()获取RowData数据对象,并交给了flink的SourceContext,至此数据真正的进入了流
    一直循环1-2这个过程,直到队列为空。
private void processSplits() throws IOException {FlinkInputSplit split = (FlinkInputSplit)this.splits.poll();if (split == null) {this.currentSplitState = StreamingReaderOperator.SplitState.IDLE;} else {this.format.open(split);try {RowData nextElement = null;while(!this.format.reachedEnd()) {nextElement = this.format.nextRecord(nextElement);this.sourceContext.collect(nextElement);}} finally {this.currentSplitState = StreamingReaderOperator.SplitState.IDLE;this.format.close();}this.enqueueProcessSplits();}
}

StreamingReaderOperator中有一个成员变量为FlinkInputFormat format,FlinkInputFormat继承自flink中的RichInputFormat,RichInputFormat继承自InputFormat,InputFormat为读取数据时候的一个抽象类,一些数据的读取数据的相关类都基于它实现。
format的open()方法会去构建一个DataIterator对象,DataIterator对应一个CombinedScanTask的数据读取的迭代器:

public void open(FlinkInputSplit split) {this.iterator = new DataIterator(this.rowDataReader, split.getTask(), this.io, this.encryption);
}

nextRecord()方法获取下一个元素:

public RowData nextRecord(RowData reuse) {++this.currentReadCount;return (RowData)this.iterator.next();
}

进入DataIterator.next():

public T next() {this.updateCurrentIterator();++this.recordOffset;return this.currentIterator.next();
}private void updateCurrentIterator() {try {while(!this.currentIterator.hasNext() && this.tasks.hasNext()) {this.currentIterator.close();this.currentIterator = this.openTaskIterator((FileScanTask)this.tasks.next());++this.fileOffset;this.recordOffset = 0L;}} catch (IOException var2) {throw new UncheckedIOException(var2);}
}private CloseableIterator<T> openTaskIterator(FileScanTask scanTask) {return this.fileScanTaskReader.open(scanTask, this.inputFilesDecryptor);
}

updateCurrentIterator()函数轮询了CombinedScanTask中的Collection files(),针对每个FileScanTask执行了FileScanTaskReader的fileScanTaskReader.open(scanTask, inputFilesDecryptor),通过FileScanTask任务读取了RowData对象,读取底层文件,包括PARQUET、AVRO、ORC三种文件格式的读取。

Iceberg源码学习:flink读iceberg流程一相关推荐

  1. 读文章笔记(三):从源码学习Transformer

    读文章笔记(三):从源码学习Transformer encoder分为两部分: decoder 公众号机器学习算法工程师 文章链接: https://mp.weixin.qq.com/s/0NajB_ ...

  2. 第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习

    第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第三课 k8s源码学习和二 ...

  3. Shiro源码学习之一

    一.最基本的使用 1.Maven依赖 <dependency><groupId>org.apache.shiro</groupId><artifactId&g ...

  4. Vuex源码学习(五)加工后的module

    没有看过moduleCollection那可不行!Vuex源码学习(四)module与moduleCollection 感谢提出代码块和截图建议的小伙伴 代码块和截图的区别: 代码块部分希望大家按照我 ...

  5. Nginx源码研究之nginx限流模块详解

    这篇文章主要介绍了Nginx源码研究之nginx限流模块详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考.一起跟随小编过来看看吧 高并发系统有三把利器:缓存.降级和限流: 限流的目的是通过对并 ...

  6. ConcurrentHashMap源码学习

    ConcurrentHashMap源码学习 自从学习了AQS之后,想着重新读一下ConcurrentHashMap的源码来加深下理解,所以有了这篇文章,针对ConcurrentHashMap常用的方法 ...

  7. ASP.NET Core MVC 源码学习:MVC 启动流程详解

    前言 在 上一篇 文章中,我们学习了 ASP.NET Core MVC 的路由模块,那么在本篇文章中,主要是对 ASP.NET Core MVC 启动流程的一个学习. ASP.NET Core 是新一 ...

  8. MVC系列——MVC源码学习:打造自己的MVC框架(一:核心原理)(转)

    阅读目录 一.MVC原理解析 1.MVC原理 二.HttpHandler 1.HttpHandler.IHttpHandler.MvcHandler的说明 2.IHttpHandler解析 3.Mvc ...

  9. Promise源码学习(2)

    Promise源码学习(2) 本篇为上一篇源码学习(1)的补充,主要是来介绍Promise.all()和Promise.race()方法. 闲话少叙,进入正题 Promise.race() 首先来简单 ...

最新文章

  1. python话雷达图-使用Python绘制雷达图
  2. 【云计算 Hadoop】Hadoop 版本 生态圈 MapReduce模型
  3. 如何避开购买灯具的9大误区?
  4. ASP VBScript 函数速查表
  5. 盘点一下结构体标签在Go中的应用
  6. Redis Bitmap 位图
  7. python os write_Python 3:写入方法与os.write返回的字节数
  8. 什么是SQL Server事务日志中的虚拟日志文件?
  9. 2021-08-21
  10. 【写作技巧】毕业论文格式要求
  11. python数据挖掘实验报告_Python数据挖掘实践—决策树
  12. js数组常用方法复习
  13. DRM2.0 的身份认证过程
  14. cacti更改显示图像的title
  15. 叮咚,您有一封告白信件待查收(原生HTML+CSS+JS绘制表白信件,代码+链接+步骤详解)
  16. 第一章 JSON语法用法
  17. bios设置 hp z800_《惠普工作站设置BIOS从U盘装系统》
  18. 2D激光雷达和视觉相结合的SLAM概述
  19. 统一配置中心对比介绍
  20. 昨天写了一个小日历,日历显示一年的日历,暂时设定位2017年,分三个种代码...

热门文章

  1. 华为云从默默无闻到市场份额17.4%,稳居第二仅用了三年,凭什么?
  2. T61P 安装Win7 出现 Windows update 当前无法检查更新,因为未运行服务。您可能需要重新启动计算机
  3. 小伙完全不懂英文,竟成为编程高手,关键词汇就这100多个啊
  4. 史上最简单的SpringCloud教程 | 第七篇: 高可用的分布式配置中心(Spring Cloud Config)(Finchley版本)
  5. Java项目:医疗药品采购系统(java+SSM+JSP+jQuery+h-ui+mysql)
  6. 洪荒之力的英语翻译(mystical powers)
  7. 硅谷游记|机智云硅谷行第二季完美收官
  8. 内连接、外链接、自然连接、全连接
  9. 【MyBatis-Plus】MyBatis-Plus插件机制以及通用Service、新功能
  10. 预告 - 英诺森《2022-2023三位一体重资产企业物资仓储综合服务体系》白皮书解读会来袭