1.概述

fxjwind
Siddhi CEP Window机制
https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-Window

https://docs.wso2.com/display/CEP400/Inbuilt+Windows#InbuiltWindows

http://wso2.com/library/articles/2013/06/understanding-siddhi-powers-wso2-cep-2x/

https://docs.wso2.com/display/CEP400/Samples+on+Processing+Events

windows机制有点晦涩,而且例子给的也不充分,这里详细看看。

2.基本语法:

from <input stream name>[<filter condition>]#window.<window name>(<parameter>, <parameter>, ... )select <attribute name>, <attribute name>, ...insert [current events | expired events | all events] into <output stream name>

2.1 window.length

直接看个例子,这里用expired event,但使用的时候往往不用expired

复制代码

"define stream cseEventStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.length(6)" +
"select symbol, price, avg(price) as ap, sum(price) as sp, count(price) as cp " +
"group by symbol " +
"insert expired events into outputStream;";

简单解释下,

  1. define,定义stream,stream中每个event的结构
  2. @info,可选,定义query的名字
  3. query的含义,对于cseEventStream,当price<700时,生成length为4的窗口
  4. 那么当windows的length超过4的时候,就会产生expired event,此时就会触发insert操作
  5. insert的内容取决于select

下面我输入如下的流数据,

int i = 0;
while (i < 10) {float p = i*10;inputHandler.send(new Object[]{"WSO2", p, 100});System.out.println("\"WSO2\", " + p);inputHandler.send(new Object[] {"IBM", p, 100});System.out.println("\"IBM\", " + p);Thread.sleep(1000);i++;
}

得到的结果部分如下,

"WSO2", 0.0
"IBM", 0.0
"WSO2", 10.0
"IBM", 10.0
"WSO2", 20.0
"IBM", 20.0
"WSO2", 30.0
receive events: 1
Event{timestamp=1447906176329, data=[WSO2, 0.0, 15.0, 30.0, 2], isExpired=false}
"IBM", 30.0
receive events: 1
Event{timestamp=1447906176331, data=[IBM, 0.0, 15.0, 30.0, 2], isExpired=false}
"WSO2", 40.0
receive events: 1
Event{timestamp=1447906177331, data=[WSO2, 10.0, 25.0, 50.0, 2], isExpired=false}
"IBM", 40.0
receive events: 1
Event{timestamp=1447906177331, data=[IBM, 10.0, 25.0, 50.0, 2], isExpired=false}

解释下,可以说明几个问题,

  1. window length = 6, 所以当发出第7个event的时,会触发expired

  2. 此时,outputStream就会收到这条expired的event

  3. 从这个event当然我们可以得到该event的所有信息,并且还可以通过aggregate functions来得到当前window中的所有events的统计值

这个地方很难以理解,得到的event只是expired的,无法得到window中的所有event,但用aggre func却可以对window你们的events做统计

这里我们做了3个统计,平均值,sum, count,这样你可以看出avg是怎么算出来的?
比如,对于

Event{timestamp=1447906176329, data=[WSO2, 0.0, 15.0, 30.0, 2], isExpired=false}

由于我们加了groupby,所以只会针对symbol=wso2的做统计,
当我们发送"WSO2", 30.0 时,会触发"WSO2", 0.0的过期,你会发现这时候去统计,这两条event都会被排除在外,参加统计的如下

"IBM", 0.0 "WSO2", 10.0 "IBM", 10.0 "WSO2", 20.0 "IBM", 20.0

所以,count为2, sum为30,而avg=15

如果不加groupby的结果如下,

"WSO2", 0.0
"IBM", 0.0
"WSO2", 10.0
"IBM", 10.0
"WSO2", 20.0
"IBM", 20.0
"WSO2", 30.0
receive events: 1
Event{timestamp=1447913986723, data=[WSO2, 0.0, 12.0, 60.0, 5], isExpired=false}
"IBM", 30.0
receive events: 1
Event{timestamp=1447913986725, data=[IBM, 0.0, 18.0, 90.0, 5], isExpired=false}

这样就不会管symbol是什么,会把window里面的全相加

这里expired event是可选的,还有current event和all event,
expired event是当event expired时触发,那么current event就是当event达到时触发,all event就是两种情况都触发,

