1、介绍

FlinkCEP是在Flink之上实现的复杂事件处理(CEP)库。它允许您在无穷无尽的事件流中检测事件模式,使您有机会掌握数据中重要的内容。通常会用来做一些用户操作APP的日志风控策略等多种复杂事件,下面详细以用户连续10s内登陆失败超过3次告警为需求,进行全面讲解。

1.1、整体需求数据详解图

2、官方案例

官方代码案例如下:

DataStream<Event> input = ...Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getId() == 42;}}).next("middle").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {@Overridepublic boolean filter(SubEvent subEvent) {return subEvent.getVolume() >= 10.0;}}).followedBy("end").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getName().equals("end");}});PatternStream<Event> patternStream = CEP.pattern(input, pattern);DataStream<Alert> result = patternStream.process(new PatternProcessFunction<Event, Alert>() {@Overridepublic void processMatch(Map<String, List<Event>> pattern,Context ctx,Collector<Alert> out) throws Exception {out.collect(createAlertFrom(pattern));}});

2.1、官方案例总结

CEP编程步骤
a)定义模式序列

Pattern.<Class>begin("patternName").API...

基本都是按照如上的套路来新建自定义一个模式规则

后续的可以跟的API可以在官方中查看学习

Event Processing (CEP) | Apache Flink
b)将模式序列作用到流上

CEP.pattern(inputDataStream,pattern)

CEP.pattern()是固定格式写法,

其中第一个参数,表示需要具体作用的流;

第二个参数,表示具体的自定义的模式。
c)提取匹配上的数据和输出

由b)生成的流用process API来进行数据处理输出,继承PatternProcessFunction,重写processMatch(Map<String, List<Event>> pattern,Context ctx,Collector<Alert> out)方法,

第一个参数,表示具体匹配上的数据,其中Map的key就是a)步骤中定义的"patternName"名称,value就是该名称具体对应规则匹配上的数据集;

第二个参数,表示没匹配上的数据侧输出流

第三个参数,表示具体该函数处理完,需要对外输出的内容收集。

3、需求案例详解

下面就以从Socket中模拟读取用户操作日志数据,来进行数据CEP匹配数据输出。

以如下代码把读进来的数据进行数据打平成JavaBean。该章节的讲解以代码段进行,后续章节会把demo代码全部贴出来。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*** 设置成1,是为了能够触发watermark来计算*/
env.setParallelism(1);DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<UserLoginLog> dataStream = socketTextStream.flatMap(new MyFlatMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<UserLoginLog>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((SerializableTimestampAssigner<UserLoginLog>) (element, recordTimestamp) -> element.getLoginTime()));

3.1、使用begin.where.next.where.next

/*** 10s钟之内连续3次登陆失败的才输出,强制连续*/
Pattern<UserLoginLog, UserLoginLog> wherePatternOne = Pattern.<UserLoginLog>begin("start").where(new SimpleCondition<UserLoginLog>() {@Overridepublic boolean filter(UserLoginLog value) throws Exception {return 1 == value.getLoginStatus();}}).next("second").where(new IterativeCondition<UserLoginLog>() {@Overridepublic boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {return 1 == value.getLoginStatus();}}).next("third").where(new SimpleCondition<UserLoginLog>() {@Overridepublic boolean filter(UserLoginLog value) throws Exception {return 1 == value.getLoginStatus();}}).within(Time.seconds(10));

如上根据设置判断登陆状态是否为失败开始计数,连续第二条,第三条如果也同样为失败的话,就会输出

//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11121

{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}

3.1.1需求输出图解

3.2、使用begin.times

/*** 10s钟之内连续3次登陆失败的才输出,不强制连续*/
Pattern<UserLoginLog, UserLoginLog> wherePatternTwo = Pattern.<UserLoginLog>begin("start").where(new IterativeCondition<UserLoginLog>() {@Overridepublic boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {return 1 == value.getLoginStatus();}}).times(3).within(Time.seconds(10));

如上根据设置判断登陆状态是否为失败开始计数,只要在10秒之内出现第二条,第三条如果也同样为失败的话,就会输出,该本质就是不需要连续出现。

//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11118、11119、11121

{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}

3.2.1、需求图解

3.3、使用begin.times.consecutive

/*** 10s钟之内连续3次登陆失败的才输出,加上 consecutive 之后 就是 强制连续输出*/
Pattern<UserLoginLog, UserLoginLog> wherePatternThree = Pattern.<UserLoginLog>begin("start").where(new IterativeCondition<UserLoginLog>() {@Overridepublic boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {return 1 == value.getLoginStatus();}}).times(3).consecutive().within(Time.seconds(10));

如上在比3.2的基础上多加了一个consecutive之后,就变成跟3.1一样的效果

//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11121

{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}

4、本Demo所有代码

4.1、pom文件

