2019独角兽企业重金招聘Python工程师标准>>>

上星期在学习计算机网络的运输层时,发现TCP协议中有window机制,看完之后觉得概念有相通之处,如果懂了再去看storm中的window就可以很快理解。

以下是原文:

请先看官网对于Window机制的说明:Storm Window

一. Window是什么

在流式处理框架中, window的概念并非Storm独有,例如Flink中也有相同的概念,可以参考博文流处理中的Window概念来帮助理解,看完该博文前面图文部分应该可以理解Window的含义以及作用。本小白拙见:Window就像一个监视器,可以在满足某些条件时对流入Window中的tuple做出相应的处理。目前,Storm只支持两种触发Window进行操作的条件,即经过一个固定时间段触发和在tuple数量达到某个值时触发(据说以后会加入类似Flink的”自定义条件”,见Future improvements)

二. Sliding Window(滑动窗口) & Tumbling Window(滚动窗口)

先解释滑动窗口。滑动窗口有两个参数,长度windowLength和滑动间隔slidingInternal,长度限制了窗口能容纳tuple的量,滑动间隔则是指每次滑动的距离。滑动窗口会在(时间上或数量上)满足了滑动间隔的时候触发操作,然后产生新的滑动窗口 

在官网的例子中,长度是10 s,滑动间隔是5 s,但是对于窗口的触发时机没有写的很清楚,实际上,假设程序从time轴的0点开始运行,运行到第5秒时就会产生第一个窗口,窗口内是e1和e2,有一半在-5s到0s,到了10会产生第二个窗口,即图中的w1,窗口内是e1到e6,以此类推。

再来看滚动窗口。滚动窗口只有一个参数,即窗口的长度,在窗口(时间上或数量上)满了后,会水平“翻滚”一次,然后产生新的窗口,其实本质上和滑动窗口是一样的,只是滑动间隔就等于窗口长度而已 

在官网的例子中,窗口长度是5s,假设程序在0s时开始运行,5s时就会产生第一个窗口(包括了e1和e2),10s时就会产生第二个窗口,以此类推

在讨论窗口的滑动和翻滚时,需要注意单位问题,即要理解“滑动窗口满足滑动间隔”是什么意思,滚动的“窗口满了”又意味着什么,实际上,前面已经说到,触发条件可以是时间段或者tuple数量,所以单位有两种:时间和数量。对于上文中滑动窗口的图文例子,如果长度是10 ,滑动间隔是5 ,那么程序将在e5到来时产生第一个滑动窗口,窗口内是e1~e5,同样,窗口有一半在负半轴,在e10(图中没画出)到来时产生第二个滑动窗口,窗口内是e1~e10;对于上文的滚动窗口的例子,如果窗口长度是5 ,那么程序将在e5到来时产生第一个窗口,窗口内是e1~e5,在e10(图中没画出)到来时产生第二个窗口,窗口内是e6到e10。

三. API以及代码层面的解释

如果对概念还有不清楚的地方,结合代码来理解可能好一点

先看两个最简单的例子,功能都是在窗口产生时打印出窗口中的tuple的信息,第一个是Sliding Window的例子,第二个是 Tumbling Window的例子 (ps:例子中写了一些辅助获取信息的类方法,看名字就知道作用,就不贴上来了)

