Flink的Transformation转换主要包括四种:单数据流基本转换、基于Key的分组转换、多数据流转换和数据重分布转换。读者可以使用Flink Scala Shell或者Intellij Idea来进行练习:

  • Flink Scala Shell:使用交互式编程环境学习和调试Flink
  • Flink 01 | 十分钟搭建第一个Flink应用和本地集群
  • Flink算子使用方法及实例演示:map、filter和flatMap
  • Flink算子使用方法及实例演示:keyBy、reduce和aggregations

很多情况下,我们需要对多个数据流进行整合处理,Flink为我们提供了多流转换算子,本文主要介绍多流转换。

union

在DataStream上使用union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。下图union对白色和深色两个数据流进行合并,生成一个数据流。

union示意图

假设股票价格数据流来自不同的交易所,我们将其合并成一个数据流:

val shenzhenStockStream: DataStream[StockPrice] = ...val hongkongStockStream: DataStream[StockPrice] = ...val shanghaiStockStream: DataStream[StockPrice] = ...val unionStockStream: DataStream[StockPrice] = shenzhenStockStream.union(hongkongStockStream, shanghaiStockStream)

connect

union虽然可以合并多个数据流,但有一个限制,即多个数据流的数据类型必须相同。connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

  1. connect只能连接两个数据流,union可以连接多个数据流。
  2. connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
  3. 两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

connect经常被应用在对一个数据流使用另外一个流进行控制处理的场景上,如下图所示。控制流可以是阈值、规则、机器学习模型或其他参数。

对一个数据流进行控制处理

对于ConnectedStreams,我们需要重写CoMapFunction或CoFlatMapFunction。这两个接口都提供了三个泛型,这三个泛型分别对应第一个输入流的数据类型、第二个输入流的数据类型和输出流的数据类型。在重写函数时,对于CoMapFunction,map1处理第一个流的数据,map2处理第二个流的数据;对于CoFlatMapFunction,flatMap1处理第一个流的数据,flatMap2处理第二个流的数据。Flink并不能保证两个函数调用顺序,两个函数的调用依赖于两个数据流数据的流入先后顺序,即第一个数据流有数据到达时,map1或flatMap1会被调用,第二个数据流有数据到达时,map2或flatMap2会被调用。下面的代码对一个整数流和一个字符串流进行了connect操作。

val intStream: DataStream[Int] = senv.fromElements(1, 0, 9, 2, 3, 6)val stringStream: DataStream[String] = senv.fromElements("LOW", "HIGH", "LOW", "LOW")val connectedStream: ConnectedStreams[Int, String] = intStream.connect(stringStream)// CoMapFunction三个泛型分别对应第一个流的输入、第二个流的输入,map之后的输出class MyCoMapFunction extends CoMapFunction[Int, String, String] {  override def map1(input1: Int): String = input1.toString  override def map2(input2: String): String = input2}val mapResult = connectedStream.map(new MyCoMapFunction)

我们知道,如果不对DataStream按照Key进行分组,数据是随机分配在各个TaskSlot上的,而绝大多数情况我们是要对某个Key进行分析和处理,Flink允许我们将connect和keyBy或broadcast结合起来使用。例如,我们将之前的股票价格数据流与一个媒体评价数据流结合起来,按照股票代号进行分组。

// 先将两个流connect,再进行keyByval keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream  .connect(mediaStatusStream)  .keyBy(0,0)// 先keyBy再connectval keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0).connect(mediaStatusStream.keyBy(0))

无论先keyBy还是先connect,我们都可以将含有相同Key的数据转发到下游同一个算子实例上。这种操作有点像SQL中的join操作。Flink也提供了join算子,join主要在时间窗口维度上,connect相比而言更广义一些,关于join的介绍将在后续文章中介绍。

下面的代码展示了如何将股票价格和媒体正负面评价结合起来,当媒体评价为正且股票价格大于阈值时,输出一个正面信号。完整代码在我的github上:https://github.com/luweizheng/flink-tutorials

