transformation是flink中stream的静态对象,通过组装包含sink和source的transformation根据定义的代码可以组成stream的静态拓扑图,如下所示:

*   Source              Source
*      +                   +
*      |                   |
*      v                   v
*  Rebalance          HashPartition
*      +                   +
*      |                   |
*      |                   |
*      +------>Union<------+
*                +
*                |
*                v
*              Split
*                +
*                |
*                v
*              Select
*                +
*                v
*               Map
*                +
*                |
*                v
*              Sink

其中Source和SInk分别为拓扑图中的起点和终点,其中的spilt,select则是中间具体对数据进行操作的transformation。

同时DataStream类也是一个完成的一个stream数据抽象,其成员如下:

protected final StreamExecutionEnvironment environment;protected final StreamTransformation<T> transformation;

其中envirorment是整个流的上下文,其中的以数组的形式依次保存着之前的所有transformation,而transformation则是当前strean在组装过程中,逻辑上最后一个transformation。

下面也是一个简单的stream拓扑图例子:

*  Source              Source
*    +                   +
*    |                   |
*    |                   |
*    +------->Map<-------+
*              +
*              |
*              v
*             Sink

其中的map也是一个具体的streamTransformation,由DataStream调用map()方法具体加入到stream操作中。

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),Utils.getCallLocationName(), true);return transform("Map", outType, new StreamMap<>(clean(mapper)));
}

在此处,当前一个DataStream调用map()方法的时候,首先需要得到当前DataStream的treanformation中输出的数据类型,根据其数据类型作为新的transformation的输入数据类型。

并需要具体实现map逻辑的MapFunction作为参数,否则没有意义。

之后将这一数据类型作为参数的一部分,在根据参数中的MapFunction生成新的StreamMap,也就是动态操作符,通过transform()方法,得到新的DataStream。

@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {// read the output type of the input Transform to coax out errors about MissingTypeInfotransformation.getOutputType();OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation,operatorName,operator,outTypeInfo,environment.getParallelism());@SuppressWarnings({ "unchecked", "rawtypes" })SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}

在此处,由于是一个map操作,自然输入输出只需要一个,所以首先生成OneInputTransformation,这里的参数需要前一个transformation和当前的map操作和输入数据类型,以及当前上下文环境的并行度。

最后将生成的tramsform和上下文生成新的DataStream返回,作为新的DataStream进行下面的transform组装。

另一种例子是union操作,这一操作将会合并两个不同的DataStream。

@SafeVarargs
public final DataStream<T> union(DataStream<T>... streams) {List<StreamTransformation<T>> unionedTransforms = new ArrayList<>();unionedTransforms.add(this.transformation);for (DataStream<T> newStream : streams) {if (!getType().equals(newStream.getType())) {throw new IllegalArgumentException("Cannot union streams of different types: "+ getType() + " and " + newStream.getType());}unionedTransforms.add(newStream.getTransformation());}return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms));
}
public UnionTransformation(List<StreamTransformation<T>> inputs) {super("Union", inputs.get(0).getOutputType(), inputs.get(0).getParallelism());for (StreamTransformation<T> input: inputs) {if (!input.getOutputType().equals(getOutputType())) {throw new UnsupportedOperationException("Type mismatch in input " + input);}}this.inputs = Lists.newArrayList(inputs);
}

此处会将所有需要合并的DataStream保存在数组当中,并生成一个新的UnionTransformation,其中将所有需要合并的DataStream数组参数作为例子保存在其中。新的UnionTransformation也将会作为新的DataStream的transform参数返回。

spilt和select同时使用可以达到分流的目的。

spilt()在被调用的时候需要一个OutputSelect并重写其select()方法,根据相应流入的数据返回不同的输出结果,而接下来再调用select()方法,并在参数中确定制定的返回结果,将会依照前者的输出结果定位到指定的流当中去。

flink Datastream组装相关推荐

  1. flink DataStream API使用及原理

    传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...

  2. Flink DataStream API 介绍

    Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...

  3. 【基础】Flink -- DataStream API

    Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...

  4. Flink专题四:Flink DataStream 窗口介绍及使用

    由于工作需要最近学习flink 现记录下Flink介绍和实际使用过程 这是flink系列的第四篇文章 Flink DataStream 窗口介绍及使用 窗口介绍 时间窗口 翻滚窗口(数据以一个时间断为 ...

  5. Apache Flink DataStream 编程全集

    概述 Flink是构建在数据流之上的有状态计算的流计算框架,通常被人们理解为是第三代大数据分析方案. 第一代 - Hadoop的MapReduce计算(静态).Storm流计算(2014.9) :两套 ...

  6. Flink DataStream iterate算子的简单使用

    Flink DataStream iterate算子的简单使用 由于DataStream程序可能永远不会完成,因此没有最大迭代次数.相反你需要指定流的哪个部分反馈到迭代,哪个部分使用split转换或转 ...

  7. Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)

    数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFil ...

  8. Flink DataStream 编程入门

    流处理是 Flink 的核心,流处理的数据集用 DataStream 表示.数据流从可以从各种各样的数据源中创建(消息队列.Socket 和 文件等),经过 DataStream 的各种 transf ...

  9. [Flink]Flink DataStream window join 和interval join

    目录 window join interval join window join 窗口连接把两个流中相同窗口通过一个键值连接起来.然后,两边的元素被传递到用户定义的JoinFunction或FlatJ ...

最新文章

  1. [BuildRelease]build number / id
  2. 雷云3灯光配置文件_雷蛇的哪种键盘最适合入手?3款最佳雷蛇键盘推荐。
  3. Android graphic: bitmap and it's principle
  4. 分布式大型互联网企业架构
  5. 云主机前景几何,风萧萧兮!
  6. 【AI志愿超强攻略】中国高校人工智能专业最全院校排名课程对比
  7. 常见的80道面试算法题
  8. 西门子atch指令详解_西门子plc指令
  9. 微信小程序自定义tabbar 图标凸出效果
  10. 解决:windows电脑连接iphone手机热点,iphone锁屏后热点会自动断开
  11. 浅谈OCR之Onenote 2010
  12. 淘淘商城项目---8.5
  13. 室内定位算法_【好设计论文】基于行人航迹推算的室内定位算法研究
  14. 《Android移动应用基础教程》(Android Studio)(第二版)黑马程序员 课后习题答案
  15. iOS 指纹支付和面容支付
  16. leetcode——第993题——二叉树的堂兄弟节点
  17. Android7.0及以上打开相机闪退,startActivityForResult报错解决
  18. 360随身WIFI作USB无线网卡的做法
  19. MATLAB--数字图像处理 im2col()
  20. 计算机职业规划作文英语作文,大学生职业规划英语作文

热门文章

  1. 诗与远方:无题(六十八)
  2. Django提交表单报错:CSRF token missing or incorrect.
  3. Struts2中UI标签之表单标签介绍
  4. Drools集成SpringBootStarter
  5. Javascript——声明提升(函数、变量提升)
  6. html5 漂亮的左右布局_2020年庚子年风水布局,2020年家居风水布局汇总 | 影楼
  7. 广发银行创新“智慧金融”打造“智慧城市”
  8. NoClassDefFoundError 解决方法
  9. 2017年单多晶市场竞争核心分析
  10. Debian下PostgreSQL修改密码与配置详解