

窗口主要用于聚合(aggregations)、连接(joins)、模式匹配(pattern matching)等。Windows可以看作是一个内存表,其中基于一些策略添加和清除事件。

Storm has support for sliding and tumbling windows based on time duration and/or event count.

Storm支持sliding滑动窗口和tumbling滚动窗口,基于时间周期 and/or 事件计数。


import java.util.List;
/*** A view of events in a sliding window.** @param <T> the type of event that this window contains. E.g. {@link org.apache.storm.tuple.Tuple}*/
public interface Window<T> {/*** Gets the list of events in the window.** @return the list of events in the window.*/List<T> get();/*** Get the list of newly added events in the window since the last time the window was generated.** @return the list of newly added events in the window.*/List<T> getNew();/*** Get the list of events expired from the window since the last time the window was generated.** @return the list of events expired from the window.*/List<T> getExpired();


package org.apache.storm.windowing;
import org.apache.storm.tuple.Tuple;
/*** A {@link Window} that contains {@link Tuple} objects.*/
public interface TupleWindow extends Window<Tuple> {


package org.apache.storm.windowing;
import org.apache.storm.tuple.Tuple;
import java.util.List;
/*** Holds the expired, new and current tuples in a window.*/
public class TupleWindowImpl implements TupleWindow {private final List<Tuple> tuples;private final List<Tuple> newTuples;private final List<Tuple> expiredTuples;public TupleWindowImpl(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples) {this.tuples = tuples;this.newTuples = newTuples;this.expiredTuples = expiredTuples;}@Overridepublic List<Tuple> get() {return tuples;}@Overridepublic List<Tuple> getNew() {return newTuples;}@Overridepublic List<Tuple> getExpired() {return expiredTuples;}@Overridepublic String toString() {return "TupleWindowImpl{" +"tuples=" + tuples +", newTuples=" + newTuples +", expiredTuples=" + expiredTuples +'}';}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;TupleWindowImpl that = (TupleWindowImpl) o;if (tuples != null ? !tuples.equals(that.tuples) : that.tuples != null) return false;if (newTuples != null ? !newTuples.equals(that.newTuples) : that.newTuples != null) return false;return expiredTuples != null ? expiredTuples.equals(that.expiredTuples) : that.expiredTuples == null;}@Overridepublic int hashCode() {int result = tuples != null ? tuples.hashCode() : 0;result = 31 * result + (newTuples != null ? newTuples.hashCode() : 0);result = 31 * result + (expiredTuples != null ? expiredTuples.hashCode() : 0);return result;}

15.5.1 Sliding Window



  • windowLength窗口长度-------时间窗的长度或者持续时间
  • slidingInterval滑动时间间隔--------窗口滑动的时间间隔


  • 如果滑动间隔和窗口大小一样则等同于滚窗
  • 如果滑动间隔大于窗口大小则会丢失数据
  • 如果滑动间隔小于窗口大小则会窗口重叠

例如,一个Sliding window持续时间为10s,滑动间隔为5s。每隔5s窗口会被估算,w2和w3的一些tuples会重复。在w2到w3进行滑动的时刻,e1和e2将过期expired (dropped out of the event queue),


通过在拓扑中定义topologyBuilder.setBolt("slidingwindowbolt", new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),1).shuffleGrouping("spout");

其中SlidingWindowBolt继承了BaseWindowedBolt,时间窗通过BaseWindowedBolt.withWindow(Duration windowLength, Duration slidingInterval)定义。

withWindow(Count windowLength, Count slidingInterval)
Tuple count based sliding window that slides after `slidingInterval` number of tuples.滑窗 窗口长度:tuple数, 滑动间隔: tuple数withWindow(Count windowLength)
Tuple count based window that slides with every incoming tuple.滑窗 窗口长度:tuple数, 滑动间隔: 每个tuple进来都滑,每个tuple传入都会触发一次windowbolt计算。withWindow(Count windowLength, Duration slidingInterval)
Tuple count based sliding window that slides after `slidingInterval` time duration.滑窗 窗口长度:tuple数, 滑动间隔: 时间间隔withWindow(Duration windowLength, Duration slidingInterval)
Time duration based sliding window that slides after `slidingInterval` time duration.滑窗 窗口长度:时间间隔, 滑动间隔: 时间间隔withWindow(Duration windowLength)
Time duration based window that slides with every incoming tuple.滑窗 窗口长度:时间间隔, 滑动间隔: 每个tuple进来都滑withWindow(Duration windowLength, Count slidingInterval)
Time duration based sliding window configuration that slides after `slidingInterval` number of tuples.滑窗 窗口长度:时间间隔, 滑动间隔: 时间间隔
public class SlidingWindowBolt extends BaseWindowedBolt {private OutputCollector collector;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(TupleWindow inputWindow) {for(Tuple tuple: inputWindow.get()) {// do the windowing computation...}// emit the resultscollector.emit(new Values(computedValue));}
}public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new RandomSentenceSpout(), 1);builder.setBolt("slidingwindowbolt", new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),1).shuffleGrouping("spout");Config conf = new Config();conf.setDebug(true);conf.setNumWorkers(1);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());}

