本文主要研究一下flink的AscendingTimestampExtractor

AscendingTimestampExtractor

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java

/*** A timestamp assigner and watermark generator for streams where timestamps are monotonously* ascending. In this case, the local watermarks for the streams are easy to generate, because* they strictly follow the timestamps.** @param <T> The type of the elements that this function can extract timestamps from*/
@PublicEvolving
public abstract class AscendingTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {private static final long serialVersionUID = 1L;/** The current timestamp. */private long currentTimestamp = Long.MIN_VALUE;/** Handler that is called when timestamp monotony is violated. */private MonotonyViolationHandler violationHandler = new LoggingHandler();/*** Extracts the timestamp from the given element. The timestamp must be monotonically increasing.** @param element The element that the timestamp is extracted from.* @return The new timestamp.*/public abstract long extractAscendingTimestamp(T element);/*** Sets the handler for violations to the ascending timestamp order.** @param handler The violation handler to use.* @return This extractor.*/public AscendingTimestampExtractor<T> withViolationHandler(MonotonyViolationHandler handler) {this.violationHandler = requireNonNull(handler);return this;}// ------------------------------------------------------------------------@Overridepublic final long extractTimestamp(T element, long elementPrevTimestamp) {final long newTimestamp = extractAscendingTimestamp(element);if (newTimestamp >= this.currentTimestamp) {this.currentTimestamp = newTimestamp;return newTimestamp;} else {violationHandler.handleViolation(newTimestamp, this.currentTimestamp);return newTimestamp;}}@Overridepublic final Watermark getCurrentWatermark() {return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);}//......
}
复制代码
  • AscendingTimestampExtractor抽象类实现AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法,同时声明抽象方法extractAscendingTimestamp供子类实现
  • AscendingTimestampExtractor适用于elements的时间在每个parallel task里头是单调递增(timestamp monotony)的场景,extractTimestamp这里先是调用子类实现的extractAscendingTimestamp方法从element提取newTimestamp,然后返回,对于违反timestamp monotony的,这里调用MonotonyViolationHandler进行处理
  • getCurrentWatermark方法在currentTimestamp不为Long.MIN_VALUE时返回Watermark(currentTimestamp - 1)

MonotonyViolationHandler

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java

 /*** Interface for handlers that handle violations of the monotonous ascending timestamps* property.*/public interface MonotonyViolationHandler extends java.io.Serializable {/*** Called when the property of monotonously ascending timestamps is violated, i.e.,* when {@code elementTimestamp < lastTimestamp}.** @param elementTimestamp The timestamp of the current element.* @param lastTimestamp The last timestamp.*/void handleViolation(long elementTimestamp, long lastTimestamp);}/*** Handler that does nothing when timestamp monotony is violated.*/public static final class IgnoringHandler implements MonotonyViolationHandler {private static final long serialVersionUID = 1L;@Overridepublic void handleViolation(long elementTimestamp, long lastTimestamp) {}}/*** Handler that fails the program when timestamp monotony is violated.*/public static final class FailingHandler implements MonotonyViolationHandler {private static final long serialVersionUID = 1L;@Overridepublic void handleViolation(long elementTimestamp, long lastTimestamp) {throw new RuntimeException("Ascending timestamps condition violated. Element timestamp "+ elementTimestamp + " is smaller than last timestamp " + lastTimestamp);}}/*** Handler that only logs violations of timestamp monotony, on WARN log level.*/public static final class LoggingHandler implements MonotonyViolationHandler {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(AscendingTimestampExtractor.class);@Overridepublic void handleViolation(long elementTimestamp, long lastTimestamp) {LOG.warn("Timestamp monotony violated: {} < {}", elementTimestamp, lastTimestamp);}}
复制代码
  • MonotonyViolationHandler继承了Serializable,它定义了handleViolation方法,这个接口内置有三个实现类,分别是IgnoringHandler、FailingHandler、FailingHandler
  • IgnoringHandler的handleViolation方法不做任何处理;FailingHandler的handleViolation会抛出RuntimeException;LoggingHandler的handleViolation方法会打印warn日志
  • AscendingTimestampExtractor默认使用的是LoggingHandler,也可以通过withViolationHandler方法来进行设置

