Operators(运算符)

    • DataStream Transformations (数据流的转换)
    • DataStream → DataStream
    • DataStream* → DataStream
    • DataStream,DataStream → ConnectedStreams(连接流)
    • ConnectedStreams → DataStream
    • DataStream → SplitStream(分流)
    • SplitStream → DataStream
    • DataStream → KeyedStream
    • KeyedStream → DataStream
  • Physical partitioning (物理分区)
    • **Rebalancing (Round-robin partitioning):** 重新平衡
    • **Random partitioning** 随机划分
    • **Rescaling** 改变大小
    • **Broadcasting** 广播
    • **Custom partitioning** 自定义分区
  • Task chaining and resource groups(任务链接和资源组)

DataStream Transformations (数据流的转换)

DataStream → DataStream

Map
Takes one element and produces one element. A map function that doubles the values of the input stream:
获取一个元素并生成一个元素。一个映射函数,使输入流的值加倍:

dataStream.map { x => x * 2 }

FlatMap
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences towords:
获取一个元素并生成零个、一个或多个元素。一个降维功能,将句子分割成单词:

dataStream.flatMap { str => str.split(" ") }

Filter
Evaluates a boolean function for each element and retains those for which the function returns true. A filterthat filters out zero values:
对每个元素求布尔函数的值,并保留函数返回true的元素。过滤掉零值的过滤器:

dataStream.filter { _ != 0 }

DataStream* → DataStream

Union
Union of two or more data streams creating a new stream containing all the elements from all the streams.Note: If you union a data stream with itself you will get each element twice in the resulting stream.
两个或多个数据流的并集,创建包含来自所有流的所有元素的新流。注意:如果您将一个数据流与它自己相结合,您将得到结果流中的每个元素两次。

dataStream.union(otherStream1, otherStream2, …)

DataStream,DataStream → ConnectedStreams(连接流)

connect
“Connects” two data streams retaining their types, allowing for shared state between the two streams
“连接”两个数据流,保持它们的类型,允许在两个流之间共享状态

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)

ConnectedStreams → DataStream

CoMap, CoFlatMap
Similar to map and flatMap on a connected data stream
类似于连接数据流上的map和 flatMap

connectedStreams.map((_ : Int) => true,(_ : String) => false
)
connectedStreams.flatMap((_ : Int) => true,(_ : String) => false
)
 val env = StreamExecutionEnvironment.getExecutionEnvironmentval text1 = env.socketTextStream("CentOS", 9999)val text2 = env.socketTextStream("CentOS", 8888)text1.connect(text2).flatMap((line:String)=>line.split("\\s+"),(line:String)=>line.split("\\s+")).map((_,1)).keyBy(0).sum(1).print("总数")env.execute("Stream WordCount")

DataStream → SplitStream(分流)

Split
Split the stream into two or more streams according to some criterion.
根据某些标准将流分成两个或多个流。

val split = someDataStream.split((num: Int) =>(num % 2) match {case 0 => List("even")case 1 => List("odd")}
)

SplitStream → DataStream

Select
Select one or more streams from a split stream.
从拆分流中选择一个或多个流。

val even = split.select("even")
val odd = split.select("odd")
val all = split.select("even","odd")
val env = StreamExecutionEnvironment.getExecutionEnvironmentval text1 = env.socketTextStream("CentOS", 9999)var splitStream= text1.split(line=> {if(line.contains("error")){List("error")} else{List("info")}})splitStream.select("error").printToErr("错误")splitStream.select("info").print("信息")splitStream.select("error","info").print("All")env.execute("Stream WordCount")

