Operators transform one or more DataStreams into a new DataStream.

Operators操作转换一个或多个DataStream到一个新的DataStream 。

filter function

Scala

object DataStreamTransformationApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentfilterFunction(env)env.execute("DataStreamTransformationApp")}def filterFunction(env: StreamExecutionEnvironment): Unit = {val data=env.addSource(new CustomNonParallelSourceFunction)data.map(x=>{println("received:" + x)x}).filter(_%2 == 0).print().setParallelism(1)}}

数据源选择之前的任意一个数据源即可。

这里的map中没有做任何实质性的操作,filter中将所有的数都对2取模操作,打印结果如下:

received:1
received:2
2
received:3
received:4
4
received:5
received:6
6
received:7
received:8
8

说明map中得到的所有的数据,而在filter中进行了过滤操作。

Java

    public static void filterFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data = env.addSource(new JavaCustomParallelSourceFunction());data.setParallelism(1).map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("received:"+value);return value;}}).filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long value) throws Exception {return value % 2==0;}}).print().setParallelism(1);}

需要先使用data.setParallelism(1)然后再进行map操作,否则会输出多次。因为我们用的是JavaCustomParallelSourceFunction(),而当我们使用JavaCustomNonParallelSourceFunction时,默认就是并行度1,可以不用设置。

Union Function

Scala

  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//    filterFunction(env)unionFunction(env)env.execute("DataStreamTransformationApp")}def unionFunction(env: StreamExecutionEnvironment): Unit = {val data01 = env.addSource(new CustomNonParallelSourceFunction)val data02 = env.addSource(new CustomNonParallelSourceFunction)data01.union(data02).print().setParallelism(1)}

Union操作将两个数据集综合起来,可以一同处理,上面打印输出如下:

1
1
2
2
3
3
4
4

Java

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//        filterFunction(environment);unionFunction(environment);environment.execute("JavaDataStreamTransformationApp");}public static void unionFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data1 = env.addSource(new JavaCustomNonParallelSourceFunction());DataStreamSource<Long> data2 = env.addSource(new JavaCustomNonParallelSourceFunction());data1.union(data2).print().setParallelism(1);}

Split  Select  Function

Scala

split可以将一个流拆成多个流,select可以从多个流中进行选择处理的流。

def splitSelectFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomNonParallelSourceFunction)val split = data.split(new OutputSelector[Long] {override def select(value: Long): lang.Iterable[String] = {val list = new util.ArrayList[String]()if (value % 2 == 0) {list.add("even")} else {list.add("odd")}list}})split.select("odd","even").print().setParallelism(1)}

可以根据选择的名称来处理数据。

Java

public static void splitSelectFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data = env.addSource(new JavaCustomNonParallelSourceFunction());SplitStream<Long> split = data.split(new OutputSelector<Long>() {@Overridepublic Iterable<String> select(Long value) {List<String> output = new ArrayList<>();if (value % 2 == 0) {output.add("odd");} else {output.add("even");}return output;}});split.select("odd").print().setParallelism(1);}

Apache Flink 零基础入门(十六)Flink DataStream transformation相关推荐

  1. Apache Flink 零基础入门(六)Flink核心概念

    Flink程序是在分布式集合上实现转换的常规程序(例如filtering, mapping, updating state, joining, grouping, defining windows, ...

  2. Apache Flink 零基础入门【转】

    Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...

  3. Apache Flink 零基础入门(二十)Flink部署与作业的提交

    之前我们都是基于Idea在本地进行开发,这种方式很适合开发以及测试,但是开发完之后,如何提交到服务器中运行? Flink单机部署方式 本地开发和测试过程中非常有用,只要把代码放到服务器直接运行. 前置 ...

  4. Apache Flink 零基础入门(一):基础概念解析

    Apache Flink 的定义.架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行 ...

  5. Apache Flink 零基础入门(三)编写最简单的helloWorld

    实验环境 JDK 1.8 IDE Intellij idea Flink 1.8.1 实验内容 创建一个Flink简单Demo,可以从流数据中统计单词个数. 实验步骤 首先创建一个maven项目,其中 ...

  6. Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)

    数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFil ...

  7. Apache Flink 零基础入门(二十)Flink kafka connector

    内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...

  8. Apache Flink 零基础入门(十九)Flink windows和Time操作

    Time类型 在Flink中常用的Time类型: 处理时间 摄取时间 事件时间 处理时间 是上图中,最后一步的处理时间,表示服务器中执行相关操作的处理时间.例如一些算子操作时间,在服务器上面的时间. ...

  9. Apache Flink 零基础入门(十八)Flink Table APISQL

    什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...

最新文章

  1. 作为程序员,你是否曾经想过写一本书?
  2. 【小米3使用经验】小米3联通版 miui7.2.11稳定版刷机
  3. 做计算机的小卫士教案,小卫士在行动小班教案
  4. 【小点点】上架了他们的官方Windows 8应用
  5. 苹果cms_影视双端源码_支持在线切换前端主题+安装教程文档
  6. HTML5: 两个viewport的故事(第一部分)
  7. MRC522(2):超简易门禁
  8. 08-Oracle基本概念
  9. 20191010:希尔排序代码详解
  10. AWS知识图谱大赛 -- 负面新闻影响股票基金预测系统架构设计文档
  11. 使用urllib,re,queue,threading,bs4,requests多线程队列爬取图片到本地保存
  12. 在linux下安装mplayer和解码器
  13. KT148A电子语音芯片ic方案适用的场景以及常见产品类型
  14. 日本被动元件是怎样称霸全球的
  15. MyEclipse 2014 破解失败,cracker.jar文件打开闪退
  16. 哪款蓝牙耳机戴着舒服?佩戴舒适度高的四款蓝牙耳机推荐
  17. 为你留存最美好的岁月——汉印CP4000L体验评测
  18. 导航定位用户向服务器发送位置请求6,跨平台的地理位置定位方法、平台及定位接入服务器...
  19. 运维体系框架标准化模型简介
  20. 计算机90学时培训小结,90学时培训总结

热门文章

  1. C#.NET编程----Spring.NET NHibernate整合
  2. linux中tcp连接内核参数调优somaxconn
  3. spyder编辑器报ModuleNotFoundError: No module named ‘pymongo‘,明明已经安装上了pymongo扩展
  4. linux国内计算机系统,计算机系统进化论 | Linux 中国
  5. sap 一代增强_在SAP标准实施中不起眼的“小”功能,居然融了3个亿
  6. 跨域 Cookie 实现单点登录
  7. golang插入字符串_golang 几种字符串的连接方式
  8. python异常处理关键字_Python中的异常处理
  9. msql安装(zip)
  10. ucla研究生计算机科学,详解UCLA研究生录取数据,达到什么标准才能稳被录?