Flink 之CEP介绍及应用
1、什么是CEP?
- CEP即复杂事件处理(Complex Event Processing,CEP)。
- Flink CEP是在 Flink 中实现的复杂事件处理(CEP)库。
- CEP 允许在无休止的事件流中检测事件模式,让我们有机会掌握数据中重要的部分。
- 一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据 —— 满足规则的复杂事件。
- CEP用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为。
- CEP支持在流上进行模式匹配,根据模式的条件不同,分为连续的条件或不连续的条件;模式的条件允许有时间的限制,当在条件范围内没有达到满足的条件时,会导致模式匹配超时。
- 功能点:
• 输入的流数据,尽快产生结果
• 在2个event流上,基于时间进行聚合类的计算
• 提供实时/准实时的警告和通知
• 在多样的数据源中产生关联并分析模式
• 高吞吐、低延迟的处理
2、Flink CEP的特点
- 目标:从有序的简单事件流中发现一些高阶特征。
- 输入:一个或多个由简单事件构成的事件流。
- 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件。
- 输出:满足规则的复杂事件。
3、Flink CEP的组件
• Event Stream
• pattern定义
• pattern检测
• 生成Alert
4、模式API
1)、概念
- 处理事件的规则,被叫做“模式”(Pattern)。
- Flink CEP 提供了 Pattern API,用于对输入流数据进行复杂事件规则定义,用来提取符合规则的事件序列。
2)、分类
个体模式(Individual Patterns)
a) 组成复杂规则的每一个单独的模式定义,就是“个体模式”。
b) 个体模式可以包括“单例(singleton)模式”和“循环(looping)模式”。单例模式只接收一个事件,而循环模式可以接收多个。
c) 量词(Quantifier):可以在一个个体模式后追加量词,也就是指定循环次数
d) 条件(Condition)
每个模式都需要指定触发条件,作为模式是否接受事件进入的判断依据。
CEP 中的个体模式主要通过调用 .where() .or() 和 .until() 来指定条件。
条件的分类:
简单条件(Simple Condition)
- 通过 .where() 方法对事件中的字段进行判断筛选,决定是否接受该事件
- 通过 .where() 方法对事件中的字段进行判断筛选,决定是否接受该事件
组合条件(Combining Condition)
- 将简单条件进行合并;.or() 方法表示或逻辑相连,where 的直接组合就是 AND
- 将简单条件进行合并;.or() 方法表示或逻辑相连,where 的直接组合就是 AND
终止条件(Stop Condition)
- 如果使用了 oneOrMore 或者 oneOrMore.optional,建议使用 .until() 作为终止条件,以便清理状态
迭代条件(Iterative Condition)
- 能够对模式之前所有接收的事件进行处理。
- 调用 .where( (value, ctx) => {…} ),可以调用 ctx.getEventsForPattern(“name”)
组合模式(Combining Patterns,也叫模式序列)
- 很多个体模式组合起来,就形成了整个的模式序列;模式序列必须以一个“初始模式”开始:
- 很多个体模式组合起来,就形成了整个的模式序列;模式序列必须以一个“初始模式”开始:
模式组(Groups of patterns)
- 将一个模式序列作为条件嵌套在个体模式里,成为一组模式
5、模式序列
1)、严格紧邻(Strict Contiguity)
- 所有事件按照严格的顺序出现,中间没有任何不匹配的事件,由 .next() 指定。
- 例如对于模式”a next b”,事件序列 [a, c, b1, b2] 没有匹配项。
2)、宽松近邻( Relaxed Contiguity )
- 允许中间出现不匹配的事件,由 .followedBy() 指定,首中即停止。
- 例如对于模式”a followedBy b”,事件序列 [a, c, b1, b2] 匹配为 {a, b1}。
3)、非确定性宽松近邻( Non-Deterministic Relaxed Contiguity )
- 进一步放宽条件,之前已经匹配过的事件也可以再次使用,由 .followedByAny() 指定。
- 例如对于模式”a followedByAny b”,事件序列 [a, c, b1, b2] 匹配为 {a, b1},{a, b2}。
4)、无近邻关系
- .notNext() —— 不想让某个事件严格紧邻前一个事件发生。
- .notFollowedBy() —— 不想让某个事件在两个事件之间发生。
注意:
- 所有模式序列必须以 .begin() 开始。
- 模式序列不能以 .notFollowedBy() 结束。
- “not” 类型的模式不能被 optional 所修饰。
- 此外,还可以为模式指定时间约束,用来要求在多长时间内匹配有效。
6、模式的检测
- 指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配。
- 调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream。
7、匹配事件的提取
- 创建 PatternStream 之后,就可以应用 select 或者 flatselect 方法,从检测到的事件序列中提取事件了。
- select() 方法需要输入一个 select function 作为参数,每个成功匹配的事件序列都会调用它。
- select() 以一个 Map[String,Iterable [IN]] 来接收匹配到的事件序列,其中 key 就是每个模式的名称,而 value 就是所有接收到的事件的 Iterable 类型。
8、超时事件的提取
- 当一个模式通过 within 关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃;
- 为了能够处理这些超时的部分匹配,select 和 flatSelect API 调用允许指定超时处理程序。
- 超时处理程序会接收到目前为止由模式匹配到的所有事件,由一个 OutputTag 定义接收到的超时事件序列。
- 通过within方法,我们的parttern规则限定在一定的窗口范围内。当有超过窗口时间后还到达的event,我们可以通过在select或flatSelect中,实现PatternTimeoutFunction/PatternFlatTimeoutFunction来处理这种情况。
模拟CEP超时事件的提取——Scala版本
package com.atguigu.flink.day7import java.utilimport org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction}
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collectorobject OrderTimeoutDetect {case class OrderEvent(orderId:String,eventType:String,eventTime:Long)def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val timeOutTag: OutputTag[String] = OutputTag[String]("timeoutTag")val orderStream: KeyedStream[OrderEvent, String] = env.fromElements(OrderEvent("order_1", "create", 2000L),OrderEvent("order_2", "create", 3000L),OrderEvent("order_1", "pay", 4000L)).assignAscendingTimestamps(_.eventTime).keyBy(r => r.orderId)val pattern: Pattern[OrderEvent, OrderEvent] = Pattern.begin[OrderEvent]("create").where(_.eventType.equals("create")).next("pay").where(_.eventType.equals("pay")).within(Time.seconds(5))val patternedStream: PatternStream[OrderEvent] = CEP.pattern(orderStream, pattern)val selectFunc = (map:scala.collection.Map[String,Iterable[OrderEvent]],out:Collector[String])=>{val create: OrderEvent = map("create").iterator.next()out.collect("order id "+create.orderId+"is payed!")}val timeoutFunc = (map:scala.collection.Map[String,Iterable[OrderEvent]],ts:Long,out:Collector[String])=>{val create: OrderEvent = map("create").iterator.next()out.collect("order id " + create.orderId + " is not payed! and timeout ts is " +ts)}//todo 方法一:val selectStream: DataStream[String] = patternedStream.flatSelect(timeOutTag)(timeoutFunc)(selectFunc)selectStream.print()selectStream.getSideOutput(timeOutTag).print()//todo 方法二:val selectStream1: DataStream[String] = patternedStream.flatSelect(timeOutTag, new MyPFTimeoutFunc, new MyPFSelectFunc)selectStream1.print()selectStream1.getSideOutput(timeOutTag).print()env.execute()}class MyPFTimeoutFunc extends PatternFlatTimeoutFunction[OrderEvent,String] {override def timeout(pattern: util.Map[String, util.List[OrderEvent]], timeoutTimestamp: Long, out: Collector[String]): Unit = {val create: OrderEvent = pattern.get("create").iterator.next()out.collect("order id " + create.orderId + " is not payed! and timeout ts is " +timeoutTimestamp)}}class MyPFSelectFunc extends PatternFlatSelectFunction[OrderEvent,String] {override def flatSelect(pattern: util.Map[String, util.List[OrderEvent]], out: Collector[String]): Unit = {val create: OrderEvent = pattern.get("create").iterator().next()out.collect("order id "+create.orderId+"is payed!")}}
}
模拟CEP超时事件的提取——JAVA版本
POJO类:
package com.atguigu.flink.day7;public class OrderEvent {public String orderId;public String eventType;public Long eventTime;public OrderEvent() {}public OrderEvent(String orderId, String eventType, Long eventTime) {this.orderId = orderId;this.eventType = eventType;this.eventTime = eventTime;}
}
核心类:
package com.atguigu.flink.day7;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.util.List;
import java.util.Map;public class OrderTimeoutDetect {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 定义侧输出标签OutputTag<String> outputTag = new OutputTag<String>("outputTag"){};KeyedStream<OrderEvent, String> keyedStream = env.fromElements(new OrderEvent("order_1", "create", 2000L),new OrderEvent("order_2", "create", 3000L),new OrderEvent("order_1", "pay", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {@Overridepublic long extractTimestamp(OrderEvent element, long recordTimestamp) {return element.eventTime;}})).keyBy(r -> r.orderId);Pattern<OrderEvent, OrderEvent> pattern = Pattern.<OrderEvent>begin("create").where(new SimpleCondition<OrderEvent>() {@Overridepublic boolean filter(OrderEvent value) throws Exception {return value.eventType.equals("create");}}).next("pay").where(new SimpleCondition<OrderEvent>() {@Overridepublic boolean filter(OrderEvent value) throws Exception {return value.eventType.equals("pay");}}).within(Time.seconds(5));PatternStream<OrderEvent> patternedStream = CEP.pattern(keyedStream, pattern);SingleOutputStreamOperator<String> singleOutputStreamOperator = patternedStream.flatSelect(outputTag, new timeoutFunc(), new selectFunc());singleOutputStreamOperator.print();singleOutputStreamOperator.getSideOutput(outputTag).print(">>>>");env.execute();}private static class timeoutFunc implements PatternFlatTimeoutFunction<OrderEvent, String> {@Overridepublic void timeout(Map<String, List<OrderEvent>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {OrderEvent create = pattern.get("create").iterator().next();out.collect("order id " + create.orderId + " is not payed! and timeout ts is " + timeoutTimestamp);}}private static class selectFunc implements PatternFlatSelectFunction<OrderEvent,String> {@Overridepublic void flatSelect(Map<String, List<OrderEvent>> pattern, Collector<String> out) throws Exception {OrderEvent create = pattern.get("create").iterator().next();out.collect("order id " + create.orderId + " is payed!");}}
}
Flink 之CEP介绍及应用相关推荐
- 网络安全公司奇安信集团是如何基于 Flink 构建 CEP 引擎实时检测网络攻击【未来不可忽视的网络安全】
摘要: 奇安信集团作为一家网络安全公司是如何基于 Flink 构建 CEP 引擎实时检测网络攻击?其中面临的挑战以及宝贵的实践经验有哪些?本文主要内容分为以下四个方面: 背景及现状 技术架构 产品及运 ...
- Flink框架的介绍和实现原理(一)
一.Flink是什么 Apache Flik 是一个面向分布式数据流处理和批量数据处理的开源计算平台,提供支持流处理和批处理两种类型应用的功能. 二.Flink特点 现在的开源方案,会把流处理和批处理 ...
- 实时计算 Flink 版总体介绍
简介:实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache F ...
- 【flink】RocksDB介绍以及Flink对RocksDB的支持
1.概述 转载:「Flink」RocksDB介绍以及Flink对RocksDB的支持 2.RocksDB简介 RocksDB是基于C++语言编写的嵌入式KV存储引擎,它不是一个分布式的DB,而是一个高 ...
- [Flink]Flink实时框架介绍
目录 架构 应用 流 状态 时间 分层API 运维 架构 Flink是一个分布式数据流处理引擎,用于处理带状态的有边界或无边界数据流.可以部署在通用的分布式集群上,实现海量数据在内存上快速计算. 无边 ...
- Flink DataStream API 介绍
Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...
- Flink专题四:Flink DataStream 窗口介绍及使用
由于工作需要最近学习flink 现记录下Flink介绍和实际使用过程 这是flink系列的第四篇文章 Flink DataStream 窗口介绍及使用 窗口介绍 时间窗口 翻滚窗口(数据以一个时间断为 ...
- Flink大数据实时计算系列-Flink的state介绍、Flink丰富的状态访问、Flink状态的分类
Flink大数据实时计算系列-Flink的state介绍.Flink丰富的状态访问.Flink状态的分类 目录 Flink的state介绍 Flink丰富的状态访问 Flink状态的分类 Flink参 ...
- Flink CEP 介绍及其使用场景
CEP 是什么? CEP 的英文全称是 Complex Event Processing,翻译成中文为复杂事件处理.它可以用于处理实时数据并在事件流到达时从事件流中提取信息,并根据定义的规则来判断事件 ...
最新文章
- MATLAB修改工作环境
- Excel批量转csv格式
- 【Lucene3.6.2入门系列】第10节_Tika
- 惠普服务器bios查看硬件属性,查看硬件信息
- 两条水位线的业务需求分析-Interval JOIN方案(转载+自己分析整理)
- 互联网日报 | 3月22日 星期一 | 苹果iMac Pro全球下架;知乎更新上市招股书;字节跳动成立朝夕光年奇想基金...
- powerpc和arm_为什么我喜欢ARM和PowerPC
- Matlab求解规划问题之 fgoalattain函数
- 如何成为更优秀的工程师?
- 川大教师发自白书:一所高校就是一座衙门
- pdf 转化为jpg python 批量转化
- DNW刷机210步骤和常见问题
- php后台视频教程,php实战开发电商后台视频教程 共6章
- 箱线图(Boxplot)也称箱须图(Box-whisker Plot)
- 7. R语言【独立性检验】:卡方独立性检验、Fisher精确检验 、Cochran-Mantel-Haenszel检验
- B2B2C简易流程图
- 怎么提高mysql多表查询效率_MySQL创建index提高多表查询效率
- 如何讲好一个故事?--白岩松西湖大学讲座
- Libgdx slg游戏进程记录
- 用SVG技术实现动态图形输出的嵌入式Web服务
热门文章
- oracle like 使用,Oracle数据库like和not like使用
- mysql的like字段名_MySQL LIKE 字句使用详情
- hadoop启动dfs报错: ERROR: Cannot set priority of namenode process 496
- 【vue】在vue项目中按顺序动态24个英文字母选项:A B C D E F......
- c++类成员函数指针
- SmartFoxServer如何使用文档和示例
- 利用DNS隧道免费上网
- 菜鸟之学习51单片机(五)蜂鸣器继电器的实现
- ubuntu 学习资料整理
- java和C语言是什么关系,哪一种好?