1.概述

转载:https://blog.csdn.net/tzs_1041218129/article/details/108786597

假设有个需求需要实时计算商品的订单流失量,规则如下:

用户点击商品 A,但购买了同类商品 B,则商品 A 记为一次订单流失量;

点击商品 A 到购买同类商品 B 的有效时间窗口应该小于 12 个小时;

有效窗口内多次点击商品 A 视为一次订单流失。

第三条规则可以理解为数据流去重,我在上一节已经介绍过了。为了更加专注于计算商品的订单流失量,本篇文章不再关注数据去重。

看到这个需求,想到可以用上一节的 ProcessFunction 进行状态管理,比如说基于用户进行分流,然后每个用户维护一个状态和一个有效时间窗口,触发购买同类事件后进行数据统计,过了有效期后舍弃。

但是,有没有更优雅的一点方式呢?

答案是有的,我们可以使用 Flink 自带的 CEP 来实现。

下面先简单介绍下 FlinkCEP,然后给出代码实践。

1.FlinkCEP
1.1 什么是 CEP
CEP 全称为 Complex Event Process,是在 Flink 之上实现的复杂事件处理(CEP)库。它允许你在无界的事件流中检测事件模式,让你有机会掌握数据中重要的事项。

例如:“起床–>洗漱–>吃饭–>上班”这一系列串联起来的事件流形成的模式称为 CEP。如果发现某一次起床后没有刷牙洗脸亦或是吃饭就直接上班,就可以把这种非正常的事件流匹配出来进行分析,看看今天是不是起晚了。

再举几个经典例子:

异常检测:打车计费后 12 小时还未结束订单;用户短时间内连续完成多个订单;

实时营销:用户在不同平台进行比价;

数据监控:检测某些指标,比如订单流失量。

1.2 FlinkCEP 原理
FlinkCEP 内部是用 「NFA(非确定有限自动机)「来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为」起始状态」、「中间状态」、「最终状态」三种,边分为 「take」、「ignore」、「proceed」 三种。

「take」:必须存在一个条件判断,当到来的消息满足 take 边条件判断时,把这个消息放入结果集,将状态转移到下一状态。

「ignore」:当消息到来时,可以忽略这个消息,将状态自旋在当前不变,是一个自己到自己的状态转移。

「proceed」:又叫做状态的空转移,当前状态可以不依赖于消息到来而直接转移到下一状态。举个例子,当用户购买商品时,如果购买前有一个咨询客服的行为,需要把咨询客服行为和购买行为两个消息一起放到结果集中向下游输出;如果购买前没有咨询客服的行为,只需把购买行为放到结果集中向下游输出就可以了。也就是说,如果有咨询客服的行为,就存在咨询客服状态的上的消息保存,如果没有咨询客服的行为,就不存在咨询客服状态的上的消息保存,咨询客服状态是由一条 proceed 边和下游的购买状态相连。

当然,在我们的场景中不会涉及太多复杂的概念。

2.FlinkCEP 简单上手
本节内容引用参考 1,用于完成基本的概念讲解和 Demo 实现。

2.1 单个 Pattern
我们先从简单的内容入手。看看在单个Pattern下,Flink CEP是如何匹配的。

2.1.1 各个API的用法
在学习 Flink CEP 的过程中,很容易找到相似的博文,文章中使用表格列举出了各个 API 的作用。然而大家很容易发现,这东西太像正则表达式了(实际上底层匹配逻辑的实现方式应该也和正则表达式类似)。因此,结合正则表达式理解这些 API 显得十分快速,所以我自作主张,加上了功能相近的正则表达式。例如,我们要用 CEP 匹配字母 x:

2.1.2 仅使用 where 和 or 写一个程序
比如说,我们现在有一个简单的需求,对于输入的数据流中,匹配所有以 x 或 y 开头的数据:

public class CepDemo {public static void main(String[] args) throws Exception {var environment = StreamExecutionEnvironment.getExecutionEnvironment();var stream = environment.setParallelism(1).addSource(new ReadLineSource("Data.txt"));// 使用 where 和 or 来定义两个需求;// 当然也可以放在一个 where 里。var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {@Overridepublic boolean filter(String s, Context<String> context) {return s.startsWith("x");}}).or(new IterativeCondition<>() {@Overridepublic boolean filter(String s, Context<String> context) throws Exception {return s.startsWith("y");}});// CEP.pattern 的第一个参数是数据流,第二个是规则;// 然后利用 select 方法抽取出匹配到的数据。// 这里用了 lambda 表达式CEP.pattern(stream, pattern).select((map ->Arrays.toString(map.get("start").toArray()))).addSink(new SinkFunction<>() {@Overridepublic void invoke(String value, Context context) {System.out.println(value);}});environment.execute();}
}

对于输入的数据流:

x1
z2
c3
y4

我们有输出:

读取:x1
[x1]
读取:z2
读取:c3
读取:y4
[y4]

可以看到,Flink CEP 可以根据输入的每一条数据进行匹配。单条数据可以是本文中的字符串,也可以是复杂的事件对象,当然也可以是字符。如果每一条数据都是一个字符,那 CEP 就和正则表达式十分相似了。

2.1.3 加上量词
接下来,还是在单个 Pattern 中,我们加上量词 API,研究研究 Flink CEP 是如何匹配多条数据的。从这里开始,事情和正则表达式有了一些差距。差距主要在结果的数量上。由于是流计算,因此在实际处理过程中,Flink 无法知道后续的数据,所以会输出所有匹配的结果。

例如,使用 timesOrMore() 函数,匹配以 a 开头的字符串出现 3 次及以上的情况,首先编写代码(其他代码与上方的例子完全一致,为节约篇幅不再列出,下同):

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {@Overridepublic boolean filter(String s, Context<String> context) {return s.startsWith("a");}
}).timesOrMore(3);

随后在Data.txt中输入如下字符串序列:

a1
a2
a3
b1
a4

运行程序,输出如下结果:

读取:a1
读取:a2
读取:a3
[a1, a2, a3]
读取:b1
读取:a4
[a1, a2, a3, a4]
[a2, a3, a4]

下面分析一下执行流程。程序开始后,等待数据流入。当a1和a2输入后,由于暂时不满足条件,所以没有产生结果,只是将数据储存在状态中。a3到来后,第一次满足了匹配条件,因此程序输出结果 [a1, a2, a3]。随后,b1输入,不满足条件;接下来a4输入。此时,a1、a2和a3依旧储存在状态中,因此依然可以参与匹配。匹配可以产生多个结果,但是有两个原则:

必须严格按照数据流入的顺序;

产生的结果必须包含当前元素;

原则 1 很好理解,由于数据的流入是按照 a1 -> a2 -> a3 -> a4 的顺序,所以结果生成的序列也必须按照这个顺序,不能删减中间数据,更不能打乱顺序。因此, [a1, a2, a4] 和 [a3, a2, a4, a1] 这种结果是不可能生成的。原则 2 就更好理解了,数据是因为 a4 的流入才产生的,再考虑到我们设定的量词条件是“三个及以上”,因此产生的结果只可能是 [a2, a3, a4] 和 [a1, a2, a3, a4]。

同理,如果我们在 Data.txt 最后加入一行 a5,则程序输出结果如下:

读取:a1
读取:a2
读取:a3
[a1, a2, a3]
读取:b1
读取:a4
[a1, a2, a3, a4]
[a2, a3, a4]
读取:a5
[a1, a2, a3, a4, a5]
[a2, a3, a4, a5]
[a3, a4, a5]

按照这种思路,如果我们继续加上 a6、a7、a8、……、a100,那么每个数据产生的结果会越来越多,因为 Flink CEP 会把所有符合条件的数据储存在状态里。「这样下去不行的,要不然内存养不起它的」。因此,oneOrMore() 和 timesOrMore() 之类的函数后面,一般都要跟上 until() 函数,从而指定终止条件。

2.1.4 把量词换成 times()
如果使用和上面一样的数据,但是把量词换成 times(3),会产生什么样的结果?我们首先修改代码:

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {@Overridepublic boolean filter(String s, Context<String> context) {return s.startsWith("a");}
}).times(3);

由于固定了只匹配三个,再加上前文提到的两个原则的束缚,结果就很明显了:

读取:a1
读取:a2
读取:a3
[a1, a2, a3]
读取:b1
读取:a4
[a2, a3, a4]
读取:a5
[a3, a4, a5]

