在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作。同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双流 join,分别是:

  • join()
  • coGroup()
  • intervalJoin()

本文举例说明它们的使用方法,顺便聊聊比较特殊的 interval join 的原理。

准备数据

从 Kafka 分别接入点击流和订单流,并转化为 POJO。

DataStream<String> clickSourceStream = env  .addSource(new FlinkKafkaConsumer011<>(    "ods_analytics_access_log",    new SimpleStringSchema(),    kafkaProps  ).setStartFromLatest());DataStream<String> orderSourceStream = env  .addSource(new FlinkKafkaConsumer011<>(    "ods_ms_order_done",    new SimpleStringSchema(),    kafkaProps  ).setStartFromLatest());DataStream clickRecordStream = clickSourceStream  .map(message -> JSON.parseObject(message, AnalyticsAccessLogRecord.class));DataStream orderRecordStream = orderSourceStream  .map(message -> JSON.parseObject(message, OrderDoneLogRecord.class));

join()

join() 算子提供的语义为"Window join",即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间两种时间特征。以下示例以10秒滚动窗口,将两个流通过商品 ID 关联,取得订单流中的售价相关字段。

clickRecordStream  .join(orderRecordStream)  .where(record -> record.getMerchandiseId())  .equalTo(record -> record.getMerchandiseId())  .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))  .apply(new JoinFunction() {    @Override    public String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception {      return StringUtils.join(Arrays.asList(        accessRecord.getMerchandiseId(),        orderRecord.getPrice(),        orderRecord.getCouponMoney(),        orderRecord.getRebateAmount()      ), '\t');    }  })  .print().setParallelism(1);

简单易用。

coGroup()

只有 inner join 肯定还不够,如何实现 left/right outer join 呢?答案就是利用 coGroup() 算子。它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。以下的例子就实现了点击流 left join 订单流的功能,是很朴素的 nested loop join 思想(二重循环)。

clickRecordStream  .coGroup(orderRecordStream)  .where(record -> record.getMerchandiseId())  .equalTo(record -> record.getMerchandiseId())  .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))  .apply(new CoGroupFunctionString, Long>>() {    @Override    public void coGroup(Iterable accessRecords, Iterable orderRecords, CollectorString, Long>> collector) throws Exception {      for (AnalyticsAccessLogRecord accessRecord : accessRecords) {        boolean isMatched = false;        for (OrderDoneLogRecord orderRecord : orderRecords) {          // 右流中有对应的记录          collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice()));          isMatched = true;        }        if (!isMatched) {          // 右流中没有对应的记录          collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null));        }      }    }  })  .print().setParallelism(1);

intervalJoin()

join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:

right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]

interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。示例代码如下。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。

clickRecordStream  .keyBy(record -> record.getMerchandiseId())  .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))  .between(Time.seconds(-30), Time.seconds(30))  .process(new ProcessJoinFunctionString>() {    @Override    public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {      collector.collect(StringUtils.join(Arrays.asList(        accessRecord.getMerchandiseId(),        orderRecord.getPrice(),        orderRecord.getCouponMoney(),        orderRecord.getRebateAmount()      ), '\t'));    }  })  .print().setParallelism(1);

由上可见,interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。

interval join 的实现原理

以下是 KeyedStream.process(ProcessJoinFunction) 方法调用的重载方法的逻辑。

public <OUT> SingleOutputStreamOperator<OUT> process(        ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,        TypeInformation<OUT> outputType) {    Preconditions.checkNotNull(processJoinFunction);    Preconditions.checkNotNull(outputType);    final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);    final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =        new IntervalJoinOperator<>(            lowerBound,            upperBound,            lowerBoundInclusive,            upperBoundInclusive,            left.getType().createSerializer(left.getExecutionConfig()),            right.getType().createSerializer(right.getExecutionConfig()),            cleanedUdf        );    return left        .connect(right)        .keyBy(keySelector1, keySelector2)        .transform("Interval Join", outputType, operator);}

可见是先对两条流执行 connect() 和 keyBy() 操作,然后利用 IntervalJoinOperator 算子进行转换。在 IntervalJoinOperator 中,会利用两个 MapState 分别缓存左流和右流的数据。

private transient MapState>> leftBuffer;private transient MapState>> rightBuffer;@Overridepublic void initializeState(StateInitializationContext context) throws Exception {    super.initializeState(context);    this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(        LEFT_BUFFER,        LongSerializer.INSTANCE,        new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))    ));    this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(        RIGHT_BUFFER,        LongSerializer.INSTANCE,        new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))    ));}

