翻译 | 王柯凝 校对 | 邱从贤(山智)

自版本 Flink 1.5.0 以来,Apache Flink 提供了一种新的状态类型,称为广播状态(Broadcast State)。在本文中,将解释什么是广播状态,并通过示例演示如何将广播状态应用在评估基于事件流的动态模式的应用程序,并指导大家学习广播状态的处理步骤和相关源码,以便在今后的实践中能实现此类的应用。

什么是广播状态

广播状态可以用于通过一个特定的方式来组合并共同处理两个事件流。第一个流的事件被广播到另一个 operator 的所有并发实例,这些事件将被保存为状态。另一个流的事件不会被广播,而是发送给同一个 operator 的各个实例,并与广播流的事件一起处理。广播状态非常适合两个流中一个吞吐大,一个吞吐小,或者需要动态修改处理逻辑的情况。我们将使用后者的一个具体实例来解释广播状态,并在本文的其余部分里对详细的 API 加以说明。

使用广播状态的动态模型评估

假设电子商务类型的网站获取了所有用户的操作行为数据作为用户的操作流,网站的运营团队致力于分析用户的操作,来提高销售额,改善用户体验,并监测和预防恶意行为。网站期望实现一个流应用程序,用于检测用户事件流中的模式,但需要避免在每次模式有变化的时候还要修改和重新部署应用程序,因此我们使用另外一个特征流来读取、更新当前特征,接下来我们通过一个实例逐步阐述如何通过 Apache Flink 中的广播状态来完成相应工作。

实例的程序获取两个数据流,第一个流提供了网站上的用户操作行为数据,如上图左上方所示,一个用户的交互事件由操作的类型(用户登录、用户注销、添加到购物车或者完成付款等)和用户的 ID(按颜色编码的)组成。图中的用户操作事件流包含用户 1001 的“登出”操作,然后是用户 1003 的“支付完成”事件,以及用户 1002 的“添加到购物车”操作。

第二个流提供了应用程序要评估的用户操作模式,模式是由两个连续的操作组成的。在上图中,模式流包含了以下两种:

  • 模式1:用户登录并立即登出,并没有点击网站上其它的页面;

  • 模式2:用户将商品添加到购物车,然后登出,而并没有完成购买操作;

这样的模式有助于企业更好地分析用户行为、检测恶意行为和提高网站体验。例如,如果只是将商品添加到购物车里而没有完成后续的支付,那么网站可以采取合适的方法,更好地了解用户没有购买的原因,并采取一定的措施以提高网站的购买转化率(例如提供优惠券、免运费等)。

在上图右侧,显示了一个 operator 的三个并发实例,这些实例获取模式和用户操作行为的数据流,评估数据流上的模式,并向下游发出模式匹配事件。为了简便起见,我们的例子中的 operator 只对一个进行两次后续操作行为的模式进行评估。当从模式流中获取到新模式的时候,将替换当前活动的模式。原则上,该 operator 也可以实现评估更复杂的模式或多个模式,这些模式可以单独添加或是删除。

我们将描述负责模式匹配的程序如何处理用户的操作和模式流。

首先,向 operator 发送一个模式,该模式被广播给这个 operator 的三个并发实例,接着,每个并发实例将模式存储在广播状态中,由于广播状态只能使用广播数据来进行更新,因此所有并发实例的状态都应该是相同的。

接下来,第一个用户信息流会基于用户 ID 进行划分,并发送给 operator 的实例,分区会确保同一用户的所有操作都由同一并发实例处理。上图显示了在 operator 实例处理了第一个模式和前三个操作行为事件之后应用程序的状态。

当任务接收到新的用户操作数据时,它通过查看用户最新的和历史的操作记录来评估当前的活动模式。对于每个用户,operator 都在 keyed state 中存储用户的上一个操作。到目前为止,由于上图中的任务只为每个用户接收一个操作(我们刚刚启动了应用程序),因此不需要评估模式。最后,keyed state 中用户的上一个操作将更新为最新的操作,以便在同一用户的下一个操作行为到达时能够进行查找。

在前三个操作行为被处理了之后,下一个事件,即用户 1001 的注销操作,将被发送到处理用户 1001 的并发实例中。当并发实例接收到用户操作的数据时,它从广播状态和用户 1001 的上一个操作中查找当前的模式。由于这两个操作符合模式匹配,因此会往下游发送匹配事件。最后,该任务会通过使用最新的操作来覆盖前一个事件以更新其 keyed state。

当一个新模式进入了模式流,它会被广播给所有任务,并且每个并发实例通过使用新模式替换当前模式来更新其广播状态。

一旦广播状态更新为新模式,那么匹配逻辑将像以前一样继续执行,即用户操作行为事件按键(key)进行分区,并由负责的并发实例进行评估。

如何实现广播状态的应用程序?

到目前为止,我们在概念上讨论了应用程序,并解释了如何使用广播状态来评估事件流上的动态模式。接下来,我们将展示如何使用 Flink 的 DataStream API 和广播状态功能实现该实例的程序代码。

