left join 后数据变多_Flink 双流 Join 的3种操作示例
- join()
- coGroup()
- intervalJoin()
本文举例说明它们的使用方法,顺便聊聊比较特殊的 interval join 的原理。
准备数据
从 Kafka 分别接入点击流和订单流,并转化为 POJO。
DataStream<String> clickSourceStream = env
.addSource(new FlinkKafkaConsumer011<>(
"ods_analytics_access_log",
new SimpleStringSchema(),
kafkaProps
).setStartFromLatest());
DataStream<String> orderSourceStream = env
.addSource(new FlinkKafkaConsumer011<>(
"ods_ms_order_done",
new SimpleStringSchema(),
kafkaProps
).setStartFromLatest());
DataStream clickRecordStream = clickSourceStream
.map(message -> JSON.parseObject(message, AnalyticsAccessLogRecord.class));
DataStream orderRecordStream = orderSourceStream
.map(message -> JSON.parseObject(message, OrderDoneLogRecord.class));
join()
join() 算子提供的语义为"Window join",即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间两种时间特征。以下示例以10秒滚动窗口,将两个流通过商品 ID 关联,取得订单流中的售价相关字段。
clickRecordStream
.join(orderRecordStream)
.where(record -> record.getMerchandiseId())
.equalTo(record -> record.getMerchandiseId())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(new JoinFunction() {
@Override
public String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception {
return StringUtils.join(Arrays.asList(
accessRecord.getMerchandiseId(),
orderRecord.getPrice(),
orderRecord.getCouponMoney(),
orderRecord.getRebateAmount()
), '\t');
}
})
.print().setParallelism(1);
简单易用。
coGroup()
只有 inner join 肯定还不够,如何实现 left/right outer join 呢?答案就是利用 coGroup() 算子。它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。以下的例子就实现了点击流 left join 订单流的功能,是很朴素的 nested loop join 思想(二重循环)。
clickRecordStream
.coGroup(orderRecordStream)
.where(record -> record.getMerchandiseId())
.equalTo(record -> record.getMerchandiseId())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(new CoGroupFunctionString, Long>>() {
@Override
public void coGroup(Iterable accessRecords, Iterable orderRecords, CollectorString, Long>> collector) throws Exception {
for (AnalyticsAccessLogRecord accessRecord : accessRecords) {
boolean isMatched = false;
for (OrderDoneLogRecord orderRecord : orderRecords) {
// 右流中有对应的记录
collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice()));
isMatched = true;
}
if (!isMatched) {
// 右流中没有对应的记录
collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null));
}
}
}
})
.print().setParallelism(1);
intervalJoin()
join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]
interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。示例代码如下。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。
clickRecordStream
.keyBy(record -> record.getMerchandiseId())
.intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
.between(Time.seconds(-30), Time.seconds(30))
.process(new ProcessJoinFunctionString>() {
@Override
public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
collector.collect(StringUtils.join(Arrays.asList(
accessRecord.getMerchandiseId(),
orderRecord.getPrice(),
orderRecord.getCouponMoney(),
orderRecord.getRebateAmount()
), '\t'));
}
})
.print().setParallelism(1);
由上可见,interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。
interval join 的实现原理
以下是 KeyedStream.process(ProcessJoinFunction) 方法调用的重载方法的逻辑。
public <OUT> SingleOutputStreamOperator<OUT> process(
ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
TypeInformation<OUT> outputType) {
Preconditions.checkNotNull(processJoinFunction);
Preconditions.checkNotNull(outputType);
final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
new IntervalJoinOperator<>(
lowerBound,
upperBound,
lowerBoundInclusive,
upperBoundInclusive,
left.getType().createSerializer(left.getExecutionConfig()),
right.getType().createSerializer(right.getExecutionConfig()),
cleanedUdf
);
return left
.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);
}
可见是先对两条流执行 connect() 和 keyBy() 操作,然后利用 IntervalJoinOperator 算子进行转换。在 IntervalJoinOperator 中,会利用两个 MapState 分别缓存左流和右流的数据。
private transient MapState>> leftBuffer;
private transient MapState>> rightBuffer;
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
LEFT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
));
this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
RIGHT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
));
}
其中 Long 表示事件时间戳,List> 表示该时刻到来的数据记录。当左流和右流有数据到达时,会分别调用 processElement1() 和 processElement2() 方法,它们都调用了 processElement() 方法,代码如下。
@Override
public void processElement1(StreamRecord record) throws Exception {
processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}
@Override
public void processElement2(StreamRecord record) throws Exception {
processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}
@SuppressWarnings("unchecked")
private void processElement(
final StreamRecord record,
final MapState>> ourBuffer,
final MapState>> otherBuffer,
final long relativeLowerBound,
final long relativeUpperBound,
final boolean isLeft) throws Exception {
final THIS ourValue = record.getValue();
final long ourTimestamp = record.getTimestamp();
if (ourTimestamp == Long.MIN_VALUE) {
throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
"interval stream joins need to have timestamps meaningful timestamps.");
}
if (isLate(ourTimestamp)) {
return;
}
addToBuffer(ourBuffer, ourValue, ourTimestamp);
for (Map.Entry>> bucket: otherBuffer.entries()) {
final long timestamp = bucket.getKey();
if (timestamp < ourTimestamp + relativeLowerBound ||
timestamp > ourTimestamp + relativeUpperBound) {
continue;
}
for (BufferEntry entry: bucket.getValue()) {
if (isLeft) {
collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
} else {
collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
}
}
}
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
if (isLeft) {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}
}
这段代码的思路是:
- 取得当前流 StreamRecord 的时间戳,调用 isLate() 方法判断它是否是迟到数据(即时间戳小于当前水印值),如是则丢弃。
- 调用 addToBuffer() 方法,将时间戳和数据一起插入当前流对应的 MapState。
- 遍历另外一个流的 MapState,如果数据满足前述的时间区间条件,则调用 collect() 方法将该条数据投递给用户定义的 ProcessJoinFunction 进行处理。collect() 方法的代码如下,注意结果对应的时间戳是左右流时间戳里较大的那个。
private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
collector.setAbsoluteTimestamp(resultTimestamp);
context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
userFunction.processElement(left, right, context, collector);
}
- 调用 TimerService.registerEventTimeTimer() 注册时间戳为 timestamp + relativeUpperBound 的定时器,该定时器负责在水印超过区间的上界时执行状态的清理逻辑,防止数据堆积。注意左右流的定时器所属的 namespace 是不同的,具体逻辑则位于 onEventTime() 方法中。
@Override
public void onEventTime(InternalTimer timer) throws Exception {
long timerTimestamp = timer.getTimestamp();
String namespace = timer.getNamespace();
logger.trace("onEventTime @ {}", timerTimestamp);
switch (namespace) {
case CLEANUP_NAMESPACE_LEFT: {
long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
logger.trace("Removing from left buffer @ {}", timestamp);
leftBuffer.remove(timestamp);
break;
}
case CLEANUP_NAMESPACE_RIGHT: {
long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
logger.trace("Removing from right buffer @ {}", timestamp);
rightBuffer.remove(timestamp);
break;
}
default:
throw new RuntimeException("Invalid namespace " + namespace);
}
}
本文转载自简书,作者:LittleMagic
原文链接:https://www.jianshu.com/p/45ec888332df
Flink Forward Asia 2020 大会议程发布Flink Forward Asia 2020 在线峰会重磅开启!12月13-15日,全球 38+ 一线厂商,70+ 优质议题,与您探讨新型数字化技术下的未来趋势!大会议程已正式上线,点击文末「阅读原文」即可免费预约~(点击可了解更多大会详情)戳我预约!
left join 后数据变多_Flink 双流 Join 的3种操作示例相关推荐
- Flink 双流 Join 的3种操作示例
在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作.同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息.Flink DataStream API 为用 ...
- html更改灰色按钮可用,点击提交按钮后按钮变灰色不可用状态的三种方法
第一种方法:直接按钮中加入 当点击提交后,提交按钮变灰色不可用,这样可有效防止重复提交,本代码就是实现这样一个功能.从代码就可以看出,我们只需在提交按钮上加入这一句: οnclick="ja ...
- jquery按钮置灰_点击提交按钮后按钮变灰色不可用状态的三种方法
第一种方法:直接按钮中加入 当点击提交后,提交按钮变灰色不可用,这样可有效防止重复提交,本代码就是实现这样一个功能.从代码就可以看出,我们只需在提交按钮上加入这一句: οnclick="ja ...
- html表单按钮灰色,点击提交按钮后按钮变灰色不可用状态的三种方法
第一种方法:直接按钮中加入 当点击提交后,提交按钮变灰色不可用,这样可有效防止重复提交,本代码就是实现这样一个功能.从代码就可以看出,我们只需在提交按钮上加入这一句: οnclick="ja ...
- Angular使用Console.log()打印出来的数据没问题,点击详情后数据变了
我在一个界面添加数据使用updataEvent将对象返回给另一个界面后,在onUpData中处理时使用 this.xxxxx= d,直接将地址值给了变量,当这个方法结束后d被重置了,所以this.xx ...
- memcpy后数据不对_详解Redis 的 5 种基本数据结构:
来源:我没有三颗心脏 一.Redis 简介 "Redis is an open source (BSD licensed), in-memory data structure store, ...
- 面试官: Flink双流JOIN了解吗? 简单说说其实现原理
摘要:今天和大家聊聊Flink双流Join问题.这是一个高频面试点,也是工作中常遇到的一种真实场景. 本文分享自华为云社区<万字直通面试:Flink双流JOIN>,作者:大数据兵工厂 . ...
- Flink双流JOIN
1.引子 1.1 数据库SQL中的JOIN 我们先来看看数据库SQL中的JOIN操作.如下所示的订单查询SQL,通过将订单表的id和订单详情表order_id关联,获取所有订单下的商品信息. sele ...
- 5.1.4 SELECT+RIGHT JOIN读取数据
5.1.4 SELECT+RIGHT JOIN读取数据 语法结构: RIGHT JOIN:包含左右表所有的记录 RIGHT JOIN图示如下: 2021年9月10日 写于芜湖
最新文章
- idea怎么使用jacoco生成报告_Intellij IDEA解析jacoco结果文件的方法
- java maven项目构建ssh工程 父工程与子模块的拆分与聚合
- python 二叉树遍历
- Bootstrap3插件系列:bootstrap-select2
- 使用mysql_fetch_array()获取当前行数据
- sdut 图的深度遍历
- python使用sqlalchemy执行sql查询语句
- 关于jq easyui 刷新tabs的问题
- 优先级队列——实现二维数组排序
- [代码片断]SQL中解析XML数据
- 关于使用Kaptcha验证码框架遇到的问题
- json学习笔记,json与js对象格式的转换,js对象转json字符串,json格式转js对象
- 批量html源代码 翻译,一键实现网页中英文对照的黑科技翻译工具
- !!obj与JavaScript中!!的作用
- linux nfs不在同一个网络,NFS共享机制
- Android 接入穿山甲SDK之Banner广告
- mysql auto_increment 原理_mysql原理之Auto_increment
- 大二文本分词过滤分类实验总结
- 正则表达式在线测试工具
- Windows .bat 脚本简单用法介绍
热门文章
- 绘制商务感十足的折线图和面积图
- By.css 的级联读取
- SAP Spartacus travis ci-scripts 下面 e2e-cypress.sh 的实现分析
- SAP S/4HANA: 一条代码线,许多种选择
- 如何使用 SAP CDS view 中的 currency conversion 功能
- 如何配置 SAP BTP Integration Suite 测试帐号的环境
- SAP Spartacus B2B Org Unit List节点展开的递归逻辑实现
- SAP Spartacus B2B Org Unit树状结构的加载机制
- Java线程同步的一些例子
- SAP CRM呼叫中心里的事件注册机制