其中 Long 表示事件时间戳,List> 表示该时刻到来的数据记录。当左流和右流有数据到达时,会分别调用 processElement1() 和 processElement2() 方法,它们都调用了 processElement() 方法,代码如下。

@Overridepublic void processElement1(StreamRecord record) throws Exception {    processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);}@Overridepublic void processElement2(StreamRecord record) throws Exception {    processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);}@SuppressWarnings("unchecked")private void processElement(        final StreamRecord record,        final MapState>> ourBuffer,        final MapState>> otherBuffer,        final long relativeLowerBound,        final long relativeUpperBound,        final boolean isLeft) throws Exception {    final THIS ourValue = record.getValue();    final long ourTimestamp = record.getTimestamp();    if (ourTimestamp == Long.MIN_VALUE) {        throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +                "interval stream joins need to have timestamps meaningful timestamps.");    }    if (isLate(ourTimestamp)) {        return;    }    addToBuffer(ourBuffer, ourValue, ourTimestamp);    for (Map.Entry>> bucket: otherBuffer.entries()) {        final long timestamp  = bucket.getKey();        if (timestamp < ourTimestamp + relativeLowerBound ||                timestamp > ourTimestamp + relativeUpperBound) {            continue;        }        for (BufferEntry entry: bucket.getValue()) {            if (isLeft) {                collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);            } else {                collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);            }        }    }    long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;    if (isLeft) {        internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);    } else {        internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);    }}

这段代码的思路是:

  1. 取得当前流 StreamRecord 的时间戳,调用 isLate() 方法判断它是否是迟到数据(即时间戳小于当前水印值),如是则丢弃。
  2. 调用 addToBuffer() 方法,将时间戳和数据一起插入当前流对应的 MapState。
  3. 遍历另外一个流的 MapState,如果数据满足前述的时间区间条件,则调用 collect() 方法将该条数据投递给用户定义的 ProcessJoinFunction 进行处理。collect() 方法的代码如下,注意结果对应的时间戳是左右流时间戳里较大的那个。
private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {    final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);    collector.setAbsoluteTimestamp(resultTimestamp);    context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);    userFunction.processElement(left, right, context, collector);}
  1. 调用 TimerService.registerEventTimeTimer() 注册时间戳为 timestamp + relativeUpperBound 的定时器,该定时器负责在水印超过区间的上界时执行状态的清理逻辑,防止数据堆积。注意左右流的定时器所属的 namespace 是不同的,具体逻辑则位于 onEventTime() 方法中。
@Overridepublic void onEventTime(InternalTimer timer) throws Exception {    long timerTimestamp = timer.getTimestamp();    String namespace = timer.getNamespace();    logger.trace("onEventTime @ {}", timerTimestamp);    switch (namespace) {        case CLEANUP_NAMESPACE_LEFT: {            long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;            logger.trace("Removing from left buffer @ {}", timestamp);            leftBuffer.remove(timestamp);            break;        }        case CLEANUP_NAMESPACE_RIGHT: {            long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;            logger.trace("Removing from right buffer @ {}", timestamp);            rightBuffer.remove(timestamp);            break;        }        default:            throw new RuntimeException("Invalid namespace " + namespace);    }}

本文转载自简书,作者:LittleMagic
原文链接:https://www.jianshu.com/p/45ec888332df


  Flink Forward Asia 2020  大会议程发布Flink Forward Asia 2020 在线峰会重磅开启!12月13-15日,全球 38+ 一线厂商,70+ 优质议题,与您探讨新型数字化技术下的未来趋势!大会议程已正式上线,点击文末「阅读原文」即可免费预约~(点击可了解更多大会详情)戳我预约!

