一.FlinkCEP介绍

FlinkCEP(Complex event processing for Flink) 是在Flink实现的复杂事件处理库. 它可以让你在无界流中检测出特定的数据,有机会掌握数据中重要的那部分。

是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。

  1. 目标:从有序的简单事件流中发现一些高阶特征
  2. 输入:一个或多个由简单事件构成的事件流
  3. 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
  4. 输出:满足规则的复杂事件

二.CEP开发步骤

1.导入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

2.基本使用

package com.atguigu.flink.java.chapter_9;

import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.List;
import java.util.Map;

public class Flink01_CEP_BasicUse {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

SingleOutputStreamOperator<WaterSensor> waterSensorStream = env
            .readTextFile("input/sensor.txt")
            .map(new MapFunction<String, WaterSensor>() {
                @Override
                public WaterSensor map(String value) throws Exception {
                    String[] split = value.split(",");
                    return new WaterSensor(split[0],
                                           Long.parseLong(split[1]) * 1000,
                                           Integer.parseInt(split[2]));
                }
            })
            .assignTimestampsAndWatermarks(WatermarkStrategy
                                               .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                               .withTimestampAssigner((element, recordTimestamp) -> element.getTs()));
        // 1. 模式
        Pattern<WaterSensor, WaterSensor> pattern = Pattern
            .<WaterSensor>begin("start")
            .where(new SimpleCondition<WaterSensor>() {
                @Override
                public boolean filter(WaterSensor value) throws Exception {
                    return "sensor_1".equals(value.getId());
                }
            });
        // 2. 在流上用模式
        PatternStream<WaterSensor> waterSensorPS = CEP.pattern(waterSensorStream, pattern);
        // 3. 取匹配到的
        waterSensorPS
            .select(new PatternSelectFunction<WaterSensor, String>() {
                @Override
                public String select(Map<String, List<WaterSensor>> pattern) throws Exception {
                    return pattern.toString();
                }
            })
            .print();

try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

sensor.txt数据:

sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60

三.模式API

1.单个模式

单例模式

单例模式只接受一个事件. 默认情况模式都是单例模式.

前面的例子就是一个单例模式

循环模式

循环模式可以接受多个事件.  单例模式配合上量词就是循环模式.(非常类似我们熟悉的正则表达式)

  • 固定次数

// 1. 模式
Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_1".equals(value.getId());
        }
    });

// 1.1 使用量 出现两次
Pattern<WaterSensor, WaterSensor> patternWithQuantifier = pattern.times(2);

  • 范围内的次数

// 1.1 使用量 [2,4]   2,3次或4
Pattern<WaterSensor, WaterSensor> patternWithQuantifier = pattern.times(2, 4);

  • 一次或多次

// 表示大于等于1

Pattern<WaterSensor, WaterSensor> patternWithQuantifier = pattern.oneOrMore();

  • 多次及多次以上

// 2次或2以上
Pattern<WaterSensor, WaterSensor> patternWithQuantifier = pattern.timesOrMore(2);

条件

对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,例如前面用到的where就是一种条件

  • 迭代条件

这是最普遍的条件类型。使用它可以指定一个基于前面已经被接受的事件的属性或者它们的一个子集的统计数据来决定是否接受时间序列的条件。

Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new IterativeCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value, Context<WaterSensor> ctx) throws Exception {
            return "sensor_1".equals(value.getId());
        }
    });

  • 简单条件

这种类型的条件扩展了前面提到的IterativeCondition类,它决定是否接受一个事件只取决于事件自身的属性。

Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            System.out.println(value);
            return "sensor_1".equals(value.getId());
        }
    });

  • 组合条件

把多个条件结合起来使用. 这适用于任何条件,你可以通过依次调用where()来组合条件。 最终的结果是每个单一条件的结果的逻辑AND。

如果想使用OR来组合条件,你可以像下面这样使用or()方法。

Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new IterativeCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value, Context<WaterSensor> ctx) throws Exception {
            return "sensor_1".equals(value.getId());
        }
    })
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return value.getVc() > 30;
        }
    })
    .or(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return value.getTs() > 3000;
        }
    });

  • 停止条件

如果使用循环模式(oneOrMore, timesOrMore),在读无界流的时候,可以指定一个停止条件, 否则有可能会内存吃不消.

意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了。

Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new IterativeCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value, Context<WaterSensor> ctx) throws Exception {
            return "sensor_1".equals(value.getId());
        }
    })
    .timesOrMore(2)
    .until(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return value.getVc() >= 40;
        }
    });

