Flink-CEP快速入门
更新时间:2022-09-12 10:58:28发布时间:2小时前朗读
文章目录
0. 简介 & 使用步骤
简介
使用步骤

  1. 模式API(Pattern API:匹配规则)
    单个模式
    量词
    条件
    限定子类型
    简单条件(SimpleCondition)
    迭代条件(IterativeCondition)
    组合条件
    终止条件
    模式操作列举
    组合模式
    连续性
    循环模式中的近邻条件
    模式组
    匹配后跳过策略

  2. 检测模式(检测满足规则的复杂事件)
    将模式应用到流上
    处理匹配事件
    匹配事件的选择提取(select)
    PatternSelectFunction
    PatternFlatSelectFunction
    匹配事件的通用处理(process)
    处理超时事件
    Maven

  3. 简介 & 使用步骤 简介
    所谓 CEP,其实就是“复杂事件处理(Complex Event Processing)”的缩写;而 Flink CEP,就是 Flink 实现的一个用于复杂事件处理的库(library)
    把事件流中的一个个简单事件,通过一定的规则匹配组合起来,这就是“复杂事件”;然后基于这些满足规则的一组组复杂事件进行转换处理,得到想要的结果进行输出
    使用步骤
    复杂事件处理(CEP)的流程可以分成三个步骤:
    定义一个匹配规则
    将匹配规则应用到事件流上,检测满足规则的复杂事件
    对检测到的复杂事件进行处理,得到结果进行输出
    // 实体类
    public class LoginEvent {
    public String userId;
    public String ipAddress;
    public String eventType;
    public Long timestamp;

    public LoginEvent(String userId, String ipAddress, String eventType, Long timestamp) {
    this.userId = userId;
    this.ipAddress = ipAddress;
    this.eventType = eventType;
    this.timestamp = timestamp;
    }
    }

// CEP Demo
public class Demo003 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 获取登录事件流,并提取时间戳、生成水位线SingleOutputStreamOperator<loginevent> sourceData = env.fromElements(new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),new LoginEvent("user_2", "192.168.1.29", "success", 6000L),new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<loginevent>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<loginevent>() {<!-- -->@Overridepublic long extractTimestamp(LoginEvent loginEvent, long l) {<!-- -->return loginEvent.timestamp;}}));// 1. 定义一个匹配规则:定义 Pattern,连续的三个登录失败事件Pattern<loginevent, loginevent=""> pattern = Pattern.<loginevent>begin("first")  // 以第一个登录失败事件开始.where(new IterativeCondition<loginevent>() {<!-- -->@Overridepublic boolean filter(LoginEvent loginEvent, Context<loginevent> context) throws Exception {<!-- -->return "fail".equals(loginEvent.eventType);}}).next("second")  // 接着是第二个登录失败事件.where(new IterativeCondition<loginevent>() {<!-- -->@Overridepublic boolean filter(LoginEvent loginEvent, Context<loginevent> context) throws Exception {<!-- -->return "fail".equals(loginEvent.eventType);}}).next("third")  // 接着是第三个登录失败事件.where(new IterativeCondition<loginevent>() {<!-- -->@Overridepublic boolean filter(LoginEvent loginEvent, Context<loginevent> context) throws Exception {<!-- -->return "fail".equals(loginEvent.eventType);}});// 2. 将 Pattern 应用到流上,检测匹配的复杂事件,得到一个 PatternStreamPatternStream<loginevent> cepPattern = CEP.pattern(sourceData.keyBy(loginEvent -> loginEvent.userId), pattern);// 3. 对检测到的复杂事件进行处理:将匹配到的复杂事件选择出来,然后包装成字符串SingleOutputStreamOperator<string> select = cepPattern.select(new PatternSelectFunction<loginevent, string="">() {<!-- -->@Overridepublic String select(Map<string, list<loginevent="">> map) throws Exception {<!-- -->LoginEvent first = map.get("first").get(0);LoginEvent second = map.get("second").get(0);LoginEvent third = map.get("third").get(0);return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp;}});select.print();env.execute();
}

}
</string,></loginevent,></loginevent,>

  1. 模式API(Pattern API:匹配规则) 单个模式
    一个模式可以是一个单例或者循环模式。单例模式只接受一个事件,循环模式可以接受多个事件。 在模式匹配表达式中,模式"a b+ c? d"(或者"a",后面跟着一个或者多个"b",再往后可选择的跟着一个"c",最后跟着一个"d"), a,c?,和 d都是单例模式,b+是一个循环模式
    量词
    单个模式后面可以跟一个“量词”,用来指定循环的次数,单个模式可以包括“单例(singleton)模式”和“循环(looping)模式”,默认是“单例(singleton)模式”,当定义了量词之后,就变成了“循环模式”,可以匹配接收多个事件
    循环模式的方法:

.oneOrMore()
匹配事件出现一次或多次,假设 a 是一个个体模式,a.oneOrMore()表示可以匹配 1 个或多个 a 的事件组合。我们有时会用 a+来简单表示
.times(times)
匹配事件发生特定次数(times),例如 a.times(3)表示 aaa
.times(fromTimes,toTimes)
指定匹配事件出现的次数范围,最小次数为fromTimes,最大次数为toTimes。例如a.times(2, 4)可以匹配 aa,aaa 和 aaaa
.greedy()
只能用在循环模式后,使当前循环模式变得“贪心”(greedy),也就是总是尽可能多地去匹配。例如 a.times(2, 4).greedy(),如果出现了连续 4 个 a,那么会直接把 aaaa 检测出来进行处理,其他任意 2 个 a 是不算匹配事件的
.optional()
使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足
// 期望出现4次
start.times(4);

// 期望出现0或者4次
start.times(4).optional();

// 期望出现2、3或者4次
start.times(2, 4);

// 期望出现2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).greedy();

// 期望出现0、2、3或者4次
start.times(2, 4).optional();

// 期望出现0、2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).optional().greedy();

// 期望出现1到多次
start.oneOrMore();

// 期望出现1到多次,并且尽可能的重复次数多
start.oneOrMore().greedy();

// 期望出现0到多次
start.oneOrMore().optional();

// 期望出现0到多次,并且尽可能的重复次数多
start.oneOrMore().optional().greedy();

// 期望出现2到多次
start.timesOrMore(2);

// 期望出现2到多次,并且尽可能的重复次数多
start.timesOrMore(2).greedy();

// 期望出现0、2或多次
start.timesOrMore(2).optional();

// 期望出现0、2或多次,并且尽可能的重复次数多
start.timesOrMore(2).optional().greedy();
条件 限定子类型
调用.subtype()方法可以为当前模式增加子类型限制条件

// 这里 SubEvent 是流中数据类型 Event 的子类型。只有事件是 SubEvent 类型时,才可以满足当前模式 pattern 的匹配条件
pattern.subtype(SubEvent.class);
简单条件(SimpleCondition)
简单条件是最简单的匹配规则,只根据当前事件的特征来决定是否接受它。这在本质上其实就是一个 filter 操作

start.where(new SimpleCondition() {
@Override
public boolean filter(MyEvent myEvent) throws Exception {
return … // 一些判断条件
}
})

迭代条件(IterativeCondition)
在实际应用中,我们可能需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。这种需要依靠之前事件来做判断的条件,就叫作“迭代条件”(Iterative Condition)

Pattern.begin(“first”)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
if (!“event1001”.equals(myEvent.getEvent())) {
return false;
}

        // 根据上下文获取 之前的事件,获取之前满足条件的Iterable<myevent> myEventIterable = context.getEventsForPattern("first");// TODO 处理 之前的事件return ... // 一些判断条件}
});

组合条件
可以多个条件一起使用,当有多个判断逻辑的时候我们可能会用if-else的方式,但组合条件可以在 where()方法后继续接or()方法来组合使用

Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return … // 一些判断条件
}
}).or(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return … // 一些判断条件
}
});
</myevent,>
终止条件
终止条件的定义是通过调用模式对象的.until()方法来实现的

