目录

十二:Flink CEP

12.1 基本概念

12.1.1 CEP 是什么

12.1.2 模式(Pattern)

12.1.3 应用场景

12.2 快速上手

12.2.1 需要引入的依赖

12.2.2 一个简单实例

12.3 模式 API(Pattern API)

12.3.1 个体模式

12.3.2 组合模式

12.3.3 模式组

12.3.4 匹配后跳过策略


十二:Flink CEP

在 Flink 的学习过程中,从基本原理和核心层 DataStream API 到底层的处理函数、再到应 用层的 Table API 和 SQL,我们已经掌握了 Flink 编程的各种手段,可以应对实际应用开发的 各种需求了。

在大数据分析领域,一大类需求就是诸如 PV、UV 这样的统计指标,我们往往可以直接 写 SQL 搞定;对于比较复杂的业务逻辑,SQL 中可能没有对应功能的内置函数,那么我们也可以使用 DataStream API,利用状态编程来进行实现。

不过在实际应用中,还有一类需求是要检测以特定顺序先后发生的一组事件,进行统计或做报警提示,这就比较麻烦了。例如,网站 做用户管理,可能需要检测“连续登录失败”事件的发生,这是个组合事件,其实就是“登录 失败”和“登录失败”的组合;电商网站可能需要检测用户“下单支付”行为,这也是组合事 件,“下单”事件之后一段时间内又会有“支付”事件到来,还包括了时间上的限制

类似的多个事件的组合,我们把它叫作“复杂事件”。对于复杂时间的处理,由于涉及到 事件的严格顺序,有时还有时间约束,我们很难直接用 SQL 或者 DataStream API 来完成。于 是只好放大招——派底层的处理函数(process function)上阵了。处理函数确实可以搞定这些需求,不过对于非常复杂的组合事件,我们可能需要设置很多状态、定时器,并在代码中定义 各种条件分支(if-else)逻辑来处理,复杂度会非常高,很可能会使代码失去可读性。怎样处 理这类复杂事件呢?Flink 为我们提供了专门用于处理复杂事件的库——CEP,可以让我们更 加轻松地解决这类棘手的问题。这在企业的实时风险控制中有非常重要的作用。

12.1 基本概念

12.1.1 CEP 是什么

所谓 CEP,其实就是“复杂事件处理(Complex Event Processing)”的缩写;而 Flink CEP, 就是 Flink 实现的一个用于复杂事件处理的库(library)

那到底什么是“复杂事件处理”呢?就是可以在事件流里,检测到特定的事件组合并进行处理,比如说“连续登录失败”,或者“订单支付超时”等等。

具体的处理过程是,把事件流中的一个个简单事件,通过一定的规则匹配组合起来,这就 是“复杂事件”;然后基于这些满足规则的一组组复杂事件进行转换处理,得到想要的结果进行 输出。

总结起来,复杂事件处理(CEP)的流程可以分成三个步骤:

(1)定义一个匹配规则

(2)将匹配规则应用到事件流上,检测满足规则的复杂事件

(3)对检测到的复杂事件进行处理,得到结果进行输出

如图所示,输入是不同形状的事件流,我们可以定义一个匹配规则:在圆形后面紧跟着三角形。那么将这个规则应用到输入流上,就可以检测到三组匹配的复杂事件。它们构成 了一个新的“复杂事件流”,流中的数据就变成了一组一组的复杂事件,每个数据都包含了一 个圆形和一个三角形。接下来,我们就可以针对检测到的复杂事件,处理之后输出一个提示或 报警信息了。

所以,CEP 是针对流处理而言的,分析的是低延迟、频繁产生的事件流

它的主要目的, 就是在无界流中检测出特定的数据组合,让我们有机会掌握数据中重要的高阶特征。

12.1.2 模式(Pattern)

CEP 的第一步所定义的匹配规则,我们可以把它叫作“模式”(Pattern)。模式的定义主要就是两部分内容:

⚫ 每个简单事件的特征

⚫ 简单事件之间的组合关系

当然,我们也可以进一步扩展模式的功能。比如,匹配检测的时间限制;每个简单事件是否可以重复出现;对于事件可重复出现的模式,遇到一个匹配后是否跳过后面的匹配;等等。

所谓“事件之间的组合关系”,一般就是定义“谁后面接着是谁”,也就是事件发生的顺序。 我们把它叫作“近邻关系”。可以定义严格的近邻关系,也就是两个事件之前不能有任何其他 事件;也可以定义宽松的近邻关系,即只要前后顺序正确即可,中间可以有其他事件。另外, 还可以反向定义,也就是“谁后面不能跟着谁”。 CEP 做的事其实就是在流上进行模式匹配。根据模式的近邻关系条件不同,可以检测连续的事件或不连续但先后发生的事件;模式还可能有时间的限制,如果在设定时间范围内没有 满足匹配条件,就会导致模式匹配超时(timeout)。 Flink CEP 为我们提供了丰富的 API,可以实现上面关于模式的所有功能,这套 API 就叫 作“模式 API”(Pattern API)。

12.1.3 应用场景