private class WatchSlidingWindowBolt extends BaseWindowedBolt {@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(TupleWindow inputWindow) {/** The inputWindow gives a view of* (a) all the events in the window* (b) events that expired since last activation of the window* (c) events that newly arrived since last activation of the window*/List<Tuple> tuplesInWindow = inputWindow.get();List<Tuple> newTuples = inputWindow.getNew();List<Tuple> expiredTuples = inputWindow.getExpired();System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++");System.out.println("Time: " + relativeTime() + "in WatchSlidingWindowBolt");System.out.println("tuplesInWindow: " + tuplesToString(tuplesInWindow));System.out.println("newTuples: " + tuplesToString(newTuples));System.out.println("expiredTuples: " + tuplesToString(expiredTuples));System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}
}
private class WatchTumblingWindowBolt extends BaseWindowedBolt {@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(TupleWindow inputWindow) {List<Tuple> tuplesInWindow = inputWindow.get();if (tuplesInWindow.size() > 0) {System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++");System.out.println("Time: "+relativeTime()+"in WatchTumblingWindowBolt");for (Tuple tuple : tuplesInWindow) {System.out.println(" values: "+tuple.getValues());}System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++");}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

两种窗口的Bolt编写都是继承BaseWindowedBolt,然后重写execute等方法即可。 每个窗口的产生对应一次BaseWindowedBolt中execute方法的调用。不同的是,Tumbling Window只有get()方法有意义,Sliding Window还有getNew()和getExpired()。两个地方的get()都是返回当前窗口内的tuple,而滑动窗口还有两个概念:“新来的”和“过期的”tuple,用上文中滑动窗口的图文例子来解释(长度10s,滑动间隔5s),对于w1,get()返回的是e1~e6,getNew()返回的是e3~e6,getExpired()返回空集,对于w2,get()返回的是e3~e9,getNew()返回的是e7~e9,getExpired()返回的是e1~e2。

可以这样理解,窗口只有一个,w1和w2都是这个窗口,只是滑动前和滑动后的区别,而滑动后(成了所谓的w2),“新来的”就是e7~e9,考虑到窗口长度为10s,e3~e9就占了10s,再往前的e1~e2自然就会被“挤出”窗口,即“过期”了。

给Topology设置BasedWindowedBlot和设置普通的Bolt没有区别,注意Sliding Window要调用withWindow方法设置好 windowLength和slidingInternal ,Tumbling Window要调用withTumblingWindow()设置好长度。前面说过,这些参数的单位很关键,将影响到execute方法调用的时机,来看一下支持的配置 :

withWindow(Count windowLength, Count slidingInterval)
Tuple count based sliding window that slides after slidingInterval number of tuples.
//长度用tuple数量衡量的滑动窗口,每当有slidingInternal个tuple到来后触发一次withWindow(Count windowLength)
Tuple count based window that slides with every incoming tuple.
//长度用tuple数量衡量的滑动窗口,每个tuple到来都会触发一次withWindow(Count windowLength, Duration slidingInterval)
Tuple count based sliding window that slides after slidingInterval time duration.
//长度用tuple数量衡量的滑动窗口,每经过slidingInternal时间段后触发一次withWindow(Duration windowLength, Duration slidingInterval)
Time duration based sliding window that slides after slidingInterval time duration.
//长度用时间段衡量的滑动窗口,每经过slidingInternal时间段后触发一次withWindow(Duration windowLength)
Time duration based window that slides with every incoming tuple.
//长度用时间段衡量的滑动窗口,每个tuple到来都会触发一次withWindow(Duration windowLength, Count slidingInterval)
Time duration based sliding window that slides after slidingInterval number of tuples.
//长度用时间段衡量的滑动窗口,每当有slidingInternal个tuple到来后触发一次withTumblingWindow(BaseWindowedBolt.Count count)
Count based tumbling window that tumbles after the specified count of tuples.
//长度用tuple数量衡量的滚动窗口,每当有count个tuple到来触发一次withTumblingWindow(BaseWindowedBolt.Duration duration)
Time duration based tumbling window that tumbles after the specified time duration.
//长度用时间段衡量的滚动窗口,每经过duration时间段后触发一次

四. Timestamp(时间戳)、Watermark(水印)和Lag(延迟)

对于以时间段作为滑动间隔来触发的滑动窗口和以时间段作为窗口长度的滚动窗口来说,还有一些细节问题。这些因素将会影响到窗口触发时机。

时间戳: 默认情况下tuple的时间戳是他们被bolt处理的时机(到达窗口的时机),窗口的触发(滑动或滚动)是建立在这些时间戳的基础上的。Storm支持把tuple的原生时间戳作为时间衡量的标准来触发窗口。可以用withTimestampField()设置时间戳字段,注意该字段对应的值应为long类型的时间

水印和延迟:水印和延迟是为了解决“迟到的tuple”的问题。 
设置Bolt时有个withWatermarkInterval()方法,用来设置产生水印的时间间隔,默认是1秒(经过测试我发现调用这个方法并不能达到目的,即还是1s,可能是我哪里理解错了或者测试用例不科学,有待验证)。延迟Lag可以在设置Bolt是调用withLag()方法设置。水印的计算方法是最新的tuple的时间戳减去延迟。每个tuple到来时,如果它的时间戳比水印还早,这个tuple将被认为是“迟到的”,它将不会被处理,而只是在worker的日记中以INFO的级别打印一条消息。

现在可以分析程序在这些机制下的运行过程了。 
1.产生一个新的水印:用旧水印对前一秒进入bolt的tuple进行筛选,比旧水印还要早的tuple即为“迟到的tuple”,将不予处理,剩下的tuple是真正进入窗口的。在窗口的tuple中选取时间戳最晚的tuple,用他的时间戳减去设置好的延迟Lag,即为新的水印 
2.产生窗口:有个大前提,每个窗口的终点都不能超过新的水印。第一个窗口的终点怎么确定呢?先在窗口的tuple中取时间戳最早的tuple,将这个时间向下取整,然后加上滑动间隔的时间段长度(对于滚动窗口,则是加上窗口长度),即为第一个窗口的终点,而它的起点则是终点减去窗口长度。有了第一个窗口,接下来的窗口只需在第一个窗口的基础上滑动或者翻滚即可产生。窗口产生的时候就是execute方法被调用的时候。要注意的是,必须满足大前提——窗口的终点不超过最新的水印,如果计算出第一个窗口的终点就已经超过了当前水印,那么一个窗口都不会产生,只能等待下一个水印产生了。

五. 测试用例及其结果解释

Github链接 
注意要使用1.0.0版本以后的storm,之前的版本可能还没有Window机制或者没有时间戳等机制