⚠️终止条件只与oneOrMore()或者oneOrMore().optional()结合使用

Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return … // 一些判断条件
}
}).oneOrMore()
.until(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return … // 一些判断条件
}
});
</myevent,>
模式操作列举
模式操作 描述
where(condition) 为当前模式定义一个条件。为了匹配这个模式,一个事件必须满足某些条件。 多个连续的where()语句取与组成判断条件:java pattern.where(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return … // 一些判断条件 } });
or(condition) 增加一个新的判断,和当前的判断取或。一个事件只要满足至少一个判断条件就匹配到模式:java pattern.where(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return … // 一些判断条件 } }).or(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return … // 替代条件 } });
until(condition) 为循环模式指定一个停止条件。意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了。只适用于和oneOrMore()同时使用。NOTE: 在基于事件的条件中,它可用于清理对应模式的状态。java pattern.oneOrMore().until(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return … // 替代条件 } });
subtype(subClass) 为当前模式定义一个子类型条件。一个事件只有是这个子类型的时候才能匹配到模式:java pattern.subtype(SubEvent.class);
oneOrMore() 指定模式期望匹配到的事件至少出现一次。.默认(在子事件间)使用松散的内部连续性。 NOTE: 推荐使用until()或者within()来清理状态。java pattern.oneOrMore();
timesOrMore(#times) 指定模式期望匹配到的事件至少出现**#times次。.默认(在子事件间)使用松散的内部连续性。 java pattern.timesOrMore(2);
times(#ofTimes) 指定模式期望匹配到的事件正好出现的次数。默认(在子事件间)使用松散的内部连续性。 java pattern.times(2);
times(#fromTimes, #toTimes) 指定模式期望匹配到的事件出现次数在
#fromTimes和#toTimes**之间。默认(在子事件间)使用松散的内部连续性。 java pattern.times(2, 4);
optional() 指定这个模式是可选的,也就是说,它可能根本不出现。这对所有之前提到的量词都适用。java pattern.oneOrMore().optional();
greedy() 指定这个模式是贪心的,也就是说,它会重复尽可能多的次数。这只对量词适用,现在还不支持模式组。java pattern.oneOrMore().greedy();
组合模式 连续性
将多个个体模式组合起来的完整模式,就叫作“组合模式”

FlinkCEP支持事件之间如下形式的连续策略:

严格连续: next()期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。
松散连续: followedBy()忽略匹配的事件之间的不匹配的事件。
不确定的松散连续: followedByAny()更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。
notNext():如果不想后面直接连着一个特定事件
notFollowedBy(),如果不想一个特定事件发生在两个事件之间的任何地方
// 严格连续
Pattern<event, ?=“”> strict = start.next(“middle”).where(…);

// 松散连续
Pattern<event, ?=“”> relaxed = start.followedBy(“middle”).where(…);

// 不确定的松散连续
Pattern<event, ?=“”> nonDetermin = start.followedByAny(“middle”).where(…);

// 严格连续的NOT模式
Pattern<event, ?=“”> strictNot = start.notNext(“not”).where(…);

// 松散连续的NOT模式
Pattern<event, ?=“”> relaxedNot = start.notFollowedBy(“not”).where(…);
</event,></event,></event,></event,></event,>
within()方法:指定一个模式应该在一定时间内发生

// 在十秒钟内,从 event1001 开始到 event1004 结束才算
Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return “event1001”.equals(myEvent.getEvent());
}
})
.followedBy(“second”)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return “event1004”.equals(myEvent.getEvent());
}
})
.within(Time.seconds(10L));
</myevent,>
循环模式中的近邻条件
oneOrMore()、times()等循环模式的默认是松散连续,也就是followedBy()模式

.consecutive():在oneOrMore()、times()等循环模式后面跟上consecutive()表示严格连续(next())