<properties><flink.version>1.14.3</flink.version><hadoop.version>2.7.5</hadoop.version><scala.binary.version>2.11</scala.binary.version><kafka.version>2.4.0</kafka.version><redis.version>3.3.0</redis.version><lombok.version>1.18.6</lombok.version><fastjson.verson>1.2.72</fastjson.verson><jdk.version>1.8</jdk.version></properties><dependencyManagement><dependencies><!--hadoop 依赖--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion></exclusions></dependency><!--flink 依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><exclusions><exclusion><groupId>log4j</groupId><artifactId>*</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><exclusions><exclusion><groupId>log4j</groupId><artifactId>*</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>com.google.code.findbugs</groupId><artifactId>jsr305</artifactId></exclusion><exclusion><groupId>org.apache.flink</groupId><artifactId>force-shading</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version><exclusions><exclusion><groupId>log4j</groupId><artifactId>*</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--kafka依赖--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version><exclusions><exclusion><groupId>log4j</groupId><artifactId>*</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><!--redis依赖--><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>${redis.version}</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.verson}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.11</artifactId><version>${flink.version}</version></dependency></dependencies></dependencyManagement>

4.2、UserLoginLog类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@AllArgsConstructor
@NoArgsConstructor
class UserLoginLog {/*** 登陆id*/private int loginId;/*** 登陆时间*/private long loginTime;/*** 登陆状态 1--登陆失败 0--登陆成功*/private int loginStatus;/*** 登陆用户名*/private String userName;
}

4.3、MyFlatMapFunction类

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;@Slf4j
public class MyFlatMapFunction implements FlatMapFunction<String, UserLoginLog> {/*** The core method of the FlatMapFunction. Takes an element from the input data set and* transforms it into zero, one, or more elements.** @param value The input value.* @param out   The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the*                   operation to fail and may trigger recovery.*/@Overridepublic void flatMap(String value, Collector<UserLoginLog> out) throws Exception {if (StringUtils.isNotBlank(value)) {UserLoginLog userLoginLog = JSONObject.parseObject(value, UserLoginLog.class);out.collect(userLoginLog);}}
}

4.4、MyPatternProcessFunction类

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.util.Collector;import java.util.List;
import java.util.Map;@Slf4j
public class MyPatternProcessFunction extends PatternProcessFunction<UserLoginLog, UserLoginLog> {/*** Generates resulting elements given a map of detected pattern events. The events are* identified by their specified names.** <p>{@link Context#timestamp()} in this case returns the time of the* last element that was assigned to the match, resulting in this partial match being finished.** @param match map containing the found pattern. Events are identified by their names.* @param ctx   enables access to time features and emitting results through side outputs* @param out   Collector used to output the generated elements* @throws Exception This method may throw exceptions. Throwing an exception will cause the*                   operation to fail and may trigger recovery.*/@Overridepublic void processMatch(Map<String, List<UserLoginLog>> match, Context ctx, Collector<UserLoginLog> out) throws Exception {List<UserLoginLog> start = match.get("start");out.collect(start.get(0));}
}

4.4、主类

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;@Slf4j
public class CepLearning {public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*** 设置成1,是为了能够触发watermark来计算*/env.setParallelism(1);DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<UserLoginLog> dataStream = socketTextStream.flatMap(new MyFlatMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<UserLoginLog>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((SerializableTimestampAssigner<UserLoginLog>) (element, recordTimestamp) -> element.getLoginTime()));/*** 10s钟之内连续3次登陆失败的才输出,强制连续*/Pattern<UserLoginLog, UserLoginLog> wherePatternOne = Pattern.<UserLoginLog>begin("start").where(new SimpleCondition<UserLoginLog>() {@Overridepublic boolean filter(UserLoginLog value) throws Exception {return 1 == value.getLoginStatus();}}).next("second").where(new IterativeCondition<UserLoginLog>() {@Overridepublic boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {return 1 == value.getLoginStatus();}}).next("third").where(new SimpleCondition<UserLoginLog>() {@Overridepublic boolean filter(UserLoginLog value) throws Exception {return 1 == value.getLoginStatus();}}).within(Time.seconds(10));/*** 10s钟之内连续3次登陆失败的才输出,不强制连续*/Pattern<UserLoginLog, UserLoginLog> wherePatternTwo = Pattern.<UserLoginLog>begin("start").where(new IterativeCondition<UserLoginLog>() {@Overridepublic boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {return 1 == value.getLoginStatus();}}).times(3).within(Time.seconds(10));/*** 10s钟之内连续3次登陆失败的才输出,加上 consecutive 之后 就是 强制连续输出*/Pattern<UserLoginLog, UserLoginLog> wherePatternThree = Pattern.<UserLoginLog>begin("start").where(new IterativeCondition<UserLoginLog>() {@Overridepublic boolean filter(UserLoginLog value, Context<UserLoginLog> ctx) throws Exception {return 1 == value.getLoginStatus();}}).times(3).consecutive().within(Time.seconds(10));PatternStream<UserLoginLog> patternStream = CEP.pattern(dataStream, wherePatternOne);PatternStream<UserLoginLog> patternStream1 = CEP.pattern(dataStream, wherePatternTwo);PatternStream<UserLoginLog> patternStream2 = CEP.pattern(dataStream, wherePatternThree);SingleOutputStreamOperator<UserLoginLog> process = patternStream.process(new MyPatternProcessFunction());SingleOutputStreamOperator<UserLoginLog> process1 = patternStream1.process(new MyPatternProcessFunction());SingleOutputStreamOperator<UserLoginLog> process2 = patternStream2.process(new MyPatternProcessFunction());process.print("resultOutPut");process1.print("resultOutPutTwo");process2.print("resultOutPutThree");try {env.execute();} catch (Exception e) {e.printStackTrace();}}}

