1、什么是CEP?

  1. CEP即复杂事件处理(Complex Event Processing,CEP)。
  2. Flink CEP是在 Flink 中实现的复杂事件处理(CEP)库。
  3. CEP 允许在无休止的事件流中检测事件模式,让我们有机会掌握数据中重要的部分。
  4. 一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据 —— 满足规则的复杂事件。
  5. CEP用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为。
  6. CEP支持在流上进行模式匹配,根据模式的条件不同,分为连续的条件或不连续的条件;模式的条件允许有时间的限制,当在条件范围内没有达到满足的条件时,会导致模式匹配超时。
  7. 功能点:
    • 输入的流数据,尽快产生结果
    • 在2个event流上,基于时间进行聚合类的计算
    • 提供实时/准实时的警告和通知
    • 在多样的数据源中产生关联并分析模式
    • 高吞吐、低延迟的处理

2、Flink CEP的特点

  • 目标:从有序的简单事件流中发现一些高阶特征。
  • 输入:一个或多个由简单事件构成的事件流。
  • 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件。
  • 输出:满足规则的复杂事件。

3、Flink CEP的组件


• Event Stream
• pattern定义
• pattern检测
• 生成Alert

4、模式API

1)、概念

  • 处理事件的规则,被叫做“模式”(Pattern)。
  • Flink CEP 提供了 Pattern API,用于对输入流数据进行复杂事件规则定义,用来提取符合规则的事件序列。

2)、分类

  • 个体模式(Individual Patterns)
    a) 组成复杂规则的每一个单独的模式定义,就是“个体模式”。

    b) 个体模式可以包括“单例(singleton)模式”和“循环(looping)模式”。单例模式只接收一个事件,而循环模式可以接收多个。
    c) 量词(Quantifier):可以在一个个体模式后追加量词,也就是指定循环次数

    d) 条件(Condition)

    • 每个模式都需要指定触发条件,作为模式是否接受事件进入的判断依据。

    • CEP 中的个体模式主要通过调用 .where() .or() 和 .until() 来指定条件。

    • 条件的分类:

      • 简单条件(Simple Condition)

        • 通过 .where() 方法对事件中的字段进行判断筛选,决定是否接受该事件
      • 组合条件(Combining Condition)

        • 将简单条件进行合并;.or() 方法表示或逻辑相连,where 的直接组合就是 AND
      • 终止条件(Stop Condition)

        • 如果使用了 oneOrMore 或者 oneOrMore.optional,建议使用 .until() 作为终止条件,以便清理状态
      • 迭代条件(Iterative Condition)

        • 能够对模式之前所有接收的事件进行处理。
        • 调用 .where( (value, ctx) => {…} ),可以调用 ctx.getEventsForPattern(“name”)
  1. 组合模式(Combining Patterns,也叫模式序列)

    • 很多个体模式组合起来,就形成了整个的模式序列;模式序列必须以一个“初始模式”开始:
  2. 模式组(Groups of patterns)

    • 将一个模式序列作为条件嵌套在个体模式里,成为一组模式

5、模式序列

1)、严格紧邻(Strict Contiguity)

  • 所有事件按照严格的顺序出现,中间没有任何不匹配的事件,由 .next() 指定。
  • 例如对于模式”a next b”,事件序列 [a, c, b1, b2] 没有匹配项。

2)、宽松近邻( Relaxed Contiguity )

  • 允许中间出现不匹配的事件,由 .followedBy() 指定,首中即停止。
  • 例如对于模式”a followedBy b”,事件序列 [a, c, b1, b2] 匹配为 {a, b1}。

3)、非确定性宽松近邻( Non-Deterministic Relaxed Contiguity )

  • 进一步放宽条件,之前已经匹配过的事件也可以再次使用,由 .followedByAny() 指定。
  • 例如对于模式”a followedByAny b”,事件序列 [a, c, b1, b2] 匹配为 {a, b1},{a, b2}。

4)、无近邻关系

  • .notNext() —— 不想让某个事件严格紧邻前一个事件发生。
  • .notFollowedBy() —— 不想让某个事件在两个事件之间发生。

注意:

  • 所有模式序列必须以 .begin() 开始。
  • 模式序列不能以 .notFollowedBy() 结束。
  • “not” 类型的模式不能被 optional 所修饰。
  • 此外,还可以为模式指定时间约束,用来要求在多长时间内匹配有效。

6、模式的检测

  • 指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配。
  • 调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream。

7、匹配事件的提取

  • 创建 PatternStream 之后,就可以应用 select 或者 flatselect 方法,从检测到的事件序列中提取事件了。
  • select() 方法需要输入一个 select function 作为参数,每个成功匹配的事件序列都会调用它。
  • select() 以一个 Map[String,Iterable [IN]] 来接收匹配到的事件序列,其中 key 就是每个模式的名称,而 value 就是所有接收到的事件的 Iterable 类型。

