flink CEP
Apache Flink提供FlinkCEP库,该库提供用于执行复杂事件处理的API。该库由以下核心组件组成:
事件流
模式定义
模式检测
警报生成

FlinkCEP在Flink的名为DataStream的流API上工作。程序员需要从事件流中定义要检测的模式,然后Flink的CEP引擎检测该模式并采取适当的操作,例如生成警报。
为了开始,我们需要添加以下Maven依赖项:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep- scala_2.10 -->
<dependency><groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_2.11</artifactId> <version>1.1.4</version></dependency>

事件流
CEP的一个非常重要的组成部分是它的输入事件流。在前面的章节中,我们已经看到了数据流API的详细信息。现在让我们使用这些知识来实现CEP。我们需要做的第一件事就是为事件定义JavaPOJO。假设我们需要监视温度传感器事件流。
首先定义一个抽象类,然后扩展这个类。

注意
在定义事件POJO时,我们需要确保实现hashCode ( )
和equals ()方法,因为在比较事件时,compile将使用它们。
下面的代码片段演示了这一点。首先,我们编写一个抽象类,如下所示:

package com.demo.chapter05;public abstract class MonitoringEvent {private String machineName;public String getMachineName() {return machineName;}public void setMachineName(String machineName) {this.machineName = machineName;}@Overridepublic int hashCode() {final int prime = 31;int result = 1;result = prime * result + ((machineName == null) ? 0 :machineName.hashCode());return result;}@Overridepublic boolean equals(Object obj) {if (this == obj) return true;if (obj == null)return false;if (getClass() != obj.getClass())return false;MonitoringEvent other = (MonitoringEvent) obj;if (machineName == null) {if (other.machineName != null) return false;} else if (!machineName.equals(other.machineName)) return false;return true;}public MonitoringEvent(String machineName) {super();this.machineName = machineName;}
}

然后,我们为实际温度事件创建POJO :

package com.demo.chapter05;public class TemperatureEvent extends MonitoringEvent {public TemperatureEvent(String machineName) {super(machineName);}private double temperature;public double getTemperature() {return temperature;}public void setTemperature(double temperature) {this.temperature = temperature;}@Overridepublic int hashCode() {final int prime = 31;int result = super.hashCode();long temp;temp = Double.doubleToLongBits(temperature);result = prime * result + (int) (temp ^ (temp >>> 32));return result;}@Overridepublic boolean equals(Object obj) {if (this == obj) return true;if (!super.equals(obj)) return false;if (getClass() != obj.getClass()) return false;TemperatureEvent other = (TemperatureEvent) obj;if (Double.doubleToLongBits(temperature) != Double.doubleToLongBits(other.temperature))return false;return true;}public TemperatureEvent(String machineName, double temperature) {super(machineName);this.temperature = temperature;}@Overridepublic String toString() {return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName()=" + getMachineName()+ "]";}
}

现在,我们可以如下定义事件源:在Java中:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),
new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1), new TemperatureEvent("xyz", 22.2),
new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz",
22.3), new TemperatureEvent("xyz", 22.1),
new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz",
22.7),
new TemperatureEvent("xyz", 27.0));

In Scala:

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<TemperatureEvent> inputEventStream=env.fromElements(new TemperatureEvent("xyz",22.0),new TemperatureEvent("xyz",20.1),new TemperatureEvent("xyz",21.1),new TemperatureEvent("xyz",22.2),new TemperatureEvent("xyz",22.1),new TemperatureEvent("xyz",22.3),new TemperatureEvent("xyz",22.1),new TemperatureEvent("xyz",22.4),new TemperatureEvent("xyz",22.7),new TemperatureEvent("xyz",27.0));

模式API
模式API允许您非常容易地定义复杂的事件模式。每个模式由多个状态组成。要从一个状态到另一个状态,通常我们需要定义条件。条件可以是连续性或过滤掉事件。

让我们尝试详细了解每个模式操作。

Begin开始

初始状态可以定义如下:

In Java:

Pattern<Event, ?> start = Pattern.begin(“start”);

In Scala:

val start : Pattern[Event, _] = Pattern.begin(“start”)

Filter过滤器

我们还可以指定初始状态的筛选条件:
In Java:

start.where(new FilterFunction<Event>() { @Overridepublic boolean filter(Event value) {return ... // condition
}
});

In Scala:

start.where(event => … /* condition */)

Subtype图表类型

我们还可以使用subtype ( )方法根据事件的子类型筛选出事件:
In Java:

start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() { @Override
public boolean filter(SubEvent value) {return ... // condition }
});

In Scala:

start.subtype(classOf[SubEvent]).where(subEvent => … /* condition */)

OR

模式API还允许我们一起定义多个条件。我们可以使用OR和AND运算符。

In Java:

pattern.where(new FilterFunction<Event>() { @Overridepublic boolean filter(Event value) {return ... // condition
}
}).or(new FilterFunction<Event>() { @Overridepublic boolean filter(Event value) {return ... // or condition
}
});

In Scala:

pattern.where(event => … /* condition /).or(event => … / or condition */)

Continuity连续性

如前所述,我们并不总是需要过滤掉事件。总有一些模式需要连续性而不是过滤器。
连续性可以分为两种类型-严格连续性和非严格连续性。

Strict continuity严格连续性

严格的连续性需要两个事件才能直接成功,这意味着两个事件之间不应有其他事件。此模式可由next ()定义。

In Java:

Pattern<Event, ?> strictNext = start.next(“middle”);

In Scala:

val strictNext: Pattern[Event, _] = start.next(“middle”)

Non-strict continuity

非严格连续性可以表述为其他事件之间允许的特定
两个事件。此模式可由followedBy ()定义。
In Java:

Pattern<Event, ?> nonStrictNext = start.followedBy(“middle”);

In Scala:

val nonStrictNext : Pattern[Event, _] = start.followedBy(“middle”)

Within

patternAPI还允许我们根据时间间隔进行模式匹配。我们可以如下定义基于时间的时间约束。
In Java:

next.within(Time.seconds(30));

In Scala:

next.within(Time.seconds(10))

Detecting patterns检测模式

要针对事件流检测模式,我们需要通过模式运行该流。pattern ( )返回patternstream。
下面的代码片段展示了如何检测模式。首先,定义模式以检查温度值在10秒钟内是否大于26.0度。

In Java:

Pattern<TemperatureEvent,?> warningPattern = Pattern.<TemperatureEvent> begin("first").subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent value) throws Exception {if (value.getTemperature() >= 26.0){return true;}return false;}}).within(Time.seconds(10));PatternStream<TemperatureEvent> pts = CEP.pattern(inputEventStream,warningPattern);

In Scala:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val input = // data
val pattern: Pattern[TempEvent, _] = Pattern.begin("start").where(event => event.temp >= 26.0)
val patternStream: PatternStream[TempEvent] = CEP.pattern(input, pattern)

Selecting from patterns从模式中选择

一旦模式流可用,我们需要从中选择模式,然后基于它采取适当的操作。我们可以使用select或flatselect方法从阵列中选择数据。

Select挑选

select方法需要PatternSelectionFunction实现。它具有为每个事件序列调用的select方法。select方法接收匹配事件的字符串/事件对的映射。字符串由状态的名称定义。select方法只返回一个结果。
为了收集结果,我们需要定义输出POJO。在我们的示例中,假设我们需要生成警报作为输出。然后,我们需要定义POJO,如下所示:

package com.demo.chapter05;
public class Alert {private String message;public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}public Alert(String message) {super();this.message = message;}@Overridepublic String toString() {return "Alert{" +"message='" + message + '\'' +'}';}@Overridepublic boolean equals(Object obj) {if (this == obj) return true;if (obj == null)return false;if (getClass() != obj.getClass())return false;Alert other = (Alert) obj;if (message == null) {if (other.message != null) return false;} else if (!message.equals(other.message)) return false;return true;}@Overridepublic int hashCode() {final int prime = 31;int result = 1;result = prime * result + ((message == null) ? 0 : message.hashCode());return result;}
}

接下来我们定义select函数。
In Java:

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {@Overridepublic OUT select(Map<String, IN> pattern) {IN startEvent = pattern.get("start");IN endEvent = pattern.get("end");return new OUT(startEvent, endEvent);}
}

In Scala:

def selectFn(pattern :mutable.Map[String, IN]):OUT={val startEvent=pattern.get("start").getval endEvent=pattern.get("end").get OUT(startEvent,endEvent)
}

flatSelect

flatselect方法与select方法类似。两者之间的唯一区别是flatselect可以返回任意数量的结果。flatselect方法具有用于输出元素的附加收集器参数。
下面的示例演示如何使用flatselect方法。
In Java:

class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {@Overridepublic void select(Map<String, IN> pattern, Collector<OUT> collector) {IN startEvent = pattern.get("start");IN endEvent = pattern.get("end");for (int i = 0; i < startEvent.getValue(); i++) {collector.collect(new OUT(startEvent, endEvent));}}
}

In Scala:

def flatSelectFn(pattern: mutable.Map[String, IN], collector: Collector[OUT] ) = {val startEvent = pattern.get ("start").get val endEvent = pattern.get ("end").getfor (i <- 0 to startEvent.getValue) {collector.collect (OUT (startEvent, endEvent) )}
}

Handling timed-out partial patterns处理超时的部分模式