下面我们看看如果换成all event,会是什么结果,我测的结果是和current event一样的,只会在event到达的时候触发,bug?

"WSO2", 10.0
"IBM", 10.0
receive events: 1
Event{timestamp=1447914310502, data=[WSO2, 10.0, 5.0, 10.0, 2], isExpired=false}
receive events: 1
Event{timestamp=1447914310502, data=[IBM, 10.0, 5.0, 10.0, 2], isExpired=false}
"WSO2", 20.0
"IBM", 20.0
receive events: 1
Event{timestamp=1447914311503, data=[WSO2, 20.0, 10.0, 30.0, 3], isExpired=false}
receive events: 1
Event{timestamp=1447914311503, data=[IBM, 20.0, 10.0, 30.0, 3], isExpired=false}
"WSO2", 30.0
"IBM", 30.0
receive events: 1
Event{timestamp=1447914312503, data=[WSO2, 30.0, 20.0, 60.0, 3], isExpired=false}
receive events: 1
Event{timestamp=1447914312503, data=[IBM, 30.0, 20.0, 60.0, 3], isExpired=false}
"WSO2", 40.0
"IBM", 40.0
receive events: 1
Event{timestamp=1447914313503, data=[WSO2, 40.0, 30.0, 90.0, 3], isExpired=false}
receive events: 1
Event{timestamp=1447914313503, data=[IBM, 40.0, 30.0, 90.0, 3], isExpired=false}

window.time

这个和length是一样的,只是触发条件是time

"define stream cseEventStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.time(2 sec)" +
"select symbol, price, avg(price) as ap, sum(price) as sp, count(price) as cp " +
"group by symbol " +
"insert expired events into outputStream;";

得到结果如下,

"WSO2", 0.0
"IBM", 0.0
"WSO2", 10.0
"IBM", 10.0
"WSO2", 20.0
"IBM", 20.0
receive events: 1
Event{timestamp=1447915287974, data=[WSO2, 0.0, 10.0, 10.0, 1], isExpired=false}
receive events: 1
Event{timestamp=1447915287977, data=[IBM, 0.0, 15.0, 30.0, 2], isExpired=false}
"WSO2", 30.0
"IBM", 30.0
receive events: 2
Event{timestamp=1447915288975, data=[WSO2, 10.0, 20.0, 20.0, 1], isExpired=false}
Event{timestamp=1447915288975, data=[IBM, 10.0, 20.0, 20.0, 1], isExpired=false}

可以看到,这里expire是根据时间的,所以expire不一定是在event来的时候判断,而是根据scheduled timer,如下图,

所以在算统计的时候,取决于当时间timer被触发时,window里面有几个event,所以上面的结果有可能是1,也有可能是2

window.lengthBatch;timeBatch

这种window就是非sliding的,直接看例子,

"

define stream cseEventStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.lengthBatch(4)" +
"select symbol, price " +
"insert expired events into outputStream;";

仍然是上面的输入,得到结果,

"WSO2", 0.0
"IBM", 0.0
"WSO2", 10.0
"IBM", 10.0
"WSO2", 20.0
"IBM", 20.0
"WSO2", 30.0
"IBM", 30.0
receive events: 4
Event{timestamp=1447923776094, data=[WSO2, 0.0], isExpired=false}
Event{timestamp=1447923776094, data=[IBM, 0.0], isExpired=false}
Event{timestamp=1447923776094, data=[WSO2, 10.0], isExpired=false}
Event{timestamp=1447923776094, data=[IBM, 10.0], isExpired=false}
"WSO2", 40.0
"IBM", 40.0
"WSO2", 50.0
"IBM", 50.0
receive events: 4
Event{timestamp=1447923778094, data=[WSO2, 20.0], isExpired=false}
Event{timestamp=1447923778094, data=[IBM, 20.0], isExpired=false}
Event{timestamp=1447923778094, data=[WSO2, 30.0], isExpired=false}
Event{timestamp=1447923778094, data=[IBM, 30.0], isExpired=false}

可以看到,lengthBatch设为4,当window的length达到8的时候,才触发expired

每次以一个batch进行expire,所以每次收到4条events,并且不重复的,所以window是没有sliding的

