java flink(二十七) 实战之电商 订单超时取消报警计算 CEP与ProcessFunction 对比
题目:根据订单文件,读取订单状态:创建-支付-取消,如果15分钟未支付,输出该订单报警。
1、首先利用CEP实现
目录结构:
文件内容:
文件内容转换成pojo包装类:
package Beans;public class OrderEvent {private Long orderId;private String eventType;private String txId;private Long timestamp;public OrderEvent() {}public OrderEvent(Long orderId, String eventType, String txId, Long timestamp) {this.orderId = orderId;this.eventType = eventType;this.txId = txId;this.timestamp = timestamp;}public Long getOrderId() {return orderId;}public void setOrderId(Long orderId) {this.orderId = orderId;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getTxId() {return txId;}public void setTxId(String txId) {this.txId = txId;}public Long getTimestamp() {return timestamp;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}@Overridepublic String toString() {return "OrderEvent{" +"orderId=" + orderId +", eventType='" + eventType + '\'' +", txId='" + txId + '\'' +", timestamp=" + timestamp +'}';}
}
报警信息包装类:
package Beans;public class OrderResult {private Long orderId;private String resultState;public OrderResult() {}public OrderResult(Long orderId, String resultState) {this.orderId = orderId;this.resultState = resultState;}public Long getOrderId() {return orderId;}public void setOrderId(Long orderId) {this.orderId = orderId;}public String getResultState() {return resultState;}public void setResultState(String resultState) {this.resultState = resultState;}@Overridepublic String toString() {return "OrderResult{" +"orderId=" + orderId +", resultState='" + resultState + '\'' +'}';}
}
CEP实现:
package Project;import Beans.OrderEvent;
import Beans.OrderResult;
import akka.stream.impl.QueueSink;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;import java.net.URL;
import java.util.List;
import java.util.Map;public class OrderpayTimeout {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//读取数据URL resource = OrderpayTimeout.class.getResource("/OrderLog.csv");DataStream<OrderEvent> orderEventStream = env.readTextFile(resource.getPath()).map(line -> {String[] fields = line.split(",");return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderEvent>() {@Overridepublic long extractAscendingTimestamp(OrderEvent orderEvent) {return orderEvent.getTimestamp()*1000L;}});//定义一个带时间限制的模式Pattern<OrderEvent, OrderEvent> orderPayPattern = Pattern.<OrderEvent>begin("create").where(new SimpleCondition<OrderEvent>() {@Overridepublic boolean filter(OrderEvent orderEvent) throws Exception {return "create".equals(orderEvent.getEventType());}}).followedBy("pay").where(new SimpleCondition<OrderEvent>() { //不用紧邻@Overridepublic boolean filter(OrderEvent orderEvent) throws Exception {return "pay".equals(orderEvent.getEventType());}}).within(Time.minutes(15));//定义侧输出流标签 用来表示超时时间OutputTag<OrderResult> orderResultOutputTag = new OutputTag<OrderResult>("order-timeout"){};//将pattern应用到输入输出流 得到patternStreamPatternStream<OrderEvent> patternStream = CEP.pattern(orderEventStream.keyBy(OrderEvent::getOrderId), orderPayPattern);//调用select实现对匹配复杂事件和超时时间的提取和处理SingleOutputStreamOperator<OrderResult> resultStream = patternStream.select(orderResultOutputTag, new OrderTimeoutSelect(), new OrderPaySelect());resultStream.print("payed normally");resultStream.getSideOutput(orderResultOutputTag).print("payed timeout");env.execute("order timeout detect job");}//实现自定义的超时事件处理函数public static class OrderTimeoutSelect implements PatternTimeoutFunction<OrderEvent, OrderResult>{@Overridepublic OrderResult timeout(Map<String, List<OrderEvent>> map, long l) throws Exception {Long timeoutOrderId = map.get("create").get(0).getOrderId();return new OrderResult(timeoutOrderId,"timeout"+l);}}//实现自定义的正常匹配事件处理函数public static class OrderPaySelect implements PatternSelectFunction<OrderEvent, OrderResult>{@Overridepublic OrderResult select(Map<String, List<OrderEvent>> map) throws Exception {Long payedOrderId = map.get("pay").get(0).getOrderId();return new OrderResult(payedOrderId,"payed success");}}
}
ProcessFunction实现:
package Project;import Beans.OrderEvent;
import Beans.OrderResult;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.net.URL;public class OrderTimeoutWithoutCep {//定义超时事件侧输出流标签private final static OutputTag<OrderResult> orderTimeoutTag = new OutputTag<OrderResult>("order-timeout") {};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//读取数据URL resource = OrderTimeoutWithoutCep.class.getResource("/OrderLog.csv");DataStream<OrderEvent> orderEventStream = env.readTextFile(resource.getPath()).map(line -> {String[] fields = line.split(",");return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderEvent>() {@Overridepublic long extractAscendingTimestamp(OrderEvent orderEvent) {return orderEvent.getTimestamp() * 1000L;}});//自定义处理函数 主流输出正常匹配订单 侧输出流输出超时报警SingleOutputStreamOperator<OrderResult> resultStream = orderEventStream.keyBy(OrderEvent::getOrderId).process(new OrderPayMatchDetect());resultStream.print("payed normally");resultStream.getSideOutput(orderTimeoutTag).print("timeout order");env.execute("order timeout detect without cep job");}//实现自定义KeyedProcessFunctionpublic static class OrderPayMatchDetect extends KeyedProcessFunction<Long, OrderEvent, OrderResult> {//定义状态 保存之前订单是否已经create payValueState<Boolean> isPayedState;ValueState<Boolean> isCreatedState;//保存定时器时间戳ValueState<Long> timerTsState;//注册状态@Overridepublic void open(Configuration parameters) throws Exception {isPayedState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-payed", Boolean.class, false));isCreatedState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-created", Boolean.class, false));timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));}@Overridepublic void processElement(OrderEvent orderEvent, Context context, Collector<OrderResult> collector) throws Exception {//取状态Boolean isPayed = isPayedState.value();Boolean isCreated = isCreatedState.value();Long timerTs = timerTsState.value();//判断当前时间类型if ("create".equals(orderEvent.getEventType())) {//如果是create 判断是否支付过if (isPayed) {//如果已经正常支付 输出正常匹配结果collector.collect(new OrderResult(orderEvent.getOrderId(), "payed successfully"));//清空状态isCreatedState.clear();isPayedState.clear();timerTsState.clear();context.timerService().deleteEventTimeTimer(timerTs);} else {//如果没有支付过 注册15分钟之后的定时器 开始等待支付Long ts = (orderEvent.getTimestamp() + 15 * 60) * 1000L;context.timerService().registerEventTimeTimer(ts);//更新状态timerTsState.update(ts);isCreatedState.update(true);}} else if ("pay".equals(orderEvent.getEventType())) {if (isCreated) {if (orderEvent.getTimestamp() * 1000L < timerTs) {collector.collect(new OrderResult(orderEvent.getOrderId(),"payed successfully"));} else {context.output(orderTimeoutTag, newOrderResult(orderEvent.getOrderId(), "payed but already timeout"));}isCreatedState.clear();timerTsState.clear();context.timerService().deleteEventTimeTimer(timerTs);} else {context.timerService().registerEventTimeTimer(orderEvent.getTimestamp() * 1000L);isPayedState.update(true);timerTsState.update(orderEvent.getTimestamp() * 1000L);}}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx,Collector<OrderResult> out) throws Exception {if (isPayedState.value()) {ctx.output(orderTimeoutTag,new OrderResult(ctx.getCurrentKey(), "already payed but not found created log"));} else {ctx.output(orderTimeoutTag,new OrderResult(ctx.getCurrentKey(), "order pay timeout"));}isPayedState.clear();isCreatedState.clear();timerTsState.clear();}}
}
java flink(二十七) 实战之电商 订单超时取消报警计算 CEP与ProcessFunction 对比相关推荐
- Gavin老师Transformer直播课感悟 - Rasa项目实战之电商零售智能业务对话机器人配置详解与Debugging演示(八十七)
本文继续围绕工业级业务对话平台和框架Rasa,对Rasa项目实战之电商零售智能业务对话机器人系统所使用的各项配置进行详细剖析,并通过debug模式来理解在下面展示的Rasa graph archite ...
- Gavin老师Transformer直播课感悟 - Rasa项目实战之电商零售智能业务对话机器人业务功能微服务解析与调试演示(八十二)
本文继续围绕工业级业务对话平台和框架Rasa,对Rasa项目实战之电商零售Customer Service智能业务对话机器人主要业务功能所使用的微服务进行解析,并通过Rasa Interactive的 ...
- spark项目实战:电商分析平台之各个范围Session步长、访问时长占比统计(需求一)
spark项目实战:电商分析平台之各个范围Session步长.访问时长占比统计(需求一) 项目基本信息,架构,需要一览 各个范围Session步长.访问时长占比统计概述 各个范围Session步长.访 ...
- Java电商平台-电商订单系统全解析
说明:Java电商平台-电商订单系统全解析主要讲解OMS的内容,设计,开发,架构等知识 今天分享将会分为以下三个环节来阐述: 1.订单系统的介绍 2.订单系统的解构 3.垂直电商订单系统设计思路 一. ...
- 微信小程序实战篇-电商(一)
哈喽,大家好,端午节过的怎么样啊,代码君可是没休息,专心的写电商文章哦,也是蛮拼的,如果对代码君认可的话,欢迎点赞转发,你们的点赞转发是对我最大的支持!好啦,言归正传,我们今天要讲解微信小程序的实战篇 ...
- spark项目实战:电商分析平台之项目概述
spark项目实战:电商分析平台之项目概述 目录 项目概述 程序架构分析 需求解析 初始代码和完成代码存放在github上面 1. 项目概述 在访问电商网站时,我们的一些访问行为会产生相应的埋点日志( ...
- 亿级流量电商详情页系统设计与实战-小型电商架构VS大型电商架构
一.电商详情页架构大致分析 电商网站里,大概可以说分成两种: 传统小型电商网站,主要采用页面静态化的架构方案. 大型电商网站,使用较复杂的一套架构方案,商品详情页的系统架构 -> 缓存架构 -& ...
- java计算机毕业设计O2O生鲜果蔬电商设计与实现源码+数据库+系统+lw文档
java计算机毕业设计O2O生鲜果蔬电商设计与实现源码+数据库+系统+lw文档 java计算机毕业设计O2O生鲜果蔬电商设计与实现源码+数据库+系统+lw文档 本源码技术栈: 项目架构:B/S架构 开 ...
- java毕业生设计新型农村消费贷电商平台计算机源码+系统+mysql+调试部署+lw
java毕业生设计新型农村消费贷电商平台计算机源码+系统+mysql+调试部署+lw java毕业生设计新型农村消费贷电商平台计算机源码+系统+mysql+调试部署+lw 本源码技术栈: 项目架构:B ...
- Gavin老师Transformer直播课感悟 - Rasa项目实战之电商零售对话机器人通过Rasa interactive进行问题修正与调试演示(八十四)
本文继续围绕工业级业务对话平台和框架Rasa,对Rasa项目实战之电商零售Customer Service智能业务对话机器人通过Rasa Interactive的调试来定位NLU的问题,并通过交互式模 ...
最新文章
- Vue body样式修改
- oracle 备份导出,oracle 怎么备份或导入导出表
- concurrenthashmap实现原理_Mybatis:PageHelper分页插件源码及原理剖析
- vb读出二进制文件,合并两个文件
- Android播放器之SurfaceView与GLSurfaceView
- java每个月某天,java – 查找一年中的某一天
- 【日本软件外包】设计书中常用到的文型
- 【HDU4312】Meeting point-2(切比雪夫距离和曼哈顿距离的转化+前缀和后缀和去绝对值)
- JAVA应用程序设计上机报告
- idea软件控制台Console里没有查找快捷键
- java实现短信发送
- 小程序运营推广的方法
- GetFLV.v9.1.1.8-kg-REPT
- L1-054 福到了
- 红楼梦评论--王国维
- 认识linux内核结构
- [解决]通常每个套接字地址只允许使用一次
- Java练习、每日一题、共100题
- 用论文写作平台Overleaf写中文论文
- 什么是“Ground truth”