主要内容:

  • 结合应用场景,介绍Flink侧输出流的使用流程和原理

在处理数据的时候,有时候想对不同情况的数据进行不同的处理,那么就需要把数据流进行分流。可以在主数据流上产生出任意数量额外的侧输出流。

1 场景

某公司使用埋点组件收集到了埋点数据,并实时写入了Kafka。其中,埋点数据共分为三类:Web端埋点数据、移动端埋点数据和CS端埋点数据。现在需要从Kafka读取埋点数据,并分别对三端数据做不同的处理逻辑:

2 Side Output

当然使用 filter 对主数据流进行过滤,也能满足上述场景,但每次筛选过滤都要保留整个流,然后通过遍历整个流来获取相应的数据,显然很浪费性能。假如能够在一个流里面就进行多次输出就好了,恰好 Flink 的 Side Output 提供了这样的功能。Flink的Side Output侧输出流的作用在于将主数据分割成多个不同的侧输出流。侧输出结果流的数据类型不需要与主数据流的类型一致,不同侧输出流的类型也可以不同。

在上述场景中,可以使用Flink此功能:将Kafka的埋点数据进行分类,分为web端、mobile端和CS端三类,然后再对每类埋点数据进行相应的处理。

3 处理流程

3.1 定义OutputTag

在使用侧输出的时候需要先定义一个OutputTag,来标识 Side Output,代表这个 Tag 是要收集哪种类型的数据。这里定义了三个 OutputTag:

  • webTerminal:web端埋点数据
  • mobileTerminal:移动端埋点数据
  • csTerminal:CS端埋点数据
lazy val webTerminal: OutputTag[MdMsg] = new OutputTag[MdMsg]("Web端埋点数据")
lazy val mobileTerminal: OutputTag[MdMsg] = new OutputTag[MdMsg]("移动端埋点数据")
lazy val csTerminal: OutputTag[MdMsg] = new OutputTag[MdMsg]("CS端埋点数据")

3.2 使用特定的处理函数

要使用侧输出,在处理数据的时候除了要定义相应类型的OutputTag外,还要使用特定的函数,主要是有四个:

  • ProcessFunction
  • CoProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

该场景里选择使用ProcessFunction函数。该函数继承于RichFunction,使用时必须要重写processElement方法。自定义ProcessFunction函数:

class MdSplitProcessFunction extends ProcessFunction[MdMsg, MdMsg] {override def processElement(value: MdMsg, ctx: ProcessFunction[MdMsg, MdMsg]#Context, out: Collector[MdMsg]): Unit = {// webif (value.mdType == "web") {ctx.output(webTerminal, value)// mobile} else if (value.mdType == "mobile") {ctx.output(mobileTerminal, value)// cs} else if (value.mdType == "cs") {ctx.output(csTerminal, value)// others} else {out.collect(value)}}}

3.3 对每个侧输出流做处理

为每种类型的侧输出流添加处理逻辑,直接调用getSideOutput函数:(这里的处理逻辑进展示直接打印)

// Web端埋点数据流
outputStream.getSideOutput(webTerminal).print("web")
// Mobile端埋点数据流
outputStream.getSideOutput(mobileTerminal).print("mobile")
// CS端埋点数据流
outputStream.getSideOutput(csTerminal).print("cs")

4 代码

4.1 代码

package org.ourhome.streamapiimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector/*** @Author Do* @Date 2020/5/3 20:35*/
object SideOutputTest2 {lazy val webTerminal: OutputTag[MdMsg] = new OutputTag[MdMsg]("Web端埋点数据")lazy val mobileTerminal: OutputTag[MdMsg] = new OutputTag[MdMsg]("移动端埋点数据")lazy val csTerminal: OutputTag[MdMsg] = new OutputTag[MdMsg]("CS端埋点数据")def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.setParallelism(1)val socketData: DataStream[String] = env.socketTextStream("localhost", 9999)socketData.print("input data")val outputStream: DataStream[MdMsg] = socketData.map(line => {val str: Array[String] = line.split(",")MdMsg(str(0), str(1), str(2).toLong)}).process(new MdSplitProcessFunction)// Web端埋点数据流处理逻辑outputStream.getSideOutput(webTerminal).print("web")// Mobile端埋点数据流处理逻辑outputStream.getSideOutput(mobileTerminal).print("mobile")// CS端埋点数据流处理逻辑outputStream.getSideOutput(csTerminal).print("cs")env.execute()}case class MdMsg(mdType:String, url:String, Time:Long)class MdSplitProcessFunction extends ProcessFunction[MdMsg, MdMsg] {override def processElement(value: MdMsg, ctx: ProcessFunction[MdMsg, MdMsg]#Context, out: Collector[MdMsg]): Unit = {// webif (value.mdType == "web") {ctx.output(webTerminal, value)// mobile} else if (value.mdType == "mobile") {ctx.output(mobileTerminal, value)// cs} else if (value.mdType == "cs") {ctx.output(csTerminal, value)// others} else {out.collect(value)}}}}

4.2 测试

输入1:

web,http://www.web1.com,1587787201000
mobile,http://www.mobile1.com,1587787202000
cs,http://www.cs1.com,1587787203000

输出1:

input data> web,http://www.web1.com,1587787201000
web> MdMsg(web,http://www.web1.com,1587787201000)input data> mobile,http://www.mobile1.com,1587787202000
mobile> MdMsg(mobile,http://www.mobile1.com,1587787202000)input data> cs,http://www.cs1.com,1587787203000
cs> MdMsg(cs,http://www.cs1.com,1587787203000)

可见:Flink将接收到的数据,分别分到了web埋点数据流、移动端埋点数据流和CS端埋点数据流中,并分别打印。

输入2:

other,http://www.other.com,1587787201000

输出2:

input data> other,http://www.other.com,1587787201000

可见:Flink将接收到的数据发到了常规数据流中,并打印。

Flink——Side Output侧输出流相关推荐

  1. 大数据(9e)Flink侧输出流

    文章目录 概述 环境 OutputTag介绍 实现分流 处理迟到数据 处理关窗之后到达的数据 概述 窗口允许迟到的数据,但仍有数据在关窗后到达 Flink提供了侧输出流(sideOutput)来处理关 ...

  2. flink 处理迟到数据(Trigger、设置水位线延迟时间、允许窗口处理迟到数据、将迟到数据放入侧输出流、代码示例、迟到数据触发窗口计算重复结果处理)

    文章目录 前言 1.Trigger 2.处理迟到数据 2.1 设置水位线延迟时间 2.2 允许窗口处理迟到数据 2.3 将迟到数据放入侧输出流 3.实操 3.1 代码示例 3.2 中间遇到的异常 3. ...

  3. Flink之乱序处理,时间语义,WaterMark,允许迟到数据,侧输出流

    一.理解Flink的乱序问题 理解Flink的乱序问题,的先理解Flink的时间语义. Flink有3中时间语义:Event Time:事件创建的时间Ingestion Time:数据进入Flink的 ...

  4. Flink 侧输出流使用

    什么是Flink 的侧输出 flink处理数据流时,经常会遇到这样的情况:处理一个数据源时,往往需要将该源中的不同类型的数据做分割(分流)处理,假如使用 filter算子对数据源进行筛选分割的话,势必 ...

  5. 侧输出流简单应用-打印的完整流程

    1.添加运行环境和设置时间语义 如果是迟到数据处理就只能在事件时间语义下使用,如果是一般数据使用侧输出流就看业务需求是按什么条件进行分流eg:如果按照数据中的温度进行划分高温流和低温流,可以直接使用处 ...

  6. 60-140-044-使用-DataSink-使用OutputTag进行Side Output(侧输出)

    1.世界 2.概述 ​ 除了从DataStream操作的结果中获取主数据流之外,你还可以产生任意数量额外的侧输出结果流.侧输出结果流的数据类型不需要与主数据流的类型一致,不同侧输出流的类型也可以不同. ...

  7. 【基础】Flink -- Time and Window

    Flink -- Time and Window Flink 时间语义 水位线 Watermark 水位线的概念 有序流中的水位线 乱序流中的水位线 水位线的特性 水位线的基本使用 水位线生成策略 内 ...

  8. 进阶大数据架构师学习路线

    ![在这里插入图片描述](https://img-blog.csdnimg.cn/25b820fe1d054f53bab70310694faffe.jpeg#pic_center 文末有惊喜 大数据架 ...

  9. 致迷茫的程序员一封信——我的程序生涯

    0.开头 大家好,我是罗鹏程,一个很老套的开头,哈哈哈. 这封信姗姗来迟,与其说是一封信,不如说是来听听我的故事.从2020开始,收到过很多网友的问题,职业的选择,是做大数据还是做java:选择了大数 ...

最新文章

  1. 不能ssh连接ubuntu linux 服务器 secureCRT不能ssh连接服务器 不能远程ssh连接虚拟机的ubuntu linux...
  2. python【力扣LeetCode算法题库】13- 罗马数字转整数
  3. php 单例类 mysql pdo_PHP实战:PHP基于单例模式编写PDO类的方法
  4. Python:两个队列实现栈,两个栈实现队列
  5. 计算机工具软件应用考试,《计算机常用工具软件》期中考试题
  6. 杀毒软件全免费遭厂家“抵制”
  7. 计算机管理用户和组无法访问,同一工作组无法访问如何解决【详解】
  8. 前端学习(1161):箭头函数02
  9. 程序员面试金典——17.6最小调整有序
  10. 【深入理解计算机系统-第二版】3.55习题
  11. SQL Server事务、视图和索引
  12. 程序员求职之道(《程序员面试笔试宝典》)之程序设计基础(static的使用)?...
  13. 常见的html内lian联元素,CSS基础:块元素、内联元素、内联块元素
  14. 基于springboot人事管理系统设计与实现
  15. Word表格转到Excel中
  16. SaaS已死。下一个。
  17. GAC注册/卸载 dll
  18. 阿里云天池大数据:【入门】精灵宝可梦数据集分析
  19. 经典前缀和+差分问题之小明的彩灯(c++)
  20. 什么是计算机嵌套分类汇总,excel2010嵌套分类汇总的教程

热门文章

  1. 获取系统文件文件夹图标之计算机图标(我的电脑图标)、驱动器图标文件图标等
  2. 2020秋招小记 0
  3. 树莓派-Linux常用终端命令nano和vi编辑器的使用(3)
  4. javascript SetTimeout 延时循环
  5. Java单点登录技术选型与对比 kisso, sa-token
  6. JDK8之Optional类
  7. 【DB2】LISTAGG函数中元素的去重(DISTINCT)
  8. 姚明的“内蒙古时间”:现身2019年WCBA全明星周末赛现场
  9. 经典编译器组成(前端+优化器+后端)以及LLVM和Clang简介
  10. mysql 数据库 死锁_Mysql数据库出现死锁的情况(一)