对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做合并处理 。这里我们利用connect 将两条流进行连接 , 然后用自定义的CoProcessFunction 进行处理

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector// 定义到账事件样例类
case class ReceiptEvent(txId: String, payChannel: String, timestamp: Long)object TxMatch {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 1. 读取订单事件数据val orderEventStream = env.readTextFile("D:\\Mywork\\workspace\\Project_idea\\UserBehaviorAnalysis0903\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv").map( data => {val arr = data.split(",")OrderEvent(arr(0).toLong, arr(1), arr(2), arr(3).toLong)} ).assignAscendingTimestamps(_.timestamp * 1000L).filter(_.eventType == "pay").keyBy(_.txId)// 2. 读取到账事件数据val receiptEventStream = env.readTextFile("D:\\Mywork\\workspace\\Project_idea\\UserBehaviorAnalysis0903\\OrderPayDetect\\src\\main\\resources\\ReceiptLog.csv").map( data => {val arr = data.split(",")ReceiptEvent(arr(0), arr(1), arr(2).toLong)} ).assignAscendingTimestamps(_.timestamp * 1000L).keyBy(_.txId)// 3. 合并两条流,进行处理val resultStream = orderEventStream.connect(receiptEventStream).process(new TxPayMatchResult())resultStream.print("matched")resultStream.getSideOutput(new OutputTag[OrderEvent]("unmatched-pay")).print("unmatched pays")resultStream.getSideOutput(new OutputTag[ReceiptEvent]("unmatched-receipt")).print("unmatched receipts")env.execute()}
}class TxPayMatchResult() extends CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]{// 定义状态,保存当前交易对应的订单支付事件,和到账事件lazy val payEventState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay", classOf[OrderEvent]))lazy val receiptEventState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("receipt", classOf[ReceiptEvent]))// 侧输出流标签val unmatchedPayEventOutputTag = new OutputTag[OrderEvent]("unmatched-pay")val unmatchedReceiptEventOutputTag = new OutputTag[ReceiptEvent]("unmatched-receipt")override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {// 订单支付来了,要判断之前是否有到账事件val receipt = receiptEventState.value()if (receipt != null) {// 如果已经有receipt,正常输出匹配,清空状态out.collect((pay, receipt))receiptEventState.clear()payEventState.clear()} else {// 如果还没来,注册定时器开始等待5秒ctx.timerService().registerEventTimeTimer(pay.timestamp * 1000L + 5000L)// 更新状态payEventState.update(pay)}}override def processElement2(receipt: ReceiptEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {// 到账事件来了,要判断之前是否有pay事件val pay = payEventState.value()if (pay != null) {out.collect((pay, receipt))receiptEventState.clear()payEventState.clear()} else {// 如果还没来,注册定时器开始等待3秒ctx.timerService().registerEventTimeTimer(receipt.timestamp * 1000L + 3000L)receiptEventState.update(receipt)}}override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {// 定时器触发,判断状态中哪个还存在,就代表另一个没来,输出到侧输出流if (payEventState.value() != null){ctx.output(unmatchedPayEventOutputTag, payEventState.value())}if (receiptEventState.value() != null){ctx.output(unmatchedReceiptEventOutputTag, receiptEventState.value())}//清空状态receiptEventState.clear()payEventState.clear()}
}//输出数据
matched> (OrderEvent(34763,pay,sddf9809ew,1558431068),ReceiptEvent(sddf9809ew,alipay,1558430936))
matched> (OrderEvent(34764,pay,832jksmd9,1558431079),ReceiptEvent(832jksmd9,wechat,1558430938))
matched> (OrderEvent(34765,pay,m23sare32e,1558431082),ReceiptEvent(m23sare32e,wechat,1558430940))
matched> (OrderEvent(34766,pay,92nr903msa,1558431095),ReceiptEvent(92nr903msa,wechat,1558430944))
matched> (OrderEvent(34756,pay,9032n4fd2,1558431951),ReceiptEvent(9032n4fd2,wechat,1558430913))
unmatched receipts> ReceiptEvent(ewr342as4,wechat,1558430845)
unmatched receipts> ReceiptEvent(8fdsfae83,alipay,1558430850)
unmatched pays> OrderEvent(34731,pay,35jue34we,1558430849)
unmatched receipts> ReceiptEvent(sdafen9932,alipay,1558430949)
unmatched pays> OrderEvent(34768,pay,88snrn932,1558430950)

