为什么80%的码农都做不了架构师?>>>   

本文主要研究一下flink的TimeCharacteristic

TimeCharacteristic

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimeCharacteristic.java

/*** The time characteristic defines how the system determines time for time-dependent* order and operations that depend on time (such as time windows).*/
@PublicEvolving
public enum TimeCharacteristic {/*** Processing time for operators means that the operator uses the system clock of the machine* to determine the current time of the data stream. Processing-time windows trigger based* on wall-clock time and include whatever elements happen to have arrived at the operator at* that point in time.** <p>Using processing time for window operations results in general in quite non-deterministic* results, because the contents of the windows depends on the speed in which elements arrive.* It is, however, the cheapest method of forming windows and the method that introduces the* least latency.*/ProcessingTime,/*** Ingestion time means that the time of each individual element in the stream is determined* when the element enters the Flink streaming data flow. Operations like windows group the* elements based on that time, meaning that processing speed within the streaming dataflow* does not affect windowing, but only the speed at which sources receive elements.** <p>Ingestion time is often a good compromise between processing time and event time.* It does not need and special manual form of watermark generation, and events are typically* not too much out-or-order when they arrive at operators; in fact, out-of-orderness can* only be introduced by streaming shuffles or split/join/union operations. The fact that* elements are not very much out-of-order means that the latency increase is moderate,* compared to event* time.*/IngestionTime,/*** Event time means that the time of each individual element in the stream (also called event)* is determined by the event's individual custom timestamp. These timestamps either exist in* the elements from before they entered the Flink streaming dataflow, or are user-assigned at* the sources. The big implication of this is that it allows for elements to arrive in the* sources and in all operators out of order, meaning that elements with earlier timestamps may* arrive after elements with later timestamps.** <p>Operators that window or order data with respect to event time must buffer data until they* can be sure that all timestamps for a certain time interval have been received. This is* handled by the so called "time watermarks".** <p>Operations based on event time are very predictable - the result of windowing operations* is typically identical no matter when the window is executed and how fast the streams* operate. At the same time, the buffering and tracking of event time is also costlier than* operating with processing time, and typically also introduces more latency. The amount of* extra cost depends mostly on how much out of order the elements arrive, i.e., how long the* time span between the arrival of early and late elements is. With respect to the* "time watermarks", this means that the cost typically depends on how early or late the* watermarks can be generated for their timestamp.** <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the* event's original time, rather than the time assigned at the data source. Practically, that* means that event time has generally more meaning, but also that it takes longer to determine* that all elements for a certain time have arrived.*/EventTime
}
  • ProcessingTime是以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间
  • IngestionTime是以数据进入flink streaming data flow的时间为准
  • EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段

区别

各个时间的区别如上图

