背景

日志系统接入的日志种类多、格式复杂多样,主流的有以下几种日志:

  • filebeat采集到的文本日志,格式多样
  • winbeat采集到的操作系统日志
  • 设备上报到logstash的syslog日志
  • 接入到kafka的业务日志

以上通过各种渠道接入的日志,存在2个主要的问题:

  • 格式不统一、不规范、标准化不够
  • 如何从各类日志中提取出用户关心的指标,挖掘更多的业务价值

为了解决上面2个问题,我们基于flink和drools规则引擎做了实时的日志处理服务。

系统架构

架构比较简单,架构图如下:


各类日志都是通过kafka汇总,做日志中转。

flink消费kafka的数据,同时通过API调用拉取drools规则引擎,对日志做解析处理后,将解析后的数据存储到Elasticsearch中,用于日志的搜索和分析等业务。

为了监控日志解析的实时状态,flink会将日志处理的统计数据,如每分钟处理的日志量,每种日志从各个机器IP来的日志量写到Redis中,用于监控统计。

模块介绍

系统项目命名为eagle。

  • eagle-api:基于springboot,作为drools规则引擎的写入和读取API服务。

  • eagle-common:通用类模块。

  • eagle-log:基于flink的日志处理服务。

重点讲一下eagle-log:

对接kafka、ES和Redis

对接kafka和ES都比较简单,用的官方的connector(flink-connector-kafka-0.10和flink-connector-elasticsearch6),详见代码。

对接Redis,最开始用的是org.apache.bahir提供的redis connector,后来发现灵活度不够,就使用了Jedis。

在将统计数据写入redis的时候,最开始用的keyby分组后缓存了分组数据,在sink中做统计处理后写入,参考代码如下:

String name = "redis-agg-log";        DataStream>> keyedStream = dataSource.keyBy((KeySelector) log -> log.getIndex())                .timeWindow(Time.seconds(windowTime)).trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))                .process(new ProcessWindowFunction>, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable iterable, Collector>> collector) {                        ArrayList logs = Lists.newArrayList(iterable);if (logs.size() > 0) {                            collector.collect(new Tuple2(s, logs));                        }                    }                }).setParallelism(redisSinkParallelism).name(name).uid(name);

后来发现这样做对内存消耗比较大,其实不需要缓存整个分组的原始数据,只需要一个统计数据就OK了,优化后:

String name = "redis-agg-log";        DataStream keyedStream = dataSource.keyBy((KeySelector) log -> log.getIndex())                .timeWindow(Time.seconds(windowTime))                .trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))                .aggregate(new LogStatAggregateFunction(), new LogStatWindowFunction())                .setParallelism(redisSinkParallelism).name(name).uid(name);

这里使用了flink的聚合函数和Accumulator,通过flink的agg操作做统计,减轻了内存消耗的压力。

使用broadcast广播drools规则引擎

1、drools规则流通过broadcast map state广播出去。

2、kafka的数据流connect规则流处理日志。

//广播规则流env.addSource(new RuleSourceFunction(ruleUrl)).name(ruleName).uid(ruleName).setParallelism(1)                .broadcast(ruleStateDescriptor);

//kafka数据流FlinkKafkaConsumer010 source = new FlinkKafkaConsumer010<>(kafkaTopic, new LogSchema(), properties);env.addSource(source).name(kafkaTopic).uid(kafkaTopic).setParallelism(kafkaParallelism);//数据流connect规则流处理日志BroadcastConnectedStream connectedStreams = dataSource.connect(ruleSource);connectedStreams.process(new LogProcessFunction(ruleStateDescriptor, ruleBase)).setParallelism(processParallelism).name(name).uid(name);

具体细节参考开源代码。

小结

本系统提供了一个基于flink的实时数据处理参考,对接了kafka、redis和elasticsearch,通过可配置的drools规则引擎,将数据处理逻辑配置化和动态化。

对于处理后的数据,也可以对接到其他sink,为其他各类业务平台提供数据的解析、清洗和标准化服务。

项目地址:

https://github.com/luxiaoxun/eagle

作者:阿凡卢

出处:http://www.cnblogs.com/luxiaoxun/

