文章目录

  • 说明
  • Transformation 数据处理
    • SingleDataStream
      • Map
      • FlatMap
      • Filter
      • KeyBy
      • Reduce
      • Aggregations
    • MultiDataStream
      • Unio
      • Connect,CoMap,CoflatMap
      • split
      • Select
      • Iterate
    • 物理分区
      • 随机分区(Random Partitioning)
      • 平衡分区(Roundrobin Partitioning)
      • Rescaling partitioning
      • 广播操作
      • 自定义分区
  • 总结

说明

  • 本博客每周五更新一次。
  • 本博客主要讲解transformation部分功能说明,这部分主要是flink计算引擎的算子,基于这些算子上层有高度抽象的flink table和flink sql。

Transformation 数据处理

  • 将一个或多个DataStream生成新的DataStream的过程被称为Transformation。转换过程中,每种操作类型被定义为不同的Operator,Flink能将多个Transformation组合为一个DataFlow的拓扑。
  • 所以DataStream的转换操作可以分为SingleDataStream、MultiDataStream、物理分区三个类型。
    • SingleDataStream:单个DataStream的处理逻辑。
    • MultiDataStream:多个DataStream的处理逻辑。
    • 物理分区:对数据集中的并行度和数据分区调整转换的处理逻辑。

SingleDataStream

Map

  • 常用作对数据集内数据的清晰和转换。如将输入数据的每个数值全部加1,并将数据输出到下游。
val dataStream = evn.formElements(("a",3),("d",4),("c",4),("c",5),("a",5))
//方法一
val mapStream:DataStream[(String,Int)] = dataStream.map(t => (t._1,t._2+1))
//方法二
val mapStream:DataStream[(String,Int)] = dataStream.map( new MapFunction[(String,Int),(String, Int)]{override def map(t: (String,Int)): (String,Int) ={(t._1, t._2+1)}
})

FlatMap

  • 主要应用于处理输入一个元素转换为多个元素场景,如WordCount,将没行文本数据分割,生成单词序列。
val dataStream:DataStream[String] = environment.fromCollections()
val resultStream[String] =dataStream.flatMap{str => str.split(" ")}

Filter

  • 按条件对输入数据集进行筛选,输出符合条件的数据。
//通配符
val filter:DataStream[Int] = dataStream.filter{ _ %2 == 0}
//运算表达式
val filter:DataStream[Int] = dataStream.filter { x => x % 2 ==0}

KeyBy

  • 根据指定的key对输入的数据集执行Partition操作,将相同的key值的数据放置到相同的区域中。
  • 将下标为1相同的数据放到一个分区
val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3))
//指定第一个字段为分区key
val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0)

Reduce

  • 与MapReduce中reduce原理基本一致,将输入的KeyedStream通过传入用户自定义的ReduceFunction滚动进行数据聚合处理,定义的ReduceFunction必须满足运算结合律和交换律。
val dataStream = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
//指定第一个字段为分区key
val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0)
//实现一:滚动第二个字段进行reduce相加求和
val reduceStream = keyedStream.reduce{(t1,t2) => (t1._1, t1._2+t2._2)}
//实现二:实现ReduceFunction
val reduceStream1 = keyedStream.reduce(new ReduceFunction[(String, Int)] {override def reduce(t1: (String,Int), t2:(String,Int)):(String, int) = {(t1._1, t1._2+ t2._2)}
})
  • 运行结果为:(c,2)(c,7)(a,3)(d,4)(a,8),结果不是最后求和的值,是将每条记录累加后的结果输出。

Aggregations

  • DataStream提供的聚合算子,根据指定的字段进行聚合操作,滚动产生一系列数据聚合结果。实际是将Reduce算子中函数进行封装,封装的聚合操作有sum、min、minBy、max、maxBy等。这样就不需要用户自己定义Reduce函数。
val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3))
//指定第一个字段为分区key
val keyedStream: KeyedStream[(String,Int),Tuple]=dataSteam.keyBy(0)
//对第二个字段进行sum统计
val sumStream: DataStream[(Int,Int)] = keyedStream.sum(1)
//输出统计结果
sumStream.print()
  • 聚合函数中传入参数必须数值型,否则会抛出异常。
//统计计算指定key最小值
val minStream: DataStream[(Int,Int)] = keyedStream.min(1)
//统计计算指定key最大值
val maxStream: DataStream[(Int,Int)] = keyedStream.max(1)
//统计计算指定key最小值,返回最小值对应元素
val minByStream: DataStream[(Int,Int)] = keyedStream.minBy(1)
//统计计算指定key最大值,返回最大值对应元素
val maxByStream: DataStream[(Int,Int)] = keyedStream.maxBy(1)

MultiDataStream

Unio

  • 将两个或多个输入的数据集合并为一个数据集,需要保证输入待合并数据集和输出数据集格式一致。
//创建不同数据集
val dataStream1: DataStream [(String ,Int)]= env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
val dataStream2: DataStream [(String ,Int)]= env.fromElements(("d",1),("s",2),("a",4),("e",5),("a",6))
val dataStream3: DataStream [(String ,Int)]= env.fromElements(("a",2),("d",1),("s",2),("c",3),("b",1))
//合并两个数据集
val unionStream = dataStream1.union(dataStream2)
//合并多个数据集
val allUnionStream = dataStream1.union(dataStream2,dataStream3)

Connect,CoMap,CoflatMap

  • 该算子为了合并两种或多种不同类型的数据集,合并后会保留原始数据集的数类型。连接操作允许共享状态数据,也就是说在多个数据集之间可以操作和查看对方数据集的状态。
  • 实例:dataStream1数据集为(String,Int)元祖类型,dataStream2数据集为Int类型,通过connect连接将两种类型数据结合在一起,形成格式为ConnectedStream是的数据集,其内部数据为[(String,Int),Int]的混合数据类型,保留两个数据集的数据类型。
val dataStream1: DataStream [(String ,Int)]= env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
val dataStream2: DataStream [Int]= env.fromElements(1,2,4,5,6)
//连接两个数据集
val connectedStream :ConnectedStreams[(String, Int), Int] = dataStream1.connect(dataStream2)
  • 注意:ConnectedStreams类型的数据集不能进行类似Print()操作,需转换为DataStream类型数据集。

  • ConnectedStreams提供map()和flatMap()需要定义CoMapFunction或CoFlatMapFunction分别处理输入的DataStream数据集,或直接传入MapFunction来分别处理两个数据集。

  • map()实例如下:

val resultStream = connectedStream.map(new CoMapFunction[(String,Int),Int,(Int, String)]{//定义第一个数据集函数处理逻辑,输入值为第一个DataStreamoverride def map1(in1: (String,Int)): (Int ,String) = {(int1._2 , in1._1)}//定义第二个数据集函数处理逻辑override def amp2(in2: Int):(Int,String) = {(int2,"default")}})
  • 以上实例中,两个函数会多线程交替执行产生结果,最后根据定义生成目标数据集。

  • flatMap()方法中指定CoFlatMapFunction。两个函数共享number变量,代码如下:

val resultStream2 = connectedStream.flatMap(new CoFlatMapFunction[(String,Int), Int ,(String ,Int , Int)]{//定义共享变量var number=0//定义第一个数据集处理函数override def flatMap1(in1:(String ,Int ), collector : Collector[(String,Int ,Int)]): Unit = {collector.collect((in1._1,in1._2,number))}//定义第二个数据集处理函数override def flatMap2(in2: Int, collector : Collector[(String , Int ,Int)]):Unit = {number=in2}
})
  • 如果想通过指定的条件对两个数据集进行关联,可以借助keyBy韩硕或broadcast广播变量实现。keyBy会将相同key的数据路由在同一个Operator中。broadcast会在执行计算逻辑前,将DataStream2数据集广播到所有并行计算的Operator中,再根据条件对数据集进行关联。这两种方式本质是分布式join算子的基本实现方式。
//通过keyby函数根据指定的key连接两个数据集
val keyedConnect: ConnectedStreams[(String ,Int ), Int] = dataStream1.connect(dataStream2).keyBy(1,0)
//通过broadcast关联两个数据集
val broadcastConnect: BroadcastConnectedStream [(String, Int), Int] = dataStream1.connect(dataStream2.broadcast())

split

  • 将一个DataStream数据集按条件进行拆分,形成两个数据集的过程,union的逆向操作。实例:如调用split函数,指定条件判断,根据第二个字段的奇偶性将数据集标记出来,偶数标记为event,奇数标记为odd,再通过集合将标记返回,最终生成SplitStream数据集。
//创建数据集
val DataStream1: DataStream[(String, Int)] = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
//合并连个DataStream数据集
val splitedStream : SplitStream[(String,Int)] = dataStream1.split(t => if(t._2 % 2 ==0 ) Seq("even") else Seq("odd"))

Select

  • split函数只是标记数据,没有拆分数据,因此需要select函数根据标记将数据切分为不同数据集。
//筛选出偶数数据集
val evenStream: DataStream[(String,Int)] = splitedStream.select("even")
//筛选出奇数数据集
val oddStream: DataStream[(String,Int)] = splitedStream.select("odd")
//筛选出偶数和奇数数据集
val allStream: DataStream[(String,Int)] = splitedStream.select("even","odd")

Iterate

  • Iterate适合于迭代计算,通过每一次的迭代计算,并将计算结果反馈到下一次迭代计算中。
//创建数据集,map处理为对数据分区根据默认并行度进行平衡
val DataStream = env.fromElements(3,1,2,1,5).map{ t:Int => t}val iterated = dataStream.iterate((input: ConnectedStreams[Int , String]) => {//定义两个map处理数据集,第一个map反馈操作,第二个map将数据输出到下游val head= input.map(i => (i+1).toString, s => s) (head.filter( _ == "2"), head.filter (_ != "2"))
},1000)  //超过1000ms没有数据接入终止迭代

物理分区

  • 根据指定的分区策略将数据重新分发到不同节点的Task实例上执行,以此优化DataStream自身API对数据的分区控制。

随机分区(Random Partitioning)

  • 随机将数据集中数据分配到下游算子的每个分区中,优点数据相对均衡,缺点失去原有数据的分区结构
val shuffleStream=dataStream.shuffle

平衡分区(Roundrobin Partitioning)

  • 循环将数据集中数据进行重分区,能尽可能保证每个分区的数据平衡,可有效解决数据集的倾斜问题。
val shuffleStream= dataStream.rebalance();

Rescaling partitioning

  • 一种通过循环方式进行数据重平衡的分区策略,与Roundrobin Partitioning不同,它仅会对上下游继承的算子数据进行重新平衡,具体主要根据上下游算子的并行度决定。如上游算子的并发度为2,下游算子的并发度为4,上游算子中第一个分区数据按照同等比例将数据路由在下游的固定两个分区中,另一个分区也是一样。
//通过调用DataStream API中rescale()方法实现Rescaling Partitioning操作
val shuffleStream = dataStream.rescale();

广播操作

  • 将输入的数据集复制到下游算子的并行的Tasks实例中,下游算子Tasks可直接从本地内存中获取广播数据集,不再依赖网络传输。
  • 这种分区策略适合于小集群,如大数据集关联小数据集时,可通过广播方式将小数据分发到算子的分区中。
//通过DataStream API的broadcast() 方法实现广播分区
val shuffleStream= dataStream.broadcast()

自定义分区

  • 实现自定义分区器,调用DataStream API上的partitionCustom()方法将创建的分区器应用到数据集上。
  • 如下,自定义分区器实现将字段中包含flink关键字的数据放在partition为0的分区中,其余数据执行随机分区策略,其中num Partitions是从系统中获取的并行度参数。
Object customPartitioner extends Partitioner[String]{//获取随机数生成器val r=scala.util.Randomoverride def partition(key: String, numPartitions: Int): Int ={//定义分区策略,key中如果包含a则放入0分区中,其他情况则根据Partitions num随机分区if(key.contains("flink")) 0 else r.nextInt(numPartitions)}
}
  • 完成自定义分区器,调用DataStream API的partitionCustom应用分区器,第二个参数指定分区器使用到的字段,对于Tuple类型数据,分区字段可以通过字段名称指定,其他类型数据集则通过位置索引指定。
//通过数据集字段名称指定分区字段
dataStream.partitionCustom(customPartitioner,"filed_name");
//通过数据集字段索引指定分区字段
dataStream.partitionCustom(customPartitioner,0)

总结

  • Transformation部分算子功能以接口方式编写具体功能,java代码使用lambda表达式可简化代码,但必须通过returns()指定返回数据结构,个人建议跟喜欢非lambda方式实现。
  • 明天五一,节日快乐。

flink(三):数据处理Transformation相关推荐

  1. flink大数据处理流式计算详解

    flink大数据处理 文章目录 flink大数据处理 二.WebUI可视化界面(测试用) 三.Flink部署 3.1 JobManager 3.2 TaskManager 3.3 并行度的调整配置 3 ...

  2. flink 三种时间机制_Flink1.10入门:时间机制简介

    一.概述 上篇文章介绍了Window窗口机制的相关知识,这里我们介绍下Flink的另外一个核心概念"Event Time机制",本篇文章只介绍相关概念不讲实战,实战会结合Windo ...

  3. Flink实时数据处理实践经验(Flink去重、维表关联、定时器、双流join)

    Flink实时数据处理实践经验 文章目录 Flink实时数据处理实践经验 1. 数据输入与预处理 2. 实时数据处理 3. 实时数仓架构 4. 优化方案 Java.大数据开发学习要点(持续更新中-) ...

  4. Flink 流数据处理

    序言 基于官网教程整理的一个教程.基于Flink1.12.0版本. 目前该版本的Flink支持的source与sink如下所示 参考资料: https://ci.apache.org/projects ...

  5. flink 三种时间机制_Flink的时间与watermarks详解

    当我们在使用Flink的时候,避免不了要和时间(time).水位线(watermarks)打交道,理解这些概念是开发分布式流处理应用的基础.那么Flink支持哪些时间语义?Flink是如何处理乱序事件 ...

  6. flink 三种时间机制_Flink时间系列:Event Time下如何处理迟到数据

    Event Time语义下我们使用Watermark来判断数据是否迟到.一个迟到元素是指元素到达窗口算子时,该元素本该被分配到某个窗口,但由于延迟,窗口已经触发计算.目前Flink有三种处理迟到数据的 ...

  7. flink 三种时间机制_360深度实践:Flink 与 Storm 协议级对比

    本文从数据传输和数据可靠性的角度出发,对比测试了 Storm 与 Flink 在流处理上的性能,并对测试结果进行分析,给出在使用 Flink 时提高性能的建议. Apache Storm.Apache ...

  8. flink 三种时间机制_Flink的时间类型和watermark机制

    一FlinkTime类型 有3类时间,分别是数据本身的产生时间.进入Flink系统的时间和被处理的时间,在Flink系统中的数据可以有三种时间属性: Event Time 是每条数据在其生产设备上发生 ...

  9. Flink学习笔记02:Flink三种运行模式

    文章目录 一.Local 模式 (一)简单说明 (二)案例演示 1.准备测试数据集 2.Flink Shell运行词频统计 二.Standalone 模式 (一)简单说明 (二)案例演示 三.Flin ...

  10. Grafana面板(panel):数据处理(Transformation)---一个奇怪的功能!

    文章目录 Transformation 概念 多个transformation的执行顺序 transformation类型 Add field from calculation Concatenate ...

最新文章

  1. string数组怎么定义
  2. HDU_Virtual Friends (并查集)
  3. Asp.net mvc中的Ajax处理
  4. 如何基于 Kubernetes 构建完整的 DevOps 流水线
  5. 网络——连接到server
  6. 数据拆分_数据拆分,偏方请拿好
  7. CentOS(八)--crontab命令的使用方法
  8. jfinal html5,Jfinal框架整合webSocket技术功能实现
  9. Sky公司的爱丁堡数据中心的能源效率纳入欧盟能效标准
  10. Volatile的实现原理(看这篇就够了)
  11. 使用yarn dev报错 vue-cli-service外部命令
  12. 基于 Retinex 的几种图像增强算法总结
  13. 简单WEB登录页面代码实现
  14. 困扰我两天的问题(nginx配置好ssl证书,https却不能访问)
  15. 极米H5搭载全新CCB流明,树电影色彩亮度新标杆
  16. 游戏出海Get,TikTok联手Zynga推出一款基于HTML5打造的手机游戏
  17. NBIOT BC26 opencpu物联网应用案例详细解析
  18. Android Studio如何连接手机设备
  19. None和nan、NaN、NAN
  20. cobol .cpy文件_Visual COBOL R3:“使传统的COBOL能够部署在JVM或.NET上”。

热门文章

  1. 国庆在家写了个简易版的在线简历网站
  2. 【离散数学】偏序集Hasse图的画法和重要元素
  3. html input 宽度自适应,CSS实现input宽度根据输入内容自适应
  4. 2019 / 3 /24 触摸屏键盘的功能实现
  5. Fibonacci法与黄金分割法
  6. LinuxCNC虚拟机环境搭建
  7. 集成学习【三】:Bagging结合神经网络及代码实现
  8. Unity(四十五):光照烘焙
  9. 斗鱼已公开的运维技术和架构分析
  10. Rapid SCADA中文使用说明书(一)