有时,如果我们用时间限制模式,我们可能会错过某些事件。事件可能会因为超过长度而被丢弃。为了对超时事件采取操作,select和flatselect方法允许超时
处理程序。为每个超时事件模式调用此处理程序。
在这种情况下,select方法包含两个参数: PatternSelectFunction和patterntimeoutfunction。超时函数的返回类型可以与selectpattern函数不同。超时事件和select事件也包装在类中。右和左
下面的代码片段展示了我们在实践中的工作方式。

In Java:

PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Either<TimeoutEvent, ComplexEvent>> result = patternStream.select(
new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
new PatternSelectFunction<Event, ComplexEvent>() {...} );
DataStream<Either<TimeoutEvent, ComplexEvent>> flatResult = patternStream.flatSelect(
new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
new PatternFlatSelectFunction<Event, ComplexEvent>() {...} );

In Scala, the select API:

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{
(pattern: mutable.Map[String, Event], timestamp: Long) => TimeoutEvent()
}{pattern: mutable.Map[String, Event] => ComplexEvent() }

flat select API是与收集器一起调用的,因为它可以发出任意数量的事件:

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect{(pattern: mutable.Map[String, Event], timestamp: Long, out: Collector[TimeoutEvent]) =>
out.collect(TimeoutEvent())
}{(pattern: mutable.Map[String, Event], out:
Collector[ComplexEvent]) =>
}

Use case - complex event processing on a temperature sensor温度传感器上的用例复杂事件处理

在前面的部分中,我们了解了FlinkCEP引擎提供的各种特性。现在是了解我们如何在实际解决方案中使用它的时候了。为此,假设我们在一家生产一些产品的机械公司工作。在产品工厂中,需要不断监视某些机器。工厂已经设置了传感器,这些传感器在给定的时间内持续发送机器的温度。
现在,我们将设置一个系统,该系统持续监视温度值,并在温度超过某个值时生成警报。
我们可以使用以下体系结构:

传感器 --> kafka --> flink Streams -->CEP 模式匹配 --> 消息

在这里,我们将使用Kafka从传感器收集事件。为了编写Java应用程序,我们首先需要创建Maven项目并添加以下依赖项:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep- scala_2.11 -->
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_2.11</artifactId> <version>1.1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink- streaming-java_2.11 -->
<dependency><groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink- streaming-scala_2.11 --><dependency><groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.1.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.9_2.11</artifactId> <version>1.1.4</version>
</dependency>

接下来,我们需要做以下事情来使用Kafka。
首先,我们需要定义一个自定义的Kafka反序列化程序。这将从Kafka主题中读取字节并将其转换为temperatureevent。下面是执行此操作的代码。

事件反序列化模式
EventDeserializationSchema.java:

package com.demo.chapter05;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;public class EventDeserializationSchema implements DeserializationSchema<TemperatureEvent> {public TypeInformation<TemperatureEvent> getProducedType() {return TypeExtractor.getForClass(TemperatureEvent.class);}public TemperatureEvent deserialize(byte[] arg0) throws IOException {String str = new String(arg0, StandardCharsets.UTF_8);String[] parts = str.split("=");return new TemperatureEvent(parts[0], Double.parseDouble(parts[1]));}public boolean isEndOfStream(TemperatureEvent arg0) {return false;}
}

接下来,我们在Kafka中创建了名为temperature的主题:

bin/kafka-topics --create --zookeeper localhost:2181 --replication-

factor 1 --partitions 1 --topic temperature

现在我们转到Java代码,它将在脱机流中侦听这些事件:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<TemperatureEvent> inputEventStream = env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("temperature", newEventDeserializationSchema(), properties));

接下来,我们将定义模式以检查温度是否大于26.0摄氏度
10秒内:

Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>begin("first").subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {private static final long serialVersionUID = 1L;public boolean filter(TemperatureEvent value) {if (value.getTemperature() >= 26.0) {return true;}return false;}}).within(Time.seconds(10));

接下来,将此模式与事件流匹配并选择事件。我们还会将警报消息添加到结果流中,如下所示:

DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern).select(new PatternSelectFunction<TemperatureEvent, Alert>() {private static final long serialVersionUID = 1L;public Alert select(Map<String, TemperatureEvent> event) throws Exception {return new Alert("Temperature Rise Detected:" + event.get("first").getTemperature()+ " on machine name:" + event.get("first").getMachineName());}});

为了了解生成了哪些警报,我们将打印结果:

patternStream.print();

我们执行流:

env.execute(“CEP on Temperature Sensor”);

现在我们都准备好执行应用程序了。当我们在Kafka主题中获得消息时,CEP将继续执行。
实际执行如下所示。下面是我们如何提供示例输入:

xyz=21.0

xyz=30.0

LogShaft=29.3

Boiler=23.1

Boiler=24.2

Boiler=27.0

Boiler=29.0

以下是示例输出的外观:

我们还可以配置邮件客户端,并使用一些外部web挂接发送电子邮件或messenger通知。

注意
应用程序的代码可以在GitHub上找到:

https://github.com/deshpandetanmay/mastering-flink

flink CEP检测温度事件微型示例相关推荐

  1. 一文学会 Flink CEP(以直播平台监控用户弹幕为例)

    我们在看直播的时候,不管对于主播还是用户来说,非常重要的一项就是弹幕文化.为了增加直播趣味性和互动性, 各大网络直播平台纷纷采用弹窗弹幕作为用户实时交流的方式,内容丰富且形式多样的弹幕数据中隐含着复杂 ...

  2. 大数据计算引擎之Flink Flink CEP复杂事件编程

    原文地址:大数据计算引擎之Flink Flink CEP复杂事件编程 复杂事件编程(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的时事件系序列库,并利 ...

  3. Apache Flink CEP 实战

    本文根据Apache Flink 实战&进阶篇系列直播课程整理而成,由哈啰出行大数据实时平台资深开发刘博分享.通过一些简单的实际例子,从概念原理,到如何使用,再到功能的扩展,希望能够给打算使用 ...

  4. Flink: CEP详解

    本文根据 Apache Flink 系列直播课程整理而成,由哈啰出行大数据实时平台资深开发刘博分享.通过一些简单的实际例子,从概念原理,到如何使用,再到功能的扩展,希望能够给计划使用或者已经使用的同学 ...

  5. flink CEP之规则解释

    flink CEP表示复杂事件处理. CEP开发流程: 输入事件流的创建即DataStream pattern的定义即规则定义 通过pattern规则去事件流匹配 选取结果 pattern 1.规则定 ...

  6. 【Flink】基于 Flink CEP 实时计算商品订单流失量

    1.概述 转载:https://blog.csdn.net/tzs_1041218129/article/details/108786597 假设有个需求需要实时计算商品的订单流失量,规则如下: 用户 ...

  7. 从滴滴的Flink CEP引擎说起

    从滴滴的Flink CEP引擎说起 本文转载自 https://www.cnblogs.com/cx2016/p/11647110.html. CEP业务场景 复杂事件处理(Complex Event ...

  8. Flink CEP 在抖音电商的业务实践

    摘要:本文整理自抖音电商实时数仓研发工程师张健,在 FFA 实时风控专场的分享.本篇内容主要分为四个部分: Flink CEP 简介 业务场景与挑战 解决方案实践 未来展望 Tips:点击「阅读原文」 ...

  9. Flink CEP结合案例详解

    1.介绍 FlinkCEP是在Flink之上实现的复杂事件处理(CEP)库.它允许您在无穷无尽的事件流中检测事件模式,使您有机会掌握数据中重要的内容.通常会用来做一些用户操作APP的日志风控策略等多种 ...

最新文章

  1. Simple Dynamic Strings(SDS)源码解析和使用说明二
  2. 惊闻VeryCD的电驴下载部分即将关闭
  3. 基于webpack的前端工程化开发解决方案探索(一):动态生成HTML
  4. python编程软件v-Python编程狮
  5. TextView显示颜色高亮的问题
  6. 微信无法连接到服务器(110087)),110087无法连接网络是什么意思
  7. React 的 Hello World
  8. AndroidStudio安卓原生开发_Activity的IntentFlag_的第一个值_FLAG_ACTIVITY_NEW_TASK的用法---Android原生开发工作笔记89
  9. Android开发笔记(六十九)JNI实战
  10. eclipse 返回上一个选项卡、注释及取消注释 、大写变小写、 光标跳到下一行快捷键
  11. .Net转Java自学之路—基础巩固篇三十(JDBC)
  12. LeetCode - Duplicate Emails
  13. IDEA远程调试Java代码
  14. 如何提取微信公众号内视频 (高清 无水印)
  15. FFMpeg的码率控制 - CBR or VBR
  16. B - Mountainous landscape Gym - 100543B(线段树+计算几何)
  17. LeetCode114--词典中最长的单词、最短补全词、宝石与石头
  18. 视频云服务四路玩家,谁是最大赢家
  19. 联想微型计算机c365,联想C365一体机如何用u盘重装系统
  20. 计算机二级 公共基础知识

热门文章

  1. 【proteus】proteus界面介绍
  2. dos攻击的服务器修复,主机被dos攻击怎么办
  3. DevExpress 16.2安装破解
  4. 计算机求锐角三角函数怎么求,求锐角三角函数值的常用方法
  5. 编译原理 —— 算符优先分析法
  6. 互联网行业哪个职位更有前途?
  7. QT综合大作业—— 多媒体应用程序设计
  8. QT 的 QSS 的基本概念
  9. 解决linux服务器中tomcat无法创建目录的问题
  10. 数据结构与算法心得笔记——零起点学习(一)