flink整合java,Flink使用SideOutPut替换Split实现分流
基于apache flink的流处理实时模型
44元
包邮
(需用券)
去购买 >
以前的数据分析项目(版本1.4.2),对从Kafka读取的原始数据流,调用split接口实现分流.
新项目决定使用Flink 1.7.2,使用split接口进行分流的时候,发现接口被标记为depracted(后续可能会被移除).
搜索相关文档,发现新版本Flink中推荐使用带外数据进行分流.
预先建立OutputTag实例(LogEntity是从kafka读取的日志实例类).
private static final OutputTag APP_LOG_TAG = new OutputTag<>("appLog", TypeInformation.of(LogEntity.class));
private static final OutputTag ANALYZE_METRIC_TAG = new OutputTag<>("analyzeMetricLog", TypeInformation.of(LogEntity.class));
对kafka读取的原始数据,通过process接口,打上相应标记.
private static SingleOutputStreamOperator sideOutStream(DataStream rawLogStream) {
return rawLogStream
.process(new ProcessFunction() {
@Override
public void processElement(LogEntity entity, Context ctx, Collector out) throws Exception {
// 根据日志等级,给对象打上不同的标记
if (entity.getLevel().equals(ANALYZE_LOG_LEVEL)) {
ctx.output(ANALYZE_METRIC_TAG, entity);
} else {
ctx.output(APP_LOG_TAG, entity);
}
}
})
.name("RawLogEntitySplitStream");
}
// 调用函数,对原始数据流中的对象进行标记
SingleOutputStreamOperator sideOutLogStream = sideOutStream(rawLogStream);
// 根据标记,获取不同的数据流,以便后续进行进一步分析
DataStream appLogStream = sideOutLogStream.getSideOutput(APP_LOG_TAG);
DataStream rawAnalyzeMetricLogStream = sideOutLogStream.getSideOutput(ANALYZE_METRIC_TAG);
通过以上步骤,就实现了数据流的切分.
PS:
如果您觉得我的文章对您有帮助,请关注我的微信公众号,谢谢!
原文链接:https://www.cnblogs.com/jason1990/p/11610130.html
java 11官方入门(第8版)教材
79.84元
包邮
(需用券)
去购买 >
flink整合java,Flink使用SideOutPut替换Split实现分流相关推荐
- flink的java api_Flink 流处理API之二
1.Transform 1.1 map val streamMap = stream.map { x => x * 2 } 1.2 flatmap flatMap的函数签名:def flatMa ...
- Flink整合kafka的两阶段提交结论
文章目录 1.Flink+kafka是如何实现exactly-once语义的 2.WC案例的如何做chekcpoint 3.源码分析 4.kafkaConsumer在与Flink整合的思考 4.1 k ...
- Cannot resolve method ‘getTableEnvironment(org.apache.flink.api.java.ExecutionEnvironment)‘
代码如下: public class UDTF {public static void main(String[] args) throws Exception{ExecutionEnvironmen ...
- Flink DataStream Split 实现分流
传送门:Flink 系统性学习笔记 在 Flink 1.12.0 版本中进行了删除 所谓分流,就是将一条数据流拆分成完全独立的两条.甚至多条流.也就是基于一个 DataStream 拆分成多个完全平等 ...
- Flink project java篇
Flink project java篇 pom.xml以及数据 attention scala mistake summary [summary most important!!!] stage01 ...
- beam+flink整合异常
信息: Remoting shut down. 十月 23, 2017 9:10:40 下午 org.apache.beam.runners.flink.FlinkRunner run 严重: Pip ...
- 《从0到1学习Flink》—— Apache Flink 介绍
前言 Flink 是一种流式计算框架,为什么我会接触到 Flink 呢?因为我目前在负责的是监控平台的告警部分,负责采集到的监控数据会直接往 kafka 里塞,然后告警这边需要从 kafka topi ...
- 【Flink】基于 Flink CEP 实时计算商品订单流失量
1.概述 转载:https://blog.csdn.net/tzs_1041218129/article/details/108786597 假设有个需求需要实时计算商品的订单流失量,规则如下: 用户 ...
- Flink(初识Flink,快速上手)
目录 初识Flink Flink设计理念 Flink的应用 Flink在企业中的应用 Flink的主要应用场景 流式数据处理的发展和演变 流处理和批处理 传统事务处理 有状态的流处理 Lambda 架 ...
最新文章
- php 删除子字符串函数,PHP删除字符串中的任何字符函数
- windows安装Matplotlib
- 使用malloc之前需要做什么准备工作。
- html输出text,为什么text();和html();输出结果不一样呢?
- SQL Server 2005 智能感知插件 - SQL Prompt 3.8.0.224
- 王成录华为鸿蒙系统,华为手机销量仍在增长!华为王成录:手机会是鸿蒙OS系统的中心...
- 360回归A股,周鸿祎来给BAT和小米添堵了
- 浅谈算法——莫比乌斯反演
- CSDN 缩进、目录、表格输入竖线或回车、字体及颜色设置
- CTF攻防世界刷题51-
- Metasploit之——社会工程学工具包
- Java 枚举类转换List
- Python——第一天的Suger Rush
- 采样频率在频谱分析中的理解
- 自然语言处理seq2seq模型实现人工智能对对联(基于TensorFlow框架)
- python简单爬虫程序分析_Python简单爬虫
- C++读入用逗号隔开的数据
- vue3中tsx的基本语法使用
- Python 实战之淘宝手机销售分析(数据清洗、可视化、数据建模、文本分析)
- NYoj 239 :月老的难题(二分图最大匹配)