侧输出流简单应用-打印的完整流程
1.添加运行环境和设置时间语义
如果是迟到数据处理就只能在事件时间语义下使用,如果是一般数据使用侧输出流就看业务需求是按什么条件进行分流eg:如果按照数据中的温度进行划分高温流和低温流,可以直接使用处理时间语义(默认值,不需要设置)。如果业务需要数据中的下单时间进行相关统计,就需要开启事件时间
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置全局并行度
env.setParallelism(1);
//开启事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
2.事件时间语义下需要设置watermark
为流设置watermark,如果是乱序数据就需要使用BoundOutOfOrdernessTimestampExtractor,如果时间是严格递增,则使用AscendingTimestampExtractor
BoundOutOfOrdernessTimestampExtractor需要设置延迟时间,一般取逆序时间的最大值eg:01:00 02:01 01:50 01:20…当前最大逆序为2:01和1:20即41s
//为流设置watermark,如果是乱序数据就需要使用BoundOutOfOrdernessTimestampExtractor,如果时间是严格递增的就可以使用
//AscendingTimestampExtractor
DataStreamSource<String> inputStream = env.readTextFile("hello.csv");
DataStream<UserBehavior> mapStream = inputStream.map(line -> {String[] split = line.split(",");return new Person(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]),split[3], Long.valueOf(split[4]));}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Person>(Time.seconds(2L)) {//设置延迟时间为2s@Overridepublic long extractTimestamp(Person element) {return element.getTimestamp()*1000;//每个数据的实际事件时间,这里乘1000是因为原始数据使//用的时间戳是s,需要转换为毫秒}})
3.设置Output标签
OutputTag<Person> outputtag=new Output<>("hello"){};
4.开启侧输出流
- 迟到数据可以使用特殊的侧输出流形式
//1).开启窗口
DataStream<Person> mainStream=mapStream.keyBy("id).timeWindow(Time.seconds(15))
//2).设置允许的延迟数据时间
.allowedLateness(Time.minutes(1))
//添加侧输出流标签
.sideOutputLateData(outputtag);
大招:所有想要使用侧输出流的情况都可以使用底层函数process
DataStream<String> mainStream=mapStream.keyBy("id).timeWindow(Time.seconds(15)).process(new MyProcessFunction());//通过process获取到的流数据的上下文设置侧输出流
public static class MyProcessFunction extends KeyedProcessFunction<String,Person,String>{@Overridepublic void processElement(Person value, Context ctx, Collector<String> out) throws Exception {if(value.getTimestamp<10000000){//假设我们将时间戳小于10000000作为分界,分到两个流中ctx.output(outputtag,value);}else{out.collect(value);}}
}
5.使用侧输出流
//简单应用-打印:
//获取侧输出流
DataStream<Person> sideOutput = mainStream.getSideOutput(outputtag);
//打印
sideOutput.print("sideoutputstream");
//主流打印
mainStream。print("mainstream");
//执行任务
env.execute();
侧输出流简单应用-打印的完整流程相关推荐
- java对接飞鹅云实现自定义订单自动打印(完整流程)
记录一下,以为对接好最少要半天,没想到总的下来十几分钟就对接好了 准备: 1:首先你要有一台打印机器,这里就不多说了 2:注册飞鹅云账号,最好是企业认证一下,拿到下方的两个值后面有用 3:绑定打印机, ...
- 一个产品从0到1的完整流程
文章目录 什么是需求 需求定义 满足需求的三种方式 需求的核心 需求收集 需求来源 内部 外部 需求收集步骤 需求分析与管理 需求分析 什么是需求分析 如何挖掘用户真实需求 需求管理 需求优先级 需求 ...
- 【几乎最全/全网最长的 2 万 字】前端工程化完整流程:从头搭到尾(vue3 + vite + qiankun + docker + tailwindcss + iview......)
文章目录 一.完整构建流程 1.在指定目录下执行 pnpm init,初始化 package.json 2.执行 pnpm install vite -D,安装 vite. 3.package.jso ...
- windows驱动数字签名之WHQL完整流程 | WHQL认证环境部署以及HLK测试
文章目录 第一部分:windows驱动数字签名之WHQL完整流程 一.驱动数字签名背景 二.驱动程序数字签名解决方案-WHQL认证 1. 对开发者的影响 2. WHQL认证主要作用 三.申请WHQL认 ...
- Flink——Side Output侧输出流
主要内容: 结合应用场景,介绍Flink侧输出流的使用流程和原理 在处理数据的时候,有时候想对不同情况的数据进行不同的处理,那么就需要把数据流进行分流.可以在主数据流上产生出任意数量额外的侧输出流. ...
- 运动想象脑机接口中迁移学习的完整流程
点击上面"脑机接口社区"关注我们 更多技术干货第一时间送达 脑机接口(Brain-Computer Interface, BCI)可以让用户使用脑电信号直接与计算机或其他外部设备进 ...
- 伍冬睿教授:脑机接口中迁移学习的完整流程
大家好,今天Rose分享一篇关于脑机接口中的迁移学习的完整流程.本文由华中科技大学伍冬睿教授授权分享. 关于脑机接口中迁移学习方面的研究,伍教授团队做过大量的工作.之前社区分享过<脑机接口中的流 ...
- 【Windows 逆向】使用 CE 工具挖掘关键数据内存真实地址 ( 查找子弹数据的动态地址 | 查找子弹数据的静态地址 | 静态地址分析 | 完整流程 ) ★
文章目录 前言 一.查找子弹数据临时内存地址 二.查找子弹数据的静态地址 1.调试内存地址 05A59544 获取基址 05A59478 2.通过搜索基址 05A59478 获取内存地址 0E1DC1 ...
- 【NLP】自然语言处理 完整流程
自然语言处理 完整流程 第一步:获取语料 1.已有语料 2.网上下载.抓取语料 第二步:语料预处理 1.语料清洗 2.分词 3.词性标注 4.去停用词 三.特征工程 1.词袋模型(BoW) 2.词向量 ...
最新文章
- 他研究了5000家AI公司,说人工智能应用该这么做!
- jsp mysql增加_jsp+mysql实现增加,查看功能
- 百度和360的关键词提交查询
- 在 Red HatAS4下添加网卡驱动!!
- Ajax入门总结--jquery实现Ajax
- EF Core 3 的 40 个中断性变更
- 如何导出无水印_抖音视频怎么去水印 抖音怎么导出无水印视频
- uni保存canvas图片_小程序canvas【开箱即用】
- 基于visual Studio2013解决C语言竞赛题之1054抽牌游戏
- length()函数_掌握Kotlin中的标准库函数: run、with、let、also和apply(转)
- 基于51单片机的CC2541蓝牙透传模块的无线通信
- Air722UG_模块硬件设计手册_V1.1
- cox回归模型python实现_生存分析Cox回归模型(比例风险模型)的spss操作实例
- YGG 与 Thirdverse 达成合作,将《足球小将》IP 带入 Web3
- 7-6,输入厘米,输出英尺英寸
- 人立方 关系搜索 微软发布的人-关系搜索引擎
- Solar-Putty如何修改显示字体大小
- 初识大数据(一)什么是大数据
- 卷毛0基础学习Golang-基础-slice切片
- linux 蓝牙设备,Ubuntu8.04下蓝牙设备连接管理