再看过timeBatch的例子,这次用 all event

"define stream cseEventStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.timeBatch(3 sec)" +
"select symbol, price " +
"insert all events into outputStream;";

结果如下,我们每发一组会sleep 1s,所以发6组后触发第一次expired,expire 6条events
并且可以看到,这次除了expire,在event reach的时候也会触发output,因为这次我们用的是all event

"WSO2", 0.0
"IBM", 0.0
"WSO2", 10.0
"IBM", 10.0
"WSO2", 20.0
"IBM", 20.0
receive events: 6
Event{timestamp=1447924146613, data=[WSO2, 0.0], isExpired=false}
Event{timestamp=1447924146614, data=[IBM, 0.0], isExpired=false}
Event{timestamp=1447924147614, data=[WSO2, 10.0], isExpired=false}
Event{timestamp=1447924147614, data=[IBM, 10.0], isExpired=false}
Event{timestamp=1447924148614, data=[WSO2, 20.0], isExpired=false}
Event{timestamp=1447924148614, data=[IBM, 20.0], isExpired=false}
"WSO2", 30.0
"IBM", 30.0
"WSO2", 40.0
"IBM", 40.0
"WSO2", 50.0
"IBM", 50.0
receive events: 12
Event{timestamp=1447924152571, data=[WSO2, 0.0], isExpired=false}
Event{timestamp=1447924152571, data=[IBM, 0.0], isExpired=false}
Event{timestamp=1447924152571, data=[WSO2, 10.0], isExpired=false}
Event{timestamp=1447924152571, data=[IBM, 10.0], isExpired=false}
Event{timestamp=1447924152571, data=[WSO2, 20.0], isExpired=false}
Event{timestamp=1447924152571, data=[IBM, 20.0], isExpired=false}
Event{timestamp=1447924149614, data=[WSO2, 30.0], isExpired=false}
Event{timestamp=1447924149614, data=[IBM, 30.0], isExpired=false}
Event{timestamp=1447924150614, data=[WSO2, 40.0], isExpired=false}
Event{timestamp=1447924150614, data=[IBM, 40.0], isExpired=false}
Event{timestamp=1447924151614, data=[WSO2, 50.0], isExpired=false}
Event{timestamp=1447924151614, data=[IBM, 50.0], isExpired=false}

但对于这样的场景,我们一般的需求是,对于batch做些统计, 例子,

"define stream cseEventStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.lengthBatch(4) " +
"select symbol, price, avg(price) as avgPrice " +
"group by symbol " +
"insert into outputStream;";

得到的结果,

"WSO2", 0.0
"IBM", 0.0
"WSO2", 10.0
"IBM", 10.0
receive events: 2
Event{timestamp=1447991871794, data=[WSO2, 10.0, 5.0], isExpired=false}
Event{timestamp=1447991871794, data=[IBM, 10.0, 5.0], isExpired=false}
"WSO2", 20.0
"IBM", 20.0
"WSO2", 30.0
"IBM", 30.0
receive events: 2
Event{timestamp=1447991873795, data=[WSO2, 30.0, 25.0], isExpired=false}
Event{timestamp=1447991873795, data=[IBM, 30.0, 25.0], isExpired=false}

可以看到,对于batch中的数据可以groupby,并进行avg统计,
注意这里,不要用expired events,否则aggre结果一直为0,因为对于batch,每次expire完后,window里面是空的。

window.externalTime

https://docs.wso2.com/display/CEP400/Sample+0114±+Using+External+Time+Windows

这个挺有用,可以以外部的时间进行slide window,因为大部分时间可能是根据采集时间,而非到达时间做聚合

但局限在于,externalTime必须递增的,有时候在实际场景中,无法保证严格的时序。

看例子,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.externalTime(time, 3 sec) " +
"select symbol, price, time, sum(price) as ap, count(price) as cp " +
"group by symbol " +
"insert expired events into outputStream;";

发送的代码如下,

int i = 0;
long time = 1447921187000L;
while (i < 10) {float p = i*10;inputHandler.send(new Object[]{"WSO2", p, time});System.out.println("\"WSO2\", " + p + ", " + time);inputHandler.send(new Object[] {"IBM", p, time});System.out.println("\"IBM\", " + p + ", " + time);Thread.sleep(1000);i++;time = time + 1000;
}