15.5.2 Tumbling Window


例如,一个Tumbling window的持续时间为5s。

通过在拓扑中定义topologyBuilder.setBolt("slidingwindowbolt", new TumblingWindowBolt().withTumblingWindow(new Count(30)),1).shuffleGrouping("spout");


时间窗通过BaseWindowedBolt.withTumblingWindow(Duration windowLength)定义。

withTumblingWindow(BaseWindowedBolt.Count count)
Count based tumbling window that tumbles after the specified count of tuples.滚窗 窗口长度:Tuple数withTumblingWindow(BaseWindowedBolt.Duration duration)
Time duration based tumbling window that tumbles after the specified time duration.滚窗 窗口长度:时间间隔

15.5.3 Tuple timestamp and out of order tuples



* Specify the tuple field that represents the timestamp as a long value. If this field
* is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
* @param fieldName the name of the field that contains the timestamp
public BaseWindowedBolt withTimestampField(String fieldName)


* Specify the maximum time lag of the tuple timestamp in millis. The tuple timestamps
* cannot be out of order by more than this amount.
* @param duration the max lag duration
public BaseWindowedBolt withLag(Duration duration)


例如,如果延迟是5秒,而一个tuple t1到达的时间戳是06:00:05。没有tuple的到达时间戳比06:00:00更早。如果一个tuple在t1之后的到达时间戳是05:59:59,窗口已经移动到t1,它将被视为一个late tuple,不会处理。

目前,the late tuples只在INFO级别的工作日志文件中登录。


Periodically (default every sec), the watermark timestamps are emitted and this is considered as the clock tick for the window calculation if tuple based timestamps are in use. The interval at which watermarks are emitted can be changed with the below api.

* Specify the watermark event generation interval. Watermark events
* are used to track the progress of time
* @param interval the interval at which watermark events are generated
public BaseWindowedBolt withWatermarkInterval(Duration interval)


When a watermark is received, all windows up to that timestamp will be evaluated. For example, consider tuple timestamp based processing with following window parameters,

Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s

Current timestamp = 09:00:00

Tuples e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36) are received between 9:00:00 and 9:00:01.

At time t = 09:00:01, watermark w1 = 6:00:31 is emitted since no tuples earlier than 6:00:31 can arrive.

Three windows will be evaluated. The first window end ts (06:00:10) is computed by taking the earliest event timestamp (06:00:03) and computing the ceiling based on the sliding interval (10s).

  1. 5:59:50 - 06:00:10 with tuples e1, e2, e3
  2. 6:00:00 - 06:00:20 with tuples e1, e2, e3, e4
  3. 6:00:10 - 06:00:30 with tuples e4, e5

e6 is not evaluated since watermark timestamp 6:00:31 is lesser than the tuple ts 6:00:36.

Tuples e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39) are received between 9:00:01 and 9:00:02

At time t = 09:00:02 another watermark w2 = 08:00:34 is emitted since no tuples earlier than 8:00:34 can arrive now. Three windows will be evaluated,

  1. 6:00:20 - 06:00:40 with tuples e5, e6 (from earlier batch)
  2. 6:00:30 - 06:00:50 with tuple e6 (from earlier batch)
  3. 8:00:10 - 08:00:30 with tuples e7, e8, e9

e10 is not evaluated since the tuple ts 8:00:39 is beyond the watermark time 8:00:34.

The window calculation considers the time gaps and computes the windows based on the tuple timestamp.

15.5.4 Guarantee

Storm的窗口功能目前提供了at-least once保证。通过Bolt的execute方法发射的tuples被自动地锚定到inputWindow中的所有tuple中。下游的bolt会对接收到的bolt进行ack以完成tuple tree。如果不是,tuple将被replay,并且窗口计算会重新进行。



