1.概述

我们是用processfunction实现的cep动态更新,然后看到这个是原生api感觉有趣,研究一下

原文:https://mp.weixin.qq.com/s/mh–wQvAWQq2tDPKq0-m8Q

规则引擎通常对我们的理解就是用来做模式匹配的,在数据流里面检测满足规则要求的数据。有人会问为什么需要规则动态变更呢?直接修改了规则把服务重启一下不就可以了吗,这个当然是不行的,规则引擎里面通常会维护很多不同的规则,例如在监控告警的场景下,如果每个人修改一下自己的监控阈值,就重启一下服务,必然会影响其他人的使用,因此需要线上满足规则动态变更加载。本篇基于Flink-Cep 来实现规则动态变更加载,同时参考了Flink中文社区刘博老师的分享(https://developer.aliyun.com/article/738454),在这个分享里面是针对在处理流中每一个Key使用不同的规则,本篇的讲解将不区分key的规则。
实现分析
•外部加载:通常规则引擎会有专门的规则管理模块,提供用户去创建自己的规则,对于Flink任务来说需要到外部去加载规则
•动态更新:需要提供定时去检测规则是否变更
•历史状态清理:在模式匹配中是一系列NFAState 的不断变更,如果规则发生变更那么这些State也就是无用的了,需要清理掉
•易容的API: 不同的业务开发人员可能会有自己的规则管理、定时策略等,那么需要对外提供易用的API

实现步骤
用户API定义: InjectionPatternFunction 用于获取、定义用户的规则
package org.apache.flink.cep.functions;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.cep.pattern.Pattern;

import java.io.Serializable;

/**

  • @param
    /
    public interface InjectionPatternFunction extends Function, Serializable {
    /
    *

    • 你可能有一些初始化的工作
      */
      public void init() throws Exception;

    /**

    • 获取新的pattern
    • @return
      */
      public Pattern<T,T> inject() throws Exception;

    /**

    • 一个扫描周期:ms
    • @return
      */
      public long getPeriod() throws Exception;

    /**

    • 规则是否发生变更
    • @return
      */
      public boolean isChanged() throws Exception;
      }
      那么如何将这个API暴露出去呢?正常情况的使用是:

CEP.pattern(dataStream,pattern)
希望以同样的方式暴露:

CEP.injectionPattern(dataStream,new YourInjectionPatternFunction)
就需要在CEP-Lib里面进行改造:

package org.apache.flink.cep
//CEP 里面增加方法
public static PatternStream injectionPattern(
DataStream input,
InjectionPatternFunction injectionPatternFunction){
return new PatternStream<>(input,injectionPatternFunction); //在PatternStream 里面增加对应的构造函数
}
同样需要在PatternStreamBuilder.build 进行改造:

CepOperator<IN, K, OUT> operator=null;
if(injectionPatternFunction==null){
final NFACompiler.NFAFactory nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
operator = new CepOperator<>(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
pattern.getAfterMatchSkipStrategy(),
processFunction,
lateDataOutputTag);
}else{
operator = new CepOperator<>(
inputSerializer,
isProcessingTime,
injectionPatternFunction, // 将InjectionPatternFunction 传给了CepOperator
comparator,
null,
processFunction,
lateDataOutputTag,null);
}
加载Pattern
上述步骤已经将InjectionPatternFunction 加载到CepOperator 中,接下来就需要从InjectionPatternFunction中获取Pattern并且构造NFA
if(injectionPatternFunction!=null){
injectionPatternFunction.init();
Pattern pattern=injectionPatternFunction.inject();
afterMatchSkipStrategy=pattern.getAfterMatchSkipStrategy();
boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;
nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
long period=injectionPatternFunction.getPeriod();
// 注册了一个定时检测规则是否变更的定时器
if(period>0){
getProcessingTimeService().registerTimer(timerService.currentProcessingTime()+period,this::onProcessingTime);
}
}
nfa = nfaFactory.createNFA();
nfa.open(cepRuntimeContext, new Configuration());
上面注册的定时器需要实现ProcessingTimeCallback 接口的onProcessingTime 方法

@Override public void onProcessingTime(long timestamp) throws Exception {

 //先检查是否变更if(injectionPatternFunction.isChanged()){//重新注入Pattern pattern=injectionPatternFunction.inject();afterMatchSkipStrategy=pattern.getAfterMatchSkipStrategy();boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);nfa = nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());refreshVersion.incrementAndGet();}//重新注册if(injectionPatternFunction.getPeriod()>0){

getProcessingTimeService().registerTimer(timerService.currentProcessingTime()+injectionPatternFunction.getPeriod(),this::onProcessingTime);
}
}
至此已经完成了动态加载与定时检测,接下来需要实现状态的清理动作。