CEP 主要用于实时流数据的分析处理。CEP 可以帮助在复杂的、看似不相关的事件流中找出那些有意义的事件组合,进而可以接近实时地进行分析判断、输出通知信息或报警。这在 企业项目的风控管理、用户画像和运维监控中,都有非常重要的应用。

⚫ 风险控制

设定一些行为模式,可以对用户的异常行为进行实时检测。当一个用户行为符合了异常行为模式,比如短时间内频繁登录并失败、大量下单却不支付(刷单),就可以向用户发送通知信息,或是进行报警提示、由人工进一步判定用户是否有违规操作的嫌疑。这样就可以有效地控制用户个人和平台的风险。

⚫ 用户画像

利用 CEP 可以用预先定义好的规则,对用户的行为轨迹进行实时跟踪,从而检测出具有特定行为习惯的一些用户,做出相应的用户画像。基于用户画像可以进行精准营销,即对行为 匹配预定义规则的用户实时发送相应的营销推广;这与目前很多企业所做的精准推荐原理是一 样的。

⚫ 运维监控

对于企业服务的运维管理,可以利用 CEP 灵活配置多指标、多依赖来实现更复杂的监控模式。

CEP 的应用场景非常丰富。很多大数据框架,如 Spark、Samza、Beam 等都提供了不同的CEP 解决方案,但没有专门的库(library)。而 Flink 提供了专门的 CEP 库用于复杂事件处理, 可以说是目前 CEP 的最佳解决方案。

12.2 快速上手

12.2.1 需要引入的依赖

想要在代码中使用 Flink CEP,需要在项目的 pom 文件中添加相关依赖:

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_${scala.binary.version}</artifactId> <version>${flink.version}</version>
</dependency>

为了精简和避免依赖冲突,Flink 会保持尽量少的核心依赖。所以核心依赖中并不包括任何的连接器(conncetor)和库,这里的库就包括了 SQL、CEP 以及 ML 等等。所以如果想要 在 Flink 集群中提交运行 CEP 作业,应该向 Flink SQL 那样将依赖的 jar 包放在/lib 目录下

从这个角度来看,Flink CEP 和 Flink SQL 一样,都是最顶层的应用级 API。

12.2.2 一个简单实例

接下来我们考虑一个具体的需求:检测用户行为,如果连续三次登录失败,就输出报警信 息。很显然,这是一个复杂事件的检测处理,我们可以使用 Flink CEP 来实现。

我们首先定义数据的类型。这里的用户行为不再是之前的访问事件 Event 了,所以应该单独定义一个登录事件 POJO 类。具体实现如下:

public class LoginEvent { public String userId; public String ipAddress; public String eventType; public Long timestamp; public LoginEvent(String userId, String ipAddress, String eventType, Long
timestamp) { this.userId = userId; this.ipAddress = ipAddress; this.eventType = eventType; this.timestamp = timestamp; } public LoginEvent() {} @Override public String toString() { return "LoginEvent{" + "userId='" + userId + '\'' + ", ipAddress='" + ipAddress + '\'' + ", eventType='" + eventType + '\'' + ", timestamp=" + timestamp + '}'; }
} 

接下来就是业务逻辑的编写。Flink CEP 在代码中主要通过 Pattern API 来实现。之前我们 已经介绍过,CEP 的主要处理流程分为三步,对应到 Pattern API 中就是:

(1)定义一个模式(Pattern);

(2) 将Pattern应用到DataStream上,检测满足规则的复杂事件,得到一个PatternStream;

(3)对 PatternStream 进行转换处理,将检测到的复杂事件提取出来,包装成报警信息输出。

具体代码实现如下:

package com.atguigu.chapter12;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;public class LoginFailDetectExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);KeyedStream<LoginEvent, String> stream = env.fromElements(new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),new LoginEvent("user_2", "192.168.1.29", "fail", 8000L),new LoginEvent("user_2", "192.168.1.29", "success", 6000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<LoginEvent>() {@Overridepublic long extractTimestamp(LoginEvent loginEvent, long l) {return loginEvent.timestamp;}})).keyBy(data -> data.userId);//1.定义一个模式(Pattern):连续三次登录失败Pattern<LoginEvent, LoginEvent> pattern = Pattern.<LoginEvent>begin("first") //开始事件的名字(标签).where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.eventType.equals("fail");}}).next("second")   //第二次登录事件的名字(标签).where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.eventType.equals("fail");}}).next("third")  //第三次登录事件的名字(标签).where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.eventType.equals("fail");}});//2.将Pattern应用到DataStream上,检测满足规则的复杂事件,得到一个PatternStream;PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);//3.对 PatternStream 进行转换处理,将检测到的复杂事件提取出来,包装成报警信息输出。SingleOutputStreamOperator<String> warningStream = patternStream.select(new PatternSelectFunction<LoginEvent, String>() {@Overridepublic String select(Map<String, List<LoginEvent>> map) throws Exception {//提取复杂事件中的三次登录失败时间LoginEvent firstFailEvent = map.get("first").get(0);LoginEvent secondFailEvent = map.get("second").get(0);LoginEvent thirdFailEvent = map.get("third").get(0);return firstFailEvent.userId + "连续三次登录失败!登录时间为:" +firstFailEvent.timestamp + "," +secondFailEvent.timestamp + "," +thirdFailEvent.timestamp + ".";}});//打印输出warningStream.print("warning");env.execute();}
}