2. 组合模式(模式序列)

把多个单个模式组合在一起就是组合模式.  组合模式由一个初始化模式(.begin(...))开头

严格连续

期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件

Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_1".equals(value.getId());
        }
    })
    .next("end")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_2".equals(value.getId());
        }
    });

注意: 

notNext  如果不想后面直接连着一个特定事件

松散连续

忽略匹配的事件之间的不匹配的事件。

Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_1".equals(value.getId());
        }
    })
    .followedBy("end")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_2".equals(value.getId());
        }
    });

注意:

notFollowBy 如果不想一个特定事件发生在两个事件之间的任何地方。(notFollowBy不能位于事件的最后)

非确定的松散连续

更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配

当且仅当数据为a,c,b,b时,对于followedBy模式而言命中的为{a,b},对于followedByAny而言会有两次命中{a,b},{a,b}

Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_1".equals(value.getId());
        }
    })
    .followedByAny("end")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_2".equals(value.getId());
        }
    });

3 .模式知识补充

循环模式的连续性

前面的连续性也可以运用在单个循环模式中. 连续性会被运用在被接受进入模式的事件之间。

  • 严格连续

Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_1".equals(value.getId());
        }
    })
    .times(2)
    .consecutive();

  • 松散连续

默认是松散连续

Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_1".equals(value.getId());
        }
    })
    .times(2);

  • 非确定的松散连续

Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_1".equals(value.getId());
        }
    })
    .times(2)
    .allowCombinations();

循环模式的贪婪性

在组合模式情况下, 对次数的处理尽快能获取最多个的那个次数, 就是贪婪!当一个事件同时满足两个模式的时候起作用.

Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_1".equals(value.getId());
        }
    }).times(2, 3).greedy()
    .next("end")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return value.getVc() == 30;
        }
    });

数据:

sensor_1,1,10
sensor_1,2,20
sensor_1,3,30
sensor_2,4,30
sensor_1,4,40
sensor_2,5,50
sensor_2,6,60

结果:

{start=[WaterSensor(id=sensor_1, ts=1, vc=10), WaterSensor(id=sensor_1, ts=2, vc=20), WaterSensor(id=sensor_1, ts=3, vc=30)], end=[WaterSensor(id=sensor_2, ts=4, vc=30)]}

{start=[WaterSensor(id=sensor_1, ts=2, vc=20), WaterSensor(id=sensor_1, ts=3, vc=30)], end=[WaterSensor(id=sensor_2, ts=4, vc=30)]}

分析:

sensor_1,3,30  在匹配的的时候, 既能匹配第一个模式也可以匹配的第二个模式, 由于第一个模式使用量词则使用greedy的时候会优先匹配第一个模式, 因为要尽可能多的次数

注意:

  1. 一般贪婪比非贪婪结果要少!
  2. 模式组不能设置为greedy

模式可选性

可以使用pattern.optional()方法让所有的模式变成可选的,不管是否是循环模式

Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_1".equals(value.getId());
        }
    }).times(2).optional()  // 0次或2
    .next("end")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_2".equals(value.getId());
        }
    });

说明:

start模式可能会没有!

8.4.4 模式组

在前面的代码中次数只能用在某个模式上, 比如: .begin(...).where(...).next(...).where(...).times(2)  这里的次数只会用在next这个模式上, 而不会用在begin模式上.

如果需要用在多个模式上,可以使用模式组!

Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .begin(Pattern
               .<WaterSensor>begin("start")
               .where(new SimpleCondition<WaterSensor>() {
                   @Override
                   public boolean filter(WaterSensor value) throws Exception {
                       return "sensor_1".equals(value.getId());
                   }
               })
               .next("next")
               .where(new SimpleCondition<WaterSensor>() {
                   @Override
                   public boolean filter(WaterSensor value) throws Exception {
                       return "sensor_2".equals(value.getId());
                   }
               }))
    .times(2);

结果:

{begin=[WaterSensor(id=sensor_1, ts=2000, vc=20), WaterSensor(id=sensor_1, ts=4000, vc=40)], next=[WaterSensor(id=sensor_2, ts=3000, vc=30), WaterSensor(id=sensor_2, ts=5000, vc=50)]}

8.4.5 超时数据

当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。

Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_1".equals(value.getId());
        }
    })
    .next("end")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            return "sensor_2".equals(value.getId());
        }
    })
    .within(Time.seconds(2));