让我们从程序的输入数据开始。有两个数据流:操作行为流和模式流,在这一点上,我们并不关心数据流从何而来,这些流可以从 Apache Kafka、Kinesis 或任何其它系统中获取。

DataStream<Action> actions = ???DataStream<Pattern> patterns = ???

Action 和 Pattern 都是 POJO,每个都含有两个字段:

  • Action的字段:Long userId, String action

  • Pattern的字段:String firstAction, String secondAction

作为第一步,我们将 userId 作为操作行为流上的键:

KeyedStream<Action, Long> actionsByUser = actions  .keyBy((KeySelector<Action, Long>) action -> action.userId);

接下来,我们准备广播状态,广播状态通常表示为 MapState,这是 Flink 提供的最通用的状态接口类。

MapStateDescriptor<Void, Pattern> bcStateDescriptor =   new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));

由于这个应用程序一次只评估和存储一个 Pattern,所以我们将广播状态配置成具有键类型 Void 和值类型 Pattern 的 MapState。MapState 的键永远为 null。

BroadcastStream<Pattern> bcedPatterns = patterns.broadcast(bcStateDescriptor);

以 MapStateDescriptor 为参数,调用模式流上的 Broadcast 转换操作,得到一个  BroadcastStream 对象 bcedPatterns。

DataStream>> matches = actionsByUser .connect(bcedPatterns) .process(new PatternEvaluator());

在获得了 keyedstreamactionsByUser 和广播流 bcedPatterns 之后,我们对两个流使用了 connect() 方法,并在连接的流上调用了 PatternEvaluator 类(见下面 PatternEvaluator 的代码)。PatternEvaluator 是实现 KeyedBroadcastProcessFunction 接口的自定义类。它调用了我们之前讨论过的模式匹配逻辑,并发出 Tuple2 的记录,其中包含用户 ID 和匹配的模式。

public static class PatternEvaluator    extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> {   // handle for keyed state (per user)  ValueState prevActionState;  // broadcast state descriptor  MapStateDescriptor patternDesc;   @Override  public void open(Configuration conf) {    // initialize keyed state    prevActionState = getRuntimeContext().getState(      new ValueStateDescriptor<>("lastAction", Types.STRING));    patternDesc =       new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));  }  /**   * Called for each user action.   * Evaluates the current pattern against the previous and   * current action of the user.   */  @Override  public void processElement(     Action action,      ReadOnlyContext ctx,      Collector> out) throws Exception {   // get current pattern from broadcast state   Pattern pattern = ctx     .getBroadcastState(this.patternDesc)     // access MapState with null as VOID default value     .get(null);   // get previous action of current user from keyed state   String prevAction = prevActionState.value();   if (pattern != null && prevAction != null) {     // user had an action before, check if pattern matches     if (pattern.firstAction.equals(prevAction) &&          pattern.secondAction.equals(action.action)) {       // MATCH       out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));     }   }   // update keyed state and remember action for next pattern evaluation   prevActionState.update(action.action); } /**  * Called for each new pattern.  * Overwrites the current pattern with the new pattern.  */ @Override public void processBroadcastElement(     Pattern pattern,      Context ctx,      Collector> out) throws Exception {   // store the new pattern by updating the broadcast state   BroadcastState bcState = ctx.getBroadcastState(patternDesc);   // storing in MapState with null as VOID default value   bcState.put(null, pattern); }}

KeyedBroadcastProcessFunction 接口提供了三种方法来处理数据记录和发出的结果:

  • processBroadcastElement() 方法:每次收到广播流的记录时会调用。在 PatternEvaluator 类中,我们只需使用 null 键将接收到的 Pattern 记录放入广播状态中(记住,我们只在 MapState 中存储一个模式);

  • processElement() 方法:接受到用户行为流的每条消息时会调用,并能够对广播状态进行只读操作,以防止导致跨越类中多个并发实例的不同广播状态的修改。PatternEvaluator 类的 processElement() 方法从广播状态中获取当前模式,并从 keyed state 中获取用户的前一个操作。如果两者都存在,它会检查前一个和当前的操作行为是否与模式匹配,如果是这样,则会发出模式匹配记录。最后,它将 keyed state 更新为当前用户操作;

  • onTimer() 方法:当之前注册过的计时器触发时被调用。计时器可以在processElement 方法中定义,用于执行计算或是清除状态。为了保持代码的简洁性,我们没有在例子中实现这个方法,但当用户在某段时间内没有操作时,它可以用来删除最后一个操作,以避免由于非活动用户而导致状态增长;

你可能注意到了 KeyedBroadcastProcessFunction 类方法的上下文对象,提供了对其它功能的访问方法,例如:

  • 广播状态(读写或只读,取决于方法)

  • TimerService,允许访问记录的时间戳、当前的水印,并可以注册计时器

  • 当前键(仅在 processElement() 方法中可用)

