本文已收录github:https://github.com/BigDataScholar/TheKingOfBigData,里面有大数据高频考点,Java一线大厂面试题资源,上百本免费电子书籍,作者亲绘大数据生态圈思维导图…持续更新,欢迎star!

本篇是flink 的「电商用户行为数据分析」的第 8 篇文章,为大家带来的是市场营销商业指标统计分析订单支付实时监控的内容!通过本期内容,我们可以实现通过使用CEPProcess Function来实现订单支付实时监控的功能,还能学会通过connectjoin来实现flink双流join的功能,可谓干货满满!受益的朋友记得三连支持一下 ~


订单支付实时监控

在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。在接下来的内容中,我们将实现这两个需求。

模块创建和数据准备

同样地,在UserBehaviorAnalysis下新建一个 maven module作为子项目,命名为OrderTimeoutDetect。在这个子模块中,我们同样将会用到 flinkCEP 库来实现事件流的模式匹配,所以需要在pom文件中引入CEP的相关依赖:

<dependency><groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

同样,在src/main/目录下,将默认源文件目录java改名为scala。

代码实现

在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如15分钟),如果下单后一段时间仍未支付,订单就会被取消

使用CEP实现

我们首先还是利用CEP库来实现这个功能。我们先将事件流按照订单号orderId分流,然后定义这样的一个事件模式:在15分钟内,事件“create”与“pay”非严格紧邻:

    // 1、 定义一个匹配事件序列的模式val orderPayPattern = Pattern.begin[OrderEvent]("create").where(_.eventType == "create")   // 首先是订单的 create 事件.followedBy("pay").where(_.eventType == "pay")  // 后面来的是订单的 pay 事件.within(Time.minutes(15))    // 间隔 15 分钟

这样调用.select方法时,就可以同时获取到匹配出的事件超时未匹配的事件了。

在src/main/scala下继续创建OrderTimeout.scala文件,新建一个单例对象。定义样例类OrderEvent,这是输入的订单事件流;另外还有OrderResult,这是输出显示的订单状态结果。订单数据也本应该从UserBehavior日志里提取,由于UserBehavior.csv中没有做相关埋点,我们从另一个文件OrderLog.csv中读取登录数据。


完整代码如下:

import java.utilimport org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
/** @Author: Alice菌* @Date: 2020/12/13 15:46* @Description:*/
object OrderTimeoutWithOutCep {// 定义输入的订单事件样例类case class OrderEvent(orderId:Long,eventType:String,eventTime:Long)// 定义输出的订单检测结果样例类case class OrderResult(orderId:Long,resultMsg:String)def main(args: Array[String]): Unit = {// 定义流处理环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 设置程序并行度env.setParallelism(1)// 设置时间特征为事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 从文件中读取数据,并转换成样例类val orderEventStream: DataStream[OrderEvent] = env.readTextFile("YOUR_PATH\\OrderLog.csv").map(data => {// 样例数据: 34729,pay,sd76f87d6,1558430844val dataArray: Array[String] = data.split(",")OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(3).toLong)})  //  处理数据.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {override def extractTimestamp(element: OrderEvent): Long = element.eventTime * 1000L})  //  设置时间戳// 1、 定义一个匹配事件序列的模式val orderPayPattern = Pattern.begin[OrderEvent]("create").where(_.eventType == "create")   // 首先是订单的 create 事件.followedBy("pay").where(_.eventType == "pay")  // 后面来的是订单的 pay 事件.within(Time.minutes(15))    // 间隔 15 分钟// 2、 将 pattern 应用在按照 orderId分组的数据流上val patterStream: PatternStream[OrderEvent] = CEP.pattern(orderEventStream.keyBy(_.orderId), orderPayPattern)// 3、定义一个侧输出流标签,用来标明超时事件的侧输出流val orderTimeOutputTag: OutputTag[OrderResult] = new OutputTag[OrderResult]("order time out")// 4、调用select方法,提取匹配事件和超时事件,分别进行处理转换输出val result: DataStream[OrderResult] = patterStream.select(orderTimeOutputTag, new OrderTimeOutSelect(), new OrderPaySelect())// 5、打印输出result.print("payed")result.getSideOutput(orderTimeOutputTag).print("timeout")// 执行程序env.execute("order timeout detect job")}// 自定义超时处理函数class OrderTimeOutSelect() extends PatternTimeoutFunction[OrderEvent,OrderResult]{override def timeout(pattern: util.Map[String, util.List[OrderEvent]], timeoutTimestamp: Long): OrderResult = {val timeOutOrderId: Long = pattern.get("create").iterator().next().orderIdOrderResult(timeOutOrderId,"timeout at" + timeoutTimestamp)}}// 自定义匹配处理函数class OrderPaySelect() extends PatternSelectFunction[OrderEvent,OrderResult]{override def select(pattern: util.Map[String, util.List[OrderEvent]]): OrderResult = {val payedOrderId: Long = pattern.get("pay").get(0).orderIdOrderResult(payedOrderId,"pay successfully")}}
}

运行结果:

使用Process Function实现

我们同样可以利用Process Function,自定义实现检测订单超时的功能。为了简化问题,我们只考虑超时报警的情形,在pay事件超时未发生的情况下,输出超时报警信息。

一个简单的思路是,可以在订单的 create 事件到来后注册定时器,15分钟后触发;然后再用一个布尔类型的Value状态来作为标识位,表明pay事件是否发生过。如果pay事件已经发生,状态被置为true,那么就不再需要做什么操作;而如果pay事件一直没来,状态一直为false,到定时器触发时,就应该输出超时报警信息。

具体代码实现如下:

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector/** @Author: Alice菌* @Date: 2020/12/23 19:35* @Description: */
object OrderTimeout {// 定义输入的订单事件样例类case class OrderEvent(orderId: Long, eventType: String, eventTime: Long)// 定义输出的订单检测结果样例类case class OrderResult(orderId: Long, resultMsg: String)def main(args: Array[String]): Unit = {// 定义流处理环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 设置程序并行度env.setParallelism(1)// 设置时间特征为事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 读取输入的订单数据流val orderEventStream: DataStream[OrderEvent] = env.readTextFile("YOUR_PATH\\OrderLog.csv").map(data => {// 示例数据: 34729,pay,sd76f87d6,1558430844val dataArray: Array[String] = data.split(",")OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(3).toLong)})// 设置水印.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {override def extractTimestamp(element: OrderEvent): Long = element.eventTime * 1000L})// 自定义 Process Function,做精细化的流程控制val orderResultStream: DataStream[OrderResult] = orderEventStream.keyBy(_.orderId).process(new OrderPayMatchDetect())// 打印输出orderResultStream.print("payed")orderResultStream.getSideOutput(new OutputTag[OrderResult]("timeout")).print("timeout")// 执行程序env.execute("order timeout without cep job")}class OrderPayMatchDetect() extends KeyedProcessFunction[Long,OrderEvent,OrderResult]{// 定义状态,用来保存是否来过 create 和 pay 事件的标识位,以及定时器的时间戳lazy val isPayState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed", classOf[Boolean]))lazy val isCreateState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-created", classOf[Boolean]))// 定义一个状态,保存每次定时器的时间戳lazy val timerTsState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-ts", classOf[Long]))// 定义一个侧输出流val orderTimeOutputTag = new OutputTag[OrderResult]("timeout")override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {// 取出当前的状态val isPayed: Boolean = isPayState.value()val isCreated: Boolean = isCreateState.value()val timeTs: Long = timerTsState.value()// 判断当前事件的类型,分成不同的情况讨论:// 情况1: 来的是 create,要继续判断之前是否有 pay 来过if (value.eventType == "create"){// 情况 1.1 : 如果已经pay过,匹配成功,输出到主流,清空状态if (isPayed){out.collect(OrderResult(value.orderId,"payed successfully"))// 清除状态isPayState.clear()timerTsState.clear()// 删除定时器ctx.timerService().deleteEventTimeTimer(timeTs)}// 情况 1.2:如果没有pay过,那么就注册一个15分钟后的定时器,开始等待else{val ts: Long = value.eventTime * 1000L + 15 * 60 *1000L// 设置一个15分钟的定时器ctx.timerService().registerEventTimeTimer(ts)timerTsState.update(ts)isCreateState.update(true)}}// 情况2:来的是pay,要继续判断是否来过 createelse if (value.eventType == "pay"){// 情况2.1 : 如果 create 已经来过,匹配成功,要继续判断间隔时间是否超过了15分钟if (isCreated){// 情况 2.1.1:如果没有超时,正常输出结果到主流if (value.eventTime * 1000L < timeTs){out.collect(OrderResult(value.orderId,"payed successfully"))}else{// 情况2.1.2: 如果已经超时,那么输出 timeout 报警到侧输出流ctx.output(orderTimeOutputTag,OrderResult(value.orderId,"payed but already timeout"))}// 无论哪种情况,都已经有了输出,清空状态isCreateState.clear()timerTsState.clear()ctx.timerService().deleteEventTimeTimer(timeTs)}// 情况2.2 :如果 create 没来,需要等待乱序 create,注册一个当前pay时间戳的定时器else{val ts: Long = value.eventTime * 1000L// 设置定时器ctx.timerService().registerEventTimeTimer(ts)// 更新状态timerTsState.update(ts)isPayState.update(true)}}}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {// 定时器触发,需要判断是哪种情况if (isPayState.value()){// 如果 pay 过,那么说明 create没来,可能出现了数据丢失异常的情况ctx.output(orderTimeOutputTag,OrderResult(ctx.getCurrentKey,"already payed but not found created log"))}else{// 如果 没有 pay过,那么说明真正 15 分钟 超时 [提交了订单,但是超过了15分钟仍未支付]ctx.output(orderTimeOutputTag,OrderResult(ctx.getCurrentKey,"order timeout"))}// 清空状态isPayState.clear()isCreateState.clear()timerTsState.clear()}}
}

运行结果:

来自两条流的订单交易匹配

对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做合并处理。这里我们利用connect将两条流进行连接,然后用自定义的CoProcessFunction进行处理。

具体代码如下:

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector/** @Author: Alice菌* @Date: 2020/12/13 15:57* @Description: 来自两条流的订单交易匹配  ( connect 实现 )*/
object OrderPayTxMatch {// 输入输出的样例类case class ReceiptEvent(txId:String, payChannel:String, timestamp:Long)case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)def main(args: Array[String]): Unit = {// 创建流处理的环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 设置程序并行度env.setParallelism(1)// 设置时间特征为事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 从 OrderLog.csv 文件中读取数据 ,并转换成样例类val orderEventStream: KeyedStream[OrderEvent, String] = env.readTextFile("G:\\idea arc\\BIGDATA\\project\\src\\main\\resources\\OrderLog.csv").map(data => {// 样例数据 :  34731,pay,35jue34we,1558430849val dataArray: Array[String] = data.split(",")OrderEvent(dataArray(0).toLong,dataArray(1),dataArray(2),dataArray(3).toLong)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {override def extractTimestamp(element: OrderEvent): Long = element.eventTime * 1000L})       //  为数据流中的元素分配时间戳.filter(_.eventType != "") // 只过滤出pay事件.keyBy(_.txId)    // 根据 订单id 分组// 从 ReceiptLog.csv 文件中读取数据 ,并转换成样例类val receiptStream: KeyedStream[ReceiptEvent, String] = env.readTextFile("YOUR_PATH\\ReceiptLog.csv").map(data => {// 样例数据: 3hu3k2432,alipay,1558430848val dataArray: Array[String] = data.split(",")ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong)}).assignAscendingTimestamps(_.timestamp * 1000L) // 设置水印.keyBy(_.txId)   // 根据 txId 进行分组// connect 连接两条流,匹配事件进行处理val resultStream: DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream.connect(receiptStream).process(new OrderPayTxDetect())// 定义侧输出流val unmatchedPays: OutputTag[OrderEvent] = new OutputTag[OrderEvent]("unmatched-pays")val unmatchedReceipts: OutputTag[ReceiptEvent] = new OutputTag[ReceiptEvent]("unmatched-receipts")// 打印输出resultStream.print("matched")resultStream.getSideOutput(unmatchedPays).print("unmatched-pays")resultStream.getSideOutput(unmatchedReceipts).print("unmatched-receipts")env.execute("order pay tx match job")}// 定义 CoProcessFunction,实现两条流数据的匹配检测class OrderPayTxDetect() extends CoProcessFunction[OrderEvent,ReceiptEvent,(OrderEvent,ReceiptEvent)]{// 定义两个 ValueState,保存当前交易对应的支付事件和到账事件lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay", classOf[OrderEvent]))lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("receipt", classOf[ReceiptEvent]))//定义侧输出流val unmatchedPays: OutputTag[OrderEvent] = new OutputTag[OrderEvent]("unmatched-pays")val unmatchedReceipts: OutputTag[ReceiptEvent] = new OutputTag[ReceiptEvent]("unmatched-receipts")override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {// pay 来了,考察有没有对应的 receipt 来过val receipt: ReceiptEvent = receiptState.value()if (receipt != null){// 如果已经有 receipt,正常输出到主流out.collect((pay,receipt))receiptState.clear()}else{// 如果 receipt 还没来,那么把 pay 存入状态,注册一个定时器等待 5 秒payState.update(pay)ctx.timerService().registerEventTimeTimer(pay.eventTime * 1000L + 5000L)}}override def processElement2(receipt: ReceiptEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {//receipt来了,考察有没有对应的pay来过val pay: OrderEvent = payState.value()if (pay != null) {//如果已经有pay,那么正常匹配,输出到主流out.collect((pay, receipt))payState.clear()}else{// 如果 pay 还没来,那么把 receipt 存入状态,注册一个定时器等待 3 秒receiptState.update(receipt)ctx.timerService().registerEventTimeTimer(receipt.timestamp * 1000L + 3000L)}}// 定时触发, 有两种情况,所以要判断当前有没有pay和receiptoverride def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {//如果 pay 不为空,说明receipt没来,输出unmatchedPaysif (payState.value() != null){ctx.output(unmatchedPays,payState.value())}if (receiptState.value() != null){ctx.output(unmatchedReceipts,receiptState.value())}// 清除状态payState.clear()receiptState.clear()}}
}

运行结果:

        对于flink的双流join通过connect的做法,肯定会有小伙伴觉得过程比较冗复杂,那还有没有其他的方法也能实现类似的效果呢?

        当然是有的,下面就为大家展示另一种通过intervalJoin方法实现的方式:

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector/** @Author: Alice菌* @Date: 2020/12/12 20:23* @Description: 来自两条流的订单交易匹配  (  JOIN 实现 )*/
object OrderPayTxMatchWithJoin {// 输入输出的样例类case class ReceiptEvent(txId:String, payChannel:String, timestamp:Long)case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)def main(args: Array[String]): Unit = {// 创建流处理的环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 设置程序并行度env.setParallelism(1)// 设置时间特征为事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 从 OrderLog.csv 文件中读取数据 ,并转换成样例类val orderEventStream: KeyedStream[OrderEvent, String] = env.readTextFile("YOUR_PATH\\OrderLog.csv").map(data => {// 样例数据 :  34731,pay,35jue34we,1558430849val dataArray: Array[String] = data.split(",")OrderEvent(dataArray(0).toLong,dataArray(1),dataArray(2),dataArray(3).toLong)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {override def extractTimestamp(element: OrderEvent): Long = element.eventTime * 1000L})       //  为数据流中的元素分配时间戳.filter(_.eventType != "") // 只过滤出pay事件.keyBy(_.txId)    // 根据 订单id 分组// 从 ReceiptLog.csv 文件中读取数据 ,并转换成样例类val receiptStream: KeyedStream[ReceiptEvent, String] = env.readTextFile("YOUR_PATH\\ReceiptLog.csv").map(data => {// 样例数据: 3hu3k2432,alipay,1558430848val dataArray: Array[String] = data.split(",")ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong)}).assignAscendingTimestamps(_.timestamp * 1000L) // 设置水印.keyBy(_.txId)   // 根据 txId 进行分组// 使用 join 连接两条流val resultStream: DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream.intervalJoin(receiptStream).between(Time.seconds(-5), Time.seconds(3))    .process(new OrderPayTxDetectWithJoin())resultStream.print()env.execute("order pay tx match with join job")}// 自定义 ProcessJoinFunctionclass OrderPayTxDetectWithJoin() extends ProcessJoinFunction[OrderEvent,ReceiptEvent,(OrderEvent,ReceiptEvent)]{override def processElement(left: OrderEvent, right: ReceiptEvent, ctx: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {collector.collect((left,right))}}
}

虽然这种方法看似代码简单了不少,但是也存在局限性。只能匹配对应上的,不能输出没有匹配上的。


小结

好了,当你看到这里的时候,意味着电商用户行为数据分析暂时完结了,不对,下一篇文章会为大家再总结一些电商常见指标的干货,敬请期待!!!考虑到部分小伙伴对于中间的部分代码有疑问,所以我每行都写上了注释,因此详细的过程笔者就不在这里详细赘述了。看了注释仍有疑惑的小伙伴们欢迎添加我的个人微信询问,互相学习,共同进步!你知道的越多,你不知道的也越多,我是Alice,我们下一期见!

文章持续更新,可以微信搜一搜「 猿人菌 」第一时间阅读,思维导图,大数据书籍,大数据高频面试题,海量一线大厂面经…期待您的关注!

基于 flink 的电商用户行为数据分析【8】| 订单支付实时监控相关推荐

  1. 大数据培训 | 电商用户行为分析之订单支付实时监控

    在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要.对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被 ...

  2. 基于 flink 的电商用户行为数据分析【9】| 电商常见指标汇总 + 项目总结

    本文已收录github:https://github.com/BigDataScholar/TheKingOfBigData,里面有大数据高频考点,Java一线大厂面试题资源,上百本免费电子书籍,作者 ...

  3. 【Flink】基于 Flink 的电商用户行为分析(二)

    1.市场营销商业指标统计分析 模块创建和数据准备 继续在 UserBehaviorAnalysis 下新建一个 maven module 作为子项目,命名为 MarketAnalysis. 这个模块中 ...

  4. 尚硅谷大数据技术之电商用户行为数据分析

    尚硅谷大数据技术之电商用户行为分析 第1章 项目整体介绍 1.1 电商的用户行为 电商平台中的用户行为频繁且较复杂,系统上线运行一段时间后,可以收集到大量的用户行为数据,进而利用大数据技术进行深入挖掘 ...

  5. 对电商用户的数据分析!

    一.以淘宝等为主的用户分析场景 以淘宝.京东和拼多多为典型的用户+商品消费场景,是国内用户量最多的业务场景,也是产生利润最多的业务场景.在这其中,客户拥有最大的自主权,如何有效地加以利用或辨别客户在行 ...

  6. 电商用户行为数据分析实战(MySQL +PowerBI)

    目录 一.项目概况 二.数据源 三.数据清洗 3.1 选择子集导入,匹配适合的数据类型 3.2 列重命名 3.3重复值处理 3.4 缺失值处理 3.5 异常值处理 从timestamps字段中提取日期 ...

  7. 淘宝电商用户行为数据分析及可视化—基于MySQL/Power BI(含代码)

    本项目以阿里巴巴移动电商平台的真实用户-商品行为数据为基础,使用MySQL进行数据清洗,以AARRR模型.RFM模型为基础展开分析,再用Power BI做可视化,最后从提升用户活跃度.促进商品成交.差 ...

  8. 淘宝电商用户行为数据分析及可视化—基于MySQL/Tableau/PPT(含分析报告及代码)

    本项目以阿里巴巴移动电商平台-淘宝APP的真实用户-商品行为数据 User Behavior Data on Taobao App 为基础,使用MySQL进行数据清洗,使用AARRR模型.RFM模型. ...

  9. 电商用户行为数据分析

    前言 本文针对淘宝app的运营数据,以行业常见指标对用户行为进行分析,包括UV.PV.新增用户分析.漏斗流失分析.留存分析.用户价值分析.复购分析等内容: 本文使用的分析工具以MySQL为主,涉及分组 ...

最新文章

  1. Java 匿名类也能使用构造函数
  2. stm32捕获占空比_基于STM32超声波避障小车
  3. SpringCloud 教程 | 第一篇: 服务的注册与发现Eureka
  4. B站升级HDR10真彩画质,开启4K+120帧+HDR创作新时代
  5. 【毕设狗】【单片机毕业设计】基于单片机的空气质量检测-仿真设计
  6. 计算机cad查询,电脑端如何快速查阅CAD图纸
  7. KindEditor上传图片和修改图片
  8. 入手kindle 3
  9. 无线网络安全技术复习重点
  10. 【PART 1】OAK-D+TurtleBot3机器人项目全解析:SLAM、ROS、深度图、点云。
  11. 图神经网络详解(四)
  12. 【Audio】声临其境——杜比音效介绍
  13. EI收录中国大陆期刊名录(2012年)
  14. 如何搭建Filecoin测试网挖矿节点 | 开发者专区系列01
  15. MySQL查询语句in子查询的优化
  16. WM8960调试记录
  17. Segment 2:Introduction Number Theory——Fermat and Euler【费马定理和欧拉定理】
  18. 高速接口之USB 3.0
  19. MQL4编程初探:从零开始学习EA编写
  20. 如何找到更老版本的anaconda和package

热门文章

  1. 从一线城市回家工作的体验
  2. Polychain重仓的Findora公链,想带领DeFi脱虚向实
  3. 基于树莓派的遥控小车
  4. 偏振光及其在摄影上的应用
  5. 金额数字转换(英文、中文)
  6. gitlab快速入门
  7. 创建一个node 项目,node 知识点
  8. android 高德地图定位圈,android ------ 实现高德定位并获取相应信息 ( 最新版高德SDK 和 Android SDK版本)...
  9. java 双列集合Map 万字详解
  10. 台积电股价突破万亿大关 创下台股历史新高