12.3 模式 API(Pattern API)

Flink CEP 的核心是复杂事件的模式匹配。Flink CEP 库中提供了 Pattern 类,基于它可以 调用一系列方法来定义匹配模式,这就是所谓的模式 API(Pattern API)。Pattern API 可以让我们定义各种复杂的事件组合规则,用于从事件流中提取复杂事件。

12.3.1 个体模式

在 12.1.2 小节中我们已经知道,模式(Pattern)其实就是将一组简单事件组合成复杂事件 的“匹配规则”。由于流中事件的匹配是有先后顺序的,因此一个匹配规则就可以表达成先后发 生的一个个简单事件,按顺序串联组合在一起。

这里的每一个简单事件并不是任意选取的,也需要有一定的条件规则;所以我们就把每个简单事件的匹配规则,叫作“个体模式”(Individual Pattern)。

1. 基本形式

在 12.2.2 小节中,每一个登录失败事件的选取规则,就都是一个个体模式。比如:

.<LoginEvent>begin("first") // 以第一个登录失败事件开始 .where(new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent loginEvent) throws Exception { return loginEvent.eventType.equals("fail"); } })

或者后面的:

.next("second") // 接着是第二个登录失败事件 .where(new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent loginEvent) throws Exception { return loginEvent.eventType.equals("fail"); } })

这些都是个体模式。个体模式一般都会匹配接收一个事件。

每个个体模式都以一个“连接词”开始定义的,比如 begin、next 等等,这是 Pattern 对象 的一个方法(begin 是 Pattern 类的静态方法),返回的还是一个 Pattern。这些“连接词”方法 有一个 String 类型参数,这就是当前个体模式唯一的名字,比如这里的“first”、“second”。在 之后检测到匹配事件时,就会以这个名字来指代匹配事件。

个体模式需要一个“过滤条件”,用来指定具体的匹配规则。这个条件一般是通过调 用.where()方法来实现的,具体的过滤逻辑则通过传入的 SimpleCondition 内的.filter()方法来定义。

另外,个体模式可以匹配接收一个事件,也可以接收多个事件。这听起来有点奇怪,一个单独的匹配规则可能匹配到多个事件吗?这是可能的,我们可以给个体模式增加一个“量词” (quantifier),就能够让它进行循环匹配,接收多个事件。

2. 量词(Quantifiers )

个体模式后面可以跟一个“量词”,用来指定循环的次数。从这个角度分类,个体模式可 以包括“单例(singleton)模式”和“循环(looping)模式”。默认情况下,个体模式是单例模式,匹配接收一个事件;当定义了量词之后,就变成了循环模式,可以匹配接收多个事件。

在循环模式中,对同样特征的事件可以匹配多次。比如我们定义个体模式为“匹配形状为 三角形的事件”,再让它循环多次,就变成了“匹配连续多个三角形的事件”。注意这里的“连续”,只要保证前后顺序即可,中间可以有其他事件,所以是“宽松近邻”关系。

在 Flink CEP 中,可以使用不同的方法指定循环模式,主要有:

⚫ .oneOrMore()

匹配事件出现一次或多次,假设 a 是一个个体模式,a.oneOrMore()表示可以匹配 1 个或多 个 a 的事件组合。我们有时会用 a+来简单表示。

⚫ .times(times)

匹配事件发生特定次数(times),例如 a.times(3)表示 aaa;

⚫ .times(fromTimes,toTimes)

指定匹配事件出现的次数范围,最小次数为fromTimes,最大次数为toTimes。例如a.times(2, 4)可以匹配 aa,aaa 和 aaaa。

⚫ .greedy()---贪婪匹配

只能用在循环模式后,使当前循环模式变得“贪心”(greedy),也就是总是尽可能多地去 匹配。例如 a.times(2, 4).greedy(),如果出现了连续 4 个 a,那么会直接把 aaaa 检测出来进行处理,其他任意 2 个 a 是不算匹配事件的。

⚫ .optional()

使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足。

对于一个个体模式 pattern 来说,后面所有可以添加的量词如下:

// 匹配事件出现 4 次
pattern.times(4); // 匹配事件出现 4 次,或者不出现
pattern.times(4).optional(); // 匹配事件出现 2, 3 或者 4 次
pattern.times(2, 4); // 匹配事件出现 2, 3 或者 4 次,并且尽可能多地匹配
pattern.times(2, 4).greedy(); // 匹配事件出现 2, 3, 4 次,或者不出现
pattern.times(2, 4).optional(); // 匹配事件出现 2, 3, 4 次,或者不出现;并且尽可能多地匹配
pattern.times(2, 4).optional().greedy(); // 匹配事件出现 1 次或多次
pattern.oneOrMore(); // 匹配事件出现 1 次或多次,并且尽可能多地匹配
pattern.oneOrMore().greedy(); // 匹配事件出现 1 次或多次,或者不出现
pattern.oneOrMore().optional(); // 匹配事件出现 1 次或多次,或者不出现;并且尽可能多地匹配
pattern.oneOrMore().optional().greedy(); // 匹配事件出现 2 次或多次
pattern.timesOrMore(2); // 匹配事件出现 2 次或多次,并且尽可能多地匹配
pattern.timesOrMore(2).greedy(); // 匹配事件出现 2 次或多次,或者不出现
pattern.timesOrMore(2).optional() // 匹配事件出现 2 次或多次,或者不出现;并且尽可能多地匹配
pattern.timesOrMore(2).optional().greedy();

