文章目录

  • 概述
  • 环境
  • OutputTag介绍
    • 实现分流
    • 处理迟到数据
  • 处理关窗之后到达的数据

概述

窗口允许迟到的数据,但仍有数据在关窗后到达
Flink提供了侧输出流(sideOutput)来处理关窗之后到达的数据

环境

WIN10+IDEA+JDK1.8+FLINK1.14

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.14.6</flink.version><scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
</dependencies>

OutputTag介绍

OutputTag是一种命名标记,用于标记算子中的侧输出

实现分流

ctx.output:向由OutputTag标识的侧输出发出记录

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义输出标签OutputTag<Integer> o1 = new OutputTag<Integer>("除以3余1") {};OutputTag<Integer> o2 = new OutputTag<Integer>("除以3余2") {};//创建流SingleOutputStreamOperator<Integer> d = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);//处理SingleOutputStreamOperator<Integer> s = d.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) {//分流if (value % 3 == 2) {ctx.output(o2, value); //ctx.output:向由OutputTag标识的侧输出发出记录} else if (value % 3 == 1) {ctx.output(o1, value); //ctx.output:向由OutputTag标识的侧输出发出记录} else {out.collect(value);}}});//输出s.print("被3整除");s.getSideOutput(o1).print(o1.getId());s.getSideOutput(o2).print(o2.getId());//环境执行env.execute();}
}
测试结果
被3整除> 0
除以3余1> 1
除以3余2> 2
被3整除> 3
除以3余1> 4
除以3余2> 5
被3整除> 6
除以3余1> 7
除以3余2> 8
被3整除> 9

处理迟到数据

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义测输出流的输出标签OutputTag<String> outputTag = new OutputTag<String>("迟到标签") {};//创建流,添加自定义数据源SingleOutputStreamOperator<String> d = env.addSource(new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> ctx) {//发送水位线ctx.emitWatermark(new Watermark(1999L));//发送2条数据,其中1条迟到ctx.collectWithTimestamp("1998", 1998L);ctx.collectWithTimestamp("2000", 2000L);}@Overridepublic void cancel() {}});//处理SingleOutputStreamOperator<String> s = d.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String value, Context ctx, Collector<String> out) {//获取水位线long watermark = ctx.timerService().currentWatermark();//判断是否迟到if (ctx.timestamp() > watermark) {//冇迟到out.collect(value);} else {//迟到:向outputTag发送数据ctx.output(outputTag, value);}}});//输出s.print("主流输出");s.getSideOutput(outputTag).print("侧输出");//环境执行env.execute();}
}
发送1999水位线,然后发送两条数据,测试结果如下
侧输出> 1998
主流输出> 2000

处理关窗之后到达的数据

开窗后.sideOutputLateData(outputTag)

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义测输出流的输出标签OutputTag<String> outputTag = new OutputTag<String>("迟到标签") {};//创建流,添加自定义数据源SingleOutputStreamOperator<String> d = env.addSource(new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> ctx) {ctx.collectWithTimestamp("a", 4000L);ctx.collectWithTimestamp("b", 5000L);ctx.emitWatermark(new Watermark(5999L)); //发送水位线,触发【3000~5999】的窗口关闭ctx.collectWithTimestamp("c", 5000L);ctx.collectWithTimestamp("d", 5000L);ctx.collectWithTimestamp("e", 6000L);ctx.collectWithTimestamp("f", 7000L);}@Overridepublic void cancel() {}});//处理SingleOutputStreamOperator<String> s = d//事件时间滚动窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(3L)))//侧输出.sideOutputLateData(outputTag)//拼接字符串.reduce((a, b) -> a + "," + b);//输出s.print("主流输出");s.getSideOutput(outputTag).print("侧输出");//环境执行env.execute();}
}
中途发送水位线,触发关窗,测试结果如下
主流输出> a,b
侧输出> c
侧输出> d
主流输出> e,f

