一. Window是什么

在流式处理框架中, window的概念并非Storm独有,例如Flink中也有相同的概念,可以参考博文流处理中的Window概念来帮助理解,看完前面图文部分应该可以理解Window的含义以及作用。本小白拙见:Window就像一个监视器,可以在满足某些条件时对流入Window中的tuple做出相应的处理。目前,Storm只支持两种触发Window进行操作的条件,即经过一个固定时间段触发和在tuple数量达到某个值时触发(据说以后会加入类似Flink的”自定义条件”,见Future improvements)

二. Sliding Window(滑动窗口) & Tumbling Window(滚动窗口)

先解释滑动窗口。滑动窗口有两个参数,长度windowLength和滑动间隔slidingInternal,长度限制了窗口能容纳tuple的量,滑动间隔则是指每次滑动的距离。滑动窗口会在(时间上或数量上)满足了滑动间隔的时候触发操作,然后产生新的滑动窗口

在官网的例子中,长度是10 s,滑动间隔是5 s,但是对于窗口的触发时机没有写的很清楚,实际上,假设程序从time轴的0点开始运行,运行到第5秒时就会产生第一个窗口,窗口内是e1和e2,有一半在-5s到0s,到了10会产生第二个窗口,即图中的w1,窗口内是e1到e6,以此类推。

再来看滚动窗口。滚动窗口只有一个参数,即窗口的长度,在窗口(时间上或数量上)满了后,会水平“翻滚”一次,然后产生新的窗口,其实本质上和滑动窗口是一样的,只是滑动间隔就等于窗口长度而已

在官网的例子中,窗口长度是5s,假设程序在0s时开始运行,5s时就会产生第一个窗口(包括了e1和e2),10s时就会产生第二个窗口,以此类推

在讨论窗口的滑动和翻滚时,需要注意单位问题,即要理解“滑动窗口满足滑动间隔”是什么意思,滚动的“窗口满了”又意味着什么,实际上,前面已经说到,触发条件可以是时间段或者tuple数量,所以单位有两种:时间和数量。对于上文中滑动窗口的图文例子,如果长度是10 ,滑动间隔是5 ,那么程序将在e5到来时产生第一个滑动窗口,窗口内是e1~e5,同样,窗口有一半在负半轴,在e10(图中没画出)到来时产生第二个滑动窗口,窗口内是e1~e10;对于上文的滚动窗口的例子,如果窗口长度是5 ,那么程序将在e5到来时产生第一个窗口,窗口内是e1~e5,在e10(图中没画出)到来时产生第二个窗口,窗口内是e6到e10。

三. API以及代码层面的解释

如果对概念还有不清楚的地方,结合代码来理解可能好一点

先看两个最简单的例子,功能都是在窗口产生时打印出窗口中的tuple的信息,第一个是Sliding Window的例子,第二个是 Tumbling Window的例子 (ps:例子中写了一些辅助获取信息的类方法,看名字就知道作用,就不贴上来了,可以下载完整代码查看)