8、超时事件的提取

  • 当一个模式通过 within 关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃;
  • 为了能够处理这些超时的部分匹配,select 和 flatSelect API 调用允许指定超时处理程序。
  • 超时处理程序会接收到目前为止由模式匹配到的所有事件,由一个 OutputTag 定义接收到的超时事件序列。
  • 通过within方法,我们的parttern规则限定在一定的窗口范围内。当有超过窗口时间后还到达的event,我们可以通过在select或flatSelect中,实现PatternTimeoutFunction/PatternFlatTimeoutFunction来处理这种情况。

模拟CEP超时事件的提取——Scala版本

package com.atguigu.flink.day7import java.utilimport org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction}
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collectorobject OrderTimeoutDetect {case class OrderEvent(orderId:String,eventType:String,eventTime:Long)def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val timeOutTag: OutputTag[String] = OutputTag[String]("timeoutTag")val orderStream: KeyedStream[OrderEvent, String] = env.fromElements(OrderEvent("order_1", "create", 2000L),OrderEvent("order_2", "create", 3000L),OrderEvent("order_1", "pay", 4000L)).assignAscendingTimestamps(_.eventTime).keyBy(r => r.orderId)val pattern: Pattern[OrderEvent, OrderEvent] = Pattern.begin[OrderEvent]("create").where(_.eventType.equals("create")).next("pay").where(_.eventType.equals("pay")).within(Time.seconds(5))val patternedStream: PatternStream[OrderEvent] = CEP.pattern(orderStream, pattern)val selectFunc = (map:scala.collection.Map[String,Iterable[OrderEvent]],out:Collector[String])=>{val create: OrderEvent = map("create").iterator.next()out.collect("order id "+create.orderId+"is payed!")}val timeoutFunc = (map:scala.collection.Map[String,Iterable[OrderEvent]],ts:Long,out:Collector[String])=>{val create: OrderEvent = map("create").iterator.next()out.collect("order id " + create.orderId + " is not payed! and timeout ts is " +ts)}//todo 方法一:val selectStream: DataStream[String] = patternedStream.flatSelect(timeOutTag)(timeoutFunc)(selectFunc)selectStream.print()selectStream.getSideOutput(timeOutTag).print()//todo 方法二:val selectStream1: DataStream[String] = patternedStream.flatSelect(timeOutTag, new MyPFTimeoutFunc, new MyPFSelectFunc)selectStream1.print()selectStream1.getSideOutput(timeOutTag).print()env.execute()}class MyPFTimeoutFunc extends PatternFlatTimeoutFunction[OrderEvent,String] {override def timeout(pattern: util.Map[String, util.List[OrderEvent]], timeoutTimestamp: Long, out: Collector[String]): Unit = {val create: OrderEvent = pattern.get("create").iterator.next()out.collect("order id " + create.orderId + " is not payed! and timeout ts is " +timeoutTimestamp)}}class MyPFSelectFunc extends PatternFlatSelectFunction[OrderEvent,String] {override def flatSelect(pattern: util.Map[String, util.List[OrderEvent]], out: Collector[String]): Unit = {val create: OrderEvent = pattern.get("create").iterator().next()out.collect("order id "+create.orderId+"is payed!")}}
}

模拟CEP超时事件的提取——JAVA版本

POJO类:

package com.atguigu.flink.day7;public class OrderEvent {public String orderId;public String eventType;public Long eventTime;public OrderEvent() {}public OrderEvent(String orderId, String eventType, Long eventTime) {this.orderId = orderId;this.eventType = eventType;this.eventTime = eventTime;}
}

核心类:

package com.atguigu.flink.day7;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.util.List;
import java.util.Map;public class OrderTimeoutDetect {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 定义侧输出标签OutputTag<String> outputTag = new OutputTag<String>("outputTag"){};KeyedStream<OrderEvent, String> keyedStream = env.fromElements(new OrderEvent("order_1", "create", 2000L),new OrderEvent("order_2", "create", 3000L),new OrderEvent("order_1", "pay", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {@Overridepublic long extractTimestamp(OrderEvent element, long recordTimestamp) {return element.eventTime;}})).keyBy(r -> r.orderId);Pattern<OrderEvent, OrderEvent> pattern = Pattern.<OrderEvent>begin("create").where(new SimpleCondition<OrderEvent>() {@Overridepublic boolean filter(OrderEvent value) throws Exception {return value.eventType.equals("create");}}).next("pay").where(new SimpleCondition<OrderEvent>() {@Overridepublic boolean filter(OrderEvent value) throws Exception {return value.eventType.equals("pay");}}).within(Time.seconds(5));PatternStream<OrderEvent> patternedStream = CEP.pattern(keyedStream, pattern);SingleOutputStreamOperator<String> singleOutputStreamOperator = patternedStream.flatSelect(outputTag, new timeoutFunc(), new selectFunc());singleOutputStreamOperator.print();singleOutputStreamOperator.getSideOutput(outputTag).print(">>>>");env.execute();}private static class timeoutFunc implements PatternFlatTimeoutFunction<OrderEvent, String> {@Overridepublic void timeout(Map<String, List<OrderEvent>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {OrderEvent create = pattern.get("create").iterator().next();out.collect("order id " + create.orderId + " is not payed! and timeout ts is " + timeoutTimestamp);}}private static class selectFunc implements PatternFlatSelectFunction<OrderEvent,String> {@Overridepublic void flatSelect(Map<String, List<OrderEvent>> pattern, Collector<String> out) throws Exception {OrderEvent create = pattern.get("create").iterator().next();out.collect("order id " + create.orderId + " is payed!");}}
}

