复合事件处理(Complex Event Processing,CEP)是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。

我们从一个案例开始源码分析之路。

1.案例代码

Import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;
import java.util.Map;public class FlinkCepTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);//        数据源KeyedStream<Tuple3<String, Long, String>, String> source = env.fromElements(new Tuple3<String, Long, String>("1001", 1656914303000L, "success"), new Tuple3<String, Long, String>("1001", 1656914304000L, "fail"), new Tuple3<String, Long, String>("1001", 1656914305000L, "fail"), new Tuple3<String, Long, String>("1001", 1656914306000L, "success"), new Tuple3<String, Long, String>("1001", 1656914307000L, "fail"), new Tuple3<String, Long, String>("1001", 1656914308000L, "success"), new Tuple3<String, Long, String>("1001", 1656914309000L, "fail"), new Tuple3<String, Long, String>("1001", 1656914310000L, "success"), new Tuple3<String, Long, String>("1001", 1656914311000L, "fail"), new Tuple3<String, Long, String>("1001", 1656914312000L, "fail"), new Tuple3<String, Long, String>("1001", 1656914313000L, "success"), new Tuple3<String, Long, String>("1001", 1656914314000L, "end")).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((event, timestamp) ->{return event.f1;})).keyBy(e -> e.f0);Pattern<Tuple3<String, Long, String>,?> pattern = Pattern.<Tuple3<String, Long, String>>begin("begin").where(new Begincondition()).followedByAny("middle").where(new Middlecondition()).followedBy("end").where(new Endcondition());//TODO 内部构建 PatternStreamBuilder 并返回 PatternStreamPatternStream patternStream = CEP.pattern(source, pattern);patternStream.select(new PatternSelectFunction<Tuple3<String, Long, String>,Map>() {@Overridepublic Map select(Map map) throws Exception {return map;}}).print();env.execute("cep");}
}

2.源码分析

根据上述提供的案例接下来我们分几个模块进行源码解析:Pattern构建,内部包含 NFAFactory,Cepopertor等构建逻辑,数据处理,超时处理,获取数据。

//TODO 内部构建 PatternStreamBuilder 并返回 PatternStream
PatternStream patternStream = CEP.pattern(source, pattern);PatternStreamBuilder.forStreamAndPattern(inputStream, pattern);static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.EventTime, null, null);}

PatternStream.select() 内部会调用 PatternStreamBuilder.build()

public <R> SingleOutputStreamOperator<R> process(final PatternProcessFunction<T, R> patternProcessFunction,final TypeInformation<R> outTypeInfo) {return builder.build(outTypeInfo,builder.clean(patternProcessFunction));}

org.apache.flink.cep.PatternStreamBuilder#build