class WatchSlidingWindowBolt extends BaseWindowedBolt {@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(TupleWindow inputWindow) {/** The inputWindow gives a view of* (a) all the events in the window* (b) events that expired since last activation of the window* (c) events that newly arrived since last activation of the window*/List tuplesInWindow = inputWindow.get();List newTuples = inputWindow.getNew();List expiredTuples = inputWindow.getExpired();System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++");System.out.println("Time: " + relativeTime() + "in WatchSlidingWindowBolt");System.out.println("tuplesInWindow: " + tuplesToString(tuplesInWindow));System.out.println("newTuples: " + tuplesToString(newTuples));System.out.println("expiredTuples: " + tuplesToString(expiredTuples));System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}
}
class WatchTumblingWindowBolt extends BaseWindowedBolt {@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(TupleWindow inputWindow) {List tuplesInWindow = inputWindow.get();if (tuplesInWindow.size() > 0) {System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++");System.out.println("Time: "+relativeTime()+"in WatchTumblingWindowBolt");for (Tuple tuple : tuplesInWindow) {System.out.println(" values: "+tuple.getValues());}System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++");}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

两种窗口的Bolt编写都是继承BaseWindowedBolt,然后重写execute等方法即可。 每个窗口的产生对应一次BaseWindowedBolt中execute方法的调用。不同的是,Tumbling Window只有get()方法有意义,Sliding Window还有getNew()和getExpired()。两个地方的get()都是返回当前窗口内的tuple,而滑动窗口还有两个概念:“新来的”和“过期的”tuple,用上文中滑动窗口的图文例子来解释(长度10s,滑动间隔5s),对于w1,get()返回的是e1~e6,getNew()返回的是e3~e6,getExpired()返回空集,对于w2,对于w2,get()返回的是e3~e9,getNew()返回的是e7~e9,getExpired()返回的是e1~e2。

可以这样理解,窗口只有一个,w1和w2都是这个窗口,只是滑动前和滑动后的区别,而滑动后(成了所谓的w2),“新来的”就是e7~e9,考虑到窗口长度为10s,e3~e9就占了10s,再往前的e1~e2自然就会被“挤出”窗口,即“过期”了。

给Topology设置BasedWindowedBlot和设置普通的Bolt没有区别,注意Sliding Window要调用withWindow方法设置好 windowLength和slidingInternal ,Tumbling Window要调用withTumblingWindow()设置好长度。前面说过,这些参数的单位很关键,将影响到execute方法调用的时机,来看一下支持的配置 :

withWindow(Count windowLength, Count slidingInterval)
Tuple count based sliding window that slides after slidingInterval number of tuples.
长度用tuple数量衡量的滑动窗口,每当有slidingInternal个tuple到来后触发一次

withWindow(Count windowLength)
Tuple count based window that slides with every incoming tuple.
长度用tuple数量衡量的滑动窗口,每个tuple到来都会触发一次

withWindow(Count windowLength, Duration slidingInterval)
Tuple count based sliding window that slides after slidingInterval time duration.
长度用tuple数量衡量的滑动窗口,每经过slidingInternal时间段后触发一次

withWindow(Duration windowLength, Duration slidingInterval)
Time duration based sliding window that slides after slidingInterval time duration.
长度用时间段衡量的滑动窗口,每经过slidingInternal时间段后触发一次

withWindow(Duration windowLength)
Time duration based window that slides with every incoming tuple.
长度用时间段衡量的滑动窗口,每个tuple到来都会触发一次

withWindow(Duration windowLength, Count slidingInterval)
Time duration based sliding window that slides after slidingInterval number of tuples.
长度用时间段衡量的滑动窗口,每当有slidingInternal个tuple到来后触发一次

withTumblingWindow(BaseWindowedBolt.Count count)
Count based tumbling window that tumbles after the specified count of tuples.
长度用tuple数量衡量的滚动窗口,每当有count个tuple到来触发一次

withTumblingWindow(BaseWindowedBolt.Duration duration)
Time duration based tumbling window that tumbles after the specified time duration.
长度用时间段衡量的滚动窗口,每经过duration时间段后触发一次

四. Timestamp(时间戳)、Watermark(水印)和Lag(延迟)

对于以时间段作为滑动间隔来触发的滑动窗口和以时间段作为窗口长度的滚动窗口来说,还有一些细节问题。这些因素将会影响到窗口触发时机。

时间戳: 默认情况下tuple的时间戳是他们被bolt处理的时机(到达窗口的时机),窗口的触发(滑动或滚动)是建立在这些时间戳的基础上的。Storm支持把tuple的原生时间戳作为时间衡量的标准来触发窗口。可以用withTimestampField()设置时间戳字段,注意该字段对应的值应为long类型的时间

水印和延迟:水印和延迟是为了解决“迟到的tuple”的问题。
设置Bolt时有个withWatermarkInterval()方法,用来设置产生水印的时间间隔,默认是1秒(经过测试我发现调用这个方法并不能达到目的,即还是1s,可能是我哪里理解错了或者测试用例不科学,有待验证)。延迟Lag可以在设置Bolt是调用withLag()方法设置。水印的计算方法是最新的tuple的时间戳减去延迟。每个tuple到来时,如果它的时间戳比水印还早,这个tuple将被认为是“迟到的”,它将不会被处理,而只是在worker的日记中以INFO的级别打印一条消息。

现在可以分析程序在这些机制下的运行过程了。
1.产生一个新的水印:用旧水印对前一秒进入bolt的tuple进行筛选,比旧水印还要早的tuple即为“迟到的tuple”,将不予处理,剩下的tuple是真正进入窗口的。在窗口的tuple中选取时间戳最晚的tuple,用他的时间戳减去设置好的延迟Lag,即为新的水印
2.产生窗口:有个大前提,每个窗口的终点都不能超过新的水印。第一个窗口的终点怎么确定呢?先在窗口的tuple中取时间戳最早的tuple,将这个时间向下取整,然后加上滑动间隔的时间段长度(对于滚动窗口,则是加上窗口长度),即为第一个窗口的终点,而它的起点则是终点减去窗口长度。有了第一个窗口,接下来的窗口只需在第一个窗口的基础上滑动或者翻滚即可产生。窗口产生的时候就是execute方法被调用的时候。要注意的是,必须满足大前提——窗口的终点不超过最新的水印,如果计算出第一个窗口的终点就已经超过了当前水印,那么一个窗口都不会产生,只能等待下一个水印产生了。

五. 测试用例及其结果解释

Github链接
注意要使用1.0.0版本以后的storm,之前的版本可能还没有Window机制或者没有时间戳等机制

BaseWindowTopology是用来测试两种窗口的基本用法和常用API的,不涉及时间戳、水印和延迟,主要用来理解滑动和滚动的含义
BaseWindowTopology运行结果解释:由于示例中滑动窗口长度是30 ,滑动间距是10 ,故每10个tuple到来就会触发一次滑动窗口的bolt中的execute方法,计算出sum,而滚动窗口长度是3 ,在滑动窗口的下游,所以每接收到3个来自滑动窗口的sum就会触发一次execute方法,计算avg

TimestampWindowTopology则是用来验证第四点中所说的运行机制的,要注意窗口的单位改为了秒,即使用时间段来衡量窗口长度和滑动间距。
TimestampWindowTopology运行结果解释:由于每一秒产生一个水印,故从第二次开始每一次都会有worker的INFO消息打印在窗口,那个是筛选过程,然后按照上文中所说来计算好第一个窗口的终点起点,开始滑动,直到不能再滑动就为止(即再滑动就会遇到水印),这几个窗口就是这次水印触发的窗口。

六. Trident中的Window机制

在Trident中,Window中的tuple可以进行聚合,API方面需要了解WindowsStoreFactory,这个类作用是储存Window收到的tuple和聚合的结果。有两个实现可以参考和使用,一个是储存在内存中的 InMemoryWindowsStoreFactory和InMemoryWindowsStore,一个是储存在HBase中的HBaseWindowsStateFactory和HBaseWindowsStore。其他信息见官网相关介绍

七. 具有类似作用的Tick Tuple机制

转自  https://www.2cto.com/net/201702/602256.html

storm windows 机制相关推荐