Flink 之CEP介绍及应用相关推荐

  1. 网络安全公司奇安信集团是如何基于 Flink 构建 CEP 引擎实时检测网络攻击【未来不可忽视的网络安全】

    摘要: 奇安信集团作为一家网络安全公司是如何基于 Flink 构建 CEP 引擎实时检测网络攻击?其中面临的挑战以及宝贵的实践经验有哪些?本文主要内容分为以下四个方面: 背景及现状 技术架构 产品及运 ...

  2. Flink框架的介绍和实现原理(一)

    一.Flink是什么 Apache Flik 是一个面向分布式数据流处理和批量数据处理的开源计算平台,提供支持流处理和批处理两种类型应用的功能. 二.Flink特点 现在的开源方案,会把流处理和批处理 ...

  3. 实时计算 Flink 版总体介绍

    简介:实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache F ...

  4. 【flink】RocksDB介绍以及Flink对RocksDB的支持

    1.概述 转载:「Flink」RocksDB介绍以及Flink对RocksDB的支持 2.RocksDB简介 RocksDB是基于C++语言编写的嵌入式KV存储引擎,它不是一个分布式的DB,而是一个高 ...

  5. [Flink]Flink实时框架介绍

    目录 架构 应用 流 状态 时间 分层API 运维 架构 Flink是一个分布式数据流处理引擎,用于处理带状态的有边界或无边界数据流.可以部署在通用的分布式集群上,实现海量数据在内存上快速计算. 无边 ...

  6. Flink DataStream API 介绍

    Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...

  7. Flink专题四:Flink DataStream 窗口介绍及使用

    由于工作需要最近学习flink 现记录下Flink介绍和实际使用过程 这是flink系列的第四篇文章 Flink DataStream 窗口介绍及使用 窗口介绍 时间窗口 翻滚窗口(数据以一个时间断为 ...

  8. Flink大数据实时计算系列-Flink的state介绍、Flink丰富的状态访问、Flink状态的分类

    Flink大数据实时计算系列-Flink的state介绍.Flink丰富的状态访问.Flink状态的分类 目录 Flink的state介绍 Flink丰富的状态访问 Flink状态的分类 Flink参 ...

  9. Flink CEP 介绍及其使用场景

    CEP 是什么? CEP 的英文全称是 Complex Event Processing,翻译成中文为复杂事件处理.它可以用于处理实时数据并在事件流到达时从事件流中提取信息,并根据定义的规则来判断事件 ...

最新文章

  1. MATLAB修改工作环境
  2. Excel批量转csv格式
  3. 【Lucene3.6.2入门系列】第10节_Tika
  4. 惠普服务器bios查看硬件属性,查看硬件信息
  5. 两条水位线的业务需求分析-Interval JOIN方案(转载+自己分析整理)
  6. 互联网日报 | 3月22日 星期一 | 苹果iMac Pro全球下架;知乎更新上市招股书;字节跳动成立朝夕光年奇想基金...
  7. powerpc和arm_为什么我喜欢ARM和PowerPC
  8. Matlab求解规划问题之 fgoalattain函数
  9. 如何成为更优秀的工程师?
  10. 川大教师发自白书:一所高校就是一座衙门
  11. pdf 转化为jpg python 批量转化
  12. DNW刷机210步骤和常见问题
  13. php后台视频教程,php实战开发电商后台视频教程 共6章
  14. 箱线图(Boxplot)也称箱须图(Box-whisker Plot)
  15. 7. R语言【独立性检验】:卡方独立性检验、Fisher精确检验 、Cochran-Mantel-Haenszel检验
  16. B2B2C简易流程图
  17. 怎么提高mysql多表查询效率_MySQL创建index提高多表查询效率
  18. 如何讲好一个故事?--白岩松西湖大学讲座
  19. Libgdx slg游戏进程记录
  20. 用SVG技术实现动态图形输出的嵌入式Web服务

热门文章

  1. oracle like 使用,Oracle数据库like和not like使用
  2. mysql的like字段名_MySQL LIKE 字句使用详情
  3. hadoop启动dfs报错: ERROR: Cannot set priority of namenode process 496
  4. 【vue】在vue项目中按顺序动态24个英文字母选项:A B C D E F......
  5. c++类成员函数指针
  6. SmartFoxServer如何使用文档和示例
  7. 利用DNS隧道免费上网
  8. 菜鸟之学习51单片机(五)蜂鸣器继电器的实现
  9. ubuntu 学习资料整理
  10. java和C语言是什么关系,哪一种好?