// 1. 定义 Pattern,登录失败事件,循环检测 3 次
Pattern<loginevent, loginevent=“”> pattern = Pattern
.begin(“fails”)
.where(new SimpleCondition() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals(“fail”);
}
}).times(3).consecutive();
</loginevent,>
.allowCombinations():在oneOrMore()、times()等循环模式后面跟上allowCombinations()表示不确定的松散连续(followedByAny())

模式组
也可以定义一个模式序列作为begin,followedBy,followedByAny和next的条件。这个模式序列在逻辑上会被当作匹配的条件, 并且返回一个GroupPattern,可以在GroupPattern上使用oneOrMore(),times(#ofTimes), times(#fromTimes, #toTimes),optional(),consecutive(),allowCombinations()。

Pattern<event, ?=“”> start = Pattern.begin(
Pattern.begin(“start”).where(…).followedBy(“start_middle”).where(…)
);

// 严格连续
Pattern<event, ?=“”> strict = start.next(
Pattern.begin(“next_start”).where(…).followedBy(“next_middle”).where(…)
).times(3);

// 松散连续
Pattern<event, ?=“”> relaxed = start.followedBy(
Pattern.begin(“followedby_start”).where(…).followedBy(“followedby_middle”).where(…)
).oneOrMore();

// 不确定松散连续
Pattern<event, ?=“”> nonDetermin = start.followedByAny(
Pattern.begin(“followedbyany_start”).where(…).followedBy(“followedbyany_middle”).where(…)
).optional();
</event,></event,></event,></event,>
模式操作 描述
begin(#name) 定义一个开始的模式:java Pattern start = Pattern.begin(“start”);
begin(#pattern_sequence) 定义一个开始的模式:java Pattern start = Pattern.begin( Pattern.begin(“start”).where(…).followedBy(“middle”).where(…) );
next(#name) 增加一个新的模式。匹配的事件必须是直接跟在前面匹配到的事件后面(严格连续):java Pattern next = start.next(“middle”);
next(#pattern_sequence) 增加一个新的模式。匹配的事件序列必须是直接跟在前面匹配到的事件后面(严格连续):java Pattern next = start.next( Pattern.begin(“start”).where(…).followedBy(“middle”).where(…) );
followedBy(#name) 增加一个新的模式。可以有其他事件出现在匹配的事件和之前匹配到的事件中间(松散连续):java Pattern followedBy = start.followedBy(“middle”);
followedBy(#pattern_sequence) 增加一个新的模式。可以有其他事件出现在匹配的事件序列和之前匹配到的事件中间(松散连续):java Pattern followedBy = start.followedBy( Pattern.begin(“start”).where(…).followedBy(“middle”).where(…) );
followedByAny(#name) 增加一个新的模式。可以有其他事件出现在匹配的事件和之前匹配到的事件中间, 每个可选的匹配事件都会作为可选的匹配结果输出(不确定的松散连续):java Pattern followedByAny = start.followedByAny(“middle”);
followedByAny(#pattern_sequence) 增加一个新的模式。可以有其他事件出现在匹配的事件序列和之前匹配到的事件中间, 每个可选的匹配事件序列都会作为可选的匹配结果输出(不确定的松散连续):java Pattern followedByAny = start.followedByAny( Pattern.begin(“start”).where(…).followedBy(“middle”).where(…) );
notNext() 增加一个新的否定模式。匹配的(否定)事件必须直接跟在前面匹配到的事件之后(严格连续)来丢弃这些部分匹配:java Pattern notNext = start.notNext(“not”);
notFollowedBy() 增加一个新的否定模式。即使有其他事件在匹配的(否定)事件和之前匹配的事件之间发生, 部分匹配的事件序列也会被丢弃(松散连续):java Pattern notFollowedBy = start.notFollowedBy(“not”);
within(time) 定义匹配模式的事件序列出现的最大时间间隔。如果未完成的事件序列超过了这个事件,就会被丢弃:java pattern.within(Time.seconds(10));
匹配后跳过策略
对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,你需要指定跳过策略AfterMatchSkipStrategy。 有五种跳过策略,如下:

NO_SKIP: 不跳过
SKIP_TO_NEXT: 跳至下一个
SKIP_PAST_LAST_EVENT: 跳过所有子匹配
SKIP_TO_FIRST: 跳至第一个
SKIP_TO_LAST: 跳至最后一个
例如,给定一个模式b+ c和一个数据流b1 b2 b3 c,不同跳过策略之间的不同如下:

跳过策略 结果 描述
NO_SKIP b1 b2 b3 c
b2 b3 c
b3 c 找到匹配b1 b2 b3 c之后,不会丢弃任何结果。
SKIP_TO_NEXT b1 b2 b3 c
b2 b3 c
b3 c 找到匹配b1 b2 b3 c之后,不会丢弃任何结果,因为没有以b1开始的其他匹配。
SKIP_PAST_LAST_EVENT b1 b2 b3 c 找到匹配b1 b2 b3 c之后,会丢弃其他所有的部分匹配。
SKIP_TO_FIRST[b] b1 b2 b3 c
b2 b3 c
b3 c 找到匹配b1 b2 b3 c之后,会尝试丢弃所有在b1之前开始的部分匹配,但没有这样的匹配,所以没有任何匹配被丢弃。
SKIP_TO_LAST[b] b1 b2 b3 c
b3 c 找到匹配b1 b2 b3 c之后,会尝试丢弃所有在b3之前开始的部分匹配,有一个这样的b2 b3 c被丢弃。
方法 描述
AfterMatchSkipStrategy.noSkip() 创建NO_SKIP策略
AfterMatchSkipStrategy.skipToNext() 创建SKIP_TO_NEXT策略
AfterMatchSkipStrategy.skipPastLastEvent() 创建SKIP_PAST_LAST_EVENT策略
AfterMatchSkipStrategy.skipToFirst(patternName) 创建引用模式名称为patternName的SKIP_TO_FIRST策略
AfterMatchSkipStrategy.skipToLast(patternName) 创建引用模式名称为patternName的SKIP_TO_LAST策略
skipToNext

// 配置跳过策略:skipToNext模式
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToNext();
// 将跳过策略加入到模式中
Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”, skipStrategy)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return … // 一些判断条件
}
});
</myevent,>
skipToFirst(patternName)

// 配置跳过策略:skipToFirst模式,参数传模式名称
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst(“first”);
Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”, skipStrategy)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return “event1001”.equals(myEvent.getEvent());
}
}).oneOrMore()
.followedBy(“second”)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return “event1003”.equals(myEvent.getEvent());
}
})
.followedBy(“thrid”)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return “event1004”.equals(myEvent.getEvent());
}
});
</myevent,>
2. 检测模式(检测满足规则的复杂事件) 将模式应用到流上
调用 CEP 类的静态方法.pattern(),将数据流(DataStream)和模式(Pattern)作为两个参数传入
DataStream,也可以通过 keyBy 进行按键分区得到 KeyedStream,接下来对复杂事件的检测就会针对不同的 key 单独进行了
DataStream inputStream = …
Pattern<event, ?=“”> pattern = …
PatternStream patternStream = CEP.pattern(inputStream, pattern);
</event,>
处理匹配事件 匹配事件的选择提取(select) PatternSelectFunction
处理匹配事件最简单的方式,就是从 PatternStream 中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select)