  • 一种将函数应用于每个已注册键的 keyed state 的方法(仅在 processBroadcastElement() 方法中可用) KeyedBroadcastProcessFunction 类与其它任何 ProcessFunction 类一样,完全可以调用 Flink 的状态和时间功能,因此可以用于实现复杂的程序逻辑。广播状态被设计成了多功能,能够适应不同的场景和用例,虽然我们只讨论了一个比较简单的应用程序,但是你可以通过多个方式使用广播状态来实现应用的需求。

结论

在本文中,我们通过学习一个应用程序的实例,来解释 Apache Flink 的广播状态是什么,以及如何应用它来评估事件流上的动态模式,除此之外本文还讨论了广播状态的 API,并展示了相关源代码。


Via:https://flink.apache.org/2019/06/26/broadcast-state.html

作者 :Fabian Hueske

活动推荐

持续学习、和同行交流的机会来啦,由贾扬清助阵,阿里云计算平台事业部、天池平台、intel 联合举办的首届 Apache Flink 极客挑战赛重磅来袭!

聚焦机器学习与计算性能两大时下热门领域,参与比赛,让自己成为技术多面手,还有机会赢得 10W 奖金。大赛详情请点击下方图片?

(点击图片即可查看大赛详细信息)你也「在看」吗?

as点击发送广播_Apache Flink 中广播状态的实用指南相关推荐

  1. Flink中的状态管理

    1 Flink中的状态   当数据流中的许多操作只查看一个每次事件(如事件解析器),一些操作会跨多个事件的信息(如窗口操作).这些操作称为有状态.状态由一个任务维护,并且用来计算某个结果的所有数据,都 ...

  2. 详解Flink中的状态管理

    流式计算分为无状态和有状态两种情况.无状态的计算观察每个独立事件,并根据最后一个事件输出结果.例如:流处理应用程序从传感器接收温度读数,并在温度超过90度时发出警告.有状态的计算则会基于多个事件输出结 ...

  3. flink checkpoint 恢复_Apache Flink 管理大型状态之增量 Checkpoint 详解

    邱从贤(山智),Apache Flink Contributor,中南大学硕士,2018 年加入阿里巴巴计算平台事业部,专注于 Flink 核心引擎开发,主要从事 Flink  State&C ...

  4. Flink中的状态与容错

    1.概述 Flink支持有状态计算,根据支持得不同状态类型,分别有Keyed State和Operator State.针对状态数据得持久化,Flink提供了Checkpoint机制处理:针对状态数据 ...

  5. linux chroot_Linux中chroot命令的实用指南

    linux chroot Sometimes, you may need to isolate a process from other processes running on your syste ...

  6. c ++向量库_C ++中的2D向量–实用指南2D向量

    c ++向量库 Also referred to as vector of vectors, 2D vectors in C++ form the basis of creating matrices ...

  7. 简短加密_神经网络训练中回调的简短实用指南

    简短加密 Callbacks are an important part of neural network training. These are actions that can be perfo ...

  8. flink的广播、累加、缓存

    flink的广播.累加器.分布式缓存 Flink的广播变量 Flink支持广播.可以将数据广播到TaskManager上,数据存储到内存中.数据存储在内存中,这样可以减缓大量的 shuwle操作:比如 ...

  9. Flink之State状态编程

    Flink中的状态State State分类 State[ValueState.ReadOnlyBroadcastState.MapState.AppendingState] AppendingSta ...

最新文章

  1. 边缘计算Edage Computing
  2. numpy中amin()方法中维度axis=0 1 2 的理解
  3. C# 操作Excel数据透视表
  4. Django静态文件一瞥
  5. 【AI不惑境】模型压缩中知识蒸馏技术原理及其发展现状和展望
  6. 如何在Elasticsearch中进行深分页
  7. sql server相关的命令行
  8. python将学生信息保存到文件中_Python statsmodels OLS:如何将学习的模型保存到文件中...
  9. 如何使用基于范围的for()循环与std :: map?
  10. OSG/osgEarth相关功能函数汇总
  11. 项目背景一般写什么_项目申报整体框架规划思路,提高你的书写逻辑!
  12. 视频教程-使用 Pandas 与 Matplotlib 分析科比职业生涯数据-Python
  13. 统考计算机应用基础ex,EXCEL操作题
  14. 组学生信| Front Immunol |基于血清蛋白质组早期诊断标志筛选的简单套路
  15. python海龟动画小猫边走边换造型的类
  16. VC++ 查看系统进程,获取进程关联的DLL列表
  17. 正则匹配大于等于号与indexof结合
  18. java中多线程介绍
  19. 计科生毕业一年,做了什么?
  20. VLAN与PVLAN的区别

热门文章

  1. IDEA中debug
  2. MQTT连接阿里云IoT(四)
  3. SPI通信总线-51
  4. python的scrapy框架的安装_Python3环境安装Scrapy爬虫框架过程及常见错误
  5. 线程安全、守护线程、join()
  6. 51单片机之串口通讯应用实例(逻辑分析仪调试)
  7. java.library.path hadoop_关于java:Hadoop“无法为您的平台加载本机Hadoop库”警告
  8. linux top交叉编译_Linux 系统下ARM Linux交叉编译环境crosstool工具
  9. java调用sap接口_(二)通过JAVA调用SAP接口 (增加一二级参数)
  10. Multi-Temporal SAR Data Large-Scale Crop Mapping Based on U-Net Model(利用U-net对多时相SAR影像获得作物图)...