目的,就是按外部时间time,进行sliding window,结果如下,

"WSO2", 0.0, 1447921187000
"IBM", 0.0, 1447921187000
"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
"WSO2", 30.0, 1447921190000
"IBM", 30.0, 1447921190000
receive events: 2
Event{timestamp=1447921190000, data=[WSO2, 0.0, 1447921187000, 30.0, 2], isExpired=false}
Event{timestamp=1447921190000, data=[IBM, 0.0, 1447921187000, 30.0, 2], isExpired=false}
"WSO2", 40.0, 1447921191000
"IBM", 40.0, 1447921191000
receive events: 2
Event{timestamp=1447921191000, data=[WSO2, 10.0, 1447921188000, 50.0, 2], isExpired=false}
Event{timestamp=1447921191000, data=[IBM, 10.0, 1447921188000, 50.0, 2], isExpired=false}

可以看到根据传入的time,当收到"WSO2", 30.0, 1447921190000 时触发3秒的过期
其他的和普通的sliding window没有区别

window.cron

https://docs.wso2.com/display/CEP400/Sample+0115±+Quartz+scheduler+based+alerts

定时任务,其实用timeBatch也可以实现,只是cron更方便些

例子,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.cron('*/4 * * * * ?') " +
"select symbol, time, sum(price) as ap, count(price) as cp " +
"group by symbol " +
"insert into outputStream;";

关键是要理解cron的语法,参考http://www.cnblogs.com/wangyuyu/p/4230742.html

Siddhi的语法多了秒,所以第一个是秒,*/4,即每4秒触发一次

得到结果如下,可以看到确实是每4秒触发一次

"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
"WSO2", 30.0, 1447921190000
"IBM", 30.0, 1447921190000
"WSO2", 40.0, 1447921191000
"IBM", 40.0, 1447921191000
receive events: 2
Event{timestamp=1448006719652, data=[WSO2, 1447921191000, 100.0, 4], isExpired=false}
Event{timestamp=1448006719652, data=[IBM, 1447921191000, 100.0, 4], isExpired=false}
"WSO2", 50.0, 1447921192000
"IBM", 50.0, 1447921192000
"WSO2", 60.0, 1447921193000
"IBM", 60.0, 1447921193000
"WSO2", 70.0, 1447921194000
"IBM", 70.0, 1447921194000
"WSO2", 80.0, 1447921195000
"IBM", 80.0, 1447921195000
receive events: 2
Event{timestamp=1448006723653, data=[WSO2, 1447921195000, 260.0, 4], isExpired=false}
Event{timestamp=1448006723653, data=[IBM, 1447921195000, 260.0, 4], isExpired=false}

window.unique, window.firstUnique

功能如其意,直接看例子,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.unique(symbol) " +
"select symbol, price, time " +
"insert into outputStream;";

得到结果,从结果看起来,就和普通的流流过一样,
因为每次这个symbol有更新都会触发一次event,

"WSO2", 0.0, 1447921187000
"IBM", 0.0, 1447921187000
receive events: 2
Event{timestamp=1448009613618, data=[WSO2, 0.0, 1447921187000], isExpired=false}
Event{timestamp=1448009613620, data=[IBM, 0.0, 1447921187000], isExpired=false}
"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
receive events: 1
Event{timestamp=1448009614633, data=[WSO2, 10.0, 1447921188000], isExpired=false}
receive events: 1
Event{timestamp=1448009614633, data=[IBM, 10.0, 1447921188000], isExpired=false}
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
receive events: 2
Event{timestamp=1448009615650, data=[WSO2, 20.0, 1447921189000], isExpired=false}
Event{timestamp=1448009615650, data=[IBM, 20.0, 1447921189000], isExpired=false}
"WSO2", 30.0, 1447921190000
receive events: 1
"IBM", 30.0, 1447921190000
Event{timestamp=1448009616650, data=[WSO2, 30.0, 1447921190000], isExpired=false}
receive events: 1
Event{timestamp=1448009616650, data=[IBM, 30.0, 1447921190000], isExpired=false}