left join 后数据变多_Flink 双流 Join 的3种操作示例相关推荐

  1. Flink 双流 Join 的3种操作示例

    在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作.同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息.Flink DataStream API 为用 ...

  2. html更改灰色按钮可用,点击提交按钮后按钮变灰色不可用状态的三种方法

    第一种方法:直接按钮中加入 当点击提交后,提交按钮变灰色不可用,这样可有效防止重复提交,本代码就是实现这样一个功能.从代码就可以看出,我们只需在提交按钮上加入这一句: οnclick="ja ...

  3. jquery按钮置灰_点击提交按钮后按钮变灰色不可用状态的三种方法

    第一种方法:直接按钮中加入 当点击提交后,提交按钮变灰色不可用,这样可有效防止重复提交,本代码就是实现这样一个功能.从代码就可以看出,我们只需在提交按钮上加入这一句: οnclick="ja ...

  4. html表单按钮灰色,点击提交按钮后按钮变灰色不可用状态的三种方法

    第一种方法:直接按钮中加入 当点击提交后,提交按钮变灰色不可用,这样可有效防止重复提交,本代码就是实现这样一个功能.从代码就可以看出,我们只需在提交按钮上加入这一句: οnclick="ja ...

  5. Angular使用Console.log()打印出来的数据没问题,点击详情后数据变了

    我在一个界面添加数据使用updataEvent将对象返回给另一个界面后,在onUpData中处理时使用 this.xxxxx= d,直接将地址值给了变量,当这个方法结束后d被重置了,所以this.xx ...

  6. memcpy后数据不对_详解Redis 的 5 种基本数据结构:

    来源:我没有三颗心脏 一.Redis 简介 "Redis is an open source (BSD licensed), in-memory data structure store, ...

  7. 面试官: Flink双流JOIN了解吗? 简单说说其实现原理

    摘要:今天和大家聊聊Flink双流Join问题.这是一个高频面试点,也是工作中常遇到的一种真实场景. 本文分享自华为云社区<万字直通面试:Flink双流JOIN>,作者:大数据兵工厂 . ...

  8. Flink双流JOIN

    1.引子 1.1 数据库SQL中的JOIN 我们先来看看数据库SQL中的JOIN操作.如下所示的订单查询SQL,通过将订单表的id和订单详情表order_id关联,获取所有订单下的商品信息. sele ...

  9. 5.1.4 SELECT+RIGHT JOIN读取数据

    5.1.4 SELECT+RIGHT JOIN读取数据 语法结构: RIGHT JOIN:包含左右表所有的记录 RIGHT JOIN图示如下: 2021年9月10日 写于芜湖

最新文章

  1. idea怎么使用jacoco生成报告_Intellij IDEA解析jacoco结果文件的方法
  2. java maven项目构建ssh工程 父工程与子模块的拆分与聚合
  3. python 二叉树遍历
  4. Bootstrap3插件系列:bootstrap-select2
  5. 使用mysql_fetch_array()获取当前行数据
  6. sdut 图的深度遍历
  7. python使用sqlalchemy执行sql查询语句
  8. 关于jq easyui 刷新tabs的问题
  9. 优先级队列——实现二维数组排序
  10. [代码片断]SQL中解析XML数据
  11. 关于使用Kaptcha验证码框架遇到的问题
  12. json学习笔记,json与js对象格式的转换,js对象转json字符串,json格式转js对象
  13. 批量html源代码 翻译,一键实现网页中英文对照的黑科技翻译工具
  14. !!obj与JavaScript中!!的作用
  15. linux nfs不在同一个网络,NFS共享机制
  16. Android 接入穿山甲SDK之Banner广告
  17. mysql auto_increment 原理_mysql原理之Auto_increment
  18. 大二文本分词过滤分类实验总结
  19. 正则表达式在线测试工具
  20. Windows .bat 脚本简单用法介绍

热门文章

  1. 绘制商务感十足的折线图和面积图
  2. By.css 的级联读取
  3. SAP Spartacus travis ci-scripts 下面 e2e-cypress.sh 的实现分析
  4. SAP S/4HANA: 一条代码线,许多种选择
  5. 如何使用 SAP CDS view 中的 currency conversion 功能
  6. 如何配置 SAP BTP Integration Suite 测试帐号的环境
  7. SAP Spartacus B2B Org Unit List节点展开的递归逻辑实现
  8. SAP Spartacus B2B Org Unit树状结构的加载机制
  9. Java线程同步的一些例子
  10. SAP CRM呼叫中心里的事件注册机制