Flink_来自两条流的订单交易匹配(Connect, CoProcessFunction)
对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做合并处理 。这里我们利用connect 将两条流进行连接 , 然后用自定义的CoProcessFunction 进行处理。
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector// 定义到账事件样例类
case class ReceiptEvent(txId: String, payChannel: String, timestamp: Long)object TxMatch {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 1. 读取订单事件数据val orderEventStream = env.readTextFile("D:\\Mywork\\workspace\\Project_idea\\UserBehaviorAnalysis0903\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv").map( data => {val arr = data.split(",")OrderEvent(arr(0).toLong, arr(1), arr(2), arr(3).toLong)} ).assignAscendingTimestamps(_.timestamp * 1000L).filter(_.eventType == "pay").keyBy(_.txId)// 2. 读取到账事件数据val receiptEventStream = env.readTextFile("D:\\Mywork\\workspace\\Project_idea\\UserBehaviorAnalysis0903\\OrderPayDetect\\src\\main\\resources\\ReceiptLog.csv").map( data => {val arr = data.split(",")ReceiptEvent(arr(0), arr(1), arr(2).toLong)} ).assignAscendingTimestamps(_.timestamp * 1000L).keyBy(_.txId)// 3. 合并两条流,进行处理val resultStream = orderEventStream.connect(receiptEventStream).process(new TxPayMatchResult())resultStream.print("matched")resultStream.getSideOutput(new OutputTag[OrderEvent]("unmatched-pay")).print("unmatched pays")resultStream.getSideOutput(new OutputTag[ReceiptEvent]("unmatched-receipt")).print("unmatched receipts")env.execute()}
}class TxPayMatchResult() extends CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]{// 定义状态,保存当前交易对应的订单支付事件,和到账事件lazy val payEventState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay", classOf[OrderEvent]))lazy val receiptEventState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("receipt", classOf[ReceiptEvent]))// 侧输出流标签val unmatchedPayEventOutputTag = new OutputTag[OrderEvent]("unmatched-pay")val unmatchedReceiptEventOutputTag = new OutputTag[ReceiptEvent]("unmatched-receipt")override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {// 订单支付来了,要判断之前是否有到账事件val receipt = receiptEventState.value()if (receipt != null) {// 如果已经有receipt,正常输出匹配,清空状态out.collect((pay, receipt))receiptEventState.clear()payEventState.clear()} else {// 如果还没来,注册定时器开始等待5秒ctx.timerService().registerEventTimeTimer(pay.timestamp * 1000L + 5000L)// 更新状态payEventState.update(pay)}}override def processElement2(receipt: ReceiptEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {// 到账事件来了,要判断之前是否有pay事件val pay = payEventState.value()if (pay != null) {out.collect((pay, receipt))receiptEventState.clear()payEventState.clear()} else {// 如果还没来,注册定时器开始等待3秒ctx.timerService().registerEventTimeTimer(receipt.timestamp * 1000L + 3000L)receiptEventState.update(receipt)}}override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {// 定时器触发,判断状态中哪个还存在,就代表另一个没来,输出到侧输出流if (payEventState.value() != null){ctx.output(unmatchedPayEventOutputTag, payEventState.value())}if (receiptEventState.value() != null){ctx.output(unmatchedReceiptEventOutputTag, receiptEventState.value())}//清空状态receiptEventState.clear()payEventState.clear()}
}//输出数据
matched> (OrderEvent(34763,pay,sddf9809ew,1558431068),ReceiptEvent(sddf9809ew,alipay,1558430936))
matched> (OrderEvent(34764,pay,832jksmd9,1558431079),ReceiptEvent(832jksmd9,wechat,1558430938))
matched> (OrderEvent(34765,pay,m23sare32e,1558431082),ReceiptEvent(m23sare32e,wechat,1558430940))
matched> (OrderEvent(34766,pay,92nr903msa,1558431095),ReceiptEvent(92nr903msa,wechat,1558430944))
matched> (OrderEvent(34756,pay,9032n4fd2,1558431951),ReceiptEvent(9032n4fd2,wechat,1558430913))
unmatched receipts> ReceiptEvent(ewr342as4,wechat,1558430845)
unmatched receipts> ReceiptEvent(8fdsfae83,alipay,1558430850)
unmatched pays> OrderEvent(34731,pay,35jue34we,1558430849)
unmatched receipts> ReceiptEvent(sdafen9932,alipay,1558430949)
unmatched pays> OrderEvent(34768,pay,88snrn932,1558430950)
Flink_来自两条流的订单交易匹配(Connect, CoProcessFunction)相关推荐
- websocket 流式传输 交易订单更新
文章目录 1.获取Binance API和Secret 2. 流式订单更新 2.1. 生成监听键 2.2. Websocket 端点 2.3. 流媒体连接 2.4. 消息处理 2.4.1 订单更新 2 ...
- 两条水位线的业务需求分析-Interval JOIN方案(转载+自己分析整理)
虽然我们利用UnBounded的JOIN能解决訂單信息和付款信息join的问题,但是仔细分析用户需求,会发现这个需求场景订单信息和付款信息并不需要长期存储, 比如2018-12-27 14:22:22 ...
- 基于FPGA系统合成两条视频流实现3D视频效果
目录 1.概述 2.时钟架构 3.带锁定视频解码器的同步系统 4.异步视频系统 4.1.时钟三态模式 4.2.两条视频流中的数据对齐误差 4.3.行锁定摄像机对齐误差 4.4.不同的连接长度 4.5. ...
- 华为式创新与海尔式创新——两条道路考验中国制造
来源:企业管理杂志(ID:qyglzz) 作者:吴兴杰,中国管理科学研究院专家咨询委员会副主任.学术委员会委员.研究员 以华为为代表的聚合化创新之路非常难走而又不得不走,否则永远只能当二流甚至三流企业 ...
- 上海远丰:打破双11魔咒,让企业电商两条腿走路
今年以来,聚美优品.京东相继上市,特别是不久之前电商巨无霸阿里巴巴的上市更具有里程碑的意义.据国家商务部的统计数据显示,上半年我国电子商务交易额约为5.66万亿元,同比增长30.1%.网络零售市场交易 ...
- Q2复苏慢,华米科技的“两条腿”还行吗?
配图来自Canva 8月18日,华米科技发布了2020年第二季度财报.对于华米科技,外界一直非常关注其与小米的关系,以及其一直在尝试的"去小米化"效果. 在屡次被传与小米不和后,华 ...
- 一千台无盘工作站,电信网通两条光纤方案
一千台无盘工作站,电信网通两条光纤方案. 申明:此帖非我原创,朋友写的. 我的设备全部采用H3C,至于型号我觉得可选择的很多,我们这里主要讨论的是方案,所以还是以方案为主,设备的型号容后再议. 我的设 ...
- 两条链路实现负载均衡和容错的设计
假设该公司有两条专线,一条电信的,一条网通的线路:本实验用路由器R6 做的Telnet服务器,该服务器有绑定两个IP地址,这两个IP地址,分别是192.168.1.3 .192.168.1.4.公司希 ...
- 微软云服务再添新产品,这次来自两位华裔女科学家创办的AI公司
微软云服务再添新产品,这次来自两位华裔女科学家创办的AI公司 微软的Azure AI又添了一项新产品. 这次是一个用于反欺诈的AI平台,来自DataVisor. 微软全球金融服务副总裁Janet Le ...
最新文章
- 算法工程师必知必会10大基础算法!
- 操作系统:内存连续分配方式采用的几种算法及各自优劣
- 搜索Maven依赖资源_搜索Maven工件_搜索Maven构件_搜索依赖_搜索构件_搜索工件
- python调用linux命令输出结果,Python-运行shell命令并捕获输出
- 互联网晚报 | 1月26日 星期三 | 春晚正式入驻视频号;小红书合并社区与电商业务;中国电信5G消息正式商用...
- [Web开发] 微软的RSS协议扩展 - FeedSync 介绍 (2)
- antd form rules字数限制_Package - antd-form-rules
- linux转为root用户_[R] 展示linux文件树 - collapsibleTree
- ssm 项目记录用户操作日志和异常日志
- Android 84、gc、高德、百度、墨卡托地理坐标转换
- 家庭作业(贪心 + 并查集)
- Thematic与Continuous区别
- jzy3D从入门到弃坑_3使用jzy3D0.9画2D散点图--多条线条
- 正则验证加js大全,真的很给力!!!很全啊有木有
- Vue如何使用iconfont(阿里图标库)
- matlab悬置非线性位移计算公式,动力总成悬置系统运动包络及工况载荷计算方法...
- C语言基础代码(10题)
- 数据响应式的原理(简单理解 为了面试)
- ESP8266-Arduino编程实例-PCF8563实时时钟(RTC)驱动
- 【黑马程序员C++ STL】学习记录
热门文章
- 计算机发展方向图,计算机地图制图原理、特点及发展趋势
- java中负数取余数_数学 - java如何用负数进行模数计算?
- nvidia Multiple Process Service (MPS)
- 人和机器人的自然交互
- Canvas2D属性和方法
- ERROR: mysqld failed while attempting to check config
- 新手安装指南:一步一步在Windows安装苹果雪豹系统
- html文字于图片齐平,CSS控制图片和文字在同一行对齐显示
- 【工具使用】前端好用的测试工具总结
- python绘制边界等值线_pykrig克里金插值后绘制等值线图+边界外白化