<OUT, K> SingleOutputStreamOperator<OUT> build(final TypeInformation<OUT> outTypeInfo,final PatternProcessFunction<IN, OUT> processFunction) {checkNotNull(outTypeInfo);checkNotNull(processFunction);//TODO 构造序列化器final TypeSerializer<IN> inputSerializer =inputStream.getType().createSerializer(inputStream.getExecutionConfig());final boolean isProcessingTime = timeBehaviour == TimeBehaviour.ProcessingTime;//TODO 判断是否是获取超时结果的 select/flatSelectfinal boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;//TODO 构建 NFAFactory 工厂类// 工厂对象还包含了用户所有的State集合final NFACompiler.NFAFactory<IN> nfaFactory =NFACompiler.compileFactory(pattern, timeoutHandling);//TODO 创建 CepOperatorfinal CepOperator<IN, K, OUT> operator =new CepOperator<>(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);final SingleOutputStreamOperator<OUT> patternStream;if (inputStream instanceof KeyedStream) {KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);} else {KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();patternStream =inputStream.keyBy(keySelector).transform("GlobalCepOperator", outTypeInfo, operator).forceNonParallel();}return patternStream;}

org.apache.flink.cep.nfa.compiler.NFACompiler#compileFactory 构建 NFACompiler.NFAFactory

org.apache.flink.cep.operator.CepOperator#open() 初始化逻辑

 @Overridepublic void open() throws Exception {super.open();//TODO 初始化 定时器服务timerService =getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);//TODO 创建NFA初始化了所有的顶点state和边transition// 这个时候的state集合已经初始化完成了nfa = nfaFactory.createNFA();//TODO 给判断逻辑设置 cep运行环境nfa.open(cepRuntimeContext, new Configuration());context = new ContextFunctionImpl();collector = new TimestampedCollector<>(output);cepTimerService = new TimerServiceImpl();// metricsthis.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);}

org.apache.flink.cep.operator.CepOperator#processElement() 接收数据进行处理

 @Overridepublic void processElement(StreamRecord<IN> element) throws Exception {//TODO 判断当前语义为 ProcessingTime 还是 EventTimeif (isProcessingTime) {if (comparator == null) {// there can be no out of order elements in processing timeNFAState nfaState = getNFAState();long timestamp = getProcessingTimeService().getCurrentProcessingTime();advanceTime(nfaState, timestamp);processEvent(nfaState, element.getValue(), timestamp);updateNFA(nfaState);} else {long currentTime = timerService.currentProcessingTime();bufferEvent(element.getValue(), currentTime);// register a timer for the next millisecond to sort and emit buffered datatimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, currentTime + 1);}} else {//TODO 获取事件的时间long timestamp = element.getTimestamp();//TODO 获取数据IN value = element.getValue();// In event-time processing we assume correctness of the watermark.// Events with timestamp smaller than or equal with the last seen watermark are// considered late.// Late events are put in a dedicated side output, if the user has specified one.//TODO 在事件时间处理中,我们假设水印的正确性。// 时间戳小于或等于最后看到的水印的事件被认为是迟到的。// 如果用户指定了,则将延迟事件放在专用端输出中。if (timestamp > timerService.currentWatermark()) {// we have an event with a valid timestamp, so// we buffer it until we receive the proper watermark.//TODO 注册Watermark定时器saveRegisterWatermarkTimer();//TODO 缓存数据 key为事件时间 value为数据集合bufferEvent(value, timestamp);} else if (lateDataOutputTag != null) {output.collect(lateDataOutputTag, element);} else {numLateRecordsDropped.inc();}}}

org.apache.flink.cep.operator.CepOperator#onEventTime() 触发计算

 @Overridepublic void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {// 1) get the queue of pending elements for the key and the corresponding NFA,// 2) process the pending elements in event time order and custom comparator if exists//       by feeding them in the NFA// 3) advance the time to the current watermark, so that expired patterns are discarded.// 4) update the stored state for the key, by only storing the new NFA and MapState iff they//        have state to be used later.// 5) update the last seen watermark.//TODO//  1)获取键的挂起元素队列和相应的NFA,//  2)按照事件时间顺序处理挂起的元素,如果存在自定义比较器,则在NFA中输入它们//  3)将时间提前到当前水印,丢弃过期的图案。//  4)更新密钥的存储状态,只存储新的 NFA 和 MapState,如果它们有状态要稍后使用。//  5)更新最后一次出现的水印。// STEP 1//TODO 获取优先队列的中的数据PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();//TODO 用于保存未匹配的完成的状态,和已匹配完成的状态,这里get为空时会初始化// 先遍历所有的找到其中为start的state作为下一个可匹配的状态NFAState nfaState = getNFAState();// STEP 2while (!sortedTimestamps.isEmpty()&& sortedTimestamps.peek() <= timerService.currentWatermark()) {long timestamp = sortedTimestamps.poll();//TODO 处理超时未匹配的数据advanceTime(nfaState, timestamp);try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {elements.forEachOrdered(event -> {try {//TODO 将数据排好序(事件时间)以后使用NFA真正的处理逻辑processEvent(nfaState, event, timestamp);} catch (Exception e) {throw new RuntimeException(e);}});}elementQueueState.remove(timestamp);}// STEP 3advanceTime(nfaState, timerService.currentWatermark());// STEP 4updateNFA(nfaState);if (!sortedTimestamps.isEmpty() || !partialMatches.isEmpty()) {saveRegisterWatermarkTimer();}}

org.apache.flink.cep.operator.CepOperator#processEvent() 处理数据

/*** Process the given event by giving it to the NFA and outputting the produced set of matched* event sequences.** @param nfaState Our NFAState object* @param event The current event to be processed* @param timestamp The timestamp of the event*/private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {//TODO 得到匹配上规则的map,map中包含了这个正则的所有数据Collection<Map<String, List<IN>>> patterns =nfa.process( //TODO 真正的处理逻辑sharedBufferAccessor,nfaState,event,timestamp,afterMatchSkipStrategy,cepTimerService);//TODO 这个map包含了匹配上的一个正则,下面会调用用户的select或者flatselect方法,往下游发送processMatchedSequences(patterns, timestamp);}}

org.apache.flink.cep.nfa.NFA#doProcess 开始处理数据

===> 第一条数据为 (1001,1656914303000,success)

当前正在匹配的只有 begin 的state

经过 org.apache.flink.cep.nfa.NFA#computeNextStates 匹配之后

新的匹配中的state 变为两个 同时版本值会递增 同状态+1 下一个状态新增下一级

org.apache.flink.cep.operator.CepOperator#processMatchedSequences 判断是否匹配完成输出

===> 第二条数据为 (1001,1656914304000,fail)

经过 org.apache.flink.cep.nfa.NFA#computeNextStates 匹配之后

===> 第三数据为 (1001,1656914305000,fail)

经过 org.apache.flink.cep.nfa.NFA#computeNextStates 匹配之后

===> 第四数据为(1001,1656914306000,success)

经过 org.apache.flink.cep.nfa.NFA#computeNextStates 匹配之后

此时当前数据匹配到begin,startState版本+1,并且新增一个middle的状态

===> 第五数据为(1001,1656914307000,end)

经过 org.apache.flink.cep.nfa.NFA#computeNextStates 匹配

正在匹配中的状态变成3个,两个end的state变为完成状态。

判断匹配到state状态是否为Final状态,是添加到potentialMatches中。

之后遍历从sharedBufferAccessor中获取数据。

SharedBufferAccessor 简介

Flink CEP设计了一个带版本的共享缓冲区。它会给每一次匹配分配一个版本号并使用该版本号来标记在这次匹配中的所有指针。

   

做三个独立的缓冲区实现上是没有问题,但是我们发现缓冲区3状态stat1的堆栈和缓冲区1状态stat1的堆栈是一样的,我们完全没有必要分别占用内存。而且在实际的模式匹配场景下,每个缓冲区独立维护的堆栈中可能会有大量的数据重叠。随着流事件的不断流入,为每个匹配结果独立维护缓存区占用内存会越来越大。所以Flink CEP 提出了共享缓存区的概念(SharedBuffer),就是用一个共享的缓存区来表示上面三个缓存区。

org.apache.flink.cep.operator.CepOperator#processMatchedSequences 处理匹配完成的state

processMatchedSequences内部调用自定义的 select函数 输出满足模式的数据

3.参考链接

Flink源码解读系列 | Flink中的CEP复杂事件处理源码分析

Flink CEP-NFA详解_听挽风讲大数据的博客-CSDN博客_flink nfa

Flink Cep 源码分析相关推荐

  1. Flink Watermark 源码分析

    随着 flink 的快速发展与 API 的迭代导致新老版本差别巨大遂重拾 flink,在回顾到时间语义时对 watermark 有了不一样的理解. 一.如何生成 在 flink 1.12(第一次学习的 ...

  2. 从flink-example分析flink组件(3)WordCount 流式实战及源码分析

    前面介绍了批量处理的WorkCount是如何执行的 <从flink-example分析flink组件(1)WordCount batch实战及源码分析> <从flink-exampl ...

  3. Flink源码分析 - 源码构建

    本篇文章首发于头条号Flink源码分析 - 源码构建,欢迎关注我的头条号和微信公众号"大数据技术和人工智能"(微信搜索bigdata_ai_tech)获取更多干货,也欢迎关注我的C ...

  4. [源码分析] 从FlatMap用法到Flink的内部实现

    [源码分析] 从FlatMap用法到Flink的内部实现 0x00 摘要 本文将从FlatMap概念和如何使用开始入手,深入到Flink是如何实现FlatMap.希望能让大家对这个概念有更深入的理解. ...

  5. flink设置watermark以及事件时间字段源码分析

    flink设置watermark以及事件时间字段源码分析 背景 1.1.提取时间戳字段,用于事件时间语义处理数据 1.2.设置水位线(水印)watermark TimestampAssigner 核心 ...

  6. 【报错】flink源码分析: has no more allocated slots与思考

    文章目录 一. 任务描述与一句话 1. 任务描述 2. 一句话 二. 日志分析 1. 申请一个task manager 2. 大概3分钟后运行这个tm时,报资源找不到 三. 源码分析与报错机制定位 1 ...

  7. Flink ParameterTool fromArgs源码分析

    一.源码路径 java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java 二.源码 /** Licensed ...

  8. Alink漫谈(十六) :Word2Vec源码分析 之 建立霍夫曼树

    Alink漫谈(十六) :Word2Vec源码分析 之 建立霍夫曼树 文章目录 Alink漫谈(十六) :Word2Vec源码分析 之 建立霍夫曼树 0x00 摘要 0x01 背景概念 1.1 词向量 ...

  9. Flink Checkpoint源码浅析

    1. JobManager 端checkpoint调度 dispatcher分发任务后会启动相应的jobMaster, 在创建jobMaster 构建过程中会执行jobGraph -> exec ...

最新文章

  1. GBDT和GNN结合,结果怎么样?
  2. [Cocos2d-x For WP8]矩形碰撞检测
  3. Unknown CMake command add_compile_definitions
  4. docker分离部署lnmp
  5. NYOJ 420 P次方求和
  6. Java EE——SpringMVC框架学习
  7. 永远不会执行的cron表达式
  8. 用户和组相关的配置文件总结
  9. XidianOJ 1123 K=1 Problem of Orz Pandas
  10. vb6.0 定义一个公共类_纠正网上的错误:能不能自定义一个类叫java.lang.System/String?...
  11. spring boot (整合redis)
  12. Python爬虫开源项目代码分享,100个
  13. C51单片机实验——矩阵按键
  14. 克隆巴赫系数 Cronbach‘s alpha 及 R, Python 实现
  15. 一元二次方程的简单解法
  16. (HTML+CSS+JS)仿小米官网首页 含源码
  17. php语言中的符号,php语言中的面向对象
  18. proteus仿真micropython_【雕爷学编程】MicroPython动手做(07)——零基础学MaixPy之机器视觉...
  19. 实例分割向:Mask R-CNN
  20. 提示网站服务器403,如何解决电脑网页提示网站拒绝显示此网页和HTTP 403的问题...

热门文章

  1. 深度学习(神经网络)
  2. scanf函数输入数据
  3. 解决Latex中参考文献没有引用却依然在出现的问题
  4. linux定时对准时间,Linux 时间定时同步操作
  5. 为什么安装pycharm要配置jdk_JDK安装配置
  6. 20新闻与传播考研视频资料持续更新中
  7. 总有一天我会绽放属于自己的光彩
  8. 前端基础:了解计算机语言和编程语言
  9. 交换机Access、Hybrid和Trunk三种模式
  10. YOLOv5改进Neck结构|首发最新原创:改进设计 Eff-QAFPN 结构,具有量化感知神经网络设计的高效网络结构 ,该网络结构表现强势