  1. Storm通信机制,Worker进程间通信,Worker进程间通信分析,Worker进程间技术(Netty、ZeroMQ),Worker 内部通信技术(Disruptor)(来自学习资料)

    Storm通信机制 Worker间的通信经常需要通过网络跨节点进行,Storm使用ZeroMQ或Netty(0.9以后默认使用)作为进程间通信的消息框架. Worker进程内部通信:不同worker的 ...

  2. 【Storm篇】--Storm并发机制

    一.前述 为了提高Storm的并行能力,通常需要设置并行. 二.具体原理 1. Storm并行分为几个方面: Worker – 进程 一个Topology拓扑会包含一个或多个Worker(每个Work ...

  3. Windows机制下的游戏编程实例一

    稍微接触一下Windows游戏编程基础,虽然不大喜欢游戏,但是为了扩大知识面,为以后的创新与启发打下基础,还是稍微了解一下: 先来看一个实例吧: 这是程序的文件目录: 里面包含背景音乐与背景图像模板 ...

  4. windows机制下的游戏编程实例二

    通过调用DirectX SDK库实现游戏空间的三维化,也就是我们通常玩的网游,鼠标拖着主角到处跑去完成任务(个人觉得这种游戏超无聊).下面先给出效果图: 还是先给出其文件目录吧,从总体把握实例,也是很 ...

  5. Apache Storm 实时流处理系统通信机制源码分析

    我们今天就来仔细研究一下Apache Storm 2.0.0-SNAPSHOT的通信机制.下面我将从大致思想以及源码分析,然后我们细致分析实时流处理系统中源码通信机制研究. 1. 简介 Worker间 ...

  6. 在线实时大数据平台Storm并行和通信机制理解

    1.storm系统角色和应用组件基本理解: 和Hadoop一起理解,清晰点. 1)物理节点Nimubus,负责资源分配和任务调度: 2)物理节点Supervisor负责接受nimbus分配的任务,启动 ...