再看看first unique,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.firstUnique(symbol) " +
"select symbol, price, time " +
"insert into outputStream;";

得到的结果,可以看到只有symbol第一次出现时,会触发

"WSO2", 0.0, 1447921187000
"IBM", 0.0, 1447921187000
receive events: 1
Event{timestamp=1448008769827, data=[WSO2, 0.0, 1447921187000], isExpired=false}
receive events: 1
Event{timestamp=1448008769831, data=[IBM, 0.0, 1447921187000], isExpired=false}
"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
"WSO2", 30.0, 1447921190000
"IBM", 30.0, 1447921190000
"WSO2", 40.0, 1447921191000
"IBM", 40.0, 1447921191000
"WSO2", 50.0, 1447921192000
"IBM", 50.0, 1447921192000
"WSO2", 60.0, 1447921193000
"IBM", 60.0, 1447921193000
"WSO2", 70.0, 1447921194000
"IBM", 70.0, 1447921194000
"WSO2", 80.0, 1447921195000
"IBM", 80.0, 1447921195000
"WSO2", 90.0, 1447921196000
"IBM", 90.0, 1447921196000

这个往往和join会同时使用,如

from SymbolStream#window.lenght(1) unidirectional join StockExchangeStream#window.unique("symbol")
insert  into StockQuote StockExchangeStream.symbol as symbol,StockExchangeStream.price as lastTradedPrice

Output Rate Limiting

只所以在这里介绍这个,是因为觉得和unique一起用,很合适

基本语法,

output ({<output-type>} every (<time interval>|<event interval> events) | snapshot every <time interval>)

其中"<output-type>","first", "last" and "all",默认是all

比如普通的window,如果每条都触发,太频繁了,我只想固定条数或时间触发一次就可以
这个对于unique尤为合适,因为使用unique,一般是只想知道最新的情况,所以每一条都触发是没有意义的,定期触发就可以

还是用前面的例子,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.unique(symbol) " +
"select symbol, price, time " +
"group by symbol " +
"output last every 5 events " +
"insert into outputStream;";

得到的结果,虽然加上group by symbol,所以每次都会分别输出wso2,ibm两条
但是对于event数的判断还是合一块的,并不是5条wso2或5条ibm触发

"WSO2", 0.0, 1447921187000
"IBM", 0.0, 1447921187000
"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
receive events: 2
Event{timestamp=1448010405404, data=[WSO2, 20.0, 1447921189000], isExpired=false}
Event{timestamp=1448010404405, data=[IBM, 10.0, 1447921188000], isExpired=false}
"WSO2", 30.0, 1447921190000
"IBM", 30.0, 1447921190000
"WSO2", 40.0, 1447921191000
"IBM", 40.0, 1447921191000
receive events: 2
Event{timestamp=1448010407404, data=[IBM, 40.0, 1447921191000], isExpired=false}
Event{timestamp=1448010407404, data=[WSO2, 40.0, 1447921191000], isExpired=false}

用时间也是一样的,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.unique(symbol) " +
"select symbol, price, time " +
"group by symbol " +
"output last every 5 sec " +
"insert into outputStream;";

结果如下,

"WSO2", 0.0, 1447921187000
"IBM", 0.0, 1447921187000
"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
"WSO2", 30.0, 1447921190000
"IBM", 30.0, 1447921190000
"WSO2", 40.0, 1447921191000
"IBM", 40.0, 1447921191000
receive events: 2
Event{timestamp=1448010645533, data=[WSO2, 40.0, 1447921191000], isExpired=false}
Event{timestamp=1448010645533, data=[IBM, 40.0, 1447921191000], isExpired=false}
"WSO2", 50.0, 1447921192000
"IBM", 50.0, 1447921192000
"WSO2", 60.0, 1447921193000
"IBM", 60.0, 1447921193000
"WSO2", 70.0, 1447921194000
"IBM", 70.0, 1447921194000
"WSO2", 80.0, 1447921195000
"IBM", 80.0, 1447921195000
"WSO2", 90.0, 1447921196000
"IBM", 90.0, 1447921196000
receive events: 2
Event{timestamp=1448010650533, data=[WSO2, 90.0, 1447921196000], isExpired=false}
Event{timestamp=1448010650533, data=[IBM, 90.0, 1447921196000], isExpired=false}