从 a1 到 b1 的逻辑完全相同,当读取到 a4 时,由于只匹配 3 个,同时结果必须包含 a4,因此产生的结果只能是 [a2, a3, a4] 。同理读取到 a5 后,由于结果必须包含 a5 且只匹配 3 个,所以结果只能是 [a3, a4, a5] 。这种情况下,过期的数据会被清理掉,妈妈再也不用担心我的内存不够用了。

除了固定参数,times() 函数还支持 times(from, to) 指定边界。这种情况下的匹配结果和上文类似,相信大家很容易就能推出来,在此我就不再赘述了。

2.1.5 使用严格模式
大家也许注意到,上文的 Data.txt 中,一直有一个讨厌的 b1。由于不满足我们的基本匹配条件,b1 直接被我们的程序忽略掉了。这是因为 Flink CEP 默认采用了不严格的匹配模式,而在某些情况下,这种数据是不能忽略的,这时候就可以使用 consecutive() 函数,指定严格的匹配模式。修改代码如下:

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {@Overridepublic boolean filter(String s, Context<String> context) {return s.startsWith("a");}
}).times(3).consecutive();

运行程序,产生如下结果:

读取:a1
读取:a2
读取:a3
[a1, a2, a3]
读取:b1
读取:a4
读取:a5

此时,由于 a1、a2、a3 是紧密相连的,因此被成功匹配。而 a2、a3、a4 和 a3、a4、a5 中间由于多了一个 b1,在严格模式下不能被匹配。可以看出,严格模式下的匹配策略更像正则表达式。

2.2 多个 Pattern
一般而言,需要使用 CEP 的任务都得依靠多个 Pattern 才能解决。此时,可以使用 followedBy()、next() 等函数创建一个新的 Pattern,并按照不同的逻辑将新 Pattern 和前一个 Pattern 连接起来。

2.2.1 使用 followedBy() 创建一个新的 Pattern
我们再来看一下如何处理多个 Pattern,比如说我们需要匹配“包含 2-3 个 a 开头的字符串,同时包含 1-2 个 b 开头的字符串”的输入数据。

// 我们用 times(2,3) 来控制匹配 2-3 次;
// followBy 用于控制两个具有顺序的关系的 Pattern。
var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {@Overridepublic boolean filter(String s, Context<String> context) {return s.startsWith("a");}
}).times(2, 3).followedBy("middle").where(new IterativeCondition<String>() {@Overridepublic boolean filter(String s, Context<String> context) throws Exception {return s.startsWith("b");}
}).times(1, 2);CEP.pattern(stream, pattern).select(map -> {// 把匹配的结果装进 list 中。var list = map.get("start");list.addAll(map.get("middle"));return Arrays.toString(list.toArray());
}).addSink(new SinkFunction<>() {@Overridepublic void invoke(String value, Context context) {System.out.println(value);}
});

这里我们使用了 followedBy () 函数,该函数创建了一个名为 “middle” 的新 Pattern,新 Pattern 中包含了指向原 Pattern 的引用。同样发生变化的是 select 函数中的 lambda 表达式。在表达式中,我们除了获取名为 “start” 的 Pattern 中的数据,还获取了名为 “middle” 的 Pattern 的数据,并将他们拼在一起。这与正则表达式中的子表达式特别类似,实际上,我们可以将每个 Pattern 近似看作一个子表达式,在读取结果的时候,使用 Pattern 的名字,从 map 中提取出结果。

数据的输入为:

a1
a2
a3
b1
a4
a5
b2

数据输出为:

读取:a1
读取:a2
读取:a3
读取:b1
[a1, a2, a3, b1]
[a1, a2, b1]
[a2, a3, b1]
读取:a4
读取:a5
读取:b2
[a1, a2, a3, b1, b2]
[a1, a2, b1, b2]
[a2, a3, a4, b2]
[a2, a3, b1, b2]
[a3, a4, a5, b2]
[a3, a4, b2]
[a4, a5, b2]

一下子产生了这么多数据,我一开始还是很懵的。接下来我们逐步分析下:

a1, a2 依次读入,不满足整体条件,但是满足 “start” 条件,且产生了 [a1, a2] 这一中间结果,存在状态中;

a3 读入,不满足整体条件,但是满足 “start” 条件,且产生了 [a2, a3] 和 [a1, a2, a3] 两个结果;