正是因为个体模式可以通过量词定义为循环模式,一个模式能够匹配到多个事件,所以之前代码中事件的检测接收才会用 Map 中的一个列表(List)来保存。而之前代码中没有定义量 词,都是单例模式,所以只会匹配一个事件,每个 List 中也只有一个元素:

LoginEvent first = map.get("first").get(0);

3. 条件(Conditions)

对于每个个体模式,匹配事件的核心在于定义匹配条件,也就是选取事件的规则。Flink CEP 会按照这个规则对流中的事件进行筛选,判断是否接受当前的事件。

对于条件的定义,主要是通过调用 Pattern 对象的.where()方法来实现的,主要可以分为简单条件、迭代条件、复合条件、终止条件几种类型。此外,也可以调用 Pattern 对象的.subtype()方法来限定匹配事件的子类型。接下来我们就分别进行介绍。

⚫ 限定子类型

调用.subtype()方法可以为当前模式增加子类型限制条件。例如:

pattern.subtype(SubEvent.class); 

这里 SubEvent 是流中数据类型 Event 的子类型。这时,只有当事件是 SubEvent 类型时, 才可以满足当前模式 pattern 的匹配条件。

⚫ 简单条件(Simple Conditions)

简单条件是最简单的匹配规则,只根据当前事件的特征来决定是否接受它。这在本质上其 实就是一个 filter 操作

代码中我们为.where()方法传入一个 SimpleCondition 的实例作为参数。SimpleCondition 是 表示“简单条件”的抽象类,内部有一个.filter()方法,唯一的参数就是当前事件。所以它可以 当作 FilterFunction 来使用。

下面是一个具体示例:

pattern.where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) { return value.user.startsWith("A"); }
});

这里我们要求匹配事件的 user 属性以“A”开头。

⚫ 迭代条件(Iterative Conditions)

简单条件只能基于当前事件做判断,能够处理的逻辑比较有限。在实际应用中,我们可能需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。这种需要依靠之前事 件来做判断的条件,就叫作“迭代条件”(Iterative Condition)。

在 Flink CEP 中,提供了 IterativeCondition 抽象类。这其实是更加通用的条件表达,查看源码可以发现, .where()方法本身要求的参数类型就是 IterativeCondition;而之前的SimpleCondition 是它的一个子类。

在 IterativeCondition 中同样需要实现一个 filter()方法,不过与 SimpleCondition 中不同的 是,这个方法有两个参数:除了当前事件之外,还有一个上下文 Context。调用这个上下文 的.getEventsForPattern()方法传入一个模式名称,就可以拿到这个模式中已匹配到的所有数据了。

下面是一个具体示例:

middle.oneOrMore() .where(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context<Event> ctx) throws Exception { // 事件中的 user 必须以 A 开头 if (!value.user.startsWith("A")) { return false; } int sum = value.amount; // 获取当前模式之前已经匹配的事件,求所有事件 amount 之和 for (Event event : ctx.getEventsForPattern("middle")) { sum += event.amount; } // 在总数量小于 100 时,当前事件满足匹配规则,可以匹配成功 return sum < 100; } }); 

上面代码中当前模式名称就叫作“middle”,这是一个循环模式,可以接受事件发生一次或多次。于是下面的迭代条件中,我们通过 ctx.getEventsForPattern("middle")获取当前模式已 经接受的事件,计算它们的数量(amount)之和;再加上当前事件中的数量,如果总和小于

100,就接受当前事件,否则就不匹配。当然,在迭代条件中我们也可以基于当前事件做出判 断,比如代码中要求 user 必须以 A 开头。最终我们的匹配规则就是:事件的 user 必须以 A 开 头;并且循环匹配的所有事件 amount 之和必须小于 100。这里的 Event 与之前定义的 POJO 不 同,增加了 amount 属性。

可以看到,迭代条件能够获取已经匹配的事件,如果自身又是循环模式(比如量词oneOrMore),那么两者结合就可以捕获自身之前接收的数据,据此来判断是否接受当前事件。 这个功能非常强大,我们可以由此实现更加复杂的需求,比如可以要求“只有大于之前数据的平均值,才接受当前事件”。

另外迭代条件中的上下文 Context 也可以获取到时间相关的信息,比如事件的时间戳和当前的处理时间(processing time)

⚫ 组合条件(Combining Conditions)

如果一个个体模式有多个限定条件,又该怎么定义呢?

最直接的想法是,可以在简单条件或者迭代条件的.filter()方法中,增加多个判断逻辑。可 以通过 if-else 的条件分支分别定义多个条件,也可以直接在 return 返回时给一个多条件的逻辑 组合(与、或、非)。不过这样会让代码变得臃肿,可读性降低。更好的方式是独立定义多个条件,然后在外部把它们连接起来,构成一个“组合条件”(Combining Condition)。

最简单的组合条件,就是.where()后面再接一个.where()。因为前面提到过,一个条件就像 是一个 filter 操作,所以每次调用.where()方法都相当于做了一次过滤,连续多次调用就表示多 重过滤,最终匹配的事件自然就会同时满足所有条件。这相当于就是多个条件的“逻辑与” (AND)。而多个条件的逻辑或(OR),则可以通过.where()后加一个.or()来实现。这里的.or()方法 与.where()一样,传入一个 IterativeCondition 作为参数,定义一个独立的条件;它和之前.where()定义的条件只要满足一个,当前事件就可以成功匹配。

当然,子类型限定条件(subtype)也可以和其他条件结合起来,成为组合条件,如下所示:

pattern.subtype(SubEvent.class)
.where(new SimpleCondition<SubEvent>() { @Override public boolean filter(SubEvent value) { return ... // some condition }
});

这里可以看到,SimpleCondition 的泛型参数也变成了 SubEvent,所以匹配出的事件就既 满足子类型限制,又符合过滤筛选的简单条件;这也是一个逻辑与的关系。

⚫ 终止条件(Stop Conditions)

对于循环模式而言,还可以指定一个“终止条件”(Stop Condition),表示遇到某个特定事件时当前模式就不再继续循环匹配了

终止条件的定义是通过调用模式对象的.until() 方法来实现的, 同样传入一 个IterativeCondition 作为参数。需要注意的是,终止条件只与 oneOrMore() 或者oneOrMore().optional()结合使用。因为在这种循环模式下,我们不知道后面还有没有事件可以 匹配,只好把之前匹配的事件作为状态缓存起来继续等待,这等待无穷无尽;如果一直等下去, 缓存的状态越来越多,最终会耗尽内存。所以这种循环模式必须有个终点,当.until()指定的条件满足时,循环终止,这样就可以清空状态释放内存了。

12.3.2 组合模式

有了定义好的个体模式,就可以尝试按一定的顺序把它们连接起来,定义一个完整的复杂事件匹配规则了。这种将多个个体模式组合起来的完整模式,就叫作“组合模式”(Combining Pattern),为了跟个体模式区分有时也叫作“模式序列”(Pattern Sequence)。

一个组合模式有以下形式:

Pattern<Event, ?> pattern = Pattern
.<Event>begin("start").where(...) .next("next").where(...) .followedBy("follow").where(...) ...

可以看到,组合模式确实就是一个“模式序列”,是用诸如 begin、next、followedBy 等表 示先后顺序的“连接词”将个体模式串连起来得到的。在这样的语法调用中,每个事件匹配的 条件是什么、各个事件之间谁先谁后、近邻关系如何都定义得一目了然。每一个“连接词”方 法调用之后,得到的都仍然是一个 Pattern 的对象;所以从 Java 对象的角度看,组合模式与个 体模式是一样的,都是 Pattern。

1. 初始模式(Initial Pattern)

所有的组合模式,都必须以一个“初始模式”开头;而初始模式必须通过调用 Pattern 的 静态方法.begin()来创建。如下所示:

Pattern<Event, ?> start = Pattern.<Event>begin("start"); 

这里我们调用 Pattern 的.begin()方法创建了一个初始模式。传入的 String 类型的参数就是模式的名称;而 begin 方法需要传入一个类型参数,这就是模式要检测流中事件的基本类型, 这里我们定义为 Event。调用的结果返回一个 Pattern 的对象实例。

Pattern 有两个泛型参数,第一个就是检测事件的基本类型 Event,跟 begin 指定的类型一致;第二个则是当前模式里事件的子类型,由子类型限制条件指定。我们这里用类型通配符(?)代替,就可以从上下文直接推断了。

2. 近邻条件(Contiguity Conditions)

在初始模式之后,我们就可以按照复杂事件的顺序追加模式,组合成模式序列了。模式之间的组合是通过一些“连接词”方法实现的,这些连接词指明了先后事件之间有着怎样的近邻 关系,这就是所谓的“近邻条件”(Contiguity Conditions,也叫“连续性条件”)。 Flink CEP 中提供了三种近邻关系:

⚫ 严格近邻(Strict Contiguity)

如图所示,匹配的事件严格地按顺序一个接一个出现,中间不会有任何其他事件。 代码中对应的就是 Pattern 的.next()方法,名称上就能看出来,“下一个”自然就是紧挨着的。

⚫ 宽松近邻(Relaxed Contiguity)

如图所示,宽松近邻只关心事件发生的顺序,而放宽了对匹配事件的“距离”要求, 也就是说两个匹配的事件之间可以有其他不匹配的事件出现。代码中对应.followedBy()方法, 很明显这表示“跟在后面”就可以,不需要紧紧相邻。

⚫ 非确定性宽松近邻(Non-Deterministic Relaxed Contiguity)

比配结果:(非确定性宽松近邻》=宽松近邻)

这种近邻关系更加宽松。所谓“非确定性”是指可以重复使用之前已经匹配过的事件;这种近邻条件下匹配到的不同复杂事件,可以以同一个事件作为开始,所以匹配结果一般会比宽松近邻更多,如图所示。代码中对应.followedByAny()方法

从图中可以看到,我们定义的模式序列中有两个个体模式:一是“选择圆形事件”,一是“选 择三角形事件”;这时它们之间的近邻条件就会导致匹配出的复杂事件有所不同。很明显,严格近邻由于条件苛刻,匹配的事件最少;宽松近邻可以匹配不紧邻的事件,匹配结果会多一些; 而非确定性宽松近邻条件最为宽松,可以匹配到最多的复杂事件。

3. 其他限制条件

除了上面提到的 next()、followedBy()、followedByAny()可以分别表示三种近邻条件,我们还可以用否定的“连接词”来组合个体模式。主要包括:

⚫ .notNext()

表示前一个模式匹配到的事件后面不能紧跟着某种事件

⚫ .notFollowedBy()

表示前一个模式匹配到的事件后面,不会出现某种事件。这里需要注意,由于notFollowedBy()是没有严格限定的;流数据不停地到来,我们永远不能保证之后“不会出现某种事件”。所以一个模式序列不能以 notFollowedBy()结尾,这个限定条件主要用来表示“两个事件中间不会出现某种事件”。

另外,Flink CEP 中还可以为模式指定一个时间限制,这是通过调用.within()方法实现的。 方法传入一个时间参数,这是模式序列中第一个事件到最后一个事件之间的最大时间间隔,只有在这期间成功匹配的复杂事件才是有效的。一个模式序列中只能有一个时间限制,调 用.within()的位置不限;如果多次调用则会以最小的那个时间间隔为准

下面是模式序列中所有限制条件在代码中的定义:

// 严格近邻条件
Pattern<Event, ?> strict = start.next("middle").where(...); // 宽松近邻条件
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...); // 非确定性宽松近邻条件
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...); // 不能严格近邻条件
Pattern<Event, ?> strictNot = start.notNext("not").where(...); // 不能宽松近邻条件
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...); // 时间限制条件
middle.within(Time.seconds(10)); 