Flink CEP结合案例详解相关推荐

  1. 数据湖架构Hudi(五)Hudi集成Flink案例详解

    五.Hudi集成Flink案例详解 5.1 hudi集成flink flink的下载地址: https://archive.apache.org/dist/flink/ Hudi Supported ...

  2. 1.16.Flink Window和Time详解、TimeWindow的应用、Window聚合分类之全量聚合、全量聚合状态变化过程-求最大值、Time介绍、EventTime和Watermarks等

    1.16.Flink Window和Time详解 1.16.1.Window(窗口) 1.16.2.Window的类型 1.16.3.Window类型汇总 1.16.4.TimeWindow的应用 1 ...

  3. python代码案例详解-我用Python抓取了7000 多本电子书案例详解

    安装 安装很简单,只要执行: pip install requests-html 就可以了. 分析页面结构 通过浏览器审查元素可以发现这个电子书网站是用 WordPress 搭建的,首页列表元素很简单 ...

  4. python代码案例详解-第7.20节 案例详解:Python抽象类之真实子类

    第7.20节 案例详解:Python抽象类之真实子类 上节介绍了Python抽象基类相关概念,并介绍了抽象基类实现真实子类的步骤和语法,本节结合一个案例进一步详细介绍. 一. 案例说明 本节定义了图形 ...

  5. java同步方法完成案例_Java同步代码块和同步方法原理与应用案例详解

    本文实例讲述了java同步代码块和同步方法.分享给大家供大家参考,具体如下: 一 点睛 所谓原子性WOmoad:一段代码要么执行,要么不执行,不存在执行一部分被中断的情况.言外之意是这段代码就像原子一 ...

  6. 《微信小程序:开发入门及案例详解》—— 3.4 小结

    本节书摘来自华章出版社<微信小程序:开发入门及案例详解>一 书中的第3章,第3.4节,作者李骏 边思,更多章节内容可以访问云栖社区"华章计算机"公众号查看. 3.4 小 ...

  7. 代码检查规则:Python语言案例详解

    在之前的文章中代码检查规则:Java语言案例详解学习了Java的检查规则.我们今天将学习<代码检查规则:Python语言案例详解>,内容主要分为两个部分:Python的代码检查规则和Pyt ...

  8. 代码检查规则:Java语言案例详解

    本节课程为<代码检查规则:Java语言案例详解>, 通常情况下Java的代码检查规则可以分为以下十类: 接下来,让我们具体来看看每个分类的内容. 一.源文件规范 该类规范主要从文件名.文件 ...

  9. python装饰器setter_第7.27节 Python案例详解: @property装饰器定义属性访问方法getter、setter、deleter...

    上节详细介绍了利用@property装饰器定义属性的语法,本节通过具体案例来进一步说明. 一.    案例说明 本节的案例是定义Rectangle(长方形)类,为了说明问题,除构造函数外,其他方法都只 ...

最新文章

  1. HighChart学习-更新数据data Series与重绘
  2. 什么是静态路由,其特点是什么?什么是动态路由,其特点是什么?
  3. 交换机出现err-disable的原因及解决方法
  4. 048_CSS3用户界面
  5. 如何写python脚本抓取数据并计算_【小工具】利用Python脚本从Gaussian计算结果中提取信息...
  6. mysql是小型数据库_mysql小型数据库
  7. jquery 选择器、筛选器、事件绑定与事件委派
  8. 科技部 2010-09-05
  9. 【java】我工作三年了,该懂并发了!
  10. Linux版本的ActiveMQ安装过程
  11. 修改/etc/resolv.conf又恢复到原来的状态?[转]
  12. 技术人生:立志、勤学、改过、责善
  13. Homestead安装 PHP Redis 扩展
  14. Ubuntu 10.04 开机默认进入命令行模式
  15. linux下JMeter安装
  16. Chrome 离线下载最佳方法
  17. 计算机指法标准,计算机键盘指法的正确练习步骤
  18. 图像处理的Alpha通道
  19. 阿里云数据库怎么使用
  20. 万兆网络传输速度测试_万兆网络有多快?实测一把先!

热门文章

  1. iOS Thread 1: EXC_BAD_ACCESS (code=EXC_I386_GPFLT)崩溃错误
  2. HP9000系统管理手册
  3. 珍宝鸭的力扣练习(11):基本数学问题
  4. 支持外部链接跳转的 Vue Router 扩展实现
  5. 国瀚实业|家庭成熟期如何进行理财规划
  6. 团队协作五大障碍读书笔记
  7. 错误0x800700ea:有更多数据可用
  8. 720°VR全景网站制作-多场景
  9. 什么,同一个文本可以设置多种样式?
  10. 2023年度国家自然科学基金项目开放申报及注意事项