  7. Storm 05_Storm并发机制通信机制

    一.Storm并发机制 Worker processes Executors (threads) Tasks Worker – 进程 一个Topology拓扑会包含一个或多个Worker(每个Work ...

  8. Storm ack和fail机制再论

    之前对这个的理解有些问题,今天用到有仔细梳理了一遍,记录一下 首先开启storm tracker机制的前提是, 1. 在spout emit tuple的时候,要加上第3个参数messageid  2 ...

  9. Windows窗口分析

    (本文尝试通过一些简单的实验,来分析Windows的窗口机制,并对微软的设计理由进行一定的猜测,需要读者具备C++.Windows编程及MFC经验,还得有一定动手能力.文中可能出现一些术语不统一的现象 ...

最新文章

  1. windows监控——再见zmq
  2. 查找 framework 文件中是否包含 WKWebView
  3. 日本搞笑研究:猫睡哪我睡哪,居然对睡眠质量没影响,还有这4点好处?
  4. python的安装教程-python安装教程 Pycharm安装详细教程
  5. 【计算机网络】应用层 : 电子邮件 ( SMTP 协议 | MIME 协议 | POP3 协议 | IMAP 协议 | 基于万维网的电子邮件 )
  6. 美团王庆:当老板对指标进行灵魂拷问时,该如何诊断分析?
  7. 为什么这么多烂代码?
  8. 光纤收发器的分类介绍
  9. 大数据之-Hadoop完全分布式_完全分布式配置总结---大数据之hadoop工作笔记0040
  10. linux蜂鸣器驱动指令,linux蜂鸣器驱动 蜂鸣器--LINUX.doc
  11. Bailian2726 采药【模拟】
  12. 关于 Eureka 2.x,别再人云亦云了!
  13. ubuntu解决软件下载速度过慢
  14. 设计模式 -- 简单工厂模式
  15. 微信小程序从零开始开发步骤(一)
  16. 自制COCO 实例分割dataset并测试效果(从采集到测试)
  17. 大学文科生vs大学理科生
  18. 几种自动化测试工具的比较
  19. centos7安装bazel,亲测可行
  20. 绿幕背景抠图,去除掉物体周围一圈的绿边 OpenCVForUnity

热门文章

  1. Java实现 计算数的平方根
  2. oracle全角改半角,Oracle全角数字转换半角数字
  3. ubuntu 从刷机到yolov5环境搭建训练记录
  4. oracle收集snop,SNAP收集服务器信息
  5. 详解Unity中的粒子系统Particle System (九)
  6. Excel添加固定文本到开头的2种操作方法
  7. 分布式事务解决方案 dbpack 和 hptx 的演进
  8. 动态规划java实现数塔问题_动态规划入门_数塔问题
  9. 为什么动漫比游戏建模精致?3大不同,一看就明白
  10. lr背景虚化_LR调色教程,用LR调出后期唯美cosplay人像思路及案例(3)