  1. BaseWindowTopology是用来测试两种窗口的基本用法和常用API的,不涉及时间戳、水印和延迟,主要用来理解滑动和滚动的含义 
    BaseWindowTopology运行结果解释:由于示例中滑动窗口长度是30 ,滑动间距是10 ,故每10个tuple到来就会触发一次滑动窗口的bolt中的execute方法,计算出sum,而滚动窗口长度是3 ,在滑动窗口的下游,所以每接收到3个来自滑动窗口的sum就会触发一次execute方法,计算avg

  2. TimestampWindowTopology则是用来验证第四点中所说的运行机制的,要注意窗口的单位改为了秒,即使用时间段来衡量窗口长度和滑动间距。 
    TimestampWindowTopology运行结果解释:由于每一秒产生一个水印,故从第二次开始每一次都会有worker的INFO消息打印在窗口,那个是筛选过程,然后按照上文中所说来计算好第一个窗口的终点起点,开始滑动,直到不能再滑动就为止(即再滑动就会遇到水印),这几个窗口就是这次水印触发的窗口。

六. Trident中的Window机制

在Trident中,Window中的tuple可以进行聚合,API方面需要了解WindowsStoreFactory,这个类作用是储存Window收到的tuple和聚合的结果。有两个实现可以参考和使用,一个是储存在内存中的 InMemoryWindowsStoreFactory和InMemoryWindowsStore,一个是储存在Hbase中的HBaseWindowsStateFactory和HBaseWindowsStore。其他信息见官网相关介绍

七. 具有类似作用的Tick Tuple机制

传送门

转载于:https://my.oschina.net/u/2300159/blog/879867

Storm中的Window机制相关推荐

  1. 彻底搞清 Flink 中的 Window 机制

    [CSDN 编者按]Window是处理无限流的核心.Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.Flink提 ...

  2. 「前端面试题系列7」Javascript 中的事件机制(从原生到框架)

    前言 这是前端面试题系列的第 7 篇,你可能错过了前面的篇章,可以在这里找到: 理解函数的柯里化 ES6 中箭头函数的用法 this 的原理以及用法 伪类与伪元素的区别及实战 如何实现一个圣杯布局? ...

