大数据系列教程(4)Flink 使用 DataStream API 进行欺诈检测
目录
- 使用 DataStream API 进行欺诈检测
- **版本1**
- 版本2
- 版本3
使用 DataStream API 进行欺诈检测
Apache Flink 提供了一个 DataStream API,用于构建健壮的、有状态的流应用程序。它提供对状态和时间的细粒度控制,从而允许实施高级事件驱动系统。
需求:
信用卡欺诈在数字时代日益受到关注。犯罪分子通过诈骗或侵入不安全的系统来窃取信用卡号码。被盗号码通过一次或多次小额购买进行测试,通常为一美元或更少。如果这行得通,他们就会进行更重大的购买,以获得可以出售或自己保留的物品。
在本教程中,您将构建一个欺诈检测系统,用于提醒可疑的信用卡交易。使用一组简单的规则,您将看到 Flink 如何让我们实现高级业务逻辑并实时行动。
- 采用JDK8、maven进行构建
$ mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-walkthrough-datastream-java -DarchetypeVersion=1.14.4 -DgroupId=frauddetection -DartifactId=frauddetection -Dversion=0.1 -Dpackage=spendreport -DinteractiveMode=false
用idea进行打包成jar,然后通过Flink 的Web UI来提交作业。
版本1
FraudDetectionJob.java
package spendreport;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;public class FraudDetectionJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//执行环境、创建源DataStream<Transaction> transactions = env.addSource(new TransactionSource()).name("transactions");//分区事件和检测欺诈DataStream<Alert> alerts = transactions.keyBy(Transaction::getAccountId).process(new FraudDetector()).name("fraud-detector");//输出结果alerts.addSink(new AlertSink()).name("send-alerts");env.execute("Fraud Detection");}
}
- 第一行设置您的
StreamExecutionEnvironment
. 执行环境是您为 Job 设置属性、创建源并最终触发 Job 执行的方式。 - 源将来自外部系统(例如 Apache Kafka、Rabbit MQ 或 Apache Pulsar)的数据提取到 Flink Jobs 中。本演练使用的源可生成无限的信用卡交易流供您处理。每笔交易都包含账户 ID (
accountId
)、交易发生时间的时间戳 (timestamp
) 和美元金额 (amount
)。 - 该
transactions
流包含来自大量用户的大量交易,因此需要由多个欺诈检测任务并行处理。由于欺诈发生在每个账户的基础上,您必须确保同一账户的所有交易都由欺诈检测器操作员的同一并行任务处理。 - 为确保同一物理任务处理特定键的所有记录,您可以使用
DataStream#keyBy
. 该process()
调用添加了一个运算符,该运算符将函数应用于流中的每个分区元素。通常说在keyBy
之后的运算符,在这种情况下FraudDetector
,是在键控上下文中执行的。 - 接收器将
DataStream
写入外部系统;例如 Apache Kafka、Cassandra 和 AWS Kinesis。AlertSink
使用日志级别INFO记录每条记录,而不是将其写入持久存储,因此您可以轻松查看Alert
结果。
FraudDetector.java
欺诈检测器
package spendreport;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {private static final long serialVersionUID = 1L;private static final double SMALL_AMOUNT = 1.00;private static final double LARGE_AMOUNT = 500.00;private static final long ONE_MINUTE = 60 * 1000;@Overridepublic void processElement(Transaction transaction,Context context,Collector<Alert> collector) throws Exception {Alert alert = new Alert();alert.setId(transaction.getAccountId());collector.collect(alert);}
}
欺诈检测器实现为
KeyedProcessFunction
.KeyedProcessFunction#processElement
每个事务事件都会调用它的方法。第一个版本会对每笔交易产生警报,有些人可能会说这过于保守。本教程的后续步骤将指导您使用更有意义的业务逻辑扩展欺诈检测器。
版本2
对于第一个版本,欺诈检测器应该为任何进行小额交易的账户立即输出警报,然后是大笔交易。小是不到 1.00 美元,大是超过 500 美元。想象一下,您的欺诈检测器处理特定帐户的以下交易流。
1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 |
---|---|---|---|---|---|---|---|---|---|
13.01 | 25 | 0.09 | 510 | 102 | 91 | 0.02 | 30 | 700 | 32 |
交易 3 和 4 应标记为欺诈,因为这是一笔小额交易,0.09 美元,然后是一笔大笔交易510 美元。或者,交易 7、8 和 9 不是欺诈,因为 0.02 美元的小额金额没有紧跟大额交易;相反,有一个中间交易打破了这种模式。
为此,欺诈检测器必须记住跨事件的信息;只有前一笔交易规模较小时,一笔大额交易才具有欺诈性。跨事件记住信息需要状态,这就是我们决定使用KeyedProcessFunction的原因。它提供了对状态和时间的细粒度控制,这将使我们能够在整个演练中根据更复杂的要求改进我们的算法。
最直接的实现是在处理小事务时设置的布尔标志。当大笔交易通过时,您可以简单地检查是否为该帐户设置了标志。
但是,仅将标志实现为类中的成员变量是FraudDetector
行不通的。Flink 处理具有相同对象实例的多个账户的交易FraudDetector
,这意味着如果账户 A 和 B 路由通过相同的实例FraudDetector
,账户 A 的交易可以将标志设置为 true,然后账户 B 的交易可以设置关闭虚假警报。我们当然可以使用像 Map
这样的数据结构来跟踪单个键的标志,但是,一个简单的成员变量不会容错,并且在发生故障时它的所有信息都会丢失。因此,如果应用程序必须重新启动以从故障中恢复,欺诈检测器可能会错过警报。
为了应对这些挑战,Flink 提供了容错状态的原语,这些原语几乎与常规成员变量一样易于使用。
Flink 中最基本的状态类型是ValueState,这是一种数据类型,可以为它包装的任何变量添加容错能力。 ValueState
是键控状态的一种形式,这意味着它仅在应用于键控上下文的运算符中可用;紧随其后的任何运算符DataStream#keyBy
。运算符的键控状态自动限定为当前处理的记录的键。在这个例子中,key 是当前交易的账户 ID(由 声明keyBy()
),并FraudDetector
为每个账户维护一个独立的状态。 ValueState
是使用创建的ValueStateDescriptor
其中包含有关 Flink 应如何管理变量的元数据。状态应该在函数开始处理数据之前注册。正确的钩子是open()
方法。
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {private static final long serialVersionUID = 1L;private transient ValueState<Boolean> flagState;@Overridepublic void open(Configuration parameters) {ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>("flag",Types.BOOLEAN);flagState = getRuntimeContext().getState(flagDescriptor);}@Overridepublic void processElement(Transaction transaction,Context context,Collector<Alert> collector) throws Exception {// Get the current state for the current keyBoolean lastTransactionWasSmall = flagState.value();// Check if the flag is setif (lastTransactionWasSmall != null) {if (transaction.getAmount() > LARGE_AMOUNT) {// Output an alert downstreamAlert alert = new Alert();alert.setId(transaction.getAccountId());collector.collect(alert); }// Clean up our stateflagState.clear();}if (transaction.getAmount() < SMALL_AMOUNT) {// Set the flag to trueflagState.update(true);}}}
ValueState
是一个包装类,类似于AtomicReference
或AtomicLong
在 Java 标准库中。它提供了三种与其内容交互的方法;update
设置状态,value
获取当前值,并clear
删除其内容。如果特定键的状态为空,例如在应用程序开始时或调用后ValueState#clear
,ValueState#value
则将返回null
。不保证对返回的对象的修改ValueState#value
被系统识别,因此所有更改都必须使用ValueState#update
. 否则,容错由 Flink 在后台自动管理,因此您可以像使用任何标准变量一样与之交互。对于每笔交易,欺诈检测器都会检查该帐户的标志状态。请记住,
ValueState
总是范围为当前键,即帐户。如果标志不为空,则该帐户的最后一笔交易很小,因此如果该交易的金额很大,则检测器会输出欺诈警报。在那次检查之后,标志状态被无条件地清除。要么当前交易导致欺诈警报,并且模式结束,要么当前交易没有引起警报,并且模式被破坏并且需要重新启动。
最后,检查交易金额是否小。如果是这样,则设置标志以便下一个事件可以检查它。请注意,它
ValueState<Boolean>
具有三个状态,未设置 (null
)true
、 和false
,因为所有ValueState
’ 都可以为空。该作业仅使用 unset (null
) 并true
检查标志是否已设置。
版本3
诈骗者不会等待很长时间进行大量购买,以减少他们的测试交易被注意到的机会。例如,假设您想为欺诈检测器设置 1 分钟的超时时间;即,在前面的示例中,交易 3 和 4 仅在它们发生在 1 分钟内时才会被视为欺诈。FlinkKeyedProcessFunction
允许您设置在未来某个时间点调用回调方法的计时器。
让我们看看如何修改我们的 Job 以符合我们的新要求:
- 每当标志设置为 时
true
,还要在未来设置一个 1 分钟的计时器。 - 当计时器触发时,通过清除其状态来重置标志。
- 如果标志被清除,则应取消计时器。
要取消计时器,您必须记住它设置的时间,并且记住意味着状态,因此您将从创建计时器状态和标志状态开始。
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {private static final long serialVersionUID = 1L;private static final double SMALL_AMOUNT = 1.00;private static final double LARGE_AMOUNT = 500.00;private static final long ONE_MINUTE = 60 * 1000;private transient ValueState<Boolean> flagState;private transient ValueState<Long> timerState;@Overridepublic void open(Configuration parameters) {ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>("flag",Types.BOOLEAN);flagState = getRuntimeContext().getState(flagDescriptor);ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>("timer-state",Types.LONG);timerState = getRuntimeContext().getState(timerDescriptor);}@Overridepublic void processElement(Transaction transaction,Context context,Collector<Alert> collector) throws Exception {// Get the current state for the current keyBoolean lastTransactionWasSmall = flagState.value();// Check if the flag is setif (lastTransactionWasSmall != null) {if (transaction.getAmount() > LARGE_AMOUNT) {//Output an alert downstreamAlert alert = new Alert();alert.setId(transaction.getAccountId());collector.collect(alert);}// Clean up our statecleanUp(context);}if (transaction.getAmount() < SMALL_AMOUNT) {// set the flag to trueflagState.update(true);long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;context.timerService().registerProcessingTimeTimer(timer);timerState.update(timer);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {// remove flag after 1 minutetimerState.clear();flagState.clear();}private void cleanUp(Context ctx) throws Exception {// delete timerLong timer = timerState.value();ctx.timerService().deleteProcessingTimeTimer(timer);// clean up all statetimerState.clear();flagState.clear();}
}
KeyedProcessFunction#processElement``Context
使用包含计时器服务的a 调用。定时器服务可用于查询当前时间、注册定时器和删除定时器。有了这个,您可以在每次设置标志时将计时器设置为 1 分钟,并将时间戳存储在timerState
.处理时间为挂钟时间,由运行操作员的机器的系统时钟决定。
当计时器触发时,它会调用
KeyedProcessFunction#onTimer
. 覆盖此方法是您如何实现回调以重置标志。最后,要取消定时器,需要删除已注册的定时器,并删除定时器状态。您可以将其包装在辅助方法中并调用此方法而不是
flagState.clear()
.
使用提供的代码运行此代码TransactionSource
将为帐户 3 发出欺诈警报。您应该在任务管理器日志中看到以下输出:
2019-08-19 14:22:06,220 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:11,383 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:16,551 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:21,723 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2019-08-19 14:22:26,896 INFO org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
大数据系列教程(4)Flink 使用 DataStream API 进行欺诈检测相关推荐
- Flink基于 DataStream API 实现欺诈检测
目录 系列文章目录 文章目录 前言 一.Flink基于 DataStream API 实现欺诈检测 二.使用步骤 1.引入pom.xml 2.主类 3.欺诈逻辑判断类 4.运行结果: 总结 前言 在当 ...
- namenode无法启动_大数据系列教程003-hadoop伪分布式环境搭建步骤11-启动与验证环境...
声明:大数据系列教程文章由Java潘老师辛苦原创,免费公开供java爱好者学习.本教程学习知识储备:Java SE基础.Linux基础.数据库基础 1.将/usr/hadoop目录赋予777权限,否则 ...
- 大数据系列教程001-初识大数据
声明:大数据系列教程文章由Java潘老师辛苦原创,免费公开供java爱好者学习.如需转载请获得潘老师授权并保留原文链接,如有疑问或建议,可以联系潘老师: Q:1562691348 V:A1562691 ...
- 大数据系列教程003-hadoop伪分布式环境搭建步骤03-安装虚拟机CentOS7
声明:大数据系列教程文章由Java潘老师辛苦原创,免费公开供java爱好者学习.如需转载请获得潘老师授权并保留原文链接,如有疑问或建议,可以联系潘老师: Q:1562691348 V:A1562691 ...
- 大数据系列教程003-hadoop伪分布式环境搭建步骤
声明:大数据系列教程文章由Java潘老师辛苦原创,免费公开供java爱好者学习.如需转载请获得潘老师授权并保留原文链接,如有疑问或建议,可以联系潘老师: Q:1562691348 V:A1562691 ...
- 大数据系列教程003-hadoop伪分布式环境搭建步骤02-设置vmware虚拟网络编辑器
声明:大数据系列教程文章由Java潘老师辛苦原创,免费公开供java爱好者学习.如需转载请获得潘老师授权并保留原文链接,如有疑问或建议,可以联系潘老师: Q:1562691348 V:A1562691 ...
- 12c集群日志位置_大数据系列教程006-开启日志聚合功能
Container日志是hadoop各个container记录的日志,其中会包含错误或失败的重要信息.如果没有打开日志聚合,默认是分布在各个nodemanager节点上的.如果打开了日志聚合选项,则会 ...
- Flink官网实例:基于DataStream API 实现欺诈检测,完整实现
1.官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/try-flink/datastream_api.html ...
- 大数据入门教程系列之Hive内置函数及自定义函数
本篇文章主要介绍Hive内置函数以及自定义UDF函数和UDFT函数,自定义UDF函数通过一个国际转换中文的例子说明. 操作步骤: ①.准备数据和环境 ②.演示Hive内置函数 ③.自定义UDF函数编写 ...
最新文章
- 通过分析exevc系统调用处理过程来理解Linux内核如何装载和启动一个可执行程序...
- Flex使用cookie保存登状态
- ubuntu开启客户端nfs服务_LINUX系统使用NFS文件共享
- python rsa加密长度_python RSA加密最新(RSA/ECB/PKCS1Padding)
- ubuntu修改默认系统启动项
- 面试之函数节流和函数防抖
- 【JAVA程序设计】(C00048)基于springboot酒店宾馆管理系统
- OPPO消息推送服务器,OPPO开放平台消息推送申请教程
- Leetcode刷题-459:重复的子字符串
- python c语言实现_使用C语言为python编写动态模块(3)--在C中实现python中的类
- Unicdoe【真正的完整码表】对照表(一)
- 牛顿法 泰勒二次展开式
- 【中亦安图】导致Oracle性能抖动的参数提醒(4)
- cv2绘图 cv.line(),cv.circle(),cv.rectangle(),cv.ellipse(),cv.putText() python
- hive 的 lateral view用法以及注意事项
- 比利时银行集团KBC创建基于区块链的硬币
- 全球及中国深紫外LED行业十四五规划及前景战略研究报告2021-2027年版
- haxm intel庐_如何开启Intel HAXM功能
- 微型计算机如何跳线,PCB设计之如何使用跳线
- 网易非人学园手游6月22日双端上线,这款漫画恶搞风5V5对战手游你喜欢吗?
热门文章
- centos-linux 查看网关
- 【Libreoffice整合SpringBoot】只需这一篇
- android开发-Listview中显示不同的视图布局
- 48、MyBatis的优缺点
- 碳通宝打造创新绿色金融产品,多元化业务拓宽收益通道
- 军队文职丨试用期薪资7200起,转正过万!军队编制,六险两金+各项福利!了解一下!
- [转] java - 过滤ASCII码中的不可见字符, ASCII三部分, 各控制字符详解
- 2013年美国LBS应用关注度统计(按统计样本的百分比)
- licode服务端总结
- java过滤excel换行符_excel 添加换行符,去除换行符: