2019独角兽企业重金招聘Python工程师标准>>>

窗口计算是流处理中常用的情况之一,在这种情况下,无边界数据流被按一定的标准(例如时间)分割成有限集合,并在每一组事件上应用计算。一个例子是在最近一小时内计算最热门的Twitter话题。

窗口主要用于聚合(aggregations)、连接(joins)、模式匹配(pattern matching)等。Windows可以看作是一个内存表,其中基于一些策略添加和清除事件。
到目前为止,Storm依靠开发人员来构建自己的窗口逻辑。对于拓扑结构中的标准方式,开发人员不能使用推荐的或高层次的抽象来定义窗口。

Storm has support for sliding and tumbling windows based on time duration and/or event count.

Storm支持sliding滑动窗口和tumbling滚动窗口,基于时间周期 and/or 事件计数。

Storm中Window接口是时间窗口的抽象。有三个事件参数的泛型方法:get为目前当前window的事件,getNew为window新增事件,getExpired为过期事件。get方法中的事件包含getNew中的事件。

import java.util.List;
/*** A view of events in a sliding window.** @param <T> the type of event that this window contains. E.g. {@link org.apache.storm.tuple.Tuple}*/
public interface Window<T> {/*** Gets the list of events in the window.** @return the list of events in the window.*/List<T> get();/*** Get the list of newly added events in the window since the last time the window was generated.** @return the list of newly added events in the window.*/List<T> getNew();/*** Get the list of events expired from the window since the last time the window was generated.** @return the list of events expired from the window.*/List<T> getExpired();
}

TupleWindow接口继承了Window接口,限定了必须包含Tuple对象。

package org.apache.storm.windowing;
import org.apache.storm.tuple.Tuple;
/*** A {@link Window} that contains {@link Tuple} objects.*/
public interface TupleWindow extends Window<Tuple> {
}

TupleWindowImpl类实现了TupleWindow接口。

package org.apache.storm.windowing;
import org.apache.storm.tuple.Tuple;
import java.util.List;
/*** Holds the expired, new and current tuples in a window.*/
public class TupleWindowImpl implements TupleWindow {private final List<Tuple> tuples;private final List<Tuple> newTuples;private final List<Tuple> expiredTuples;public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples) {this.tuples = tuples;this.newTuples = newTuples;this.expiredTuples = expiredTuples;}@Overridepublic List<Tuple> get() {return tuples;}@Overridepublic List<Tuple> getNew() {return newTuples;}@Overridepublic List<Tuple> getExpired() {return expiredTuples;}@Overridepublic String toString() {return "TupleWindowImpl{" +"tuples=" + tuples +", newTuples=" + newTuples +", expiredTuples=" + expiredTuples +'}';}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;TupleWindowImpl that = (TupleWindowImpl) o;if (tuples != null ? !tuples.equals(that.tuples) : that.tuples != null) return false;if (newTuples != null ? !newTuples.equals(that.newTuples) : that.newTuples != null) return false;return expiredTuples != null ? expiredTuples.equals(that.expiredTuples) : that.expiredTuples == null;}@Overridepublic int hashCode() {int result = tuples != null ? tuples.hashCode() : 0;result = 31 * result + (newTuples != null ? newTuples.hashCode() : 0);result = 31 * result + (expiredTuples != null ? expiredTuples.hashCode() : 0);return result;}
}

15.5.1 Sliding Window

Tuples被分组在窗口,窗口每个滑动间隔会滑动。一个tuple可能属于多个时间窗口。这种叫做slidlingWindow(滑动窗口)。

在设置slidlingWindow时需要指定两个时间间隔:

  • windowLength窗口长度-------时间窗的长度或者持续时间
  • slidingInterval滑动时间间隔--------窗口滑动的时间间隔

按照固定的时间间隔或者Tuple数量滑动窗口:

  • 如果滑动间隔和窗口大小一样则等同于滚窗
  • 如果滑动间隔大于窗口大小则会丢失数据
  • 如果滑动间隔小于窗口大小则会窗口重叠

例如,一个Sliding window持续时间为10s,滑动间隔为5s。每隔5s窗口会被估算,w2和w3的一些tuples会重复。在w2到w3进行滑动的时刻,e1和e2将过期expired (dropped out of the event queue),

时间窗会在t=5s的时候移动,会包含第一个5s的时候接收到的事件。

通过在拓扑中定义topologyBuilder.setBolt("slidingwindowbolt", new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),1).shuffleGrouping("spout");

其中SlidingWindowBolt继承了BaseWindowedBolt,时间窗通过BaseWindowedBolt.withWindow(Duration windowLength, Duration slidingInterval)定义。

withWindow(Count windowLength, Count slidingInterval)
Tuple count based sliding window that slides after `slidingInterval` number of tuples.滑窗 窗口长度:tuple数, 滑动间隔: tuple数withWindow(Count windowLength)
Tuple count based window that slides with every incoming tuple.滑窗 窗口长度:tuple数, 滑动间隔: 每个tuple进来都滑,每个tuple传入都会触发一次windowbolt计算。withWindow(Count windowLength, Duration slidingInterval)
Tuple count based sliding window that slides after `slidingInterval` time duration.滑窗 窗口长度:tuple数, 滑动间隔: 时间间隔withWindow(Duration windowLength, Duration slidingInterval)
Time duration based sliding window that slides after `slidingInterval` time duration.滑窗 窗口长度:时间间隔, 滑动间隔: 时间间隔withWindow(Duration windowLength)
Time duration based window that slides with every incoming tuple.滑窗 窗口长度:时间间隔, 滑动间隔: 每个tuple进来都滑withWindow(Duration windowLength, Count slidingInterval)
Time duration based sliding window configuration that slides after `slidingInterval` number of tuples.滑窗 窗口长度:时间间隔, 滑动间隔: 时间间隔
public class SlidingWindowBolt extends BaseWindowedBolt {private OutputCollector collector;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(TupleWindow inputWindow) {for(Tuple tuple: inputWindow.get()) {// do the windowing computation...}// emit the resultscollector.emit(new Values(computedValue));}
}public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new RandomSentenceSpout(), 1);builder.setBolt("slidingwindowbolt", new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),1).shuffleGrouping("spout");Config conf = new Config();conf.setDebug(true);conf.setNumWorkers(1);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());}

15.5.2 Tumbling Window

Tuples会根据时间和数量被分到一个单独的窗口。任何一个tuple都只属于一个窗口。这种叫做TumblingWindow(滚动窗口)。指定窗口长度和滑动间隔作为元组数或时间间隔的计数。

例如,一个Tumbling window的持续时间为5s。

通过在拓扑中定义topologyBuilder.setBolt("slidingwindowbolt", new TumblingWindowBolt().withTumblingWindow(new Count(30)),1).shuffleGrouping("spout");

其中TumblingWindowBolt继承了BaseWindowedBolt,它有用于指定窗口长度和滑动间隔的api。

时间窗通过BaseWindowedBolt.withTumblingWindow(Duration windowLength)定义。

withTumblingWindow(BaseWindowedBolt.Count count)
Count based tumbling window that tumbles after the specified count of tuples.滚窗 窗口长度:Tuple数withTumblingWindow(BaseWindowedBolt.Duration duration)
Time duration based tumbling window that tumbles after the specified time duration.滚窗 窗口长度:时间间隔

15.5.3 Tuple timestamp and out of order tuples

默认情况下,在窗口中跟踪的时间戳是由bolt处理元组的时间。窗口计算是基于处理时间戳执行的。Storm支持追踪源数据的时间戳。

默认情况下,窗口中跟踪的时间戳是由bolt处理元组的时间。窗口的计算是基于时间戳进行处理。Storm支持基于源生成的时间戳来跟踪窗口。这对处理基于时间发生时的事件很有用,例如处理含有时间戳的日志条目。

/**
* Specify the tuple field that represents the timestamp as a long value. If this field
* is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
*
* @param fieldName the name of the field that contains the timestamp
*/
public BaseWindowedBolt withTimestampField(String fieldName)

withTimestampField方法中参数fieldName,将从用于窗口计算的输入tuple进行查找。如果指定了这个选项,所有的元组都需要包含时间戳字段。如果该字段不存在于元组中,则会抛出异常IllegalArgumentException并终止拓扑。要解决这个问题,必须从spout源(例如kafka)中手动删除有问题的元组,并且必须重新启动拓扑。