状态清理
状态清理一共分为两块: 匹配状态数据清理、定时器清理;
状态清理有两种方式:一种是对KeyedState 执行clear操作,就是每处理一个key时执行清理操作;另外一种方式是getKeyedStateBackend().applyToAllKeys 一次性清理所有的状态,这种方式可能会导致任务消费阻塞,因此使用第一种方式;
另外需要思考的一个问题是如何判断状态是否需要清理?这里可以使用版本比对的方式进行处理,每一次规则变更对应的version提升,然后在使用该version与数据的version进行比对处理。
定义几个状态变量:

/**
* 动态的pattern注入
*/
private InjectionPatternFunction injectionPatternFunction;

/***  表示的是一个version*/
ListState<Integer> refreshFlagState; //nfa 的version 需要持久化
private AtomicInteger refreshVersion;  //    刷新nfa的version
private ValueState<Integer> needRefresh; //  每一个key 对应一个versionprivate ListState<Long> registerTimeState;// 注册定时器存储的时间

在processElement里面执行状态清理动作:

if(injectionPatternFunction!=null){
int currVersion=needRefresh.value(); //当前key的版本
if(currVersion<refreshVersion.get()){ //版本不一致
//那么就开始执行清理动作 状态 与 定时器, 应该没有其他的了吧
computationStates.clear();
elementQueueState.clear();
partialMatches.releaseData();
//删除定时器相关的操作
Iterable registerTime=registerTimeState.get();
if(registerTime!=null){
Iterator registerTimeIter=registerTime.iterator();
while(registerTimeIter.hasNext()){
Long l=registerTimeIter.next();
timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE,l); //删除定时器
timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE,l);
registerTimeIter.remove(); //把状态清理一下
}
}

  needRefresh.update(refreshVersion.get()); //更新到当前的版本}}

在上面用到的registerTimeState 状态数据从哪里来的呢?比喻说我们需要做数据排序比较或者是事件时间的语义,通常需要先缓存数据,然后会做一个排序操作,最后做匹配,那么这个触发的就是由定时器来完成的。接下来看一下registerTimeState中数据来源入口:
//processElement中排序与事件时间处理逻辑中增加saveRegisterTime方法
//time 表示触发的时间
private void saveRegisterTime(long time) throws Exception {
if(injectionPatternFunction!=null){
registerTimeState.add(time);
}
}
同样在定时器触发,也需要将registerTimeState 中对应的时间移除掉。另外如果状态变更了但是还未来得及清理定时器,那么就有可能造成定时器触发,可以在onEventTime 或者onProcessingTime方法里面做一个前置的版本比对判断,如果version不一致就不做任何处理或者提前清理定时器的数据。
在上面自定义了一些状态,接下来看一下状态的初始化与保存操作:
//initializeState 方法
if(injectionPatternFunction!=null) {
/**
* 两个标识位状态
*/
refreshFlagState = context.getOperatorStateStore()
.getUnionListState(new ListStateDescriptor(“refreshFlagState”, Integer.class));
if (context.isRestored()) {
if (refreshFlagState.get().iterator().hasNext()) {
refreshVersion = new AtomicInteger(refreshFlagState.get().iterator().next());
}
} else {
refreshVersion = new AtomicInteger(0);
}
needRefresh = context.getKeyedStateStore()
.getState(new ValueStateDescriptor(“needRefreshState”, Integer.class, 0));
registerTimeState = context.getKeyedStateStore()
.getListState(new ListStateDescriptor(“registerTimeState”, Long.class));
}
可以看出refreshFlagState 使用的是一个Union类型的Operator-State,这个思考题留给大家这个为什么要这样使用。对应这种类型state通常会在定义一下:

@Override public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
if(injectionPatternFunction!=null){
refreshFlagState.clear();
refreshFlagState.add(refreshVersion.get());
}
}
至此整个流程完成。

总结
本篇介绍cep如何实现动态规则加载,给出了大部分的关键实现代码,需要与前一篇给出的demo结合使用,对于不同Key的变更,需要定义与Key相关联的NFA,其他的处理逻辑大体相同,欢迎大家一起交流。