snapshot功能,emit all current events arrived so far,这个一般不会直接这么用,想不出啥场景

例子,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.unique(symbol) " +
"select symbol, price, time " +
"group by symbol " +
"output snapshot every 2 sec " +
"insert into outputStream;";

结果如下,

"WSO2", 0.0, 1447921187000
"IBM", 0.0, 1447921187000
"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
receive events: 4
Event{timestamp=1448011434403, data=[WSO2, 0.0, 1447921187000], isExpired=false}
Event{timestamp=1448011434405, data=[IBM, 0.0, 1447921187000], isExpired=false}
Event{timestamp=1448011435405, data=[WSO2, 10.0, 1447921188000], isExpired=false}
Event{timestamp=1448011435405, data=[IBM, 10.0, 1447921188000], isExpired=false}
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
"WSO2", 30.0, 1447921190000
"IBM", 30.0, 1447921190000
receive events: 8
Event{timestamp=1448011434403, data=[WSO2, 0.0, 1447921187000], isExpired=false}
Event{timestamp=1448011434405, data=[IBM, 0.0, 1447921187000], isExpired=false}
Event{timestamp=1448011435405, data=[WSO2, 10.0, 1447921188000], isExpired=false}
Event{timestamp=1448011435405, data=[IBM, 10.0, 1447921188000], isExpired=false}
Event{timestamp=1448011436405, data=[WSO2, 20.0, 1447921189000], isExpired=false}
Event{timestamp=1448011436405, data=[IBM, 20.0, 1447921189000], isExpired=false}
Event{timestamp=1448011437405, data=[WSO2, 30.0, 1447921190000], isExpired=false}
Event{timestamp=1448011437405, data=[IBM, 30.0, 1447921190000], isExpired=false}

window.sort

在window中排序,

<event> sort(<int> windowLength, <string> attribute, <string> order, .. , <string> attributeN, <string> orderN)order,"asc" or "desc",默认为asc

例子,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.sort(3, price, 'asc') " +
"select symbol, price, time " +
"group by symbol " +
"insert all events into outputStream;";

length为3,对price升序;这里的意思是,当window length >3时,即4,会输出按price升序排序,最大的那个event

结果如下,

"WSO2", 0.0, 1447921187000
"IBM", 0.0, 1447921187000
Events{ @timeStamp = 1448875633289, inEvents = [Event{timestamp=1448875633289, data=[WSO2, 0.0, 1447921187000], isExpired=false}], RemoveEvents = null }
Events{ @timeStamp = 1448875633290, inEvents = [Event{timestamp=1448875633290, data=[IBM, 0.0, 1447921187000], isExpired=false}], RemoveEvents = null }
"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
Events{ @timeStamp = 1448875634291, inEvents = [Event{timestamp=1448875634291, data=[WSO2, 10.0, 1447921188000], isExpired=false}], RemoveEvents = null }
Events{ @timeStamp = 1448875634291, inEvents = [Event{timestamp=1448875634291, data=[IBM, 10.0, 1447921188000], isExpired=false}], RemoveEvents = [Event{timestamp=1448875634291, data=[IBM, 10.0, 1447921188000], isExpired=true}] }
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
Events{ @timeStamp = 1448875635292, inEvents = [Event{timestamp=1448875635292, data=[WSO2, 20.0, 1447921189000], isExpired=false}], RemoveEvents = [Event{timestamp=1448875635292, data=[WSO2, 20.0, 1447921189000], isExpired=true}] }
Events{ @timeStamp = 1448875635292, inEvents = [Event{timestamp=1448875635292, data=[IBM, 20.0, 1447921189000], isExpired=false}], RemoveEvents = [Event{timestamp=1448875635292, data=[IBM, 20.0, 1447921189000], isExpired=true}] }

可以看到,大于3的时候,current event和expired event收到的都是一样的,因为是asc排序,所以大于前3个的都会被过期

window.frequent;window.lossyFrequent

<event> frequent(<int> eventCount, <string> attribute, .. , <string> attributeN), based on Misra-Gries counting algorithm, 参考http://www.zhihu.com/question/23480657