大数据(9e)Flink侧输出流相关推荐

  1. flink 处理迟到数据(Trigger、设置水位线延迟时间、允许窗口处理迟到数据、将迟到数据放入侧输出流、代码示例、迟到数据触发窗口计算重复结果处理)

    文章目录 前言 1.Trigger 2.处理迟到数据 2.1 设置水位线延迟时间 2.2 允许窗口处理迟到数据 2.3 将迟到数据放入侧输出流 3.实操 3.1 代码示例 3.2 中间遇到的异常 3. ...

  2. 大数据之flink教程

    第一章 Flink简介 1.1  初识Flink Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究 ...

  3. Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 2

    七.Flink 时间语义与 Watermark 7.1 Flink 中的时间语义 7.2 设置 Event Time 7.3 水位线 - Watermark 7.3.1 基本概念 7.3.2 Wate ...

  4. 手把手教你搭建实时大数据引擎FLINK

    手把手教你搭建实时大数据引擎FLINK 服务器规划 Standalone高可用HA模式 架构图 下载并上传tar包 具体安装步骤 yarm 集群环境搭建 服务器规划 服务器规划 服务名称 职能 zhe ...

  5. 深度解读!新一代大数据引擎Flink厉害在哪?(附实现原理细节)

    导语 | 大数据计算分为离线计算和实时计算,其中离线计算就是我们通常说的批计算,代表技术是Hadoop MapReduce.Hive等:实时计算也被称作流计算,代表技术是Storm.Spark Str ...

  6. Flink 侧输出流使用

    什么是Flink 的侧输出 flink处理数据流时,经常会遇到这样的情况:处理一个数据源时,往往需要将该源中的不同类型的数据做分割(分流)处理,假如使用 filter算子对数据源进行筛选分割的话,势必 ...

  7. 大数据之Flink流式计算引擎

    Flink DataFlow 数据的分类 有界数据 离线的计算 效率:硬件相同的情况下:时间 无界数据 实时的计算 效率:硬件环境相同的情况下,吞吐量:数据处理的数量.延迟:结果输出的时间-数据接收的 ...

  8. 实时数仓 大数据 Hadoop flink kafka

    ⼀.实时数仓建设背景 实时需求⽇趋迫切 ⽬前各⼤公司的产品需求和内部决策对于数据实时性的要求越来越迫切,需要实时数仓的能⼒来赋能.传统离 线数仓的数据时效性是 T+1,调度频率以天为单位,⽆法⽀撑实时 ...

  9. 大数据入门--Flink(四)状态管理与容错机制

    状态管理与容错机制 术语 状态管理 容错机制 状态一致性 检查点(checkpoint) 保存点(savepoint) 状态后端(state backend) 案例 术语 算子状态.键控状态.状态一致 ...

最新文章

  1. python_web框架
  2. SpringBoot 使用(三): 配置文件详解
  3. base64的c语言实现方法
  4. 配置maven mvn命令使用jdk 1.7编译
  5. 大文本存mysql怎么建索引_如何正确合理的建立MYSQL数据库索引
  6. Windows平台下GO语言编译器(GO-windows)
  7. @90后程序员,“颜值即正义”的现在,程序员应该如何更新穿搭?
  8. 数据结构-动态查找树表与平衡二叉树 红黑树简单介绍
  9. 在linux centos中加入中文输入法
  10. stardict词典全集
  11. java删除文件夹部分内容_java 删除文件夹中的所有内容而不删除文件夹本身
  12. win10c语言关机,win10电脑自动关机命令
  13. 夏普(SHARP) LS050T1SX01 液晶屏接口定义
  14. java音乐_用JavaJFugue进行音乐编程
  15. 台式计算机没办法连接wifi吗,台式机连接wifi不能用怎么办
  16. 期末复习(Day5)
  17. corosync/openais+pacemaker+drbd+web实现web服务高可用集群
  18. 哈夫曼算法以及求哈夫曼编码
  19. 电脑系统数据丢失了是什么原因?找回方法有哪些?
  20. 计算机管理老是自动打开,win10系统打开设备管理器后一直自动刷新的设置方案...

热门文章

  1. 百度技术中台质量部_测试开发(三面+hr)
  2. java 资源网站有哪些_可以收藏的几个java自学资源网站,说说我的观点
  3. ModbusTCP协议学习
  4. 用计算机弹音乐时间都去哪了,重生之音乐全才
  5. BZOJ4451 : [Cerc2015]Frightful Formula
  6. Web播放器AudioBox基于HTML5完成重构 支持云存储
  7. 文本相似性计算--MinHash和LSH算法
  8. 路飞阶段性测试-机试
  9. 踩坑系列]URLEncode 中对 空格的编码有 “+”和“%20”两种
  10. 【Linux】grep正则表达式详解