Flink_来自两条流的订单交易匹配(Connect, CoProcessFunction)相关推荐

  1. websocket 流式传输 交易订单更新

    文章目录 1.获取Binance API和Secret 2. 流式订单更新 2.1. 生成监听键 2.2. Websocket 端点 2.3. 流媒体连接 2.4. 消息处理 2.4.1 订单更新 2 ...

  2. 两条水位线的业务需求分析-Interval JOIN方案(转载+自己分析整理)

    虽然我们利用UnBounded的JOIN能解决訂單信息和付款信息join的问题,但是仔细分析用户需求,会发现这个需求场景订单信息和付款信息并不需要长期存储, 比如2018-12-27 14:22:22 ...

  3. 基于FPGA系统合成两条视频流实现3D视频效果

    目录 1.概述 2.时钟架构 3.带锁定视频解码器的同步系统 4.异步视频系统 4.1.时钟三态模式 4.2.两条视频流中的数据对齐误差 4.3.行锁定摄像机对齐误差 4.4.不同的连接长度 4.5. ...

  4. 华为式创新与海尔式创新——两条道路考验中国制造

    来源:企业管理杂志(ID:qyglzz) 作者:吴兴杰,中国管理科学研究院专家咨询委员会副主任.学术委员会委员.研究员 以华为为代表的聚合化创新之路非常难走而又不得不走,否则永远只能当二流甚至三流企业 ...

  5. 上海远丰:打破双11魔咒,让企业电商两条腿走路

    今年以来,聚美优品.京东相继上市,特别是不久之前电商巨无霸阿里巴巴的上市更具有里程碑的意义.据国家商务部的统计数据显示,上半年我国电子商务交易额约为5.66万亿元,同比增长30.1%.网络零售市场交易 ...

  6. Q2复苏慢,华米科技的“两条腿”还行吗?

    配图来自Canva 8月18日,华米科技发布了2020年第二季度财报.对于华米科技,外界一直非常关注其与小米的关系,以及其一直在尝试的"去小米化"效果. 在屡次被传与小米不和后,华 ...

  7. 一千台无盘工作站,电信网通两条光纤方案

    一千台无盘工作站,电信网通两条光纤方案. 申明:此帖非我原创,朋友写的. 我的设备全部采用H3C,至于型号我觉得可选择的很多,我们这里主要讨论的是方案,所以还是以方案为主,设备的型号容后再议. 我的设 ...

  8. 两条链路实现负载均衡和容错的设计

    假设该公司有两条专线,一条电信的,一条网通的线路:本实验用路由器R6 做的Telnet服务器,该服务器有绑定两个IP地址,这两个IP地址,分别是192.168.1.3 .192.168.1.4.公司希 ...

  9. 微软云服务再添新产品,这次来自两位华裔女科学家创办的AI公司

    微软云服务再添新产品,这次来自两位华裔女科学家创办的AI公司 微软的Azure AI又添了一项新产品. 这次是一个用于反欺诈的AI平台,来自DataVisor. 微软全球金融服务副总裁Janet Le ...

最新文章

  1. 算法工程师必知必会10大基础算法!
  2. 操作系统:内存连续分配方式采用的几种算法及各自优劣
  3. 搜索Maven依赖资源_搜索Maven工件_搜索Maven构件_搜索依赖_搜索构件_搜索工件
  4. python调用linux命令输出结果,Python-运行shell命令并捕获输出
  5. 互联网晚报 | 1月26日 星期三 | 春晚正式入驻视频号;小红书合并社区与电商业务;中国电信5G消息正式商用...
  6. [Web开发] 微软的RSS协议扩展 - FeedSync 介绍 (2)
  7. antd form rules字数限制_Package - antd-form-rules
  8. linux转为root用户_[R] 展示linux文件树 - collapsibleTree
  9. ssm 项目记录用户操作日志和异常日志
  10. Android 84、gc、高德、百度、墨卡托地理坐标转换
  11. 家庭作业(贪心 + 并查集)
  12. Thematic与Continuous区别
  13. jzy3D从入门到弃坑_3使用jzy3D0.9画2D散点图--多条线条
  14. 正则验证加js大全,真的很给力!!!很全啊有木有
  15. Vue如何使用iconfont(阿里图标库)
  16. matlab悬置非线性位移计算公式,动力总成悬置系统运动包络及工况载荷计算方法...
  17. C语言基础代码(10题)
  18. 数据响应式的原理(简单理解 为了面试)
  19. ESP8266-Arduino编程实例-PCF8563实时时钟(RTC)驱动
  20. 【黑马程序员C++ STL】学习记录

热门文章

  1. 计算机发展方向图,计算机地图制图原理、特点及发展趋势
  2. java中负数取余数_数学 - java如何用负数进行模数计算?
  3. nvidia Multiple Process Service (MPS)
  4. 人和机器人的自然交互
  5. Canvas2D属性和方法
  6. ERROR: mysqld failed while attempting to check config
  7. 新手安装指南:一步一步在Windows安装苹果雪豹系统
  8. html文字于图片齐平,CSS控制图片和文字在同一行对齐显示
  9. 【工具使用】前端好用的测试工具总结
  10. python绘制边界等值线_pykrig克里金插值后绘制等值线图+边界外白化