@【

1.概述

转载:Flink系列 13. 复杂事件处理 CEP
请到原文查看。。。

1. 什么是 CEP?

  • CEP 是 Flink 中实现的复杂事件处理库,(Complex Event Processing,CEP)是一种基于流处理的技术,CEP是Flink一个基于复杂事件监测处理的库
  • CEP通过一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。
  • CEP复杂事件处理DataStrem API 提供了 FlinkCEP 组件栈,专门用于对复杂事件的处理,帮助用户从流式数据中发掘有价值的信息,主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域

2. CEP 的特点

目标:从有序的简单事件流中发现一些高阶特征

输入:一个或多个简单事件构成的事件流

处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件

输出:满足规则的复杂事件

3. Pattern API

  • flinkCEP 提供了 Pattern API,定义用于对输入流数据进行复杂事件的规则,用来提取符合规则的事件结果

  • 处理事件的规则,被叫做“模式(Pattern)”

  • 包含四个步骤

    • 输入事件流的创建

    • Pattern 的定义

    • Pattern 应用在事件流上检测

    • 选取结果

// 定义一个 Pattern
val pattern = Pattern.begin[event]("start").where(_.getId == 42) .next("middle").subtype(classOf(SubEvent)).where(_.getTemp >= 10.0) .followedBy("end").where(_.getName == "end")// 将创建好的 Pattern 应用到输入事件流上
val patternStream = CEP.pattern(inputStream.pattern)// 获取事件序列,得到处理结果
val result:DataStream(alert) = patternStream.select(createAlert(_))

Pattern API 模式有几下几种分类

  • 个体模式(Individual Patterns)

  • 模式序列\组合模式(Combining Patterns )

  • 模式组(Groups of patterns)

    • 将一个模式序列作为条件,嵌到在个体模式里,成为一组模式

3.1 个体模式(Individual Patterns)

个体模式就是组成复杂规则的每个单独的模式定义

个体模式可以包括“单利模式 singleton” 和“循环模式 loogping”

单例模式只接收一个事件,而循环模式可以接收多个事件

start.times(3).where(_.behavior.startWith(“fav”))

3.1.1 模式中的量词(Quantifier)

可以在一个个体模式后追加量词,也就是指定循环次数

// 匹配出现4次
start.times(4)
// 匹配出现 2-4 次
start.times(2,4)
// 匹配出现0次或4次
start.times(4).optional
// 匹配出现 2-4 次,并且尽可能多的重复匹配
start.times(2,4).greedy
// 匹配出现 1 次或多次
start.oneOrMore
// 匹配出现 0 次、两次或多次
start.timesOrMore(2).optional.greedy

3.1.2 个体模式的条件

条件(Condition)

  • 每个模式都需要指定触发的条件,作为模式是否接受事件进入的判断依据
  • CEP 中的个体模式主要通过调用.where() .or() .until()来指定条件

3.1.3 模式条件的分类

  • 简单条件

  • 通过.where()方法对事件中的字段进行判断筛选,决定是否接受该事件

  • start.where(event=>event.getName.startWith(“foo”))

  • 组合条件

    • 将简单条件进行组合:.or() 方法表示逻辑相连,.where() 的直接组合就是 AND
    • pattern.where(event=>…).or(event=>…)
  • 终止条件

    • 如果使用了 oneOrMore 或者 oneOrMore.optional 建议使用.until() 作为终止条件,以便清理状态
  • 迭代条件(Iterative Condition)

    • 能够对模式之前所有接收的事件进行处理
    • 调用.where((value,ctx)=>{…}) 可以调用 ctx.getEventsForPattern(“name”)

3.2 模式序列/组合模式

  • 很多个体模式组合起来,就形成了整个的模式序列
  • 模式序列必须是以一个“初始模式”开始:val start = Pattern.begin(“start”)

3.2.1 严格近邻(Strict Contiguity)

  • 所有事件按照严格的顺序出现,中间没有任何不匹配的事件,由.next()指定
  • 例如对于模式 “a next b “ 事件序列 【a,c,b,d】则没有匹配

3.2.2 宽松近邻(Relaxed Contiguity)

  • 允许中间出现不匹配的时间,由.followBy()指定
  • 例如对于模式 “a followBy b “ 事件序列 【a,c,b,d】匹配为{a,b}

3.2.3 非确定性宽松近邻(Non-Deterministic Relaxed Contiguity)

  • 进一步放宽条件,之前已经匹配过得事件也可以再次使用,由.followedByAny()指定
  • 例如对于模式 “a followBy b “ 事件序列 【a,c,b1,b2】匹配为{a,b1},{a,b2}

除了上述模式之外,还可以定义“不希望出现某种近邻关系”

  • .notNext : 不想让某个事件严格紧邻前一个事件发生
  • .notFollowedBy :不想让某个事件在两个事件之间发生

3.2.4 注意项

  • 所有模式序列必须以.begin()开始
  • 模式序列不能以.notFollowedBy()结束
  • “not”类型的模式不能被 optional所修饰
  • 此外,还可以为模式指定时间约束,用来要求在多长时间内匹配有效 next.within(Time.seconds(10))

3.3 模式的检测

指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配
调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream

val input:DataStream[Event] = ...
val pattern:Pattern[Event,_] = ...
val patternStream:PatternStream[Event] = CEP.pattern(input,pattern)

3.3.1 匹配事件的提取

创建 PatternStream 之后,就可以应用 select 或者 flatselect方法,从检测到的事件序列中提取事件了

select()方法需要输入一个 select function作为参数,每个成功匹配的事件序列都会调用它

