1.添加运行环境和设置时间语义

如果是迟到数据处理就只能在事件时间语义下使用,如果是一般数据使用侧输出流就看业务需求是按什么条件进行分流eg:如果按照数据中的温度进行划分高温流和低温流,可以直接使用处理时间语义(默认值,不需要设置)。如果业务需要数据中的下单时间进行相关统计,就需要开启事件时间

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置全局并行度
env.setParallelism(1);
//开启事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

2.事件时间语义下需要设置watermark

为流设置watermark,如果是乱序数据就需要使用BoundOutOfOrdernessTimestampExtractor,如果时间是严格递增,则使用AscendingTimestampExtractor
BoundOutOfOrdernessTimestampExtractor需要设置延迟时间,一般取逆序时间的最大值eg:01:00 02:01 01:50 01:20…当前最大逆序为2:01和1:20即41s

//为流设置watermark,如果是乱序数据就需要使用BoundOutOfOrdernessTimestampExtractor,如果时间是严格递增的就可以使用
//AscendingTimestampExtractor
DataStreamSource<String> inputStream = env.readTextFile("hello.csv");
DataStream<UserBehavior> mapStream = inputStream.map(line -> {String[] split = line.split(",");return new Person(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]),split[3], Long.valueOf(split[4]));}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Person>(Time.seconds(2L)) {//设置延迟时间为2s@Overridepublic long extractTimestamp(Person element) {return element.getTimestamp()*1000;//每个数据的实际事件时间,这里乘1000是因为原始数据使//用的时间戳是s,需要转换为毫秒}})

3.设置Output标签

OutputTag<Person> outputtag=new Output<>("hello"){};

4.开启侧输出流

  • 迟到数据可以使用特殊的侧输出流形式
//1).开启窗口
DataStream<Person> mainStream=mapStream.keyBy("id).timeWindow(Time.seconds(15))
//2).设置允许的延迟数据时间
.allowedLateness(Time.minutes(1))
//添加侧输出流标签
.sideOutputLateData(outputtag);

大招:所有想要使用侧输出流的情况都可以使用底层函数process

DataStream<String> mainStream=mapStream.keyBy("id).timeWindow(Time.seconds(15)).process(new MyProcessFunction());//通过process获取到的流数据的上下文设置侧输出流
public static class MyProcessFunction extends KeyedProcessFunction<String,Person,String>{@Overridepublic void processElement(Person value, Context ctx, Collector<String> out) throws Exception {if(value.getTimestamp<10000000){//假设我们将时间戳小于10000000作为分界,分到两个流中ctx.output(outputtag,value);}else{out.collect(value);}}
}

5.使用侧输出流

//简单应用-打印:
//获取侧输出流
DataStream<Person> sideOutput = mainStream.getSideOutput(outputtag);
//打印
sideOutput.print("sideoutputstream");
//主流打印
mainStream。print("mainstream");
//执行任务
env.execute();

侧输出流简单应用-打印的完整流程相关推荐

  1. java对接飞鹅云实现自定义订单自动打印(完整流程)

    记录一下,以为对接好最少要半天,没想到总的下来十几分钟就对接好了 准备: 1:首先你要有一台打印机器,这里就不多说了 2:注册飞鹅云账号,最好是企业认证一下,拿到下方的两个值后面有用 3:绑定打印机, ...

  2. 一个产品从0到1的完整流程

    文章目录 什么是需求 需求定义 满足需求的三种方式 需求的核心 需求收集 需求来源 内部 外部 需求收集步骤 需求分析与管理 需求分析 什么是需求分析 如何挖掘用户真实需求 需求管理 需求优先级 需求 ...

  3. 【几乎最全/全网最长的 2 万 字】前端工程化完整流程:从头搭到尾(vue3 + vite + qiankun + docker + tailwindcss + iview......)

    文章目录 一.完整构建流程 1.在指定目录下执行 pnpm init,初始化 package.json 2.执行 pnpm install vite -D,安装 vite. 3.package.jso ...

  4. windows驱动数字签名之WHQL完整流程 | WHQL认证环境部署以及HLK测试

    文章目录 第一部分:windows驱动数字签名之WHQL完整流程 一.驱动数字签名背景 二.驱动程序数字签名解决方案-WHQL认证 1. 对开发者的影响 2. WHQL认证主要作用 三.申请WHQL认 ...

  5. Flink——Side Output侧输出流

    主要内容: 结合应用场景,介绍Flink侧输出流的使用流程和原理 在处理数据的时候,有时候想对不同情况的数据进行不同的处理,那么就需要把数据流进行分流.可以在主数据流上产生出任意数量额外的侧输出流. ...

  6. 运动想象脑机接口中迁移学习的完整流程

    点击上面"脑机接口社区"关注我们 更多技术干货第一时间送达 脑机接口(Brain-Computer Interface, BCI)可以让用户使用脑电信号直接与计算机或其他外部设备进 ...

  7. 伍冬睿教授:脑机接口中迁移学习的完整流程

    大家好,今天Rose分享一篇关于脑机接口中的迁移学习的完整流程.本文由华中科技大学伍冬睿教授授权分享. 关于脑机接口中迁移学习方面的研究,伍教授团队做过大量的工作.之前社区分享过<脑机接口中的流 ...

  8. 【Windows 逆向】使用 CE 工具挖掘关键数据内存真实地址 ( 查找子弹数据的动态地址 | 查找子弹数据的静态地址 | 静态地址分析 | 完整流程 ) ★

    文章目录 前言 一.查找子弹数据临时内存地址 二.查找子弹数据的静态地址 1.调试内存地址 05A59544 获取基址 05A59478 2.通过搜索基址 05A59478 获取内存地址 0E1DC1 ...

  9. 【NLP】自然语言处理 完整流程

    自然语言处理 完整流程 第一步:获取语料 1.已有语料 2.网上下载.抓取语料 第二步:语料预处理 1.语料清洗 2.分词 3.词性标注 4.去停用词 三.特征工程 1.词袋模型(BoW) 2.词向量 ...

最新文章

  1. 他研究了5000家AI公司,说人工智能应用该这么做!
  2. jsp mysql增加_jsp+mysql实现增加,查看功能
  3. 百度和360的关键词提交查询
  4. 在 Red HatAS4下添加网卡驱动!!
  5. Ajax入门总结--jquery实现Ajax
  6. EF Core 3 的 40 个中断性变更
  7. 如何导出无水印_抖音视频怎么去水印 抖音怎么导出无水印视频
  8. uni保存canvas图片_小程序canvas【开箱即用】
  9. 基于visual Studio2013解决C语言竞赛题之1054抽牌游戏
  10. length()函数_掌握Kotlin中的标准库函数: run、with、let、also和apply(转)
  11. 基于51单片机的CC2541蓝牙透传模块的无线通信
  12. Air722UG_模块硬件设计手册_V1.1
  13. cox回归模型python实现_生存分析Cox回归模型(比例风险模型)的spss操作实例
  14. YGG 与 Thirdverse 达成合作,将《足球小将》IP 带入 Web3
  15. 7-6,输入厘米,输出英尺英寸
  16. 人立方 关系搜索 微软发布的人-关系搜索引擎
  17. Solar-Putty如何修改显示字体大小
  18. 初识大数据(一)什么是大数据
  19. 卷毛0基础学习Golang-基础-slice切片
  20. linux 蓝牙设备,Ubuntu8.04下蓝牙设备连接管理

热门文章

  1. cocos2d-x游戏实例(11)-触屏主角移动轨迹
  2. windbg调试堆破坏
  3. live555 源码分析:播放启动
  4. 聊一聊ThreadLocal
  5. LiveVideoStack 2020 年度盘点
  6. 保25ms争10ms——Zenlayer如何保持出海业务的超低延时
  7. 在线催稿:当一位高级视频算法工程师接受采访
  8. 音视频技术开发周刊 78期
  9. 容联CTO许志强:AI、5G让通讯更智能、更高效
  10. 突破领域边界,探索文创文保新趋势