drools动态配置规则_基于 Flink 和 Drools 的实时日志处理相关推荐

  1. drools动态配置规则_关于规则引擎

    很早之前就知道Drools,这几天正好有个项目,里面用了大量的规则定义,就想是否能采用Drools来解决. 在github上分析了规则引擎项目,包括: Easy-rules https://githu ...

  2. drools动态配置规则_微服务实战系列(八)-网关springcloud gateway自定义规则

    1. 场景描述 先说明下项目中使用的网关是:springcloud gateway, 因需要给各个网关服务系统提供自定义配置路由规则,实时生效,不用重启网关(重启风险大),目前已实现:动态加载自定义路 ...

  3. 基于数据库数据增量同步_基于 Flink SQL CDC 的实时数据同步方案

    简介:Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的 ...

  4. Flink从入门到精通100篇(四)-基于 Flink 和 Drools 的实时日志处理

    背景 日志系统接入的日志种类多.格式复杂多样,主流的有以下几种日志: filebeat采集到的文本日志,格式多样 winbeat采集到的操作系统日志 设备上报到logstash的syslog日志 接入 ...

  5. 网络安全公司奇安信集团是如何基于 Flink 构建 CEP 引擎实时检测网络攻击【未来不可忽视的网络安全】

    摘要: 奇安信集团作为一家网络安全公司是如何基于 Flink 构建 CEP 引擎实时检测网络攻击?其中面临的挑战以及宝贵的实践经验有哪些?本文主要内容分为以下四个方面: 背景及现状 技术架构 产品及运 ...

  6. 基于Flink的高可靠实时ETL系统

    GIAC(GLOBAL INTERNET ARCHITECTURE CONFERENCE)是长期关注互联网技术与架构的高可用架构技术社区和msup推出的,面向架构师.技术负责人及高端技术从业人员的年度 ...

  7. 基于 Flink 的超大规模在线实时反欺诈系统的建设与实践

    在大数据时代,金融科技公司通常借助消费数据来综合评估用户的信用和还款能力.这个过程中,某些中介机构会搜集大量的号并进行"养号"工作,即在一年周期里让这些号形成正常的消费.通讯记录, ...

  8. Flink原理解析50篇(四)-基于 Flink CDC 打通数据实时入湖

    在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Flink和数据湖Apache Iceberg两种技术,来解决业务数据实时入湖相关的问题. 0 ...

  9. 基于Flink的超大规模在线实时反欺诈系统的建设与实践

    作者:关贺宇 在大数据时代,金融科技公司通常借助消费数据来综合评估用户的信用和还款能力.这个过程中,某些中介机构会搜集大量的号并进行"养号"工作,即在一年周期里让这些号形成正常的消 ...

最新文章

  1. Redis 笔记(04)— list类型(作为消息队列使用、在列表头部添加元素、尾部删除元素、查看列表长度、遍历指定列表区间元素、获取指定区间列表元素、阻塞式获取列表元素)
  2. docker安装Mysql5.7以及远程登陆链接配置
  3. 最容易看懂的汇编语言书籍:汇编语言 王爽
  4. 某大型国企技术平台建设
  5. linux使用crontab命令定时重启服务器
  6. android+note2+分辨率,Android自适应屏幕方向、大小和分辨率
  7. oracle 10g中没有refhost.xml,解决win7 安装oracle10g的问题
  8. window powershell 筛选
  9. 你所不知道的 CSS 阴影技巧与细节
  10. centos7学习笔记-安装配置apache
  11. 【spring】注解开发和spring整合junit
  12. BZOJ1001: [BeiJing2006]狼抓兔子 (最小割)
  13. 阿里云服务器防火墙的问题
  14. cad渐变线怎么画_花花绿绿的股票线是怎么画出来的?想怎么画就怎么画!
  15. oracle提取违反,ORA-01002: 提取违反顺序的问题分析
  16. java word模板poi生成文件_poi读写word模板 / java生成word文档
  17. 干货分享!华为模拟器Web配置防火墙
  18. 现在都在考华为认证,含金量高不高呢?该如何备考?
  19. AcWing Round #14
  20. Spring及Springboot IOC与AOP思考

热门文章

  1. ilitek win10 触摸屏驱动_想做多大尺寸触摸框找融创方圆定制触摸屏工厂
  2. java——Runtime
  3. 【PAT】B1055 集体照(25 分)
  4. Linux下从零开始部署和使用Jaeger
  5. 8.String、StringBuffer、enum枚举
  6. Discuz常见小问题-如何取消登陆发帖验证码
  7. 授人以鱼不如授人以渔,UCHome全面大解析培训活动【第三集】
  8. java 取Blob转为String
  9. java异常代码分析
  10. oracle如何创建视图