/**
* Specify the maximum time lag of the tuple timestamp in millis. The tuple timestamps
* cannot be out of order by more than this amount.
*
* @param duration the max lag duration
*/
public BaseWindowedBolt withLag(Duration duration)

除了时间戳字段名之外,还可以使用withLag方法指定一个时间延迟参数,该参数表示时间戳之外的tuple的最大时间限制。

例如,如果延迟是5秒,而一个tuple t1到达的时间戳是06:00:05。没有tuple的到达时间戳比06:00:00更早。如果一个tuple在t1之后的到达时间戳是05:59:59,窗口已经移动到t1,它将被视为一个late tuple,不会处理。

目前,the late tuples只在INFO级别的工作日志文件中登录。

15.5.3.1

使用时间戳字段处理tuple,Storm内部计算基于传入的tuple时间戳的watermask。在所有输入流中,watermask是最新的tuple时间戳(减去延迟)的最小值。

Periodically (default every sec), the watermark timestamps are emitted and this is considered as the clock tick for the window calculation if tuple based timestamps are in use. The interval at which watermarks are emitted can be changed with the below api.
如果Tuple基于timestamps,watermark的时间戳周期性地(默认每秒)被发射,可以看做为窗口计算的时间周期。watermark的发射间隔可以用下面的API来改变。

/**
* Specify the watermark event generation interval. Watermark events
* are used to track the progress of time
*
* @param interval the interval at which watermark events are generated
*/
public BaseWindowedBolt withWatermarkInterval(Duration interval)

示例:

When a watermark is received, all windows up to that timestamp will be evaluated. For example, consider tuple timestamp based processing with following window parameters,

Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s

Current timestamp = 09:00:00

Tuples e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36) are received between 9:00:00 and 9:00:01.

At time t = 09:00:01, watermark w1 = 6:00:31 is emitted since no tuples earlier than 6:00:31 can arrive.

Three windows will be evaluated. The first window end ts (06:00:10) is computed by taking the earliest event timestamp (06:00:03) and computing the ceiling based on the sliding interval (10s).

  1. 5:59:50 - 06:00:10 with tuples e1, e2, e3
  2. 6:00:00 - 06:00:20 with tuples e1, e2, e3, e4
  3. 6:00:10 - 06:00:30 with tuples e4, e5

e6 is not evaluated since watermark timestamp 6:00:31 is lesser than the tuple ts 6:00:36.

Tuples e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39) are received between 9:00:01 and 9:00:02

At time t = 09:00:02 another watermark w2 = 08:00:34 is emitted since no tuples earlier than 8:00:34 can arrive now. Three windows will be evaluated,

  1. 6:00:20 - 06:00:40 with tuples e5, e6 (from earlier batch)
  2. 6:00:30 - 06:00:50 with tuple e6 (from earlier batch)
  3. 8:00:10 - 08:00:30 with tuples e7, e8, e9

e10 is not evaluated since the tuple ts 8:00:39 is beyond the watermark time 8:00:34.

The window calculation considers the time gaps and computes the windows based on the tuple timestamp.

15.5.4 Guarantee

Storm的窗口功能目前提供了at-least once保证。通过Bolt的execute方法发射的tuples被自动地锚定到inputWindow中的所有tuple中。下游的bolt会对接收到的bolt进行ack以完成tuple tree。如果不是,tuple将被replay,并且窗口计算会重新进行。

窗口中的tuple在到期的时候自动被ac,也就是当tuple在超过窗口长度+滑动间隔的时候。注意配置topology.message.timeout.secs,对于基于时间的窗口,应该足够大于窗口长度+滑动间隔。否则,tuple将超时并replay,并可能导致重复的评估。对于基于计数的窗口,应该对配置进行调整,以便在超时期间可以接收到窗口长度+滑动间隔的tuples。

转载于:https://my.oschina.net/yulongblog/blog/1506229