实例

   public static void main(String[] args) throws Exception {final int popThreshold = 20; // threshold for popular places// set up streaming execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.getConfig().setAutoWatermarkInterval(1000);// configure the Kafka consumerProperties kafkaProps = new Properties();kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);// always read the Kafka topic from the startkafkaProps.setProperty("auto.offset.reset", "earliest");// create a Kafka consumerFlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>("cleansedRides",new TaxiRideSchema(),kafkaProps);// assign a timestamp extractor to the consumerconsumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());// create a TaxiRide data streamDataStream<TaxiRide> rides = env.addSource(consumer);// find popular placesDataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides// match ride to grid cell and event type (start or end).map(new GridCellMatcher())// partition by cell id and event type.keyBy(0, 1)// build sliding window.timeWindow(Time.minutes(15), Time.minutes(5))// count ride events in window.apply(new RideCounter())// filter by popularity threshold.filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))// map grid cell to coordinates.map(new GridToCoordinates());popularPlaces.print();// execute the transformation pipelineenv.execute("Popular Places from Kafka");}/*** Assigns timestamps to TaxiRide records.* Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.*/public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {public TaxiRideTSExtractor() {super(Time.seconds(MAX_EVENT_DELAY));}@Overridepublic long extractTimestamp(TaxiRide ride) {if (ride.isStart) {return ride.startTime.getMillis();}else {return ride.endTime.getMillis();}}}
  • 这里消费kafka的时候setStreamTimeCharacteristic为TimeCharacteristic.EventTime,同时assignTimestampsAndWatermarks指定为TaxiRideTSExtractor,它继承了BoundedOutOfOrdernessTimestampExtractor,这里的extractTimestamp根据ride的start与否返回ride.startTime.getMillis()或者ride.endTime.getMillis(),来自定义了eventTime

小结

  • flink的TimeCharacteristic枚举定义了三类值,分别是ProcessingTime、IngestionTime、EventTime
  • ProcessingTime是以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间;IngestionTime是以数据进入flink streaming data flow的时间为准;EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
  • 指定为EventTime的source需要自己定义event time以及emit watermark,或者在source之外通过assignTimestampsAndWatermarks在程序手工指定

doc

  • Event Time

转载于:https://my.oschina.net/go4it/blog/2989828

聊聊flink的TimeCharacteristic相关推荐

  1. flink的TimeCharacteristic(转载)

    flink的TimeCharacteristic枚举定义了三类值,分别是ProcessingTime.IngestionTime.EventTime ProcessingTime是以operator处 ...

  2. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  3. 聊聊flink的StateTtlConfig

    序 本文主要研究一下flink的StateTtlConfig 实例 import org.apache.flink.api.common.state.StateTtlConfig; import or ...

  4. 聊聊flink Table的ScalarFunction

    序 本文主要研究一下flink Table的ScalarFunction 实例 public class HashCode extends ScalarFunction {private int fa ...

  5. 聊聊flink的Async I/O

    序 本文主要研究一下flink的Async I/O 实例 // This example implements the asynchronous request and callback with F ...

  6. 聊聊flink的HistoryServer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...

  7. 聊聊flink JobManager的heap大小设置

    序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...

  8. 聊聊flink的InternalTimeServiceManager

    序 本文主要研究一下flink的InternalTimeServiceManager InternalTimeServiceManager flink-streaming-java_2.11-1.7. ...

  9. 聊聊flink的AscendingTimestampExtractor

    序 本文主要研究一下flink的AscendingTimestampExtractor AscendingTimestampExtractor flink-streaming-java_2.11-1. ...

最新文章

  1. Visual Studio2005无法启动web调试的真正原因
  2. Django自带的加密算法及加密模块
  3. 树转化为二叉树_森林转化为二叉树(详解版)
  4. linux编译安装madam,linux 下 使用 mdadm 创建阵列
  5. linux中group命令详解,linux groupmod命令参数及用法详解
  6. php prepare 批量,PreparedStatement批处理
  7. P5732 【深基5.习7】杨辉三角(python3实现)
  8. linux登录交换机备份脚本,如何从Linux上备份和恢复许多Cisco路由器和交换机的配置?...
  9. php java session共享_PHP实现多服务器session共享之NFS共享
  10. linux vi编辑撤销,vi撤销命令(u和U),撤销上一次的操作
  11. java 中 枚举 大括号 用法
  12. Vue2源码学习笔记 - 12.响应式原理—Dep 类详解
  13. python实现税后工资_Python实现扣除个人税后的工资计算器示例
  14. 基于LPC1114的闪烁小灯
  15. verilog:part select
  16. MFC中CDC *PDC hDC 等等及Wnd的区别
  17. 怎么用python画一个皮卡丘,用python画皮卡丘的代码
  18. python第四次作业——陈灵院
  19. 全球网脑系列丛书找出版合作
  20. Cris 带你快速入门 Flink

热门文章

  1. 便携式不锈钢管道焊接机器人_304不锈钢管居然可以发黑!?
  2. JAVA基础7-封装(1)
  3. top在html5里什么意思,html中的scrolltop是什么意思
  4. javascript 回调函数
  5. windows10 中 python3 离线 安装包,没有 网络 的 情况下 安装 whl包
  6. python的代码编译、代码打包方法
  7. 深度抠图--Deep Image Matting
  8. oracle 开窗子句,分析函数和开窗函数
  9. oracle存过传参,oracle 存储过程 传参数,如果是希望传参值也被利用起来,存储过程里面虽有IN,但OUT必不可少。...
  10. java项目怎样提高性能_从代码的角度谈如何优化JAVA代码以提高性能【初、中级程序员必看】...