大数据培训 | 电商用户行为分析之订单支付实时监控
在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。在接下来的内容中,我们将实现这两个需求。
模块创建和数据准备
同样地,在 UserBehaviorAnalysis 下新建一个 maven module 作为子项目,命名为 OrderTimeoutDetect。在这个子模块中,我们同样将会用到 flink 的 CEP 库来实现事件流的模式匹配,所以需要在 pom 文件中引入 CEP 的相关依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
同样,在 src/main/目录下,将默认源文件目录 java 改名为 scala。
更多Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com)
代码实现
在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未支付,订单就会被取消。
使用 CEP 实现
我们首先还是利用 CEP 库来实现这个功能。我们先将事件流按照订单号 orderId分流,然后定义这样的一个事件模式:在 15 分钟内,事件“create”与“pay”非严格紧邻:
val orderPayPattern = Pattern.begin[OrderEvent]("begin")
.where(_.eventType == "create")
.followedBy("follow")
.where(_.eventType == "pay")
.within(Time.seconds(5))
这样调用.select 方法时,就可以同时获取到匹配出的事件和超时未匹配的事件了。
在 src/main/scala 下继续创建 OrderTimeout.scala 文件,新建一个单例对象。定义样例类 OrderEvent,这是输入的订单事件流;另外还有 OrderResult,这是输出显示 的 订 单 状 态 结 果 。 订 单 数 据 也 本 应 该 从 UserBehavior 日 志 里 提 取 , 由 于UserBehavior.csv 中没有做相关埋点,我们从另一个文件 OrderLog.csv 中读取登录数据_大数据培训。
完整代码如下:
OrderTimeoutDetect/src/main/scala/OrderTimeout.scala
case class OrderEvent(orderId: Long, eventType: String, eventTime: Long)
case class OrderResult(orderId: Long, eventType: String)
object OrderTimeout {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val orderEventStream = env.readTextFile("YOUR_PATH\\resources\\OrderLog.csv")
.map( data => {
val dataArray = data.split(",")
OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(3).toLong)
})
.assignAscendingTimestamps(_.eventTime * 1000)
// 定义一个带匹配时间窗口的模式
val orderPayPattern = Pattern.begin[OrderEvent]("begin")
.where(_.eventType == "create")
.followedBy("follow")
.where(_.eventType == "pay")
.within(Time.minutes(15))
// 定义一个输出标签
val orderTimeoutOutput = OutputTag[OrderResult]("orderTimeout")
// 订单事件流根据 orderId 分流,然后在每一条流中匹配出定义好的模式
val patternStream = CEP.pattern(orderEventStream.keyBy("orderId"), orderPayPattern)
val completedResult = patternStream.select(orderTimeoutOutput) {
// 对于已超时的部分模式匹配的事件序列,会调用这个函数
(pattern: Map[String, Iterable[OrderEvent]], timestamp: Long) => {
val createOrder = pattern.get("begin")
OrderResult(createOrder.get.iterator.next().orderId, "timeout")
}
} {
// 检测到定义好的模式序列时,就会调用这个函数
pattern: Map[String, Iterable[OrderEvent]] => {
val payOrder = pattern.get("follow")
OrderResult(payOrder.get.iterator.next().orderId, "success")
}
}
// 拿到同一输出标签中的 timeout 匹配结果(流)
val timeoutResult = completedResult.getSideOutput(orderTimeoutOutput)
completedResult.print()
timeoutResult.print()
env.execute("Order Timeout Detect Job")
}
}
使用 Process Function 实现
我们同样可以利用 Process Function,自定义实现检测订单超时的功能。为了简化问题,我们只考虑超时报警的情形,在 pay 事件超时未发生的情况下,输出超时报警信息。
一个简单的思路是,可以在订单的 create 事件到来后注册定时器,15 分钟后触发;然后再用一个布尔类型的 Value 状态来作为标识位,表明 pay 事件是否发生过。如果 pay 事件已经发生,状态被置为 true,那么就不再需要做什么操作;而如果 pay事件一直没来,状态一直为 false,到定时器触发时,就应该输出超时报警信息_大数据视频。
具体代码实现如下:
OrderTimeoutDetect/src/main/scala/OrderTimeoutWithoutCep.scala
object OrderTimeoutWithoutCep {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val orderEventStream = env.readTextFile("YOUR_PATH\\resources\\OrderLog.csv")
.map( data => {
val dataArray = data.split(",")
OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(3).toLong)
})
更多Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com)
.assignAscendingTimestamps(_.eventTime * 1000)
.keyBy(_.orderId)
// 自定义一个 process function,进行 order 的超时检测,输出超时报警信息
val timeoutWarningStream = orderEventStream
.process(new OrderTimeoutAlert)
timeoutWarningStream.print()
env.execute()
}
class OrderTimeoutAlert extends KeyedProcessFunction[Long, OrderEvent, OrderResult]
{
lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new
ValueStateDescriptor[Boolean]("ispayed-state", classOf[Boolean]))
override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long,
OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {
val isPayed = isPayedState.value()
if (value.eventType == "create" && !isPayed) {
ctx.timerService().registerEventTimeTimer(value.eventTime * 1000L + 15 * 60 *
1000L)
} else if (value.eventType == "pay") {
isPayedState.update(true)
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent,
OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
val isPayed = isPayedState.value()
if (!isPayed) {
out.collect(OrderResult(ctx.getCurrentKey, "order timeout"))
}
isPayedState.clear()
}
}
}
来自两条流的订单交易匹配
对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做 合 并 处 理 。 这 里 我 们 利 用 connect 将 两 条 流 进 行 连 接 , 然 后 用 自 定 义 的CoProcessFunction 进行处理。
具体代码如下:
TxMatchDetect/src/main/scala/TxMatch
case class OrderEvent( orderId: Long, eventType: String, txId: String, eventTime: Long )
case class ReceiptEvent( txId: String, payChannel: String, eventTime: Long )
object TxMatch {
val unmatchedPays = new OutputTag[OrderEvent]("unmatchedPays")
val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatchedReceipts")
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val orderEventStream = env.readTextFile("YOUR_PATH\\resources\\OrderLog.csv")
.map( data => {
val dataArray = data.split(",")
OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2),
dataArray(3).toLong)
})
.filter(_.txId != "")
.assignAscendingTimestamps(_.eventTime * 1000L)
.keyBy(_.txId)
val receiptEventStream = env.readTextFile("YOUR_PATH\\resources\\ReceiptLog.csv")
.map( data => {
val dataArray = data.split(",")
ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong)
})
.assignAscendingTimestamps(_.eventTime * 1000L)
.keyBy(_.txId)
val processedStream = orderEventStream
.connect(receiptEventStream)
.process(new TxMatchDetection)
processedStream.getSideOutput(unmatchedPays).print("unmatched pays")
processedStream.getSideOutput(unmatchedReceipts).print("unmatched receipts")
processedStream.print("processed")
env.execute()
}
class TxMatchDetection extends CoProcessFunction[OrderEvent, ReceiptEvent,
(OrderEvent, ReceiptEvent)]{
lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new
ValueStateDescriptor[OrderEvent]("pay-state",classOf[OrderEvent]) )
lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new
ValueStateDescriptor[ReceiptEvent]("receipt-state", classOf[ReceiptEvent]) )
override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent,
ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent,
ReceiptEvent)]): Unit = {
val receipt = receiptState.value()
if( receipt != null ){
receiptState.clear()
out.collect((pay, receipt))
} else{
payState.update(pay)
ctx.timerService().registerEventTimeTimer(pay.eventTime * 1000L)
}
}
override def processElement2(receipt: ReceiptEvent, ctx:
CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out:
Collector[(OrderEvent, ReceiptEvent)]): Unit = {
val payment = payState.value()
if( payment != null ){
payState.clear()
out.collect((payment, receipt))
} else{
receiptState.update(receipt)
ctx.timerService().registerEventTimeTimer(receipt.eventTime * 1000L)
}
}
更多Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com)
override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent,
ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent,
ReceiptEvent)]): Unit = {
if ( payState.value() != null ){
ctx.output(unmatchedPays, payState.value())
}
if ( receiptState.value() != null ){
ctx.output(unmatchedReceipts, receiptState.value())
}
payState.clear()
receiptState.clear()
}
}
}
大数据培训 | 电商用户行为分析之订单支付实时监控相关推荐
- 北京大数据培训 | 电商用户行为分析之实时流量统计
模块创建和数据准备 在 UserBehaviorAnalysis 下 新 建 一 个 maven module 作 为 子 项 目 , 命 名 为NetworkFlowAnalysis.在这个子模块中 ...
- Flink_大数据技术之电商用户行为分析
大数据技术之电商用户行为分析 第1章 项目整体介绍 1.1 电商的用户行为 电商平台中的用户行为频繁且较复杂,系统上线运行一段时间后,可以收集到大量的用户行为数据,进而利用大数据技术进行深入挖掘和分析 ...
- 电商用户行为分析大数据平台相关系列1-环境介绍
最近在自学Spark,看了一些书籍和视频,总是感觉无从下手.拿着一个想法总是无从下手.追其原因,主要是没有系统的学习和使用.对于IT,一切新技术都需要不断实践.不断动手.本着动手的原则,本人通过各种渠 ...
- Spark项目实战—电商用户行为分析
文章目录 一.[SparkCore篇]项目实战-电商用户行为分析 前言:数据准备 1.数据规则如下: 2.详细字段说明: 3.样例类 (一)需求1:TOP10热门品类 1.需求说明 2.代码实现方案1 ...
- 【Flink】基于 Flink 的电商用户行为分析(二)
1.市场营销商业指标统计分析 模块创建和数据准备 继续在 UserBehaviorAnalysis 下新建一个 maven module 作为子项目,命名为 MarketAnalysis. 这个模块中 ...
- 基于 flink 的电商用户行为数据分析【8】| 订单支付实时监控
本文已收录github:https://github.com/BigDataScholar/TheKingOfBigData,里面有大数据高频考点,Java一线大厂面试题资源,上百本免费电子书籍,作者 ...
- 电商用户行为分析大数据平台
简介 对电商用户行为: 访问行为 购物行为 广告点击 进行复杂的分析 结果 辅助PM 数据分析师 管理分析现有状况 改进产品设计 调整公司战略业务 提升业绩 营业额以及市场占有率提升 技术简介 业务模 ...
- 数据分析实战(六):英国电商用户行为分析
案例:英国电商用户行为数据分析 Part 1. 数据获取 1.1 数据集简介 https://archive.ics.uci.edu/ml/datasets/online+retail# 该数据集为英 ...
- 某电商用户行为分析-提升GMV
目录 1. 明确项目背景和需求 项目目的: 分析思路: 2 数据探索(EDA) 2.1 数据集信息 2.2数据预览 2.2.1 new_user 2.2.2 age 2.2.3 sex 2.2.4 m ...
最新文章
- Iptables 表和链之间的关系【未完成】
- 世博、城市云和2020
- linux写时复制技术初探
- CSharpGL(5)解析3DS文件并用CSharpGL渲染
- [java] javax.el.PropertyNotFoundException: Property 'id' not found on type bean.Student
- java做一个简单的数据库,哪个嵌入式数据库用Java写成一个简单的键/值存储?
- Orchard中如何配置远端发布
- stack.pop()方法_C.示例中的Stack.Pop()方法
- linux mysql移植_linux 下mysql 移植设置方法
- mysql 索引原理_MySQL InnoDB索引原理和算法
- python——socket实现简单C/S交互开发
- deepin安装realtek c821无线网卡驱动
- GoogleNet家族
- 如何安装ioncube扩展
- 外企重修课:商人高通、任性微软、老姜IBM
- linux重启数据库11g,linux下重启oracle数据库
- 移动端下拉刷新,兼容ios,Android及微信浏览器
- 企业网站首页设计常见的6种布局方式
- 不用P图!用Python给头像加圣诞帽并制作成可执行软件!
- Windows安装pyserial