聊聊flink的TimeCharacteristic
为什么80%的码农都做不了架构师?>>>
序
本文主要研究一下flink的TimeCharacteristic
TimeCharacteristic
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimeCharacteristic.java
/*** The time characteristic defines how the system determines time for time-dependent* order and operations that depend on time (such as time windows).*/
@PublicEvolving
public enum TimeCharacteristic {/*** Processing time for operators means that the operator uses the system clock of the machine* to determine the current time of the data stream. Processing-time windows trigger based* on wall-clock time and include whatever elements happen to have arrived at the operator at* that point in time.** <p>Using processing time for window operations results in general in quite non-deterministic* results, because the contents of the windows depends on the speed in which elements arrive.* It is, however, the cheapest method of forming windows and the method that introduces the* least latency.*/ProcessingTime,/*** Ingestion time means that the time of each individual element in the stream is determined* when the element enters the Flink streaming data flow. Operations like windows group the* elements based on that time, meaning that processing speed within the streaming dataflow* does not affect windowing, but only the speed at which sources receive elements.** <p>Ingestion time is often a good compromise between processing time and event time.* It does not need and special manual form of watermark generation, and events are typically* not too much out-or-order when they arrive at operators; in fact, out-of-orderness can* only be introduced by streaming shuffles or split/join/union operations. The fact that* elements are not very much out-of-order means that the latency increase is moderate,* compared to event* time.*/IngestionTime,/*** Event time means that the time of each individual element in the stream (also called event)* is determined by the event's individual custom timestamp. These timestamps either exist in* the elements from before they entered the Flink streaming dataflow, or are user-assigned at* the sources. The big implication of this is that it allows for elements to arrive in the* sources and in all operators out of order, meaning that elements with earlier timestamps may* arrive after elements with later timestamps.** <p>Operators that window or order data with respect to event time must buffer data until they* can be sure that all timestamps for a certain time interval have been received. This is* handled by the so called "time watermarks".** <p>Operations based on event time are very predictable - the result of windowing operations* is typically identical no matter when the window is executed and how fast the streams* operate. At the same time, the buffering and tracking of event time is also costlier than* operating with processing time, and typically also introduces more latency. The amount of* extra cost depends mostly on how much out of order the elements arrive, i.e., how long the* time span between the arrival of early and late elements is. With respect to the* "time watermarks", this means that the cost typically depends on how early or late the* watermarks can be generated for their timestamp.** <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the* event's original time, rather than the time assigned at the data source. Practically, that* means that event time has generally more meaning, but also that it takes longer to determine* that all elements for a certain time have arrived.*/EventTime
}
- ProcessingTime是以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间
- IngestionTime是以数据进入flink streaming data flow的时间为准
- EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
区别
各个时间的区别如上图
实例
public static void main(String[] args) throws Exception {final int popThreshold = 20; // threshold for popular places// set up streaming execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.getConfig().setAutoWatermarkInterval(1000);// configure the Kafka consumerProperties kafkaProps = new Properties();kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);// always read the Kafka topic from the startkafkaProps.setProperty("auto.offset.reset", "earliest");// create a Kafka consumerFlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>("cleansedRides",new TaxiRideSchema(),kafkaProps);// assign a timestamp extractor to the consumerconsumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());// create a TaxiRide data streamDataStream<TaxiRide> rides = env.addSource(consumer);// find popular placesDataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides// match ride to grid cell and event type (start or end).map(new GridCellMatcher())// partition by cell id and event type.keyBy(0, 1)// build sliding window.timeWindow(Time.minutes(15), Time.minutes(5))// count ride events in window.apply(new RideCounter())// filter by popularity threshold.filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))// map grid cell to coordinates.map(new GridToCoordinates());popularPlaces.print();// execute the transformation pipelineenv.execute("Popular Places from Kafka");}/*** Assigns timestamps to TaxiRide records.* Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.*/public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {public TaxiRideTSExtractor() {super(Time.seconds(MAX_EVENT_DELAY));}@Overridepublic long extractTimestamp(TaxiRide ride) {if (ride.isStart) {return ride.startTime.getMillis();}else {return ride.endTime.getMillis();}}}
- 这里消费kafka的时候setStreamTimeCharacteristic为TimeCharacteristic.EventTime,同时assignTimestampsAndWatermarks指定为TaxiRideTSExtractor,它继承了BoundedOutOfOrdernessTimestampExtractor,这里的extractTimestamp根据ride的start与否返回ride.startTime.getMillis()或者ride.endTime.getMillis(),来自定义了eventTime
小结
- flink的TimeCharacteristic枚举定义了三类值,分别是ProcessingTime、IngestionTime、EventTime
- ProcessingTime是以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间;IngestionTime是以数据进入flink streaming data flow的时间为准;EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
- 指定为EventTime的source需要自己定义event time以及emit watermark,或者在source之外通过assignTimestampsAndWatermarks在程序手工指定
doc
- Event Time
转载于:https://my.oschina.net/go4it/blog/2989828
聊聊flink的TimeCharacteristic相关推荐
- flink的TimeCharacteristic(转载)
flink的TimeCharacteristic枚举定义了三类值,分别是ProcessingTime.IngestionTime.EventTime ProcessingTime是以operator处 ...
- 聊聊flink的FsStateBackend
序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...
- 聊聊flink的StateTtlConfig
序 本文主要研究一下flink的StateTtlConfig 实例 import org.apache.flink.api.common.state.StateTtlConfig; import or ...
- 聊聊flink Table的ScalarFunction
序 本文主要研究一下flink Table的ScalarFunction 实例 public class HashCode extends ScalarFunction {private int fa ...
- 聊聊flink的Async I/O
序 本文主要研究一下flink的Async I/O 实例 // This example implements the asynchronous request and callback with F ...
- 聊聊flink的HistoryServer
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...
- 聊聊flink JobManager的heap大小设置
序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...
- 聊聊flink的InternalTimeServiceManager
序 本文主要研究一下flink的InternalTimeServiceManager InternalTimeServiceManager flink-streaming-java_2.11-1.7. ...
- 聊聊flink的AscendingTimestampExtractor
序 本文主要研究一下flink的AscendingTimestampExtractor AscendingTimestampExtractor flink-streaming-java_2.11-1. ...
最新文章
- Visual Studio2005无法启动web调试的真正原因
- Django自带的加密算法及加密模块
- 树转化为二叉树_森林转化为二叉树(详解版)
- linux编译安装madam,linux 下 使用 mdadm 创建阵列
- linux中group命令详解,linux groupmod命令参数及用法详解
- php prepare 批量,PreparedStatement批处理
- P5732 【深基5.习7】杨辉三角(python3实现)
- linux登录交换机备份脚本,如何从Linux上备份和恢复许多Cisco路由器和交换机的配置?...
- php java session共享_PHP实现多服务器session共享之NFS共享
- linux vi编辑撤销,vi撤销命令(u和U),撤销上一次的操作
- java 中 枚举 大括号 用法
- Vue2源码学习笔记 - 12.响应式原理—Dep 类详解
- python实现税后工资_Python实现扣除个人税后的工资计算器示例
- 基于LPC1114的闪烁小灯
- verilog:part select
- MFC中CDC *PDC hDC 等等及Wnd的区别
- 怎么用python画一个皮卡丘,用python画皮卡丘的代码
- python第四次作业——陈灵院
- 全球网脑系列丛书找出版合作
- Cris 带你快速入门 Flink
热门文章
- 便携式不锈钢管道焊接机器人_304不锈钢管居然可以发黑!?
- JAVA基础7-封装(1)
- top在html5里什么意思,html中的scrolltop是什么意思
- javascript 回调函数
- windows10 中 python3 离线 安装包,没有 网络 的 情况下 安装 whl包
- python的代码编译、代码打包方法
- 深度抠图--Deep Image Matting
- oracle 开窗子句,分析函数和开窗函数
- oracle存过传参,oracle 存储过程 传参数,如果是希望传参值也被利用起来,存储过程里面虽有IN,但OUT必不可少。...
- java项目怎样提高性能_从代码的角度谈如何优化JAVA代码以提高性能【初、中级程序员必看】...