实例

    @Testpublic void testWithFailingHandler() {AscendingTimestampExtractor<Long> extractor = (new AscendingTimestampExtractorTest.LongExtractor()).withViolationHandler(new FailingHandler());this.runValidTests(extractor);try {this.runInvalidTest(extractor);Assert.fail("should fail with an exception");} catch (Exception var3) {;}}private void runValidTests(AscendingTimestampExtractor<Long> extractor) {Assert.assertEquals(13L, extractor.extractTimestamp(13L, -1L));Assert.assertEquals(13L, extractor.extractTimestamp(13L, 0L));Assert.assertEquals(14L, extractor.extractTimestamp(14L, 0L));Assert.assertEquals(20L, extractor.extractTimestamp(20L, 0L));Assert.assertEquals(20L, extractor.extractTimestamp(20L, 0L));Assert.assertEquals(20L, extractor.extractTimestamp(20L, 0L));Assert.assertEquals(500L, extractor.extractTimestamp(500L, 0L));Assert.assertEquals(9223372036854775806L, extractor.extractTimestamp(9223372036854775806L, 99999L));}private void runInvalidTest(AscendingTimestampExtractor<Long> extractor) {Assert.assertEquals(1000L, extractor.extractTimestamp(1000L, 100L));Assert.assertEquals(1000L, extractor.extractTimestamp(1000L, 100L));Assert.assertEquals(999L, extractor.extractTimestamp(999L, 100L));}private static class LongExtractor extends AscendingTimestampExtractor<Long> {private static final long serialVersionUID = 1L;private LongExtractor() {}public long extractAscendingTimestamp(Long element) {return element;}}
复制代码
  • 这里使用withViolationHandler设置了violationHandler为FailingHandler,在遇到999这个时间的时候,由于比之前的1000小,因而会调用MonotonyViolationHandler.handleViolation方法

小结

  • flink为了方便开发提供了几个内置的Pre-defined Timestamp Extractors / Watermark Emitters,其中一个就是AscendingTimestampExtractor
  • AscendingTimestampExtractor抽象类实现AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法,同时声明抽象方法extractAscendingTimestamp供子类实现
  • AscendingTimestampExtractor适用于elements的时间在每个parallel task里头是单调递增的,对于违反timestamp monotony的,这里调用MonotonyViolationHandler的handleViolation方法进行处理;MonotonyViolationHandler继承了Serializable,它定义了handleViolation方法,这个接口内置有三个实现类,分别是IgnoringHandler、FailingHandler、FailingHandler

doc

  • Pre-defined Timestamp Extractors / Watermark Emitters

聊聊flink的AscendingTimestampExtractor相关推荐

  1. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  2. 聊聊flink的HistoryServer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...

  3. 聊聊flink的TimeCharacteristic

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...

  4. 聊聊flink JobManager的heap大小设置

    序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...

  5. 聊聊flink的InternalTimeServiceManager

    序 本文主要研究一下flink的InternalTimeServiceManager InternalTimeServiceManager flink-streaming-java_2.11-1.7. ...

  6. 聊聊flink的StateTtlConfig

    序 本文主要研究一下flink的StateTtlConfig 实例 import org.apache.flink.api.common.state.StateTtlConfig; import or ...

  7. 聊聊flink的CheckpointScheduler

    序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...

  8. 聊聊flink的NetworkEnvironmentConfiguration

    序 本文主要研究一下flink的NetworkEnvironmentConfiguration NetworkEnvironmentConfiguration flink-1.7.2/flink-ru ...

  9. 聊聊flink Table的groupBy操作

    序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...

最新文章

  1. linux 添加banner,OpenWrt:Linux下生成banner
  2. 飞机游戏项目完整代码(详解JAVA300集)
  3. 关于obs的录制时黑屏问题
  4. oracle将存储过程导出,Oracle如何导出存储过程
  5. 一维数组所有元素是否大于_【C语言】- 指向一维数组元素的指针!你今天学习了吗?...
  6. vector 结构体排序_指下码上横戈行——排序
  7. Java基础学习总结(172)——手写Java 重试机制
  8. Codeforces 1039D You Are Given a Tree (看题解)
  9. [多图]Maclean的巴厘岛游记
  10. 《网络是怎样连接的》了解网络连接的全貌
  11. 计算机黑屏但是有鼠标,电脑桌面黑屏怎么解决 电脑黑屏怎么办 - 云骑士一键重装系统...
  12. kali2020出现中文乱码解决
  13. Redis 与 Mysql 的数据一致性
  14. 计算机联锁论文开题报告,学生论文开题报告评语
  15. 三角公式 - 记忆版
  16. android 紫外线传感器,Arduino光线传感器-UV Sensor V1.0-ML8511紫外线传感器
  17. 惠普 hp3414 笔记本 电脑 驱动 drivers
  18. tablueau地图标记圆形_多点钉图标记-简单易用的地图位置标记标注工具
  19. 修改hosts不必重启 立刻生效
  20. hexo图片展示-blog图床迁移至七牛云

热门文章

  1. 计算机的图形渲染机制
  2. LoadRunner模拟Json请求
  3. (组合)Binomial Showdown
  4. 怎样解决输入法不能切换?
  5. Foundation 框架
  6. df命令、du命令、磁盘分区(fdisk命令)
  7. 《Swift 权威指南》——第6章,第6.7节常量和变量参数
  8. I00002 打印九九乘法表
  9. Java字符串处理技巧
  10. Android存储之SQLiteDatbase