FlinkCEP的底层理论:NFA-b Automaton原理介绍
1. 基本概念
1.1. NFAb介绍
FlinkCEP是基于《Efficient Pattern Matching over Event Streams》这篇论文的思想实现的。
该论文提出了一种在事件流上进行高效模式匹配的方法,即带匹配缓存的非确定有限自动机【Non-deterministic finite automaton】,又称为NFAb自动机。
1.2. SASE+
SASE+语言:一个复杂的事件语言,它支持事件流上的Kleene闭包,并对该语言的可表达性提供了形式化的分析。
SASE+是一种专门用来描述CEP pattern的通用语言
1.3. CEP模式操作符
CEP模式操作符
【org.apache.flink.cep.operator.CepOperator】
- 针对键控输入流【keyed input stream】,对于每个键,CepOperator创建一个NFA和一个优先队列来缓冲顺序乱序的事件,这两种数据结构都使用托管键控状态进行存储。
- 针对于非键控输入流,CepOperator创建一个的全局NFA。
当事件被处理时,它更新NFA的内部状态机。
属于部分匹配序列的事件被保存在内部的SharedBuffer缓冲区【一个内存优化的数据结构】中。
当包含事件的所有匹配序列时碰到如下情况时,将删除缓冲区中的事件:
- emitted (success)
- discarded (patterns containing NOT)
- timed-out (windowed patterns)
2. NFAbAutomaton原理
2.1. NFAbAutomaton的定义与构造
在大学《形式语言与自动机》课程中的非确定有限状态机(NFA),用一句话概括就是:对于每个<状态,输入符号>二元组,其状态转移可以有多个,而不是确定的一个。
NFAbAutomaton的定义与普通NFA略有不同,为五元组:A = (Q, E, θ, q1, F)
说明如下:
- Q: 表示状态集合 a set of states
- E: 表示状态转移的有向边集合 a set of directed edges
- θ: 表示状态转移的公式集合,与E共同作用 a set of formulas
- q1: 表示边的初始状态;labelling those edges a start state
- F: 表示最终状态 a final state
下面通过实例来构造NFAbAutomaton。
先通过SASE+语言]定义如下的股票趋势事件模式:
PATTERN SEQ(Stock+ a[ ], Stock b)
WHERE skip_till_next_match(a[ ], b) {[symbol]and a[1].volume > 1000and a[i].price > avg(a[..i-1].price)and b.volume < 80%*a[a.LEN].volume }
WITHIN 1 hour
此事件模式以1小时作为时间窗口的长度,以“交易量大于1000”作为匹配序列的起始,且要求序列中股票的最近价格必须高于之前所有交易价格的均值,当检测到该股票的交易量下跌到最近一次交易量的80%以下时,则成功匹配整个事件模式。
根据此股票趋势事件模式,构造出NFAbAutomaton,如下图所示。这也是Flink CEP中NFACompiler组件需要做的事情。
匹配序列a[]的生成过程 就是构造符合谓词约束的事件的Kleene +闭包的过程
NFAbAutomaton的每个状态都有各自的匹配缓存,用于在运行时存储当前的匹配结果。
the start state, a[1], is where the matching process begins. It awaits input to start the Kleene plus and to select an event into the a[1] unit of the match buffer. At the next state a[i], it attempts to select another event into the a[i] (i > 1) unit of the buffer. The subsequent state b denotes that the matching process has fulfilled the Kleene plus (for a particular match) and is ready to process the next pattern component. The final state, F, represents the completion of the process, resulting in the creation of a pattern match.
- 初始状态a[1]是匹配过程开始的位置,即NFAbAutomaton的初始状态。它等待输入以启动Kleene plus并将一个事件【交易量大于1000的事件】选择到匹配缓冲区的a[1]单元中。
- 在下一个状态a[i]时,它尝试选择另一个事件【最近价格高于之前所有交易价格的均值】到缓冲区的a[i] (i > 1)单元中。a[i]是正在构造Kleene +的状态
- 随后的状态b表示匹配过程已经完成了Kleene +(对于特定的匹配)闭包,并准备处理下一个模式组件【交易量下跌到最近一次交易量的80%以下】。
- 最后的状态F表示流程的完成,从而创建模式匹配。
2.2. 状态转移语义
复杂事件模式的匹配过程:本质上是输入事件流驱动NFAbAutomaton进行状态转移的过程。
根据θ集合定义的条件,在有向边集合E上可以定义4种状态转移语义:
- begin:消费输入事件,存入缓存,并转移到下一个状态;
- take:消费输入事件,存入缓存,并保持当前状态;
- ignore:忽略输入事件,不存入缓存,并保持当前状态;
- proceed:感知输入事件,转移到下一个状态,同时保留该事件给下一个状态处理。
结合这4种状态转移语义,就可以读懂上图中的转移公式了。
FlinkCEP的StateTransitionAction定义中没有begin语义,仅有take、ignore和proceed语义,但是它和NFAbAutomaton是等价的
/** Set of actions when doing a state transition from a {@link State} to another. */
public enum StateTransitionAction {TAKE, // take the current event and assign it to the current stateIGNORE, // ignore the current eventPROCEED // do the state transition and keep the current event for further processing (epsilon transition)
}
2.3. 事件选择策略
事件选择策略:指选择符合条件的事件进入正闭包——即扩展匹配序列的方法。在时间窗口的限制之内,常用的有以下三种策略。
- Strict contiguity(严格连续):在最严格的事件选择策略中,两个选定的事件必须在输入流中是连续的。这种要求在正则表达式匹配字符串、DNA序列等时很常见。
- 严格按顺序选择所有符合条件的事件,途中不能出现不符合条件的事件
- 对应FlinkCEP API中的Pattern.next()/notNext();
- Partition contiguity:两个选定的事件不需要是连续的; 但是如果事件是根据一个条件在概念上划分的,那么在同一分区中,下一个相关事件必须与前一个相关事件相邻
- 例如示例中的[symbol],通常用于形成分区。然而如果事件模式的目的是检测价格上涨的总体趋势,尽管存在一些局部波动值,那么分区相邻可能不够灵活导致无法支持。
- 对应FlinkCEP在键控输入流【keyed input stream】上使用Strict contiguity
- skip till next match(宽松连续):进一步宽松,完全删除连续性要求: 所有不相关的事件将被跳过,直到读取下一个相关事件
- 按顺序选择所有符合条件的事件,而途中不符合条件的事件被忽略,
- 对应FlinkCEP API中的Pattern.followedBy()/notFollowedBy()。上述SASE+语言描述的pattern使用的就是这个策略;
- skip till any match(可变宽松连续):Finally, skip till any match relaxes the previous one by further allowing non-deterministic actions on relevant events
- 在skip till next match的基础上,还允许忽略一些符合条件的事件,以尽量延长匹配序列的长度,
- 对应FlinkCEP API中的Pattern.followedByAny()。
以skip till next match策略为例,给出如下的示例数据,可以产生3个匹配序列R1、R2、R3,如图所示。
2.3. 共享版本匹配缓存
仍然考虑上一节的图,回顾一下a[i]状态的take和proceed转移逻辑:
θ*a[i]_take = θa[i]_take ∧ a[i].time<a[1].time+1 hour
θ*a[i]_proceed = θb_begin ∨ (¬θ*a[i]_take ∧ ¬θ*a[i]_ignore)
可见,在e6到达NFA时,可以同时满足a[i]_take和a[i]_proceed的转移(这里正好体现出了NFA的不确定性),所以原本的一个序列会在此分裂成两个:其中一个(R1)终止匹配,另一个(R3)继续匹配。同理,当e3到达NFA时,同时满足a[1]_begin和a[i]_take的转移,所以又会出现一个序列R2。
由上可知,这些序列之间的重合是比较大的,如果都按原样存储在匹配缓存中,会造成比较大的膨胀。为了避免这个问题,论文中设计了一种科学的缓存结构,称为shared versioned match buffer,即“共享版本匹配缓存”,如下图所示:
其中图a、b、c是原始的R1、R2、R3缓存,图d则是整合在一起的共享版本缓存。
它会将所有序列的前向指针附加上一个版本号(采用杜威十进制法,点号分隔),并且遵循以下两个规则:
- 迁移到下一个状态时,版本号增加一位,如a[1]状态的版本号是1(为了符合习惯写作1.0),a[i]状态的版本号是1.0、1.1,b状态的版本号是1.0.0、1.1.0……以此类推;
- 当序列发生分裂时,处于当前状态的版本号位加1。例如e3事件产生了2.0版本,e6事件产生了1.1版本。
依照这种规则,就可以根据前向指针上版本号的递增规律和前缀来回溯出正确的序列了。
FlinkCEP中将此缓存设计为SharedBuffer类,但是版本的设计有些不同。
2.4. 计算状态
对于每一个序列,NFAbAutomaton还需要维护一些最基础的状态数据,以方便执行状态转移和匹配逻辑,论文中将其称为computation state,即计算状态。
基础的计算状态结构如下图所示,包含以下数据项:
- 当前的版本号;
- 当前的状态;
- 指向匹配缓存中最近一个事件的指针;
- 整个序列的起始时间;
- 其他必要的上下文数据存储。以股票趋势数据为例,会维护Kleene +闭包内的事件数、价格之和以及交易量等。
运行计算状态【Computation state of runs】如下:
Flink CEP框架用ComputationState类来维护计算状态,大体思路与论文相同。
FlinkCEP的底层理论:NFA-b Automaton原理介绍相关推荐
- quartz获取开始结束时间_quartz核心元素及底层原理介绍
quartz核心元素及底层原理介绍 Quartz的核心元素主要有Scheduler.Trigger.Job.JobDetail.其中 - Scheduler为调度器负责整个定时系统的调度,内部通过线程 ...
- 【基于物理的渲染(PBR)白皮书】(二) PBR核心理论与渲染光学原理总结
本文由@浅墨_毛星云 出品,首发于知乎专栏,转载请注明出处 文章链接: https://zhuanlan.zhihu.com/p/56967462 这是[基于物理的 ...
- Hadoop 底层原理介绍
1 概述 Apache Hadoop是一个软件框架,可在具有数千个节点和PB级数据的大型集群上进行分布式处理. Hadoop主要包含四个项目:Hadoop Common.Hadoop分布式文件系统(H ...
- APM(应用性能管理)与Dapper原理介绍
欢迎点击访问我的瞎几把整站点:复制未来 文章目录 APM(应用性能管理)与Dapper原理介绍 什么是APM APM介绍 APM三大特征 APM的发展历程 DevOps APM 的核心思想 为什么要使 ...
- heartbeat原理介绍
heartbeat原理介绍 HeartBeat运行于备用主机上的Heartbeat可以通过以太网连接检测主服务器的运行状态,一旦其无法检测到主服务器的"心跳"则自动接管主服务器的资 ...
- Springboot中的缓存Cache和CacheManager原理介绍
一.背景理解 什么是缓存,为什么要用缓存? 程序运行中,在内存保持一定时间不变的数据就是缓存.简单到写一个Map,里面放着一些key,value数据,就已经是个缓存了.所以缓存并不是什么高大上的技术, ...
- Express中间件工作原理介绍
Express中间件工作原理介绍 中间件是Express框架学习中最难的部分,同时也是最为核心的技术,我们的学习路线如下 1.什么是中间件 2.Express中间件的本质及工作原理 3.自定义解析po ...
- LVS原理介绍及安装过程
一.ARP技术概念介绍 为什么讲ARP技术,因为平常工作中有接触.还有就是LVS的dr模式是用到arp的技术和数据. 1.什么是ARP协议 ARP协议全程地址解析协议(AddressResolutio ...
- ethtool 原理介绍和解决网卡丢包排查思路(附ethtool源码下载)
Table of Contents 1. 了解接收数据包的流程 将网卡收到的数据包转移到主机内存(NIC 与驱动交互) 通知系统内核处理(驱动与 Linux 内核交互) 2. ifconfig 解释 ...
最新文章
- 万能系统卸载器免root_Linux umount命令:卸载文件系统
- wireshark 如何修改抓包时间日期显示格式?
- AngularJS之ng-class(十一)
- Linux 查看网段内所有IP
- Steady Cow Assignment
- C语言--学生管理系统--(完整代码)
- 华为云张昆:支持全场景全业务,GaussDB加速企业数字化转型
- 从零开始徒手撸一个vue的toast弹窗组件
- 细说CSS的transform
- 发布NGuestBook(一个基于.NET平台的分层架构留言本小系统)
- 【大数据部落】R语言实现:混合正态分布EM最大期望估计法
- ITIL学习笔记——核心流程之:配置管理
- 爆音(杂音)问题的推论与解决
- 〖Python零基础入门篇③〗- Pycharm编辑器不能复制粘贴怎么办?
- 基于okhttp3依赖和gson依赖的快递查询系统
- 微信小程序——天气查询
- 最主流的五个大数据处理框架的优势对比
- JavaScript数据结构——图的实现
- 图片镜像翻转 Java
- 论大数据时代下的海量数据存储技术
热门文章
- LeetCode 885 救生艇
- 看完20000条微博,捋一捋杜蕾斯的营销套路
- kettle日志解析_kettle之日志有关方面的总结
- php重载求圆锥体积,编写一函数文件,实现求一个圆锥体的体积。
- 使用Audacity软件对清浊音进行时频分析并描述其特点
- PATCHY-SAN - Learning Convolutional Neural Networks for Graphs ICML
- 计算机考研初试350分什么水平,考研分数,考研350分什么水平!
- [洛谷P1024]python一元三次方程求解
- 来,带你实现基于网络通信QQ聊天室-----QQ有这么强!!!
- android 禁用触摸屏,如何在Android手机中禁用触摸屏?