【flink】Flink-Cep实现规则动态更新相关推荐

  1. Flink-Cep实现规则动态更新

    Flink-Cep实现规则动态更新 规则引擎通常对我们的理解就是用来做模式匹配的,在数据流里面检测满足规则要求的数据.有人会问为什么需要规则动态变更呢?直接修改了规则把服务重启一下不就可以了吗,这个当 ...

  2. 【Spark】SparkStreaming-流处理-规则动态更新-解决方案

    SparkStreaming-流处理-规则动态更新-解决方案 image2017-10-27_11-10-53.png (1067×738)elasticsearch-headElasticsearc ...

  3. Spark/Flink广播实现作业配置动态更新

    点击上方"zhisheng",选择"设为星标" 后台回复"ffa"可以查看 Flink 资料 前言 在实时计算作业中,往往需要动态改变一些配 ...

  4. 网络安全公司奇安信集团是如何基于 Flink 构建 CEP 引擎实时检测网络攻击【未来不可忽视的网络安全】

    摘要: 奇安信集团作为一家网络安全公司是如何基于 Flink 构建 CEP 引擎实时检测网络攻击?其中面临的挑战以及宝贵的实践经验有哪些?本文主要内容分为以下四个方面: 背景及现状 技术架构 产品及运 ...

  5. 大数据计算引擎之Flink Flink CEP复杂事件编程

    原文地址:大数据计算引擎之Flink Flink CEP复杂事件编程 复杂事件编程(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的时事件系序列库,并利 ...

  6. Drools7 动态更新规则

    动态更新规则 上一章节讲述了 Drools7 和 Springboot2 集成使用,集成工作相对简单.可以快速开发.但是缺点也很明显,规则和配置文件绑定在项目中(耦合度太高).如果你不需要修改规则文件 ...

  7. Drools动态更新规则

    Drools动态规则加载 Drools简介 动态加载规则 1. 项目依赖 2. 实现 (1) 新增规则 (2) 删除规则 (3) 更新规则 Drools简介 简单说明一下规则动态插拔使用到的Drool ...

  8. [Flink]Flink实时框架介绍

    目录 架构 应用 流 状态 时间 分层API 运维 架构 Flink是一个分布式数据流处理引擎,用于处理带状态的有边界或无边界数据流.可以部署在通用的分布式集群上,实现海量数据在内存上快速计算. 无边 ...

  9. h5 数字变化_前端/h5 D3.js实现根据数据动态更新图形/类似进度实时变化效果

    最近接到一个需求,在满足规则下,实现类似这种展示效果,其实就是用图形反映数据(NK,一种干扰值) 运行后,它其实是不断在动的,每格都可能显示灰色或者彩色 这里一共是10个格子,每格代表一个范围边界,说 ...

最新文章

  1. squid启动失败的解决办法
  2. php实现人员权限管理(用户界面)
  3. centos7 python
  4. php中的自定义函数与c语言有什么区别,php与c语言的不同点是什么?
  5. Web Service 缓存
  6. 前端学习(2766):生命周期函数
  7. 工业智能相机与基于PC的机器视觉的区别比较
  8. End-to-end Recovery of Human Shape and Pose
  9. Jquery取form表单中的所有参数
  10. MacOS平台上编译 hadoop 3.1.2 源码
  11. centos rm -rf 恢复删除的文件
  12. c语言程序图片截取,C++实现屏幕截图功能
  13. 线性回归 T检验P值计算
  14. 计算机声卡和显卡驱动,声卡或显卡驱动怎么卸载干净?声卡或显卡常见驱动问题解决方案...
  15. win10设置计算机关机时间,最新版:如何在Win10计算机上设置计划的关机时间? Windows 10计算机设置定时关机命令...
  16. 洛谷P6158 封锁,平面图最小乘积最短路
  17. 蓝绿部署、AB测试、灰度发布、⾦丝雀发布、滚 动发布的概念与区别
  18. matlab p图,【MATLAB】P图神器,初露锋芒:第一周作业(剧透)
  19. C2039 Error: WriteHuge : is not a member of CFile
  20. vue药物管理系统nodejs

热门文章

  1. 永福股份遭宁德时代减持3% 后者产业链投资浮盈已达80亿
  2. 阿里巴巴美股股价大跌:创在美上市以来最大单日跌幅
  3. 罗永浩吐槽卖小米、苹果被骂,卖华为也被骂,李楠:警惕键盘侠
  4. 全球仅4人,刚毕业年薪201万元 !华为最高档“天才少年”回应...
  5. 抖音内测语音直播功能 支持8位观众同时在线聊天
  6. OPPO Reno3系列旗舰官宣:骁龙765G+正反双曲面设计
  7. 小米投资偏爱智能与芯片 雷军:有3家科创板上市
  8. “天天快报”涉黄被下架?官方回应:不是我 是山寨版APP
  9. 卢伟冰再怼荣耀9X 10W快充与五年前红米Note一代机型一致
  10. 网易传媒回应“变相裁员 ”说法:假消息,将提起诉讼