select() 以一个 Map[String,Iterable[N]] 来接收匹配到的事件序列,其中 key 就是每个模式的名称,而 value 就是所接收到的事件的 Iterable 类型

def selectFn(pattern:Map[String,Iterable[In]]):OUT= {val startEvent = pattern.get("start").get.nextval endEvent = pattern.get("end").get.nextOUT(startEvent,endEvent)
}

3.3.2 超时事件提取

当一个模式通过 within关键字定义了检测窗口时间时,部分时间序列可能因为超过窗口长度而被丢弃,为了能够处理这部分超时的数据,select 和 flatSelect API 调用允许指定超时处理程序

超时处理程序会接收到目前为止模式匹配到的所有事件,由一个 OutputTag 定义接收到的额超时事件序列

val patternStream:PatternStream[Event] = CEP.pattern(input,pattern)
val outputTag = OutputTag[String]("side-output")
val result = patternStream.select(outputTag){(pattern:Map[String,Iterable[Event]],timestamp:Long)=>TimeoutEvent()
}{pattern:Map[String,Iterable[Event]] => complexEvent()
}
val timeoutResult:DataStream<TimeoutEvent> = result.getSideoutput(outputTag)

【flink】flink 复杂事件处理 CEP相关推荐

  1. Apache Flink 实战教程:CEP 实战(转载)

    文章目录 原文链接: 一:Flink CEP 概念以及使用场景 1.什么是 CEP 2.Flink CEP 应用场景 3.Flink CEP 原理 二:Flink CEP 程序开发 1.Flink C ...

  2. 大数据计算引擎之Flink Flink CEP复杂事件编程

    原文地址:大数据计算引擎之Flink Flink CEP复杂事件编程 复杂事件编程(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的时事件系序列库,并利 ...

  3. [Flink]Flink实时框架介绍

    目录 架构 应用 流 状态 时间 分层API 运维 架构 Flink是一个分布式数据流处理引擎,用于处理带状态的有边界或无边界数据流.可以部署在通用的分布式集群上,实现海量数据在内存上快速计算. 无边 ...

  4. 凌波微步Flink——Flink的技术逻辑与编程步骤剖析

    转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95459606 本文出自[我是干勾鱼的博客] Ingredients: Java: ...

  5. 凌波微步Flink——Flink API中的一些基础概念

    转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95355619 本文出自[我是干勾鱼的博客] Ingredients: Java: ...

  6. 大数据框架对比:Hadoop、Storm、Samza、Spark和Flink——flink支持SQL,待看

    简介 大数据是收集.整理.处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称.虽然处理数据所需的计算能力或存储容量早已超过一台计算机的上限,但这种计算类型的普遍性.规模,以及价值在最近几年才 ...

  7. flink 不设置水印_从0到1学习Flink—— Flink parallelism 和 Slot 介绍

    前言 之所以写这个是因为前段时间自己的项目出现过这样的一个问题: 1Caused by: akka.pattern.AskTimeoutException: 2Ask timed out on [Ac ...

  8. 【Flink】Flink Flink 1.14 新特性预览

    1.概述 转载:Flink 1.14 新特性预览 简介: 一文了解 Flink 1.14 版本新特性及最新进展 本文由社区志愿者陈政羽整理,内容源自阿里巴巴技术专家宋辛童 (五藏) 在 8 月 7 日 ...

  9. [Flink] Flink运行报错The number of requested virtual cores for application master

    文章目录 1.概述 2.环境如下 1.概述 运行一个flink任务,启动的时候报错 flink提交到yarn 环境报错 IllegalConfigurationException: The numbe ...

最新文章

  1. 某电视台晚会多机位特殊视频修复案例
  2. Linux平台Oracle多个实例启动说明
  3. Honey Tree(超好用的样板代码管理工具)
  4. Java基础 选择语句,循环结构数组
  5. 一个真实的案例———HPUX调整LUN大小识别更改
  6. Pentium的指令系统(3)——算术运算指令
  7. ~~Bellman-Ford算法
  8. Windows多线程应用程序的编译和链接
  9. 【CCCC】L3-018 森森美图 (30分),计算几何+判断三点共线+bfs最短路
  10. DevOps和持续交付
  11. 7.Zeng_Cache(1) --- 简介
  12. 2021年3月计算机语言排名,2021年3月编程语言排行榜:TOIBE将迎来重大改变,SQL如愿挤进前十...
  13. c语言pow函数原型_C语言pow函数问题
  14. Linux系统安装,教你安装一个属于自己的Linux系统
  15. 整数与浮点数比较-汇编码分析
  16. 【ARM自学笔记】ARM7时钟简述及配置
  17. luogu2161 SHOI2009 会场预约
  18. 移动应用开发——“音乐”小程序项目
  19. 草根方式学习java中的多线程
  20. 国际教育邮箱哪个好?企业邮箱oa系统的那个好?

热门文章

  1. 贾跃亭发文祝父亲节快乐:FF就像我的孩子
  2. 以拼多多为例,中国互联网企业在农业上都做了哪些努力?
  3. 每卖出一部新款iPhone SE,苹果就要赚1500元?
  4. 藏的太深!原来支付宝还有另一个“集福”活动
  5. 连亏172亿,割肉卖楼,年收3700亿、闻名全球的巨头,败退中国!
  6. 百度再回应“泼水门”:强烈谴责 肇事者已被公安机关带走
  7. 支付宝蚂蚁森林入选2019年世界环境日实践案例
  8. 华为:对学生定制机毫不知情 将会对虚假宣传追责
  9. 三星Galaxy Fold全球翻车后 推迟发售时间进一步改进
  10. 京东金融回应“白条漏洞”:在2017年已修复