PrcoessFunction
⼀般来说,更多使⽤PrcoessFunctio完成流的分⽀

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject Operators {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval text = env.readTextFile("hdfs://SparkTwo:9000/demo/words")val errorTag = new OutputTag[String]("a")val allTag = new OutputTag[String]("all")val unit = text.process(new ProcessFunction[String, String] {override def processElement(value: String,ctx: ProcessFunction[String, String]#Context,out: Collector[String]): Unit = {if (value.contains("a")) {ctx.output(errorTag, value)} else {out.collect(value)}ctx.output(allTag, value)}})unit.getSideOutput(errorTag).print("A=")unit.getSideOutput(allTag).print("ALL=")unit.print("正常=")env.execute("Stream WordCount")}
}

输出结果:

DataStream → KeyedStream

KeyBy
Logically partitions a stream into disjoint partitions, each partition containing elements of the same key.Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedStream.
逻辑上将一个流划分为不相连的分区,每个分区包含相同键的元素。
在内部,这是通过哈希分区实现的。有关如何指定keys,请参阅keys。这个转换返回一个KeyedStream。

dataStream.keyBy("someKey") // 以字段 "someKey" 为键
dataStream.keyBy(0) // 由元组的第一个元素作为键

KeyedStream → DataStream

Reduce
A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
键控数据流上的“滚动”减少。将当前元素与最后一个减少的值组合在一起,并发出新的值。
A reduce function that creates a stream of partial sums:
创建部分流的和的reduce函数:

keyedStream.reduce(_ + _)

 lines.flatMap(_.split("\\s+")).map((_,1)).keyBy("_1").reduce((v1,v2)=>(v1._1,v1._2+v2._2)).print()

Fold
A “rolling” fold on a keyed data stream with an initial value. Combines the current element with the lastfolded value and emits the new value.
具有初始值的键控数据流上的“滚动”折叠。将当前元素与上一个折叠值组合并发出新值。
A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence “start-1”, “start-1-2”,“start-1-2-3”, …
当作用于序列(1,2,3,4,5)时,发出序列“start-1”、“start-1-2”、“start-1-2”、“start-1- 3”、…

val result: DataStream[String] =
keyedStream.fold(“start”)((str, i) => { str + “-” + i })

 text.flatMap(_.split("\\s+")).map((_,1)).keyBy("_1").fold((null:String,0:Int))((z,v)=>(v._1,v._2+z._2)).print()

Aggregations
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns theminimum value, whereas minBy returns the element that has the minimum value in this field (same for
max and maxBy).
在键控数据流上滚动聚合。min和minBy之间的区别是,min返回最小值,而minBy返回该字段中最小值的元素(对于max和maxBy和它们类似)。

keyedStream.sum(0)
keyedStream.sum(“key”)
keyedStream.min(0)
keyedStream.min(“key”)
keyedStream.max(0)
keyedStream.max(“key”)
keyedStream.minBy(0)
keyedStream.minBy(“key”)
keyedStream.maxBy(0)
keyedStream.maxBy(“key”)

 val env = StreamExecutionEnvironment.getExecutionEnvironment//zhangsan 研发部 1000//lisi 研发部 5000//ww 销售部 9000val lines = env.socketTextStream("CentOS", 9999)lines.map(line=>line.split(" ")).map(ts=>Emp(ts(0),ts(1),ts(2).toDouble)).keyBy("dept").maxBy("salary")//Emp(lisi,研发部,5000.0).print()env.execute("Stream WordCount")

如果使⽤时max,则返回的是Emp(zhangsan,研发部,5000.0)

Physical partitioning (物理分区)

Flink还通过以下function对转换后的DataStream进⾏分区(如果需要)。

Rebalancing (Round-robin partitioning): 重新平衡

分区元素轮循,从⽽为每个分区创建相等的负载。在存在数据偏斜的情况下对性能优化有⽤

dataStream.rebalance()

Random partitioning 随机划分

根据均匀分布对元素进⾏随机划分。

dataStream.shuffle()

Rescaling 改变大小

和Roundrobin Partitioning⼀样,Rescaling Partitioning也是⼀种通过循环的⽅式进⾏数据重平衡的分区策略。但是不同的是,当使⽤Roundrobin Partitioning时,数据会全局性地通过⽹络介质传输到其他的节点完成数据的重新平衡,⽽Rescaling Partitioning仅仅会对上下游继承的算⼦数据进⾏重平衡,具体的分区主要根据上下游算⼦的并⾏度决定。例如上游算⼦的并发度为2,下游算⼦的并发度为4,就会发⽣上游算⼦中⼀个分区的数据按照同等⽐例将数据路由在下游的固定的两个分区中,另外⼀个分区同理路由到下游两个分区中。

dataStream.rescale()

Broadcasting 广播

Broadcasts elements to every partition.
向每个分区广播元素。

dataStream.broadcast

Custom partitioning 自定义分区

Selects a subset of fields from the tuples
从元组中选择字段的子集

dataStream.partitionCustom(partitioner, “someKey”)
dataStream.partitionCustom(partitioner, 0)

 val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.socketTextStream("CentOS", 9999).map((_,1)).partitionCustom(new Partitioner[String] {override def partition(key: String, numPartitions: Int): Int = {key.hashCode & Integer.MAX_VALUE % numPartitions}},_._1).print().setParallelism(4)println(env.getExecutionPlan)env.execute("Stream WordCount")

Task chaining and resource groups(任务链接和资源组)

对两个算⼦操作进⾏Chain(连接),意味着将这两个算⼦放置于⼀个线程中,这样是为了节省没必要的线程开销,提升性能。如果可能的话,默认情况下Flink会链接运算符。
用户是可以调⽤:

StreamExecutionEnvironment.disableOperatorChaining()

禁⽤chain⾏为,但是不推荐。
startNewChain

someStream.filter(…).map(…).startNewChain().map(…)
将第⼀个map算⼦和filter算⼦进⾏隔离

disableChaining

someStream.map(…).disableChaining()
所有操作符禁⽌和map操作符进⾏chain

slotSharingGroup
设置操作的slot共享组。 Flink会将具有相同slot共享组的operator放在同⼀个Task slot中,同时将没有slot共享组的operator保留在其他Task slot中。这可以⽤来隔离Task Slot。下游的操作符会⾃动继承上游资源组。默认情况下,所有的输⼊算⼦的资源组的名字是 default ,因此当⽤户不对程序进⾏资源划分的情况下,⼀个job所需的资源slot,就等于最⼤并⾏度的Task。

someStream.filter(…).slotSharingGroup(“name”)

Flink快速回忆之Operators(运算符)相关推荐

  1. Flink快速应用(批、流一体)简单实现

    Flink快速应用 Flink批处理:统计一个文件中各个单词出现的次数,把统计结果输出到文件. 新建maven项目 引入依赖 <dependency><groupId>org. ...

  2. 如何基于 Apache Doris 与 Apache Flink 快速构建极速易用的实时数仓

    随着大数据应用的不断深入,企业不再满足离线数据加工计算的时效,实时数据需求已成为数据应用新常态.伴随着实时分析需求的不断膨胀,传统的数据架构面临的成本高.实时性无法保证.组件繁冗.运维难度高等问题日益 ...

  3. SQL语句快速回忆——SQL基础知识点汇总

    文章目录 DDL 建表 约束 已经建了表要添加约束 取消主键PRIMARY约束 外键 FOREIGN KEY 建表时加外键(表2必须已经创建,并且建表时列名已经定义) 已有表添加外键约束 删除外键约束 ...

  4. 如何快速记忆C语言运算符?

    扫描二维码可进入本文的学习视频 学习第一门编程语言的时候,你很可能是读过一本书,输入了你不太懂的代码,然后试图弄懂它们的原理.我写的其他书大多是这个样子,这对初学者非常有效.初学的时候,对于有一些复杂 ...

  5. Flink学习笔记:Operators之CoGroup及Join操作

    本文为<Flink大数据项目实战>学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程: Flink大数据项目实战:http://t.cn/EJtKhaz ...

  6. 【C语言运算符大全】快速学会C语言运算符

    目    录(本篇字数:2120) 介绍 运算符分类 算术运算符 逻辑运算符 关系运算符 位运算符 赋值运算符 经典问题:a = 5,b = 8,如何交换两个变量的值? 介绍 说到运算符,从小我们就学 ...

  7. c++operators(运算符)

    C++运算符 运算符 赋值运算符(=) 算术运算符(+,-,*,/,%) 增量和减量(++,-) 关系运算符和比较运算符(==,!=,>,<,> =,<=) 逻辑运算符(!,& ...

  8. 如何快速记忆C语言运算符,C语言运算符快速记忆法

    =====运算符的优先级与结合律(从高到低排列)===== 优先级 名称 符号 结合性 1 数组下标 [] 左结合性 1 函数调用 () 左结合性 1 结构和联合的成员. -> 左结合性 1 自 ...

  9. 无框架,简单maven webapp骨架跑一个项目 (快速回忆基本使用及其流程用)

    文章目录 0.先建数据库 一.建立IDEA基本环境 1.1 新建module 1.2 选择maven->webapp骨架 1.3 改名称和包名 1.4 第一次要修改maven仓库配置 1.5 f ...

  10. Flink快速入门教程

    Apache Flink 是以高效.可扩展方式处理海量数据的大数据处理框架.本文介绍它的一些核心概念,以及标准数据转换Java版本api,这些API以流畅的方式可以很容易使用Flink的核心数据结构- ...

最新文章

  1. java获取手机号码归属地_手机号码归属地能否取消?这些热点问题,工信部回复了...
  2. centOs7上用yum安装软件报错
  3. 谈协同软件实施特点与过程
  4. PTGAN:针对行人重识别的生成对抗网络 | PaperDaily #36
  5. 210307共享内存的读写
  6. 《Java程序设计》 第五周学习总结
  7. Java中HashMap遍历的两种方式
  8. 小程序 const moment = require('moment')_C++大作业-XXX管理程序
  9. 我对Linux输入输出重定向的小结
  10. DEV控件:gridControl常用属性设置
  11. Maven生命周期——2
  12. PCL之鼠标拾取点云的三维坐标
  13. Kubernetes学习笔记(一):Kubernetes-1.7.x 创建TLS证书和秘钥
  14. VMware Workstation 10.0.7 安装
  15. 复旦-华盛顿大学EMBA科创的奥E丨《神奇的材料》与被塑造的我们
  16. Minecraft Mod 开发:目录
  17. html中input type什么意思,HTML中type是什么意思
  18. ANN to SNN
  19. jsp清真餐馆订餐系统
  20. 英文ppt怎么翻译成中文?教你几种ppt翻译方法

热门文章

  1. 微信小程序--瀑布流布局
  2. ACM-ICPC 2018 北京赛区网络预赛 B Tomb Raider(二进制枚举暴力)
  3. 面试被问到【未来3-5年的职业规划】,到底该怎么回答?
  4. 可靠性测试项目之可靠性试验
  5. 推荐一些国外前端的学习网站
  6. 张锋因何错过诺贝尔化学奖?
  7. 我爱 Ruby 的三十七个理由
  8. knn.predict()报错 Expected 2D array, got 1D array instead: array=[18 90]. Reshape your data either usi
  9. [原创]VC成功实现重启路由器(完整源码)
  10. 手机status500_HTTP Status 500 - 是什么意思