业务功能:根据告警规则,从告警流中将主告警和次告警进行关联。

pom maven配置:

    <drools.version>6.5.0.Final</drools.version><flink.version>1.10.0</flink.version><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.drools</groupId><artifactId>drools-core</artifactId><version>${drools.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.drools/drools-compiler --><dependency><groupId>org.drools</groupId><artifactId>drools-compiler</artifactId><version>${drools.version}</version></dependency><dependency><groupId>org.drools</groupId><artifactId>drools-decisiontables</artifactId><version>${drools.version}</version></dependency><dependency><groupId>org.drools</groupId><artifactId>drools-templates</artifactId><version>${drools.version}</version></dependency><dependency><groupId>org.kie</groupId><artifactId>kie-api</artifactId><version>${drools.version}</version></dependency>

1、普通的Java 的POJO(对应Drools中的fact 对象)

package www.lxk.com;import java.io.Serializable;
import java.util.Date;public class Alarm implements Serializable{private String alarmId;private Date eventTime;private String relatedType;public String getAlarmId() {return alarmId;}public void setAlarmId(String alarmId) {this.alarmId = alarmId;}public Date getEventTime() {return eventTime;}public void setEventTime(Date eventTime) {this.eventTime = eventTime;}public String getRelatedType() {return relatedType;}public void setRelatedType(String relatedType) {this.relatedType = relatedType;}@Overridepublic String toString() {return "Alarm{" +"alarmId='" + alarmId + '\'' +", eventTime=" + eventTime +", relatedType='" + relatedType + '\'' +'}';}
}

2、规则文件rules2.drl

package rulesimport com.lxk.oss.sink.Alarm
import java.util.ArrayListglobal java.util.List listdeclare Alarm@role(event)        //声明为事件@timestamp(eventTime)  //事件时间戳@expire(5S)      //事件过期时间10分钟,过期后不再匹配任何规则endrule "rule2"no-loop trueduration 3000when$pAlarm : Alarm( $qq: alarmId memberOf ["111"] ) from entry-point "Demo02"$sList:ArrayList(size >=10 ) from collect (Alarm(alarmId memberOf ["222","333"] )from entry-point "Demo02")then$pAlarm.setRelatedType("主告警");System.out.println("匹配到主告警" + $pAlarm.toString());System.out.println($qq);//drools.insert($pAlarm);$sList.forEach(x -> {((Alarm)x).setRelatedType("子告警");System.out.println("匹配到子告警"  + x.toString());});
end

3、处理类--规则的编译、收集和执行

package www.lxk.com;import com.lxk.aisware.oss.sink.Alarm;
import com.lxk.aisware.oss.sink.Rule;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;public class Demo02 {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 非常关键,一定要设置启动检查点!!env.enableCheckpointing(5000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //精确一次env.setParallelism(1);//告警流DataStream<Alarm> dataStream = env.addSource(new MyAlarmSource());//规则流DataStream<Rule> ruleStream = env.addSource(new MyRuleSource());//dataStream.print();//ruleStream.print();dataStream.connect(ruleStream).flatMap(new RuleAlarmFunction()).print();env.execute("Drools Demo01");}//自定义类,继承SourceFunction,每条数据是一个规则对象(根据业务自定义)public static class MyAlarmSource implements SourceFunction<Alarm>{// 定义一个标识,表示数据源是否继续运行private Boolean running = true;private static AtomicInteger counter = new AtomicInteger(0);@Overridepublic void run(SourceContext<Alarm> sourceContext) throws Exception {String[] ids = {"111","222","333"};while (running){Alarm alarm = new Alarm();alarm.setAlarmId(ids[counter.getAndIncrement()%3]);alarm.setEventTime(new Date());Thread.sleep(500);sourceContext.collect(alarm);}}@Overridepublic void cancel() {running = false;}}//自定义类,继承SourceFunction,每条数据是一个规则对象(根据业务自定义)public static class MyRuleSource implements SourceFunction<Rule>{// 定义一个标识,表示数据源是否继续运行private boolean running = true;@Overridepublic void run(SourceContext<Rule> sourceContext) throws Exception {while (running){//counter = 1;Thread.sleep(500);Rule rule = new Rule();File file = new File("F:\\04_drools\\fm-rule\\src\\main\\resources\\rules\\rules2.drl");FileReader reader = new FileReader(file);BufferedReader br = new BufferedReader(reader);// 建立一个对象,它把文件内容转成计算机能读懂的语言String line;StringBuffer sb = new StringBuffer();//网友推荐更加简洁的写法while ((line = br.readLine()) != null) {// 一次读入一行数据sb.append(line).append("\n");}rule.setDrlStr(sb.toString());rule.setOperate(1);rule.setName("rule2");sourceContext.collect(rule);}}@Overridepublic void cancel() {this.running = false;}}}
package www.lxk.com;import com.lxk.aisware.oss.sink.Alarm;
import com.lxk.aisware.oss.sink.Rule;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
import org.drools.core.impl.KnowledgeBaseImpl;
import org.kie.api.KieBaseConfiguration;
import org.kie.api.KieServices;
import org.kie.api.conf.EventProcessingOption;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.rule.EntryPoint;
import org.kie.internal.builder.KnowledgeBuilder;
import org.kie.internal.builder.KnowledgeBuilderFactory;
import org.kie.internal.io.ResourceFactory;
import org.kie.internal.utils.KieHelper;import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class RuleAlarmFunction extends RichCoFlatMapFunction<Alarm, Rule, Alarm> {private  KieHelper kieHelper;private  KnowledgeBaseImpl kieBase;private  KieSession kieSession;private  EntryPoint entryPoint;private Lock lock = new ReentrantLock();@Overridepublic void open(Configuration parameters) throws Exception {if(kieSession == null ){kieHelper = new KieHelper();//kieHelper.addContent(drlStr, ResourceType.DRL);KieBaseConfiguration config = KieServices.Factory.get().newKieBaseConfiguration();config.setOption( EventProcessingOption.STREAM );try{kieBase =(KnowledgeBaseImpl) kieHelper.build();kieSession = kieBase.newStatefulSession();}catch (Exception e){e.printStackTrace();}}}@Overridepublic void close() throws Exception {Thread.sleep(1000);kieSession.destroy();}@Overridepublic void flatMap1(Alarm alarm, Collector collector) throws Exception {lock.lock();try{entryPoint = kieSession.getEntryPoint("Demo02");if(entryPoint != null){entryPoint.insert(alarm);int fireNum = kieSession.fireAllRules();}}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}collector.collect(alarm);}@Overridepublic void flatMap2(Rule rule, Collector collector) throws Exception {if(rule.getOperate() == 0){lock.lock();try{kieBase.removeRule("rules", rule.getName());kieSession = kieBase.newKieSession();}finally {lock.unlock();}}else{// 规则名是ruleIdorg.kie.api.definition.rule.Rule ruleCache = kieBase.getRule("rules", rule.getName());// 规则如果已存在,不需要再次新增if(ruleCache !=null && rule.getName().equals(ruleCache.getName())){System.out.println("规则已经存在,ruleName="+ ruleCache.getName());}else {//重新添加规则KnowledgeBuilder kb = KnowledgeBuilderFactory.newKnowledgeBuilder();//装入规则,可以装入多个kb.add(ResourceFactory.newByteArrayResource(rule.getDrlStr().getBytes("utf-8")), ResourceType.DRL);kieBase.addKnowledgePackages(kb.getKnowledgePackages());kieSession = kieBase.newKieSession();}}}
}

4、运行测试

"C:\Program Files\Java\jdk1.8.0_201\bin\java" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA...Alarm{alarmId='111', eventTime=Sun Jun 07 17:01:17 CST 2020, relatedType='null'}
规则已经存在,ruleName=rule2
Alarm{alarmId='222', eventTime=Sun Jun 07 17:01:18 CST 2020, relatedType='null'}
规则已经存在,ruleName=rule2
匹配到主告警Alarm{alarmId='111', eventTime=Sun Jun 07 17:01:14 CST 2020, relatedType='主告警'}
111
匹配到子告警Alarm{alarmId='333', eventTime=Sun Jun 07 17:01:09 CST 2020, relatedType='子告警'}
匹配到子告警Alarm{alarmId='222', eventTime=Sun Jun 07 17:01:09 CST 2020, relatedType='子告警'}
匹配到子告警Alarm{alarmId='222', eventTime=Sun Jun 07 17:01:10 CST 2020, relatedType='子告警'}
匹配到子告警Alarm{alarmId='333', eventTime=Sun Jun 07 17:01:11 CST 2020, relatedType='子告警'}
匹配到子告警Alarm{alarmId='222', eventTime=Sun Jun 07 17:01:12 CST 2020, relatedType='子告警'}
匹配到子告警Alarm{alarmId='333', eventTime=Sun Jun 07 17:01:12 CST 2020, relatedType='子告警'}
匹配到子告警Alarm{alarmId='222', eventTime=Sun Jun 07 17:01:13 CST 2020, relatedType='子告警'}
匹配到子告警Alarm{alarmId='333', eventTime=Sun Jun 07 17:01:14 CST 2020, relatedType='子告警'}
匹配到子告警Alarm{alarmId='222', eventTime=Sun Jun 07 17:01:15 CST 2020, relatedType='子告警'}
匹配到子告警Alarm{alarmId='333', eventTime=Sun Jun 07 17:01:15 CST 2020, relatedType='子告警'}
匹配到子告警Alarm{alarmId='222', eventTime=Sun Jun 07 17:01:16 CST 2020, relatedType='子告警'}
匹配到子告警Alarm{alarmId='333', eventTime=Sun Jun 07 17:01:17 CST 2020, relatedType='子告警'}
匹配到子告警Alarm{alarmId='222', eventTime=Sun Jun 07 17:01:18 CST 2020, relatedType='子告警'}
匹配到子告警Alarm{alarmId='333', eventTime=Sun Jun 07 17:01:18 CST 2020, relatedType='子告警'}
匹配到主告警Alarm{alarmId='111', eventTime=Sun Jun 07 17:01:13 CST 2020, relatedType='主告警'}

Process finished with exit code 0

Flink整合Drools规则引擎相关推荐

  1. Spring Boot整合Drools规则引擎实例

    1.DRools介绍 官网:https://www.drools.org/ 规则引擎主要完成的就是将业务规则从代码中分离出来. DRools一款由JBoss组织提供的基于Java语言开发的开源规则引擎 ...

  2. SpringBoot2 整合 Drools规则引擎,实现高效的业务规则

    本文源码:GitHub·点这里 || GitEE·点这里 一.Drools引擎简介 1.基础简介 Drools是一个基于java的规则引擎,开源的,可以将复杂多变的规则从硬编码中解放出来,以规则脚本的 ...

  3. SpringBoot整合Drools规则引擎动态生成业务规则

    最近的项目中,使用的是flowable工作流来处理业务流程,但是在业务规则的配置中,是在代码中直接固定写死的,领导说这样不好,需要规则可以动态变化,可以通过页面去动态配置改变,所以就花了几天时间去研究 ...

  4. springboot2整合drools规则引擎(kie-spring+drools-core)

    What&Why Drools? Drools(JBoss Rules )的前身是Codehaus的一个开源项目叫Drools,后来纳入JBoss门下,更名为JBoss Rules,成为了JB ...

  5. 从零构建FLINK整合Drools动态规则实时运营系统(项目案例)第1篇(项目介绍篇)

    1 整体架构 前言 项目介绍在线视频: https://www.bilibili.com/video/BV1zv41157yY 本案例是一个专注于flink动态规则计算的项目,核心技术组件涉及flin ...

  6. 黑马Drools学习笔记(一)——Drools规则引擎示例概述以及SpringBoot整合示例

    文章目录 1. 问题引出 2. 规则引擎概述 2.1 什么是规则引擎 2.2 使用规则引擎的优势 2.3 规则引擎应用场景 2.4 Drools介绍 3. Drools入门案例 3.1 业务场景说明 ...

  7. 大数据风控项目实战 Drools规则引擎

    可以借鉴的干货 1,统一存储服务,包含:多种存储库连接封装和服务封装 在统一存储服务 2.获取配置的环境 类:EnvVariable 一.风控项目介绍 对一个复杂支付系统提供统一.全面.高效的风险控制 ...

  8. Drools 规则引擎

    官网:https://www.drools.org/ 累了听听歌:http://www.hy57.com/p/158102.html 1. 快速度入门 1. 导入依赖 <dependencies ...

  9. drools规则引擎耗费内存问题解决

    背景 公司使用drools规则引擎过程中,一个规则文件中差不多10个rule,每一次访问都需要耗费800M内存,导致频繁GC,同时classloader实例数每构建一次都会增加,导致nonHeap区内 ...

最新文章

  1. 【玩转数据】让您的PPT数据图表炫酷起来吧!
  2. win10 修改gitlab账号_玩转gitlab + jenkins
  3. 网络matlab程序_【Matlab】官网资源盘点
  4. Java LocalDate类| 带示例的compareTo()方法
  5. 网际控制报文协议icmp_网络中的ICMP(Internet控制消息协议)
  6. python加密敏感信息_仅需10行代码,使用python加密用户敏感数据
  7. vue中数组长度_如何在Vue.js中获取计算数组的长度
  8. myeclipse svn 删除文件或者文件夹
  9. 深扒洪恩教育招股书:研发含金量低,三年亏3亿,池宇峰再闯美股
  10. 计算机tpu定义,tpu材料
  11. 《JavaScript 设计模式核心原理与应用实践》
  12. 评测了10款画流程图软件,这4款最好用!(完全免费)
  13. 文本挖掘带你分析苏轼的一生
  14. 数学与泛型编程(7)置换算法
  15. CSS 中的层叠,层级关系
  16. C#获取当前桌面路径
  17. 深度学习实战(十一)——多标签分类(基于Keras)
  18. Json 读文件错误:Expecting property name enclosed
  19. 鸡啄米:C++编程入门系列之目录和总结
  20. c语言软件开发心得:

热门文章

  1. IPv4、IPv6地址、组播地址及子网子划分详解三可变长子网掩码
  2. 机器学习系列(5)_特征工程02特征提取
  3. 东南大学跟华中科技大学计算机学院,四大工学院:华科大读研比例最高;东南大学就业最好...
  4. 全能代码生成器,自动生成前后端代码、生成项目框架、生成JavaBean、生成数据库文档、自动化部署项目(TableGo v8.0.0)
  5. CNN中的卷积的意义
  6. Android简易画板:
  7. 培训班学python0基础靠谱吗_零基础学到什么程度可以找一份靠谱的Python工作?...
  8. Usenet:P2P下载的替代方法
  9. postman测试接口出现404
  10. bugku ctf web4 (看看源代码吧)