使用DataStream API进行欺诈检测
Apache Flink提供了一个DataStream API,用于构建强大的、有状态的流式应用。它提供了对状态和时间的精细控制,这使得高级事件驱动系统的实现成为可能。在这个分步指南中,你将学习如何使用Flink的DataStream API来构建一个有状态的流应用。
你在建造什么?
在数字时代,信用卡诈骗是一个日益严重的问题。犯罪分子通过行骗或入侵不安全的系统来盗取信用卡号码。盗取的号码通过进行一次或多次小额购物来测试,通常是一美元或更少。如果这样做有效,他们就会进行更多的大宗交易,以获得他们可以出售或为自己保留的物品。
在本教程中,您将建立一个欺诈检测系统,对可疑的信用卡交易进行报警。使用一组简单的规则,您将看到Flink如何让我们实现高级业务逻辑并实时行动。
先决条件
这个演练假设你对Java或Scala有一定的熟悉,但即使你来自不同的编程语言,你也应该能够跟上。
救命,我被卡住了!
如果你被卡住了,请查看社区支持资源。特别是Apache Flink的用户邮件列表一直被评为Apache项目中最活跃的一个,也是快速获得帮助的好方法。
如何跟进
如果你想跟着走,你需要一台电脑和
Java 8或11
Maven
提供的Flink Maven Archetype会快速创建一个包含所有必要依赖项的骨架项目,因此你只需要专注于填写业务逻辑。这些依赖包括flink-streaming-java,它是所有Flink流应用的核心依赖,以及flink-walkthrough-common,它有数据生成器和其他针对这个演练的类。
注意:为了简洁起见,本演练中的每个代码块可能不包含完整的周边类。完整的代码可以在页面底部找到。
java代码:
$ mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-walkthrough-datastream-java
-DarchetypeVersion=1.12.0
-DgroupId=frauddetection
-DartifactId=frauddetection
-Dversion=0.1
-Dpackage=spendreport
-DinteractiveMode=false
scala代码:
$ mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-walkthrough-datastream-scala
-DarchetypeVersion=1.12.0
-DgroupId=frauddetection
-DartifactId=frauddetection
-Dversion=0.1
-Dpackage=spendreport
-DinteractiveMode=false
如果你愿意,可以编辑groupId、artifactId和包。有了上述参数,Maven会创建一个名为frauddetection的文件夹,其中包含一个项目,该项目包含了完成本教程的所有依赖项。将项目导入编辑器后,你可以找到一个文件FraudDetectionJob.java(或FraudDetectionJob.scala),里面有以下代码,你可以直接在IDE里面运行。试着在数据流中设置断点,并在DEBUG模式下运行代码,以了解一切如何工作。
java代码:
FraudDetectionJob.java
package spendreport; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.walkthrough.common.sink.AlertSink; import org.apache.flink.walkthrough.common.entity.Alert; import org.apache.flink.walkthrough.common.entity.Transaction; import org.apache.flink.walkthrough.common.source.TransactionSource; public class FraudDetectionJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Transaction> transactions = env .addSource(new TransactionSource()) .name("transactions"); DataStream<Alert> alerts = transactions .keyBy(Transaction::getAccountId) .process(new FraudDetector()) .name("fraud-detector"); alerts .addSink(new AlertSink()) .name("send-alerts"); env.execute("Fraud Detection"); } }
FraudDetector.java
package spendreport; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.walkthrough.common.entity.Alert; import org.apache.flink.walkthrough.common.entity.Transaction; public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { private static final long serialVersionUID = 1L; private static final double SMALL_AMOUNT = 1.00; private static final double LARGE_AMOUNT = 500.00; private static final long ONE_MINUTE = 60 * 1000; @Override public void processElement( Transaction transaction, Context context, Collector<Alert> collector) throws Exception { Alert alert = new Alert(); alert.setId(transaction.getAccountId()); collector.collect(alert); } }
scala代码:
FraudDetectionJob.scala
package spendreport import org.apache.flink.streaming.api.scala._ import org.apache.flink.walkthrough.common.sink.AlertSink import org.apache.flink.walkthrough.common.entity.Alert import org.apache.flink.walkthrough.common.entity.Transaction import org.apache.flink.walkthrough.common.source.TransactionSource object FraudDetectionJob { @throws[Exception] def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val transactions: DataStream[Transaction] = env .addSource(new TransactionSource) .name("transactions") val alerts: DataStream[Alert] = transactions .keyBy(transaction => transaction.getAccountId) .process(new FraudDetector) .name("fraud-detector") alerts .addSink(new AlertSink) .name("send-alerts") env.execute("Fraud Detection") } }
FraudDetector.scala
package spendreport import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.util.Collector import org.apache.flink.walkthrough.common.entity.Alert import org.apache.flink.walkthrough.common.entity.Transaction object FraudDetector { val SMALL_AMOUNT: Double = 1.00 val LARGE_AMOUNT: Double = 500.00 val ONE_MINUTE: Long = 60 * 1000L } @SerialVersionUID(1L) class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] { @throws[Exception] def processElement( transaction: Transaction, context: KeyedProcessFunction[Long, Transaction, Alert]#Context, collector: Collector[Alert]): Unit = { val alert = new Alert alert.setId(transaction.getAccountId) collector.collect(alert) } }
拆解代码
让我们一步步走完这两个文件的代码。FraudDetectionJob类定义了应用程序的数据流,FraudDetector类定义了检测欺诈交易的函数的业务逻辑。
我们从FraudDetectionJob类的主方法中开始描述Job是如何组装的。
执行环境
第一行设置了你的StreamExecutionEnvironment。执行环境是你为你的Job设置属性,创建你的源,并最终触发Job的执行。
Java代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Scala代码:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
创建源
源从外部系统(如Apache Kafka、Rabbit MQ或Apache Pulsar)摄取数据到Flink Jobs中。本演练使用了一个源,它能生成无限的信用卡交易流供您处理。每笔交易都包含一个账户ID(accountId)、交易发生的时间戳(timestamp)和美元金额(金额)。附在源码上的名字只是为了调试的目的,所以如果出了问题,我们会知道错误的来源。
Java代码:
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
Scala代码:
val transactions: DataStream[Transaction] = env
.addSource(new TransactionSource)
.name("transactions")
分区事件和检测欺诈
交易流包含大量用户的交易,因此需要由多个欺诈检测任务并行处理。由于欺诈行为是以每个账户为单位发生的,因此您必须确保同一账户的所有交易都由欺诈检测操作员的同一个并行任务处理。
为了确保同一个物理任务处理某个特定密钥的所有记录,您可以使用 DataStream#keyBy 分割一个流。process()调用添加了一个操作符,该操作符将一个函数应用于流中的每个分区元素。通常在keyBy(在本例中为FraudDetector)在键控上下文中执行后立即说操作符。
Java代码:
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
Scala代码:
val alerts: DataStream[Alert] = transactions
.keyBy(transaction => transaction.getAccountId)
.process(new FraudDetector)
.name("fraud-detector")
输出结果
水槽将DataStream写入外部系统;如Apache Kafka、Cassandra和AWS Kinesis。AlertSink会将每个Alert记录用日志级别INFO记录下来,而不是写到持久化存储中,这样你就可以很容易地看到你的结果。
Java代码:
alerts.addSink(new AlertSink());
Scala代码:
alerts.addSink(new AlertSink)
执行工作
Flink应用是懒惰构建的,只有在完全成型后才会运到集群中执行。调用StreamExecutionEnvironment#execute开始执行我们的Job,并给它起一个名字。
Java代码:
env.execute("Fraud Detection");
Scala代码:
env.execute("Fraud Detection")
欺诈检测器
欺诈检测器是作为一个KeyedProcessFunction实现的。它的方法KeyedProcessFunction#processElement被调用为每一个交易事件。第一个版本在每个事务上都会产生一个警报,有人可能会说这太保守了。
本教程的下一步将引导你用更有意义的业务逻辑来扩展欺诈检测器。
Java代码:
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
}
Scala代码:
object FraudDetector {
val SMALL_AMOUNT: Double = 1.00
val LARGE_AMOUNT: Double = 500.00
val ONE_MINUTE: Long = 60 * 1000L
}
@SerialVersionUID(1L)
class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
@throws[Exception]
def processElement(
transaction: Transaction,
context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
collector: Collector[Alert]): Unit = {
val alert = new Alert
alert.setId(transaction.getAccountId)
collector.collect(alert)
}
}
编写一个真实的应用程序(v1)
对于第一个版本,欺诈检测器应该为任何紧随大额交易之后进行小额交易的账户输出警报。其中,小额是指小于1.00美元的交易,大额是指超过500美元的交易。想象一下,您的欺诈检测器为一个特定的账户处理了以下交易流。
交易3和4应被标记为欺诈,因为它是一笔小额交易,0.09美元,然后是一笔大额交易,510美元。另外,交易7、8、9不是欺诈,因为0.02美元的小额交易之后并没有紧接着大额交易,而是有一个中间交易打破了这个模式。
要做到这一点,欺诈检测器必须记住跨事件的信息;只有当前一个交易金额较小时,大额交易才是欺诈的。跨事件记忆信息需要状态,这就是为什么我们决定使用KeyedProcessFunction。它提供了对状态和时间的精细控制,这将使我们在整个演练过程中以更复杂的要求来演化我们的算法。
最直接的实现是一个布尔标志,每当处理一个小的事务时,就会设置这个标志。当有大额交易通过时,你可以简单地检查该账户的标志是否被设置。
然而,仅仅将标志作为FraudDetector类中的成员变量来实现是不行的。Flink用FraudDetector的同一个对象实例来处理多个账户的交易,这意味着如果账户A和账户B通过FraudDetector的同一个实例进行路由,那么账户A的交易可能会将flag设置为true,然后账户B的交易可能会触发虚假警报。当然,我们可以使用像Map这样的数据结构来跟踪各个键的标志,但是,一个简单的成员变量并不具备容错性,一旦发生故障,它的所有信息就会丢失。因此,如果应用程序不得不重新启动以从故障中恢复,欺诈检测器可能会错过警报。
为了解决这些挑战,Flink提供了容错状态的基元,这些基元几乎和普通成员变量一样容易使用。
Flink中最基本的状态类型是ValueState,这是一种为其封装的任何变量增加容错性的数据类型。ValueState是键控状态的一种形式,这意味着它只有在键控上下文中应用的操作符中才可用;任何操作符紧跟DataStream#keyBy。一个操作符的键控状态会自动作用于当前处理的记录的键。在这个例子中,键是当前交易的账户id(由keyBy()声明),FraudDetector为每个账户维护一个独立的状态。ValueState是使用ValueStateDescriptor创建的,它包含了关于Flink应该如何管理变量的元数据。该状态应该在函数开始处理数据之前被注册。正确的钩子是open()方法。
Java代码:
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private transient ValueState<Boolean> flagState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
}
Scala代码:
@SerialVersionUID(1L)
class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
@transient private var flagState: ValueState[java.lang.Boolean] = _
@throws[Exception]
override def open(parameters: Configuration): Unit = {
val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
flagState = getRuntimeContext.getState(flagDescriptor)
}
ValueState是一个包装类,类似于Java标准库中的AtomicReference或AtomicLong。它提供了三个方法来与它的内容进行交互;update设置状态,value获取当前值,clear删除其内容。如果某个键的状态为空,比如在应用程序开始时或调用ValueState#clear后,那么ValueState#value将返回null。对ValueState#value返回的对象的修改不能保证被系统识别,因此所有的修改都必须使用ValueState#update进行。否则,容错是由Flink在引擎盖下自动管理的,所以你可以像与任何标准变量一样与它交互。
下面,你可以看到一个例子,说明你如何使用标志状态来跟踪潜在的欺诈交易。
Java代码:
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
// Get the current state for the current key
Boolean lastTransactionWasSmall = flagState.value();
// Check if the flag is set
if (lastTransactionWasSmall != null) {
if (transaction.getAmount() > LARGE_AMOUNT) {
// Output an alert downstream
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
// Clean up our state
flagState.clear();
}
if (transaction.getAmount() < SMALL_AMOUNT) {
// Set the flag to true
flagState.update(true);
}
}
Scala代码:
override def processElement(
transaction: Transaction,
context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
collector: Collector[Alert]): Unit = {
// Get the current state for the current key
val lastTransactionWasSmall = flagState.value
// Check if the flag is set
if (lastTransactionWasSmall != null) {
if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
// Output an alert downstream
val alert = new Alert
alert.setId(transaction.getAccountId)
collector.collect(alert)
}
// Clean up our state
flagState.clear()
}
if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
// set the flag to true
flagState.update(true)
}
}
对于每一笔交易,欺诈检测器都会检查该账户的标志状态。请记住,ValueState总是被扩展到当前的键,即账户。如果标志是非空的,那么该账户最后一次看到的交易金额很小,所以如果这次交易的金额很大,那么检测器就会输出一个欺诈警报。
经过该检查后,标志状态无条件清零。要么是当前交易引起了欺诈警报,模式结束;要么是当前交易没有引起警报,模式被破坏,需要重新开始。
最后,检查交易金额是否偏小。如果是,则设置标志,以便下一个事件检查。注意ValueState<Boolean>实际上有三种状态,unset ( null)、true和false,因为所有ValueState的都是可空的。这个工作只利用unset ( null)和true来检查标志是否被设置。
欺诈检测器v2: 状态+时间=❤️
骗子不会等很久才进行大额购买,以减少他们的测试交易被发现的机会。例如,假设你想给你的欺诈检测器设置一个1分钟的超时时间;也就是说,在前面的例子中,只有当交易3和4发生在1分钟之内时,它们才会被认为是欺诈。Flink的KeyedProcessFunction允许你设置定时器,在未来的某个时间点调用回调方法。
让我们看看如何修改我们的Job以符合我们的新要求。
每当标志被设置为真时,就设置一个未来1分钟的定时器。
当定时器发射时,通过清除标志的状态来重置标志。
如果标志曾经被清除,则应取消定时器。
要取消一个定时器,你必须记住它被设置的时间,而记住意味着状态,所以你将首先创建一个定时器状态以及你的标志状态。
Java代码:
private transient ValueState<Boolean> flagState;
private transient ValueState<Long> timerState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timer-state",
Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}
Scala代码:
@SerialVersionUID(1L)
class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
@transient private var flagState: ValueState[java.lang.Boolean] = _
@transient private var timerState: ValueState[java.lang.Long] = _
@throws[Exception]
override def open(parameters: Configuration): Unit = {
val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
flagState = getRuntimeContext.getState(flagDescriptor)
val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
timerState = getRuntimeContext.getState(timerDescriptor)
}
KeyedProcessFunction#processElement是用一个包含定时器服务的Context调用的。定时器服务可以用来查询当前时间、注册定时器和删除定时器。有了它,你可以在每次设置标志时,设置未来1分钟的定时器,并将时间戳存储在timerState中。
Java代码:
if (transaction.getAmount() < SMALL_AMOUNT) {
// set the flag to true
flagState.update(true);
// set the timer and timer state
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
Scala代码:
if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
// set the flag to true
flagState.update(true)
// set the timer and timer state
val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
context.timerService.registerProcessingTimeTimer(timer)
timerState.update(timer)
}
处理时间为挂钟时间,由运行操作机的系统时钟决定。
当定时器发射时,它会调用KeyedProcessFunction#onTimer。覆盖这个方法就是如何实现你的回调来重置标志。
Java代码:
private void cleanUp(Context ctx) throws Exception {
// delete timer
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
// clean up all state
timerState.clear();
flagState.clear();
}
Scala代码:
@throws[Exception]
private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
// delete timer
val timer = timerState.value
ctx.timerService.deleteProcessingTimeTimer(timer)
// clean up all states
timerState.clear()
flagState.clear()
}
就是这样,一个全功能、有状态、分布式的流媒体应用!
最终程序
Java代码:
package spendreport;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
private transient ValueState<Boolean> flagState;
private transient ValueState<Long> timerState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timer-state",
Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
// Get the current state for the current key
Boolean lastTransactionWasSmall = flagState.value();
// Check if the flag is set
if (lastTransactionWasSmall != null) {
if (transaction.getAmount() > LARGE_AMOUNT) {
//Output an alert downstream
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
// Clean up our state
cleanUp(context);
}
if (transaction.getAmount() < SMALL_AMOUNT) {
// set the flag to true
flagState.update