package com.flink.tutorials.demos.stockimport java.util.Calendarimport com.flink.tutorials.demos.stock.StockPriceDemo.{StockPrice, StockPriceSource, StockPriceTimeAssigner}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunctionimport org.apache.flink.streaming.api.functions.source.RichSourceFunctionimport org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContextimport org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collectorimport scala.util.Randomobject StockMediaConnectedDemo {  def main(args: Array[String]) {    // 设置执行环境    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    // 每5秒生成一个Watermark    env.getConfig.setAutoWatermarkInterval(5000L)    // 股票价格数据流    val stockPriceRawStream: DataStream[StockPrice] = env      // 该数据流由StockPriceSource类随机生成      .addSource(new StockPriceSource)      // 设置 Timestamp 和 Watermark      .assignTimestampsAndWatermarks(new StockPriceTimeAssigner)    val mediaStatusStream: DataStream[Media] = env      .addSource(new MediaSource)    // 先将两个流connect,再进行keyBy    val keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream      .connect(mediaStatusStream)      .keyBy(0,0)    // 先keyBy再connect    val keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0)      .connect(mediaStatusStream.keyBy(0))    val alert1 = keyByConnect1.flatMap(new AlertFlatMap).print()    val alerts2 = keyByConnect2.flatMap(new AlertFlatMap).print()    // 执行程序    env.execute("connect stock price with media status")  }  /** 媒体评价    *    * symbol 股票代号    * timestamp 时间戳    * status 评价 正面/一般/负面    */  case class Media(symbol: String, timestamp: Long, status: String)  class MediaSource extends RichSourceFunction[Media]{    var isRunning: Boolean = true    val rand = new Random()    var stockId = 0    override def run(srcCtx: SourceContext[Media]): Unit = {      while (isRunning) {        // 每次从列表中随机选择一只股票        stockId = rand.nextInt(5)        var status: String = "NORMAL"        if (rand.nextGaussian() > 0.9) {          status = "POSITIVE"        } else if (rand.nextGaussian() < 0.05) {          status = "NEGATIVE"        }        val curTime = Calendar.getInstance.getTimeInMillis        srcCtx.collect(Media(stockId.toString, curTime, status))        Thread.sleep(rand.nextInt(100))      }    }    override def cancel(): Unit = {      isRunning = false    }  }  case class Alert(symbol: String, timestamp: Long, alert: String)  class AlertFlatMap extends RichCoFlatMapFunction[StockPrice, Media, Alert] {    var priceMaxThreshold: List[Double] = List(101.0d, 201.0d, 301.0d, 401.0d, 501.0d)    var mediaLevel: String = "NORMAL"    override def flatMap1(stock: StockPrice, collector: Collector[Alert]) : Unit = {      val stockId = stock.symbol.toInt      if ("POSITIVE".equals(mediaLevel) && stock.price > priceMaxThreshold(stockId)) {        collector.collect(Alert(stock.symbol, stock.timestamp, "POSITIVE"))      }    }    override def flatMap2(media: Media, collector: Collector[Alert]): Unit = {      mediaLevel = media.status    }  }}

c#中connect函数_Flink算子使用方法及实例演示:union和connect相关推荐

  1. matlab中fprintf函数的具体使用方法

    matlab中fprintf函数的具体使用方法实例如下: fprintf函数可以将数据按指定格式写入到文本文件中.其调用格式为: 数据的格式化输出:fprintf(fid, format, varia ...

  2. Oracle中wm_concat函数报错解决方法

    Oracle中wm_concat函数报错解决方法 参考文章: (1)Oracle中wm_concat函数报错解决方法 (2)https://www.cnblogs.com/52net/archive/ ...

  3. python中bool函数用法_在python中bool函数的取值方法

    bool是Boolean的缩写,只有真(True)和假(False)两种取值 bool函数只有一个参数,并根据这个参数的值返回真或者假. 1.当对数字使用bool函数时,0返回假(False),任何其 ...

  4. php fopen函数 返回值,php中fopen函数失败的解决方法

    php中fopen函数失败的解决方法 发布时间:2021-03-29 12:26:30 来源:亿速云 阅读:86 作者:小新 这篇文章将为大家详细讲解有关php中fopen函数失败的解决方法,小编觉得 ...

  5. python中def什么时候用_Python中定义函数def的使用方法

    Python中定义函数def的使用方法 发布时间:2020-12-15 09:26:24 来源:亿速云 阅读:71 作者:小新 这篇文章给大家分享的是有关Python中定义函数def的使用方法的内容. ...

  6. c语言isnumber函数的使用方法,Excel中isnumber函数功能的使用方法

    你还在为Excel中isnumber函数的使用方法而苦恼吗,今天,学习啦小编就教大家在Excel中isnumber函数功能的使用方法,让你告别Excel中isnumber函数的使用方法的烦恼. Exc ...

  7. surprise库中evaluate函数弃用解决方法

    surprise库中evaluate函数弃用解决方法 代码:在数据集上测试效果 evaluate(self.svd, data, measures=['RMSE', 'MAE']) 运行报错:Impo ...

  8. mysql数据库中count的作用_详解 MySQL中count函数的正确使用方法

    1. 描述 在MySQL中,当我们需要获取某张表中的总行数时,一般会选择使用下面的语句 select count(*) from table; 其实count函数中除了*还可以放其他参数,比如常数.主 ...

  9. laravel5.5 __construct函数 无法使用session() 解决方法和实例

    laravel5.5 __construct函数 无法使用session() 解决方法和实例 参考文章: (1)laravel5.5 __construct函数 无法使用session() 解决方法和 ...

最新文章

  1. docker如何push镜像到docker hub个人的仓库
  2. 3天我把DDD业务领域建模、数据库、聚合彻底撸干净了!
  3. python教程笔记(详细)
  4. ES6里的修饰器Decorator
  5. Docker学习总结(6)——通过 Docker 化一个博客网站来开启我们的 Docker 之旅
  6. 论文笔记:MobileFaceNet
  7. FPGA 状态机设计
  8. 构建之法阅读笔记之三
  9. 数据结构:实验四 图的遍历
  10. Cocos Creator 自制小工具-小游戏场景地图编辑器
  11. 「 数学模型 」“使用SPSS软件线性回归分析”实例
  12. 【小程序按钮控制视频播放暂停】
  13. 抖音怎么用计算机数字等于中文字,抖音数字表白公式怎么玩 抖音数字表白从1到100攻略...
  14. Asp.Net Core3.1-集成Hangfire
  15. es6之扩展运算符 Object.assign和 三个点(...)
  16. MFC(C++)实现斑马标签打印机打印
  17. 解决Ubuntu更新后无线网卡不能使用的问题
  18. 母函数 By Tanky Woo
  19. 2021-07-17【普及组】模拟赛C组
  20. R语言简单的频数统计图

热门文章

  1. 任何项目都适用的CMakeLists配置
  2. JQuery Datatables 样式Style
  3. windows下使用Caffe框架和matlab实现SRCNN官方代码的步骤
  4. windows安装双JDK并实现版本切换
  5. linux coreutils升级,Coreutils
  6. 如何更改电脑ip地址租期_局域网通过IP地址如何找到电脑的位置
  7. java中数组中添加新元素,求解!!!往ArrayList数组里添加元素,貌似没加进去。...
  8. 如何用java对接口发送请求_7. 用Java做接口测试1-发送HTTP请求和接收HTTP响应
  9. python在末尾加关闭程序_廖雪峰的Python教程教程-02
  10. ftp服务器复制粘贴文件夹,ftp服务器复制粘贴文件夹