这个processor的实现原理参考,http://mail.wso2.org/mailarchive/dev/2015-September/055230.html

说实在的,如果对这个算法不了解,相当的晦涩,

"define stream cseEventStream (symbol string, price float, time long);" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.frequent(2, symbol) " +
"select symbol, price, time " +
"insert all events into outputStream;";

frequent的意思,就是你接收current events,如果当前stream的event,是属于top frequent的,就会输出,否则就会丢掉
说白了,从current events,你可以一直重复的收到属于top frequent的event,其他的则会丢掉

输入如下,

String str = "attributes to attributes to to events. If no no no no attributes";
String[] strs = str.split(" ");
for(String s:strs){float p = i*10;inputHandler.send(new Object[]{s, p, time});System.out.println(s + ", " + p + ", " + time);Thread.sleep(1000);i++;time = time + 1000;
}

得到结果,来分析一下,

attributes, 0.0, 1447921187000
Events{ @timeStamp = 1448873866506, inEvents = [Event{timestamp=1448873866506, data=[attributes, 0.0, 1447921187000], isExpired=false}], RemoveEvents = null }
to, 10.0, 1447921188000
Events{ @timeStamp = 1448873867509, inEvents = [Event{timestamp=1448873867509, data=[to, 10.0, 1447921188000], isExpired=false}], RemoveEvents = null }
attributes, 20.0, 1447921189000
Events{ @timeStamp = 1448873868509, inEvents = [Event{timestamp=1448873868509, data=[attributes, 20.0, 1447921189000], isExpired=false}], RemoveEvents = null }
to, 30.0, 1447921190000
Events{ @timeStamp = 1448873869509, inEvents = [Event{timestamp=1448873869509, data=[to, 30.0, 1447921190000], isExpired=false}], RemoveEvents = null }
to, 40.0, 1447921191000
Events{ @timeStamp = 1448873870509, inEvents = [Event{timestamp=1448873870509, data=[to, 40.0, 1447921191000], isExpired=false}], RemoveEvents = null }
events., 50.0, 1447921192000
If, 60.0, 1447921193000
Events{ @timeStamp = 1448873872509, inEvents = [Event{timestamp=1448873872509, data=[If, 60.0, 1447921193000], isExpired=false}], RemoveEvents = [Event{timestamp=1448873868509, data=[attributes, 20.0, 1447921189000], isExpired=true}] }
no, 70.0, 1447921194000
Events{ @timeStamp = 1448873873509, inEvents = [Event{timestamp=1448873873509, data=[no, 70.0, 1447921194000], isExpired=false}], RemoveEvents = [Event{timestamp=1448873870509, data=[to, 40.0, 1447921191000], isExpired=true}, Event{timestamp=1448873872509, data=[If, 60.0, 1447921193000], isExpired=true}] }

前面一直都没有问题,一直输入attributes,to,
直到输入events.,因为attributes,to已经占满2个位置,所以要触发过期,window里面的所有event的frequency减1,过期frequency=0的event
可是这里attributes,to的frequent都是大于0的,所以window里面没有可以expire的event,
那么只能把当前的events.给丢掉了,所以在current events中并没有收到这个event,‘events.’
因为我们只能收到top frequent的events

到收到if,再次触发expire,window里面的所有event的frequency再次减1,
此时,attributes的frequency已经为0,所以attribute被过期,而event,‘if’,被放入window中,
所以此时,我们会在current events中看到‘if’,而在expired events中看到‘attributes’

<event> lossyFrequent(<double> supportThreshold, <double> errorBound, <string> attribute, .. , <string> attributeN), based on Lossy Counting algorithm, 参考http://stackoverflow.com/questions/8033012/what-is-lossy-counting

没测,应该是判断过期的算法不一样,其他差不多