Storm Windowing(翻译)相关推荐

  1. Storm之路-WordCount-实例

    初学storm,有不足的地方还请纠正. 网上看了很多wordcount实例,发现都不是我想要的. 实现场景:统计shengjing.txt词频到集合,一次打印结果. ● 消息源Spout 继承Base ...

  2. 聊聊storm的WindowedBolt

    序 本文主要研究一下storm的WindowedBolt 实例 @Testpublic void testSlidingTupleTsTopology() throws InvalidTopology ...

  3. Apache Storm

    一.概述 http://storm.apache.org/ Apache Storm是一款免费开源的分布式实时计算的框架(流处理) Apache Storm可以非常容易并且可靠的处理无界的流数据,进行 ...

  4. 阿卡接口_阿卡vs风暴

    阿卡接口 我最近在Twitter的Storm上工作了一段时间,这让我想知道,它与另一个高性能的并发数据处理框架Akka相比如何 . 什么是Akka和Storm? 让我们从两个系统的简短描述开始. St ...

  5. BaseWindowedBolt.java

    /**  * storm1.1.1的窗口bolt的核心代码  * 窗口可以从时间或数量上来划分,由如下两个因素决定:窗口的长度,可以是时间间隔或Tuple数量:滑动间隔(sliding Interva ...

  6. [QA翻译]如何在Storm里拆分stream流?

    原文:http://stackoverflow.com/questions/19807395/how-would-i-split-a-stream-in-apache-storm 问题: 我现在不清楚 ...

  7. storm the 少儿英语_storm是什么意思_storm在线翻译_英语_读音_用法_例句_海词词典...

    The storm forbids us to proceed. 暴风雨阻止我们前进. The storm wrecked the ship. 那艘船因风暴遇难. The storm gathers. ...

  8. storm tread 耐克_中英对照_英语谚语翻译

    中英对照英语谚语翻译 1.A bad thing never dies. 好人不长寿,祸害遗千年. 2.A bad workman always blames his tools. 笨工匠总是怪工具差 ...

  9. dora storm 文本_《7b课文文本带翻译》.docx

    7BUnit1 People around us我们周围的人 人们在...周围我们 My grandma 我的奶奶Grandma Grandpa Grandparent Grandson Grandd ...

最新文章

  1. QuickBI助你成为分析师——数据源FAQ小结
  2. LeetCode Construct Binary Tree from Inorder and Postorder Traversal
  3. win10下如何查看电脑名称?查看计算机名(win+r -- cmd -- hostname)
  4. 时间管理-暗世界-时间碎片
  5. qt如何做到实时显示数据_Python 如何实时绘制数据
  6. iOS绘圆形图-CGContextAddArc各参数说明
  7. $(function() {})
  8. ECCV 2020最佳论文讲了什么?作者为ImageNet一作、李飞飞高徒
  9. python实现人脸识别比对_人脸识别并比对实现(基于face_recognition)
  10. 计算机一级windows2000,一级BWindows2000操作系统[2]
  11. 人工智能TensorFlow工作笔记010---TensorFlow 游乐场游戏,了解神经网络主要功能作用_工作流程
  12. 1997年起至今的所有 WiFi 设备均易遭 Frag 攻击
  13. c++ 协程_深入理解异步I/O+epoll+协程
  14. 如何使网站被搜索引擎快速收录
  15. 多边形裁剪图片与自定义 Gizmo !Cocos Creator Gizmo!
  16. 什么是域,域树,深林?
  17. PJzhang:漏洞渗透测试框架“天使之剑(AngelSword)”
  18. 关于我国计算机软件著作权保护的调研报告,我国计算机软件著作权保护问题研究...
  19. 华为电脑如何投屏到电视linux,华为mate10/mate10pro怎么投屏至电视或电脑上面?
  20. Verilog HDL 基础

热门文章

  1. 微信公众号开发 - 配置表设计以及接入公众号接口开发
  2. 吸血鬼数字java_Java求吸血鬼数算法(通用)
  3. java吸血鬼数字_[求助]吸血鬼数字
  4. move_base源码学习
  5. redis的发布和订阅功能
  6. 微信5 不能点开朋友圈的链接-思科路由器
  7. 计算机网络-自定向下方法之应用层
  8. Gif录制神器GifCam
  9. Redis数据库及五种数据类型的常用命令详解
  10. 如何获取网站的HTTPS证书?