4. 循环模式中的近邻条件

之前我们讨论的都是模式序列中限制条件,主要用来指定前后发生的事件之间的近邻关系。 而循环模式虽说是个体模式,却也可以匹配多个事件;那这些事件之间自然也会有近邻关系的 讨论。

在循环模式中,近邻关系同样有三种:严格近邻、宽松近邻以及非确定性宽松近邻。对于 定义了量词(如 oneOrMore()、times())的循环模式,默认内部采用的是宽松近邻。也就是说, 当循环匹配多个事件时,它们中间是可以有其他不匹配事件的;相当于用单例模式分别定义、 再用 followedBy()连接起来。这就解释了在 12.2.2 小节的示例代码中,为什么我们检测连续三次登录失败用了三个单例模式来分别定义,而没有直接指定 times(3):因为我们需要三次登录失败必须是严格连续的,中间不能有登录成功的事件,而 times()默认是宽松近邻关系。

不过把多个同样的单例模式组合在一起,这种方式还是显得有些笨拙了。连续三次登录失败看起来不太复杂,那如果要检测连续 100 次登录失败呢?显然使用 times()是更明智的选择。 不过它默认匹配事件之间是宽松近邻关系,我们可以通过调用额外的方法来改变这一点。

⚫ .consecutive()

为循环模式中的匹配事件增加严格的近邻条件,保证所有匹配事件是严格连续的。

也就是 说,一旦中间出现了不匹配的事件,当前循环检测就会终止。这起到的效果跟模式序列中的next()一样,需要与循环量词 times()、oneOrMore()配合使用。

于是,12.2.2 小节中检测连续三次登录失败的代码可以改成:

// 1. 定义 Pattern,登录失败事件,循环检测 3 次
Pattern<LoginEvent, LoginEvent> pattern = Pattern .<LoginEvent>begin("fails") .where(new SimpleCondition<LoginEvent>() { @Override public boolean filter(LoginEvent loginEvent) throws Exception { return loginEvent.eventType.equals("fail"); } }).times(3).consecutive(); 

⚫ .allowCombinations()

除严格近邻外,也可以为循环模式中的事件指定非确定性宽松近邻条件,表示可以重复使用已经匹配的事件 。 这需要调用 .allowCombinations() 方法来实现,实现的效果与.followedByAny()相同。

12.3.3 模式组

一般来说,代码中定义的模式序列,就是我们在业务逻辑中匹配复杂事件的规则。不过在有些非常复杂的场景中,可能需要划分多个“阶段”,每个“阶段”又有一连串的匹配规则。为了 应对这样的需求,Flink CEP 允许我们以“嵌套”的方式来定义模式

之前在模式序列中,我们用 begin()、next()、followedBy()、followedByAny()这样的“连 接词”来组合个体模式,这些方法的参数就是一个个体模式的名称;而现在它们可以直接以一个模式序列作为参数,就将模式序列又一次连接组合起来了。这样得到的就是一个“模式组” (Groups of Patterns)。