【Siddhi】Siddhi的window操作相关推荐

  1. 2021年大数据Flink(十八):Flink Window操作

    目录 ​​​​​​​Flink-Window操作 为什么需要Window Window的分类 按照time和count分类 ​​​​​​​按照slide和size分类 ​​​​​​​总结 Window ...

  2. DOM、Window操作

    一.DOM的基本概念 DOM是文档对象模型,这种模型为树模型:文档是指标签文档:对象是指文档中每个元素:模型是指抽象化的东西. 一.基本语法: 数据类型(字符串,小数,整数,布尔,时间) var, v ...

  3. 通过Spark Streaming的window操作实战模拟热点搜索词案例实战

    本博文主要内容包括: 1.在线热点搜索词实现解析 2.SparkStreaming 利用reduceByKeyAndWindow实现在线热点搜索词实战 一:在线热点搜索词实现解析 背景描述:在社交网络 ...

  4. window操作系统安装多个版本的nodejs——nodejs版本控制工具nvm

    前言 在学习前端框架或者是nodejs时,有时候某些框架对nodejs的版本有要求.但此时你的电脑上已经安装了10.x版本的nodejs,你不想直接更新到12.x,想同时保存10.x和12.x版本,在 ...

  5. 006 window操作系统安装(GHO)

    百度百科的介绍 公司推荐使用原版(不是正版花2000¥的意思,一般品牌机都自带正版的家庭版window) 百度gho window 带有ghost字眼的就是,下载后一般都是ISO文件(包含PE和GHO ...

  6. 【window操作】windows定时打开网页或程序

    1 控制面板-管理工具-任务计划程序 2  创建任务 3  填写名称 4  触发器-个性化定制时间 5 操作-新建-浏览,选择需要打开的程序(如chrome) 6 添加参数-如百度搜索首页网址http ...

  7. Window操作系统补丁知多少?

    我们每天使用的Windows操作系统一个非常复杂的软件系统,因此它难免会存在许多的程序漏洞,这些漏洞会被病毒.木马.恶意脚本.黑客利用,从而严重影响电脑使用和网络的安全和畅通.微软公司会不断发布升级程 ...

  8. Window操作系统补丁知多少

    我们每天使用的windows操作系统一个非常复杂的软件系统,因此它难免会存在许多的程序漏洞,这些漏洞会被病毒.木马.恶意脚本.黑客利用,从而严重影响电脑使用和 网络 的 安全 和畅通. 微软 公司会不 ...

  9. c# 模拟window 操作鼠标|winapi

    View Code [DllImport("user32.dll", EntryPoint = "mouse_event", SetLastError = tr ...

最新文章

  1. 安全狗:云时代的服务器安全守护者
  2. Shell中自定义函数
  3. AngularJS+RequireJS集成环境
  4. C语言中的各输出格式含义
  5. 如何将一个文件分割成多个小文件
  6. 搞笑向, 面向IE8的webworker-ployfill
  7. thinkphp删除某一学生_基于ThinkPHP实现批量删除
  8. [html] 百度、淘宝、京东移动端首页秒开是如何做到的?
  9. maven 本地仓库添加jar包
  10. Jenkins手动更新AWS 上面的ECS服务
  11. css代码样式大全(整理版)
  12. 经济应用文写作【10】
  13. 区块链Baas平台纳管实战
  14. [leetcode] 884. Uncommon Words from Two Sentences
  15. wifi辐射知多少【解疑答惑篇】
  16. 在PB中存图片入数据库及显示图片
  17. clickhouse 集群异常排查处理总结
  18. 氢原子光谱、类氢原子光谱和类氢离子光谱
  19. 新一代三维GIS技术在交通行业的创新应用
  20. 搬:五大车载操作(VOS)系统优劣对比,车载系统架构分析

热门文章

  1. 华为一所英国研发中心建设项目获当地批准 计划总投资4亿英镑
  2. 全球蜂窝基带芯片厂商去年营收209亿美元,华为海思占16%
  3. 疫情相关电信诈骗涉案金额超3.51亿元,谨防这四类骗局!
  4. 中国恒大:半个月网上售房近10万套 认购金额1026.7亿元
  5. 小米大杀器稳了?队友泄露小米MIX4 5G预售页面...
  6. 苹果设备频繁杀后台问题在iOS 13.2.2正式版更新后得以抑制
  7. 雷军:小米CC9 Pro人像镜头简直太奢华了
  8. 为用户提供出游指南,抖音发起“抖音美好打卡地”文旅认证品牌
  9. 霸气!任正非:即使美企断供 也无法威胁华为生存
  10. 中国联通辟谣“不支持华为”:恶意诽谤 将通过法律手段维护权益