Pattern.begin(“first”).where(…);

// 处理匹配事件
cepPattern.select(new PatternSelectFunction<myevent, string=“”>() {
@Override
public String select(Map<string, list<myevent=“”>> map) throws Exception {
// first 是 Pattern 的 name 字符串
List first = map.get(“first”);
return … // 处理匹配事件逻辑
}
});
</string,></myevent,>
PatternFlatSelectFunction
.flatSelect(),传入的参数是一个PatternFlatSelectFunction。这是 PatternSelectFunction 的“扁平化”版本;内部需要实现一个 flatSelect()方法,

它与之前 select()的不同就在于没有返回值,而是多了一个收集器(Collector)参数 collector,通过调用 collector.collet()方法就可以实现多次发送输出数据了

cepPattern.flatSelect(new PatternFlatSelectFunction<myevent, string=“”>() {
@Override
public void flatSelect(Map<string, list<myevent=“”>> map, Collector collector) throws Exception {
// 处理匹配事件逻辑
}
});
</string,></myevent,>
匹配事件的通用处理(process)
自 1.8 版本之后,Flink CEP 引入了对于匹配事件的通用检测处理方式,那就是直接调用PatternStream 的.process()方法,传入一个 PatternProcessFunction。这看起来就像是我们熟悉的处理函数(process function),它也可以访问一个上下文(Context),进行更多的操作。