b1 读入,满足 “middle” 条件,产生 [b1] 中间结果。此时整体条件满足,因此和上述中间结果组合输出 [a1, a2, a3, b1] 、 [a1, a2, b1] 和 [a2, a3, b1] ;

a4 读入,继续满足 “start” 条件,产生 [a2, a3, a4] 和 [a3, a4];两个结果,但是由于这两个结果是在 b1 读入之后产生的,因此这两个结果不能和 [b1] 进行组合;

a5 读入,继续满足 “start” 条件,产生 [a3, a4, a5] 和 [a4, a5] 两个中间结果,同理不能和 [b1] 进行组合;

b2 读入,继续满足 “middle” 条件,产生 [b1, b2] 和 [b2] 两个中间结果。这里开始比较复杂了,需要严格结合时间顺序来分析。由于 b1 是在 a4 之前读入的,因此包含 b1 的序列 [b1, b2] 只能与 [a1, a2] 、 [a2, a3] 和 [a1, a2, a3] 进行关联。而 [b2] 则可以与包含了 a4 或 a5 的 [a2, a3, a4] 、 [a3, a4]、 [a3, a4, a5] 和 [a4, a5] 四个序列关联,因此此时输出结果如下:

[a1, a2, a3, b1, b2]    // [a1, a2, a3] 和 [b1, b2] 关联
[a1, a2, b1, b2]        // [a1, a2] 和 [b1, b2] 关联
[a2, a3, a4, b2]        // [a2, a3, a4] 和 [b2] 关联
[a2, a3, b1, b2]        // [a2, a3] 和 [b1, b2] 关联
[a3, a4, a5, b2]        // [a3, a4, a5] 和 [b2] 关联
[a3, a4, b2]            // [a3, a4] 和 [b2] 关联
[a4, a5, b2]            // [a4, a5] 和 [b2] 关联

那么有一个问题,为什么 [b2] 不能与 [a1, a2] 、 [a2, a3] 和 [a1, a2, a3] 进行关联呢?还是要站在时间序列的角度进行解释。因为只有 b1 是跟随在这三个元素后面的,所以只有包含 b1 的两个序列([b1] 和 [b1, b2])可以和它们进行关联,这就是 followedBy 的含义。为了验证这一观点,我们在 Data.txt 最后加上一个 b3,在其他代码均不变的情况下,最后读入 b3 后,输出如下结果:

[a2, a3, a4, b2, b3]
[a3, a4, a5, b2, b3]
[a3, a4, b2, b3]
[a4, a5, b2, b3]

分析如下:当读入 b3 后,满足 “middle” 条件,生成 [b2, b3] 和 [b3]。其中,只有 [b2, b3] 包含了 b2,由于 b2 是距离 [a2, a3, a4] 、 [a3, a4]、 [a3, a4, a5] 和 [a4, a5] 四个序列最近的数据,因此只有 [b2, b3] 才能和上述四个序列关联。而 [b3] 由于不包含 b2,因此无法和它们关联。

2.2.2 将 followedBy() 换成 next()
可以将 next () 看作是加强版的 followedBy ()。在 followedBy 中,两个 Pattern 直接允许不紧密连接,例如上文中的 [a1, a2] 和 [b1] ,他们中间隔了一个 a3. 这种数据在 next () 中会被丢弃掉。使用上文同样的数据(不包括 b3),将代码中的 followedBy 换成 next,修改如下:

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {@Overridepublic boolean filter(String s, Context<String> context) {return s.startsWith("a");}
}).times(2, 3).next("middle").where(new IterativeCondition<String>() {@Overridepublic boolean filter(String s, Context<String> context) throws Exception {return s.startsWith("b");}
}).times(1, 2);

运行后,看到如下结果:

读取:a1
读取:a2
读取:a3
读取:b1
[a1, a2, a3, b1]
[a2, a3, b1]
读取:a4
读取:a5
读取:b2
[a1, a2, a3, b1, b2]
[a2, a3, b1, b2]
[a3, a4, a5, b2]
[a4, a5, b2]

和之前的结果进行分析,发现结果中的 [a1, a2, b1] 、 [a1, a2, b1, b2]、 [a2, a3, a4, b2] 和 [a3, a4, b2] 均被排除,因为他们相比原序列,分别缺少了 a3、a3、a5、a5。