数据:

sensor_1,1,10
sensor_2,4,30
sensor_1,4,40
sensor_2,5,50

Flink CEP编程相关推荐

  1. 大数据计算引擎之Flink Flink CEP复杂事件编程

    原文地址:大数据计算引擎之Flink Flink CEP复杂事件编程 复杂事件编程(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的时事件系序列库,并利 ...

  2. Flink CEP结合案例详解

    1.介绍 FlinkCEP是在Flink之上实现的复杂事件处理(CEP)库.它允许您在无穷无尽的事件流中检测事件模式,使您有机会掌握数据中重要的内容.通常会用来做一些用户操作APP的日志风控策略等多种 ...

  3. Flink DataStream 编程入门

    流处理是 Flink 的核心,流处理的数据集用 DataStream 表示.数据流从可以从各种各样的数据源中创建(消息队列.Socket 和 文件等),经过 DataStream 的各种 transf ...

  4. Apache Flink CEP 实战

    本文根据Apache Flink 实战&进阶篇系列直播课程整理而成,由哈啰出行大数据实时平台资深开发刘博分享.通过一些简单的实际例子,从概念原理,到如何使用,再到功能的扩展,希望能够给打算使用 ...

  5. 【Flink】基于 Flink CEP 实时计算商品订单流失量

    1.概述 转载:https://blog.csdn.net/tzs_1041218129/article/details/108786597 假设有个需求需要实时计算商品的订单流失量,规则如下: 用户 ...

  6. 从滴滴的Flink CEP引擎说起

    从滴滴的Flink CEP引擎说起 本文转载自 https://www.cnblogs.com/cx2016/p/11647110.html. CEP业务场景 复杂事件处理(Complex Event ...

  7. Flink Cep 扩展 - 动态规则更新及Pattern间within()

    上一篇文章 <Flink Cep 源码分析>我们可以知道Flink cep中Pattern的创建,state的转换,以及匹配结果的数据.这一篇则对Flink cep的两个痛点进行扩展: 1 ...

  8. Flink CEP在哈啰出行的应用

    来源:ververica.cn 作者:刘博·哈啰出行 By 大数据技术与架构 场景描述:Flink CEP 是 Flink 的复杂处理库.它允许用户快速检测无尽数据流中的复杂模式.不过 Flink C ...

  9. Flink CEP 在抖音电商的业务实践

    摘要:本文整理自抖音电商实时数仓研发工程师张健,在 FFA 实时风控专场的分享.本篇内容主要分为四个部分: Flink CEP 简介 业务场景与挑战 解决方案实践 未来展望 Tips:点击「阅读原文」 ...

最新文章

  1. 使用时间超级长的充电宝是啥样的?
  2. 2014 UESTC Training for Data Structures B - 母仪天下
  3. QT的QElapsedTimer类的使用
  4. JavaScript高级程序设计-读书笔记(6)
  5. 山东大学 2020级数据库系统 实验五
  6. 使用python对url编码解码
  7. SharePoint 2013 Error - File names can't contain the following characters: ? # {} % ~ / \.
  8. matlab生成流程图,matlab做流程图
  9. 不同品牌路由器无线桥接的设置方法
  10. 往事如烟 - 老钟14
  11. Heat模板及简单应用
  12. VS2013,MFC,在视图类里添加鼠标左键响应函数OnLButtonDown
  13. 前端学习 Vue笔记 完整版
  14. 干5年外包,突然失业了。。。
  15. 2.天猫商品数据爬虫(已模拟登录)
  16. PB关于打印机纵向横向打印的设置
  17. DS18B20测量温度液晶1602显示
  18. 调试,一项磨炼人的活(一)---《调试九法》
  19. 天下武功唯快不破-实验吧
  20. 计算机excel函数基础知识,《计算机基础知识复习资料》Excel函数详解.doc

热门文章

  1. 菜鸟前路---c/c++小游戏
  2. 蚂蚁国际 一面、二面、HR面 已offer
  3. loader是什么?
  4. 介绍一些好的域名空间网站
  5. C# WinForm TreeView
  6. 哪些大学计算机考研最容易上岸,还不知道去哪上岸的考研人看过来~,不歧视本科的40所院校盘点!...
  7. 硬盘分区表的修复(Ubuntu安装盘的另类用法)
  8. event_log之am_pss
  9. PHP简单入门基础知识
  10. 照片宽高比怎么设置_自己动手搭建照片墙