在模式组中,每一个模式序列就被当作了某一阶段的匹配条件,返回的类型是一个GroupPattern。而 GroupPattern 本身是 Pattern 的子类;所以个体模式和组合模式能调用的方法, 比如 times()、oneOrMore()、optional()之类的量词,模式组一般也是可以用的。

具体在代码中的应用如下所示:

// 以模式序列作为初始模式
Pattern<Event, ?> start = Pattern.begin(
Pattern.<Event>begin("start_start").where(...)
.followedBy("start_middle").where(...)
); // 在 start 后定义严格近邻的模式序列,并重复匹配两次
Pattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...)
.followedBy("next_middle").where(...)
).times(2); // 在 start 后定义宽松近邻的模式序列,并重复匹配一次或多次
Pattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedby_start").where(...)
.followedBy("followedby_middle").where(...)
).oneOrMore(); //在 start 后定义非确定性宽松近邻的模式序列,可以匹配一次,也可以不匹配
Pattern<Event, ?> nonDeterminRelaxed = start.followedByAny(
Pattern.<Event>begin("followedbyany_start").where(...)
.followedBy("followedbyany_middle").where(...)
).optional();

12.3.4 匹配后跳过策略

在 Flink CEP 中,由于有循环模式和非确定性宽松近邻的存在,同一个事件有可能会重复利用,被分配到不同的匹配结果中。这样会导致匹配结果规模增大,有时会显得非常冗余。当然,非确定性宽松近邻条件,本来就是为了放宽限制、扩充匹配结果而设计的;我们主要是针对循环模式来考虑匹配结果的精简。

之前已经讲过,如果对循环模式增加了.greedy()的限制,那么就会“尽可能多地”匹配事件,这样就可以砍掉那些子集上的匹配了。不过这种方式还是略显简单粗暴,如果我们想要精确控制事件的匹配应该跳过哪些情况,那就需要制定另外的策略了。

在 Flink CEP 中,提供了模式的“匹配后跳过策略”(After Match Skip Strategy),专门用来精准控制循环模式的匹配结果。这个策略可以在 Pattern 的初始模式定义中,作为 begin()的 第二个参数传入

Pattern.begin("start", AfterMatchSkipStrategy.noSkip())
.where(...) ...

匹配后跳过策略 AfterMatchSkipStrategy 是一个抽象类,它有多个具体的实现,可以通过调用对应的静态方法来返回对应的策略实例。这里我们配置的是不做跳过处理,这也是默认策略。

下面我们举例来说明不同的跳过策略。例如我们要检测的复杂事件模式为:开始是用户名 为 a 的事件(简写为事件 a,下同),可以重复一次或多次;然后跟着一个用户名为 b 的事件,a 事件和 b 事件之间可以有其他事件(宽松近邻)。用简写形式可以直接写作:“a+ followedBy b”。在代码中定义 Pattern 如下:

Pattern.<Event>begin("a").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.user.equals("a"); }
}).oneOrMore()
.followedBy("b").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.user.equals("b"); }
}); 

我们如果输入事件序列“a a a b”——这里为了区分前后不同的 a 事件,可以记作“a1 a2 a3 b”——那么应该检测到 6 个匹配结果:(a1 a2 a3 b),(a1 a2 b),(a1 b),(a2 a3 b),(a2 b), (a3 b)。如果在初始模式的量词.oneOrMore()后加上.greedy()定义为贪心匹配,那么结果就是: (a1 a2 a3 b),(a2 a3 b),(a3 b),每个事件作为开头只会出现一次。

接下来我们讨论不同跳过策略对匹配结果的影响:

⚫ 不跳过(NO_SKIP)

代码调用 AfterMatchSkipStrategy.noSkip()。这是默认策略,所有可能的匹配都会输出。所以这里会输出完整的 6 个匹配。

⚫ 跳至下一个(SKIP_TO_NEXT)

代码调用 AfterMatchSkipStrategy.skipToNext()。找到一个 a1 开始的最大匹配之后,跳过a1 开始的所有其他匹配,直接从下一个 a2 开始匹配起。当然 a2 也是如此跳过其他匹配。最 终得到(a1 a2 a3 b),(a2 a3 b),(a3 b)。可以看到,这种跳过策略跟使用.greedy()效果是相同的。

⚫ 跳过所有子匹配(SKIP_PAST_LAST_EVENT)

代码调用 AfterMatchSkipStrategy.skipPastLastEvent()。找到 a1 开始的匹配(a1a2a3b)之 后,直接跳过所有 a1 直到 a3 开头的匹配,相当于把这些子匹配都跳过了。最终得到(a1a2a3b),这是最为精简的跳过策略。

⚫ 跳至第一个(SKIP_TO_FIRST[a])

代码调用 AfterMatchSkipStrategy.skipToFirst(“a”),这里传入一个参数,指明跳至哪个模式 的第一个匹配事件。找到 a1 开始的匹配(a1a2a3b)后,跳到以最开始一个 a(也就是 a1) 为开始的匹配,相当于只留下 a1 开始的匹配。最终得到(a1a2a3b),(a1a2b),(a1b)。

⚫ 跳至最后一个(SKIP_TO_LAST[a])