2.2.3 greedy() 做了什么
关于 greedy () 的用法,可以说是十分令人迷惑的。我看了许多文章,对 greedy () 的描述几乎都是一笔带过。描述大多是 “尽可能多的匹配”,但是实际上,大多数情况下加不加 greedy () 几乎没有任何区别。「因为 greedy () 虽然被归为量词 API,但是它实际上是在多个 Pattern 中才能起作用的。」 为此,我找到了 greedy () 的实现逻辑,在 NFACompiler 类的 updateWithGreedyCondition 方法中,代码如下:

private void updateWithGreedyCondition(State<T> state,IterativeCondition<T> takeCondition) {for (StateTransition<T> stateTransition : state.getStateTransitions()) {stateTransition.setCondition(new RichAndCondition<>(stateTransition.getCondition(), new RichNotCondition<>(takeCondition)));}
}

阅读代码,发现该方法实际上添加了一个逻辑:「确认当前条件满足转换到下一个 state 所需的条件,且不满足当前 state 的条件」。意思就是,如果当前处于 Pattern1,但是出现了一条同时满足两个 Pattern1 和 Pattern2 条件的数据,在不加 greedy () 的情况下,会跳转到 Pattern2,但是如果加了 greedy (),则会留在 Pattern1。下面我们来验证一下,编写如下代码:

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {@Overridepublic boolean filter(String s, Context<String> context) {return s.startsWith("a");}
}).times(2, 3).next("middle").where(new IterativeCondition<String>() {@Overridepublic boolean filter(String s, Context<String> context) throws Exception {return s.length() == 3;}
}).times(1, 2);

在这一代码中,如果一条数据为a开头,且长度为3,则同时满足“start”和“middle”。同时,为了方便区分数据到底属于哪个Pattern,我们在输出前加入分隔符:

CEP.pattern(stream, pattern).select(map -> {var list = map.get("start");list.add("|");list.addAll(map.get("middle"));return Arrays.toString(list.toArray());
}).addSink(new SinkFunction<>() {@Overridepublic void invoke(String value, Context context) {System.out.println(value);}
});

准备如下数据:

a
a1
a22
b33

在不加greedy()的情况下,运行结果如下:

读取:a
读取:a1
读取:a22
[a, a1, |, a22]
读取:b33
[a, a1, a22, |, b33]
[a, a1, |, a22, b33]
[a1, a22, |, b33]

观察结果,可知a22在两个Pattern中左右横跳,输出了所有可能的结果。接下来我们加上greedy():

var pattern = Pattern.<String>begin("start").where(new IterativeCondition<>() {@Overridepublic boolean filter(String s, Context<String> context) {return s.startsWith("a");}
}).times(2, 3).greedy().next("middle").where(new IterativeCondition<String>() {@Overridepublic boolean filter(String s, Context<String> context) throws Exception {return s.length() == 3;}
}).times(1, 2);

运行结果如下:

读取:a
读取:a1
读取:a22
读取:b33
[a, a1, a22, |, b33]
[a1, a22, |, b33]

此时,a22 被划到了 “start” 这一 Pattern 中。由此可见,greedy () 影响的是 “同时满足两个 Pattern 条件的数据的划分逻辑”,而且加了 greedy () 后,产生的结果会变少,并不是直观印象中的,产生尽可能多条的数据。

2.代码实践
简单看一下代码,主要以注释方式进行讲解

输入数据为:

952483,310884,4580532,pv,1511712000
952483,5119439,982926,pv,1511712000
952483,4484065,1320293,pv,1511712000
952483,5097906,149192,pv,1511712000
952483,2348702,3002561,pv,1511712000
952483,2157435,1013319,buy,1511712020
952483,1132597,4181361,pv,1511712020
952483,3505100,2465336,pv,1511712020
952483,3815446,2342116,pv,1511712030
952483,3815446,2442116,buy,1511712030

数据源代码为:

package com.aze.producer;import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.io.BufferedReader;
import java.io.FileReader;/*** @Author: aze* @Date: 2020-09-16 14:41*/
public class ReadLineSource  implements SourceFunction<String> {private String filePath;private boolean canceled = false;public ReadLineSource(String filePath){this.filePath = filePath;}@Overridepublic void run(SourceContext<String> sourceContext) throws Exception {BufferedReader reader = new BufferedReader(new FileReader(filePath));while (!canceled && reader.ready()){String line = reader.readLine();sourceContext.collect(line);}}@Overridepublic void cancel() {canceled = true;}
}