PatternProcessFunction 功能更加丰富、调用更加灵活,可以完全覆盖其他接口,也就成为了目前官方推荐的处理方式。事实上,PatternSelectFunction 和 PatternFlatSelectFunction在 CEP 内部执行时也会被转换成 PatternProcessFunction

Context context:上下文

collector.collect():调用此方法实现发送输出数据

cepPattern.process(new PatternProcessFunction<myevent, string=“”>() {
@Override
public void processMatch(Map<string, list<myevent=“”>> map, Context context, Collector collector) throws Exception {
// 处理匹配事件逻辑
}
});
</string,></myevent,>
处理超时事件
在 Flink CEP 中 , 提 供 了 一 个 专 门 捕 捉 超 时 的 部 分 匹 配 事 件 的 接 口 , 叫 作TimedOutPartialMatchHandler。这个接口需要实现一个 processTimedOutMatch()方法,可以将超时的、已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个参数则是 PatternProcessFunction 的上下文 Context。所以这个接口必须与 PatternProcessFunction结合使用,对处理结果的输出则需要利用侧输出流来进行

PatternStream cepPattern = CEP.pattern(myEventData.keyBy(myEvent -> myEvent.getUserId()), pattern);
// 测流
OutputTag outputTag = new OutputTag(“time_out”){};
// 超时数据处理
SingleOutputStreamOperator processData = cepPattern.process(new MyPatternProcessFunction());

// 数据处理,处理匹配成功数据,处理超时数据
public static class MyPatternProcessFunction extends PatternProcessFunction<myevent, string=“”>
implements TimedOutPartialMatchHandler {

@Override
public void processMatch(Map<string, list<myevent="">> map, Context context, Collector<string> collector) throws Exception {<!-- -->// 匹配成功逻辑处理
}@Override
public void processTimedOutMatch(Map<string, list<myevent="">> map, Context context) throws Exception {<!-- -->// 超时逻辑处理,将数据写入到测输出流中OutputTag<string> outputTag = new OutputTag<string>("time_out"){<!-- -->};String str = ...  // 逻辑处理context.output(outputTag, str);
}

}
</string,></string,></myevent,>
Maven

<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>

org.apache.flink flink-cep_${scala.binary.version} ${flink.version} org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version}

文章转自:Flink-CEP快速入门_Java-答学网

作者:答学网,转载请注明原文链接:http://www.dxzl8.com/