  3. jQuery中的事件机制深入浅出

    昨天呢,我们大家一起分享了jQuery中的样式选择器,那么今天我们就来看一下jQuery中的事件机制,其实,jQuery中的事件机制与JavaScript中的事件机制区别是不大的,只是,JavaScr ...

  4. 深度解析ASP.NET2.0中的Callback机制

    callback的一般使用方法还算简单,直接参照msdn的帮助和范例就足够了.但是想要真正用好.用精,或者想开发一些基于callback机制的WEB组件,那么,就要先深入了解callback的实现机制 ...

  5. android串口补位,Rust多线程中的消息传递机制

    代码说话. use std::thread; use std::sync::mpsc; use std::time::Duration; fn main() { let (tx, rx) = mpsc ...

  6. 【NLP】四万字全面详解 | 深度学习中的注意力机制(四,完结篇)

    作者 | 蘑菇先生 知乎 | 蘑菇先生学习记 深度学习Attention小综述系列: 四万字全面详解 | 深度学习中的注意力机制(一) 四万字全面详解 | 深度学习中的注意力机制(二) 四万字全面详解 ...

  7. ack是什么,如何使用Ack机制,如何关闭Ack机制,基本实现,STORM的消息容错机制,Ack机制

    1.ack是什么 ack 机制是storm整个技术体系中非常闪亮的一个创新点. 通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作.比如在Me ...

  8. epoll监听文件_介绍一下 Android Handler 中的 epoll 机制?

    介绍一下 Android Handler 中的 epoll 机制? 目录: IO 多路复用 select.poll.epoll 对比 epoll API epoll 使用示例 Handler 中的 e ...

  9. Storm程序的并发机制原理总结

    文章目录 目录 前言: 1.概念 2.配置并行度 总结: 目录 前言: 为了在以后的实践中提高Storm程序执行的效率,我们还是有必要了解下对应的Storm程序的并发机制.(哈哈,虽然以博主小菜鸟的水 ...

最新文章

  1. java 默认字符集 iso_当服务器的默认字符集是UTF-8时,Perl并使用ISO-8859-1字符集...
  2. 安卓开发之使用viewpager+fragment实现滚动tab页
  3. 江苏小高考计算机知识点,江苏小高考知识点
  4. 【渝粤题库】广东开放大学 文化投资与贸易 形成性考核
  5. mysql binlog查看_MySQL--17 配置binlog-server 及中间件
  6. APP设计|搜索页面设计灵感
  7. C++类的成员变量和成员函数的介绍
  8. ubuntu服务器系统不识别,U盘安装16.04server版 安装好后重启 无法进入系统
  9. mysql binlog 恢复
  10. php 5.5 preg replace,解决ecshop在php5.5以上环境preg_replace报错方法
  11. 【转】从事IT行业的应该如何学习最高效的休息方式
  12. 键盘按键的各种编码对照表
  13. 用计算机术语写毕业寄语,毕业寄语唯美句子(精选55句)
  14. OPENCV入门教程十四:medianBlur中值平滑
  15. oracle-临时表
  16. 网络7层协议,4层,5层?理清容易混淆的几个概念
  17. linux 欢迎语,一日一技 | 如何让你的终端欢迎语好看又有趣
  18. 爆路径写后门拿shell的一些姿势
  19. 利用faac进行编码
  20. 计算摄影:基于深度学习的畸变校正

热门文章

  1. mathematics中如何计算排列组合
  2. IDEA常用的代码模板使用
  3. marvin java_使用Java中的Marvin框架去除轮廓
  4. 详解Unity中的粒子系统Particle System (十)
  5. 开源项目分析解读——基于Spring Cloud的在线考试系统
  6. 在Excel中将人民币金额小写转成大写(转)
  7. 详谈redis命令之集合(SET)
  8. Visual C++实现黑白棋游戏项目实战二:界面的设计与实现(附源码和资源 超详细)
  9. GetPixel算法
  10. 曙光“城市大数据平台”冲破数据孤岛、创造数据价值