主代码为

package com.aze.consumer;import lombok.val;
import com.aze.producer.ReadLineSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.time.Duration;/*** 订单流失率** @Author: aze* @Date: 2020-09-23 14:45*/
public class OrderLostCEP {public static void main(String[] args) throws Exception {val env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);val dataStream = env.addSource(new ReadLineSource("src/main/resources/data.txt"));// 先配置一下事件时间// 然后利用 uid 和商品类别进行分组(商品类别的第一个字母代表一级类别)val keyStream = dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(30)).withTimestampAssigner((SerializableTimestampAssigner<String>)(s, l) -> Long.parseLong(s.split(",")[4]) * 1000)).keyBy((KeySelector<String, String>) s ->s.split(",")[0] + "-" + s.split(",")[2].substring(0, 1));// 我们采用不丢弃的策略,主要逻辑在于,点击商品 A,而购买同类商品 B 和同类商品 C 算作两次订单流失val noSkip = AfterMatchSkipStrategy.noSkip();// 制定一个匹配规则;// 用 followedByAny 指定不确定的松散连续,读者可以试下其与 followedBy 的区别。val pattern = Pattern.<String>begin("start", noSkip).where(new IterativeCondition<String>() {@Overridepublic boolean filter(String s, Context<String> ctx) {return "pv".equals(s.split(",")[3]);}}).within(Time.minutes(10)).followedByAny("end").where(new IterativeCondition<String>() {@Overridepublic boolean filter(String s, Context<String> ctx) {return "buy".equals(s.split(",")[3]);}});// 经过 CEP 规则匹配后,抽取点击的事件流// 利用商品 id 进行分组,并利用 process 进行状态统计。val patStream = CEP.pattern(keyStream, pattern).select(map -> map.get("start").get(0)).keyBy((KeySelector<String, String>) s -> s.split(",")[1]).process(new KeyedProcessFunction<String, String, Object>() {private ValueState<Long> clickState;@Overridepublic void open(Configuration parameters) {clickState = getRuntimeContext().getState(new ValueStateDescriptor<>("OrderLost", Long.class));}@Overridepublic void processElement(String in, Context ctx, Collector<Object> out)throws Exception {Long clickValue = clickState.value();clickValue = clickValue == null ? 1L : ++clickValue;clickState.update(clickValue);out.collect("【" + in.split(",")[1] + "】OrderLost:" + clickValue);}});patStream.print();env.execute("test");}}

结果:

【3505100】OrderLost:1
【3815446】OrderLost:1
【4484065】OrderLost:1
【5097906】OrderLost:1

3.总结
本文主要介绍了如何使用 FlinkCEP,并给出诸多 Demo 进行学习。

但完成开头的需求是,我采用的是基于 uid 和商品类别进行分组,然后用 cep 去挖掘配对规则。当然也可以先基于 uid 进行分组,然后用 cep 挖掘配对模式 [点击商品、购买商品],然后利用 select 去过滤是否是同类商品。

最后留一个新需求:如果需要同时计算商品的下单量、CTR 该怎么操作?

4.参考
《探索如何使用Flink CEP》

《Apache Flink CEP 实战》

【Flink】基于 Flink CEP 实时计算商品订单流失量相关推荐

  1. 用户行为分析大数据系统(实时统计每个分类被点击的次数,实时计算商品销售额,统计网站PV、UV )

    Spark Streaming实战对论坛网站动态行为pv,uv,注册人数,跳出率的多维度分析_小强签名设计 的博客-CSDN博客_spark streaming uv 实时统计每天pv,uv的spar ...

  2. 实时计算-多级订单金额,及下级人数

    1 系统概述 人物关系为代理模式,一级代理包含二级代理,二级代理包含三级代理. 需求为实时计算每个用户的订单金额,并取出金额的TOP100. 并实时计算当天下级人数. 1.1 指标使用方式 单用户订单 ...

  3. 基于Java+Springboot+vue网上商品订单转手系统设计和实现

    博主介绍:✌全网粉丝30W+,csdn特邀作者.博客专家.CSDN新星计划导师.java领域优质创作者,博客之星.掘金/华为云/阿里云/InfoQ等平台优质作者.专注于Java技术领域和毕业项目实战✌