Flink-CEP快速入门相关推荐

  1. Flink 笔记01:安装部署与快速入门

    stypora-copy-images-to: img typora-root-url: ./ Flink 笔记01:安装部署与快速入门 一.课程回安排与内容提纲 大数据技术框架转折点:2017年,天 ...

  2. Flink:从入门到放弃

    文章目录 前言 一.Flink简介 1. Flink组件栈 2. Flink基石 3. Fink的应用场景 3.1 Event-driven Applications[事件驱动] 3.2 Data A ...

  3. flink和kafka区别_Apache Flink和Kafka入门

    flink和kafka区别 介绍 Apache Flink是用于分布式流和批处理数据处理的开源平台. Flink是具有多个API的流数据流引擎,用于创建面向数据流的应用程序. Flink应用程序通常使 ...

  4. Apache Flink和Kafka入门

    介绍 Apache Flink是用于分布式流和批处理数据处理的开源平台. Flink是具有多个API的流数据流引擎,用于创建面向数据流的应用程序. Flink应用程序通常使用Apache Kafka进 ...

  5. 【Flink】基于 Flink CEP 实时计算商品订单流失量

    1.概述 转载:https://blog.csdn.net/tzs_1041218129/article/details/108786597 假设有个需求需要实时计算商品的订单流失量,规则如下: 用户 ...

  6. 从滴滴的Flink CEP引擎说起

    从滴滴的Flink CEP引擎说起 本文转载自 https://www.cnblogs.com/cx2016/p/11647110.html. CEP业务场景 复杂事件处理(Complex Event ...

  7. Flink(初识Flink,快速上手)

    目录 初识Flink Flink设计理念 Flink的应用 Flink在企业中的应用 Flink的主要应用场景 流式数据处理的发展和演变 流处理和批处理 传统事务处理 有状态的流处理 Lambda 架 ...

  8. ElasticSearch最新版快速入门详解

    写在前面:我是「且听风吟」,目前是某上市游戏公司的大数据开发工程师,热爱大数据开源技术,喜欢分享自己的所学所悟,现阶段正在从头梳理大数据体系的知识,以后将会把时间重点放在Spark和Flink上面. ...

  9. Flink CEP在哈啰出行的应用

    来源:ververica.cn 作者:刘博·哈啰出行 By 大数据技术与架构 场景描述:Flink CEP 是 Flink 的复杂处理库.它允许用户快速检测无尽数据流中的复杂模式.不过 Flink C ...

  10. 一文学会 Flink CEP(以直播平台监控用户弹幕为例)

    我们在看直播的时候,不管对于主播还是用户来说,非常重要的一项就是弹幕文化.为了增加直播趣味性和互动性, 各大网络直播平台纷纷采用弹窗弹幕作为用户实时交流的方式,内容丰富且形式多样的弹幕数据中隐含着复杂 ...

最新文章

  1. Intelij IDEA注册码生成代码
  2. C语言杂谈:指针与数组 (上) (转)
  3. 用CSS3让不知道宽高的元素居中
  4. 算法学习之快速排序的C语言实现
  5. 超级有趣的七个 404 错误页面设计
  6. 百练162:Post Office
  7. mysql auto_increment 原理_[Mysql]mysql原理之Auto_increment
  8. c iostream.源码_通达信《牛气冲天》指标,共振主升浪冲涨停,牛散经常用(附源码...
  9. 多表查询返回多个DataTable,合并到一个Table中.
  10. Java Secret:加载和卸载静态字段
  11. 漫话:如何给女朋友解释什么是适配器模式?
  12. ecshop活动页_ecshop 促销活动,如每人只限购1件
  13. 在Ubuntu 将PHP5升级到PHP7.0 PHP7.1
  14. python 通达信公式函数_通达信,文华财经,非常实用的主图均线变色指标
  15. 逃跑h5小游戏源码熊出没手机游戏
  16. Django使用manager.py 运行项目,或者uWSGI进行部署项目,使用Nginx进行负载均衡
  17. 听力1-10中的不熟悉的单词
  18. OpenGauss/MogDB调用C FUNCTION 范例
  19. Linux安装zabbix4
  20. html中把图片移动位置不变,css如何定位图片保持位置不变?

热门文章

  1. 傅里叶光学随机散斑原理 matlab仿真实现随机散斑
  2. BlueScreenView: 系统蓝屏分析工具
  3. SQL基本语法总结(含SQL代码)
  4. 物流前沿理论与方法1
  5. opencv无法打开源文件opencv2/opencv.hpp文件
  6. 傻瓜攻略(十九)——MATLAB实现SVM多分类
  7. 数据运营平台-数据采集
  8. 华为模拟eNSP器交换机简单开具以及基础命令
  9. 【C++软件开发】面试经典题目汇总
  10. 【数学模型】基于Matlab实现洪水调度运算