代码调用 AfterMatchSkipStrategy.skipToLast(“a”),同样传入一个参数,指明跳至哪个模式的最后一个匹配事件。找到 a1 开始的匹配(a1 a2 a3 b)后,跳过所有 a1、a2 开始的匹配,跳到以最后一个 a(也就是 a3)为开始的匹配。最终得到(a1a2a3b),(a3b)。

Flink中的CEP(一)相关推荐

  1. Flink中的CEP(二)

    目录 12.4 模式的检测处理 12.4.1 将模式应用到流上 12.4.2 处理匹配事件 12.4.3 处理超时事件 12.4.4 处理迟到数据 12.5 CEP 的状态机实现 12.6 本章总结 ...

  2. 什么是cep算子_Flink中的CEP复杂事件处理 (源码分析)

    其实CEP复杂事件处理,简单来说你可以用通过类似正则表达式的方式去表示你的逻辑,表现能力非常的强,用过的人都知道 开篇先偷一张图,整体了解FlinkCEP中的  一种重要的图  NFA FlinkCE ...

  3. 【flink】flink 复杂事件处理 CEP

    @[ 1.概述 转载:Flink系列 13. 复杂事件处理 CEP 请到原文查看... 1. 什么是 CEP? CEP 是 Flink 中实现的复杂事件处理库,(Complex Event Proce ...

  4. Flink SQL篇,SQL实操、Flink Hive、CEP、CDC、GateWay

    Flink源码篇,作业提交流程.作业调度流程.作业内部转换流程图 Flink核心篇,四大基石.容错机制.广播.反压.序列化.内存管理.资源管理 Flink基础篇,基本概念.设计理念.架构模型.编程模型 ...

  5. Flink 使用之 CEP

    Flink 使用介绍相关文档目录 Flink 使用介绍相关文档目录 什么是CEP CEP的全称为Complex Event Processing,中文翻译为复杂事件处理.光看字面意思解释还是很难理解. ...

  6. Flink从入门到精通100篇(二十一)-万字长文详解 Flink 中的 CopyOnWriteStateTable

    前言 现如今想阅读 HashMap 源码实际上比较简单,因为网上一大堆博客去分析 HashMap 和 ConcurrentHashMap.本文详细分析 CopyOnWriteStateTable 源码 ...

  7. as点击发送广播_Apache Flink 中广播状态的实用指南

    翻译 | 王柯凝 校对 | 邱从贤(山智) 自版本 Flink 1.5.0 以来,Apache Flink 提供了一种新的状态类型,称为广播状态(Broadcast State).在本文中,将解释什么 ...

  8. 《从0到1学习Flink》—— 介绍Flink中的Stream Windows

    前言 目前有许多数据分析的场景从批处理到流处理的演变, 虽然可以将批处理作为流处理的特殊情况来处理,但是分析无穷集的流数据通常需要思维方式的转变并且具有其自己的术语(例如,"windowin ...

  9. flink中的WaterMark调研和具体实例

    一些基本概念介绍: Event Time 事件时间是每个事件在其生产设备上发生的时间 Ingestion Time 摄取时间是数据进入Flink的时间 Processing Time 处理时间是是指正 ...

最新文章

  1. SD--根据订单创建发票(相关的函数列表的介绍系列篇(3))
  2. Java 常用API的运用,效率及技巧
  3. 使用fcntl编写set_fl()函数和clr_fl()函数
  4. [转] PHP在不同页面之间传值的三种常见方式
  5. 读《三体Ⅱ · 黑暗森林》| 人能相互理解的前提是力量对等
  6. java笔记:自己动手写javaEE框架(七)--使用JSON和Ajax技术
  7. Caffe源码解析4: Data_layer
  8. 5 获取当前访问的控制名称_LabVIEW编程技巧:网络通信中如何获取计算机名称、IP地址等信息...
  9. html复选框全选按钮代码,全选复选框JavaScript编写小结(附代码)
  10. Mysql datadir change on ubuntu
  11. C++ this指针
  12. AsyncTask更新UI线程的基本原理
  13. 根据MAC地址修改固定IP(附带IPMAC扫描脚本)
  14. 【系统架构】小型电商网站的架构(一)
  15. 悬置线高通滤波器设计
  16. PyQt5之Drag拖曳功能
  17. matlab波形叠加,matlab程序两列波相向传播叠加波形图和动画.doc
  18. Ckeditor5 整合Ckfinder3 防出错实战教程(二)整合篇
  19. 解惑好文:移动端H5页面高清多屏适配方案
  20. html仿今日头条数据列表

热门文章

  1. 谷歌浏览器自带的翻译功能无法使用的解决办法
  2. 大数据和云计算技术周报(第81期)
  3. python画蛋糕祝福图片大全_蛋糕画画图片大全_简单的简笔画图片大全
  4. hdfs datanode 清除回收站的命令
  5. 利用DSF深度优先搜索来解容器倒水问题
  6. 在Word中设置页面B5的问题
  7. 面试时,被问到职业规划如何作答?
  8. 使用TDOA进行声源定位
  9. 阿里云oss文件服务器
  10. seq2seq发展介绍