  4. 腾讯基于 Flink 的实时流计算平台演进之路

    原文地址:https://www.infoq.cn/article/TjDeQDJQpKZ*NpG71pRW 大家好,我是来自腾讯大数据团队的杨华(vinoyang),很高兴能够参加这次北京的 QCo ...

  5. 基于实时计算Flink版的场景解决方案demo

    简介:通过两个demo分享技术实时计算flink版的解决方案 本文整理自阿里云智能行业解决方案专家GIN的直播分享 直播链接:https://developer.aliyun.com/learning ...

  6. Oceanus:基于Apache Flink的一站式实时计算平台

    Flink Forward是由Apache官方授权,用于介绍Flink社区的最新动态.发展计划以及Flink相关的生产实践经验的会议.2018年12月20日,Flink Forward首次来到中国举办 ...

  7. 如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了

    作者 | 潘国庆编辑 | Natalie AI 前线导读:Flink 已经渐渐成为实时计算引擎的首选之一,从简单的实时 ETL 到复杂的 CEP 场景,Flink 都能够很好地驾驭.本文整理自携程实时 ...

  8. Flink 零基础实战教程:如何计算实时热门商品

    在上一篇入门教程中,我们已经能够快速构建一个基础的 Flink 程序了.本文会一步步地带领你实现一个更复杂的 Flink 应用程序:实时热门商品.在开始本文前我们建议你先实践一遍上篇文章,因为本文会沿 ...

  9. 【硬刚大数据】Flink在实时在实时计算平台和实时数仓中的企业级应用小结

    欢迎关注博客主页:https://blog.csdn.net/u013411339 欢迎点赞.收藏.留言 ,欢迎留言交流! 本文由[王知无]原创,首发于 CSDN博客! 本文首发CSDN论坛,未经过官 ...

最新文章

  1. c语言Inqueue函数用法,C语言用两个栈实现队列(完整版)
  2. php56wmysql_centos6.5下使用yum完美搭建LNMP环境(php5.6)【Fizzday整理】
  3. 笔记-信息化与系统集成技术-国务院关于印发新一代人工智能发展规划的通知...
  4. Web 趋势榜:上周最有意思、最热门的 10 大 Web 项目 - 210625
  5. DCMTK:测试程序中定义的功能和类 ofmem.h(OF shared_ptr)
  6. OC类导入Swift工程演示
  7. 并发工具类【线程安全相关的类】
  8. [C++]在Visual Studio 2010中使用Google Test - 配置
  9. 迭代获取ViewState
  10. 《机器人爱好者(第2辑)》——部署机械手或末端执行器
  11. 定义Student类,该类中有Sting name和int age两个属性,该类实现Comparable接口,实现根据学生姓名和年龄排序,该类重写toString()输出学生的姓名和年龄。
  12. 《机器学习—李宏毅》HW1
  13. 计算机基础常用英语,计算机常用基础英语
  14. 《 ERP高级计划》书的解读之零物料约束和能力约束逻辑(蔡颖)(转)
  15. 实现表格内容第一行居中,其他行与第一行左对齐
  16. 为什么不想做产品经理
  17. Python大学计算机程序设计-通讯录管理系统
  18. 上传大文件解决方案方法
  19. 天池大赛——天猫用户复购预测
  20. 空气传导和骨传导耳机哪个好?这两种耳机有什么区别?

热门文章

  1. 工信部通报侵害用户权益APP:腾讯应用宝、小米应用商店等在列
  2. 特斯拉副总裁陶琳:Model Y 本月开始陆续交付
  3. 供应商禁止供货后,这个城市行动了:社区团购不得低价倾销、排挤对手
  4. iPhone 12 Pro拆解:韩国零部件占比最高 达26.8%
  5. 吊打奔驰宝马!这个又贵又丑的“玩具”,为何让男人集体高潮?
  6. 高通CEO:已向美国申请向华为出售芯片 但尚未有回应
  7. 滴滴出行:10月国内月活用户突破4亿
  8. 万元华为旗舰新机超21万人预约,网友:有钱人真的多!
  9. 陌陌直播公益课复课 带乡村孩子“打卡”丝绸之路
  10. 中国女排代言作业帮直播课,作业帮累计用户已超8亿