英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial

----------------

Trident是在storm基础上,一个以realtime 计算为目标的高度抽象。 它在提供处理大吞吐量数据能力的同时,也提供了低延时分布式查询和有状态流式处理的能力。 如果你对Pig和Cascading这种高级批量处理工具很了解的话,那么应该毕竟容易理解Trident,因为他们之间很多的概念和思想都是类似的。Tident提供了 joins, aggregations, grouping, functions, 以及 filters等能力。除此之外,Trident 还提供了一些专门的原语,从而在基于数据库或者其他存储的前提下来应付有状态的递增式处理。

举例说明

让我们一起来看一个Trident的例子。在这个例子中,我们主要做了两件事情:

  1. 从一个流式输入中读取语句病计算每个单词的个数
  2. 提供查询给定单词列表中每个单词当前总数的功能
因为这只是一个例子,我们会从如下这样一个无限的输入流中读取语句作为输入:
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,new Values("the cow jumped over the moon"),new Values("the man went to the store and bought some candy"),new Values("four score and seven years ago"),new Values("how many apples can you eat"),
spout.setCycle(true);

这个spout会循环输出列出的那些语句到sentence stream当中,下面的代码会以这个stream作为输入并计算每个单词的个数:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =topology.newStream("spout1", spout).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                .parallelismHint(6);

让我们一起来读一下这段代码。我们首先创建了一个TridentTopology对象。TridentTopology类相应的接口来构造Trident计算过程中的所有内容。我们在调用了TridentTopology类的newStream方法时,传入了一个spout对象,spout对象会从外部读取数据并输出到当前topology当中,从而在topology中创建了一个新的数据流。在这个例子中,我们使用了上面定义的FixedBatchSpout对象。输入数据源同样也可以是如Kestrel或者Kafka这样的队列服务。Trident会再Zookeeper中保存一小部分状态信息来追踪数据的处理情况,而在代码中我们指定的字符串“spout1”就是Zookeeper中用来存储metadata信息的Znode节点

Trident在处理输入stream的时候会把输入转换成若干个tuple的batch来处理。比如说,输入的sentence stream可能会被拆分成如下的batch:

一般来说,这些小的batch中的tuple可能会在数千或者数百万这样的数量级,这完全取决于你的输入的吞吐量。

Trident提供了一系列非常成熟的批量处理的API来处理这些小batch. 这些API和你在Pig或者Cascading中看到的非常类似, 你可以做group by's, joins, aggregations, 运行 functions, 执行 filters等等。当然,独立的处理每个小的batch并不是非常有趣的事情,所以Trident提供了很多功能来实现batch之间的聚合的结果并可以将这些聚合的结果存储到内存,Memcached, Cassandra或者是一些其他的存储中。同时,Trident还提供了非常好的功能来查询实时状态。这些实时状态可以被Trident更新,同时它也可以是一个独立的状态源。

回到我们的这个例子中来,spout输出了一个只有单一字段“sentence”的数据流。在下一行,topology使用了Split函数来拆分stream中的每一个tuple,Split函数读取输入流中的“sentence”字段并将其拆分成若干个word tuple。每一个sentence tuple可能会被转换成多个word tuple,比如说"the cow jumped over the moon" 会被转换成6个 "word" tuples. 下面是Split的定义:

public class Split extends BaseFunction {public void execute(TridentTuple tuple, TridentCollector collector) {String sentence = tuple.getString(0);for(String word: sentence.split(" ")) {collector.emit(new Values(word));                }}
}

如你所见,真的很简单。它只是简单的根据空格拆分sentence,并将拆分出的每个单词作为一个tuple输出。

topology的其他部分计算单词的个数并将计算结果保存到了持久存储中。首先,word stream被根据“word”字段进行group操作,然后每一个group使用Count聚合器进行持久化聚合。persistentAggregate会帮助你把一个状态源聚合的结果存储或者更新到存储当中。在这个例子中,单词的数量被保持在内存中,不过我们可以很简单的把这些数据保存到其他的存储当中,如 Memcached, Cassandra等。如果我们要把结果存储到Memcached中,只是简单的使用下面这句话替换掉persistentAggregate就可以,这当中的"serverLocations"是Memcached cluster的主机和端口号列表:

.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))
MemcachedState.transactional()

persistentAggregate存储的数据就是所有batch聚合的结果。

Trident非常酷的一点就是它是完全容错的,拥有者有且只有一次处理的语义。这就让你可以很轻松的使用Trident来进行实时数据处理。Trident会把状态以某种形式保持起来,当有错误发生时,它会根据需要来恢复这些状态。

persistentAggregate方法会把数据流转换成一个TridentState对象。在这个例子当中,TridentState对象代表了所有的单词的数量。我们会使用这个TridentState对象来实现在计算过程中的进行分布式查询。

下面这部分实现了一个低延时的单词数量的分布式查询。这个查询以一个用空格分割的单词列表为输入,并返回这些单词当天的个数。这些查询是想普通的RPC调用那样被执行的,要说不同的话,那就是他们在后台是并行执行的。下面是执行查询的一个例子:

DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
// prints the JSON-encoded result, e.g.: "[[5078]]"

如你所见,除了这是并发执行在storm cluster上之外,这看上去就是一个正常的RPC调用。这样的简单查询的延时通常在10毫秒左右。当然,更负责的DRPC调用可能会占用更长的时间,尽管延时很大程度上是取决于你给计算分配了多少资源。

这个分布式查询的实现如下所示:

topology.newDRPCStream("words").each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));

我们仍然是使用TridentTopology对象来创建DRPC stream,并且我们将这个函数命名为“words”。这个函数名会作为第一个参数在使用DRPC Client来执行查询的时候用到。

每个DRPC请求会被当做只有一个tuple的batch来处理。在处理的过程中,以这个输入的单一tuple来表示这个请求。这个tuple包含了一个叫做“args”的字段,在这个字段中保存了客户端提供的查询参数。在这个例子中,这个参数是一个以空格分割的单词列表。

首先,我们使用Splict功能把入参拆分成独立的单词。然后对“word” 进行group by操作,之后就可以使用stateQuery来在上面代码中创建的TridentState对象上进行查询。stateQuery接受一个数据源(在这个例子中,就是我们的topolgoy所计算的单词的个数)以及一个用于查询的函数作为输入。在这个例子中,我们使用了MapGet函数来获取每个单词的出现个数。由于DRPC stream是使用跟TridentState完全同样的group方式(按照“word”字段进行group),每个单词的查询会被路由到TridentState对象管理和更新这个单词的分区去执行。

接下来,我们用FilterNull这个过滤器把从未出现过的单词给去掉,并使用Sum这个聚合器将这些count累加起来。最终,Trident会自动把这个结果发送回等待的客户端。

Trident在如何最大程度的保证执行topogloy性能方面是非常智能的。在topology中会自动的发生两件非常有意思的事情:

  1. 读取和更新状态的操作 (比如说 persistentAggregate 和 stateQuery) 会自动的是batch的形式操作状态。 如果有20次更新需要被同步到存储中,Trident会自动的把这些操作汇总到一起,只做一次读一次写,而不是进行20次读20次写的操作。因此你可以在很方便的执行计算的同时,保证了非常好的性能。
  2. Trident的聚合器已经是被优化的非常好了的。Trident并不是简单的把一个group中所有的tuples都发送到同一个机器上面进行聚合,而是在发送之前已经进行过一次部分的聚合。打个比方,Count聚合器会先在每个partition上面进行count,然后把每个分片count汇总到一起就得到了最终的count。这个技术其实就跟MapReduce里面的combiner是一个思想。

让我们再来看一下Trident的另外一个例子。

Reach

下一个例子是一个纯粹的DRPC topology。这个topology会计算一个给定URL的reach。那么什么事reach呢,这里我们将reach定义为有多少个独立用户在Twitter上面expose了一个给定的URL,那么我们就把这个数量叫做这个URL的reach。要计算reach,你需要tweet过这个URL的所有人,然后找到所有follow这些人的人,并将这些follower去重,最后就得到了去重后的follower的数量。如果把计算reach的整个过程都放在一台机器上面,就太勉强了,因为这会需要进行数千次数据库调用以及上一次的tuple的读取。如果使用Storm和Trident,你就可以把这些计算步骤在整个cluster中进行并发。

这个topology会读取两个state源。一个用来保存URL以及tweet这个URL的人的关系的数据库。还有一个保持人和他的follower的关系的数据库。topology的定义如下:

TridentState urlToTweeters =topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers =topology.newStaticState(getTweeterToFollowersState());topology.newDRPCStream("reach").stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).parallelismHint(200).each(new Fields("followers"), new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields("one")).parallelismHint(20).aggregate(new Count(), new Fields("reach"));

这个topology使用newStaticState方法创建了TridentState对象来代表一种外部存储。使用这个TridentState对象,我们就可以在这个topology上面进行动态查询了。和所有的状态源一样,在数据库上面的查找会自动被批量执行,从而最大程度的提升效率。

这个topology的定义是非常直观的 - 只是一个简单的批量处理job。首先,查询urlToTweeters数据库来得到tweet过这个URL的人员列表。这个查询会返回一个列表,因此我们使用ExpandList函数来把每一个反悔的tweeter转换成一个tuple。

接下来,我们来获取每个tweeter的follower。我们使用shuffle来把要处理的tweeter分布到toplology运行的每一个worker中并发去处理。然后查询follower数据库从而的到每个tweeter的follower。你可以看到我们为topology的这部分分配了很大的并行度,这是因为这部分是整个topology中最耗资源的计算部分。

然后我们在follower上面使用group by操作进行分组,并对每个组使用一个聚合器。这个聚合器只是简单的针对每个组输出一个tuple “One”,再count “One” 从而的到不同的follower的数量。“One”聚合器的定义如下:

public class One implements CombinerAggregator<Integer> {public Integer init(TridentTuple tuple) {return 1;}public Integer combine(Integer val1, Integer val2) {return 1;}public Integer zero() {return 1;}
}

这是一个"汇总聚合器", 它会在传送结果到其他worker汇总之前进行局部汇总,从而来最大程度上提升性能。Sum也是一个汇总聚合器,因此以Sum作为topology的最终操作是非常高效的。

接下来让我们一起来看看Trident的一些细节。

Fields and tuples

Trident的数据模型就是TridentTuple - 一个有名的值的列表。在一个topology中,tuple是在一系列的处理操作(operation)中增量生成的。operation一般以一组子弹作为输入并输出一组功能字段。Operation的输入字段经常是输入tuple的一个子集,而功能字段则是operation的输出。

看一下如下这个例子。假定你有一个叫做“stream”的stream,它包含了“x”,"y"和"z"三个字段。为了运行一个读取“y”作为输入的过滤器MyFilter,你可以这样写:

stream.each(new Fields("y"), new MyFilter())

假定MyFilter的实现是这样的:

public class MyFilter extends BaseFilter {public boolean isKeep(TridentTuple tuple) {return tuple.getInteger(0) < 10;}
}

这会保留所有“y”字段小于10的tuples。TridentTuple传个MyFilter的输入将只有字段“y”。这里需要注意的是,当选择输入字段时,Trident会自动投影tuple的一个子集,这个操作是非常高效的。

让我们一起看一下“功能字段”是怎样工作的。假定你有如下这个功能:

public class AddAndMultiply extends BaseFunction {public void execute(TridentTuple tuple, TridentCollector collector) {int i1 = tuple.getInteger(0);int i2 = tuple.getInteger(1);collector.emit(new Values(i1 + i2, i1 * i2));}
}

这个函数接收两个数作为输入并输出两个新的值:“和”和“乘积”。假定你有一个stream,其中包含“x”,"y"和"z"三个字段。你可以这样使用这个函数:

stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));

输出的功能字段被添加到输入tuple中。因此这个时候,每个tuple中将会有5个字段"x", "y", "z", "added", 和 "multiplied". "added" 和"multiplied"对应于AddAndMultiply输出的第一和第二个字段。

另外,我们可以使用聚合器来用输出字段来替换输入tuple。如果你有一个stream包含字段"val1"和"val2",你可以这样做:

stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

output stream将会只包含一个叫做“sum”的字段,这个sum字段就是“val2”的累积和。

在group之后的stream上,输出将会是被group的字段以及聚合器输出的字段。举例如下:

stream.groupBy(new Fields("val1")).aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

在这个例子中,输出将包含字段"val1" 和 "sum".

State

在实时计算领域的一个主要问题就是怎么样来管理状态并能轻松应对错误和重试。消除错误的影响是非常重要的,因为当一个节点死掉,或者一些其他的问题出现时,那行batch需要被重新处理。问题是-你怎样做状态更新来保证每一个消息被处理且只被处理一次?

这是一个很棘手的问题,我们可以用接下来的例子进一步说明。假定你在做一个你的stream的计数聚合,并且你想要存储运行时的count到一个数据库中去。如果你只是存储这个count到数据库中,并且想要进行一次更新,我们是没有办法知道同样的状态是不是以前已经被update过了的。这次更新可能在之前就尝试过,并且已经成功的更新到了数据库中,不过在后续的步骤中失败了。还有可能是在上次更新数据库的过程中失败的,这些你都不知道。

Trident通过做下面两件事情来解决这个问题:

  1. 每一个batch被赋予一个唯一标识id“transaction id”。如果一个batch被重试,它将会拥有和之前同样的transaction id
  2. 状态更新是以batch为单位有序进行的。也就是说,batch 3的状态更新必须等到batch 2的状态更新成功之后才可以进行。

有了这2个原则,你就可以达到有且只有一次更新的目标。你可以将transaction id和count一起以原子的方式存到数据库中。当更新一个count的时候,需要判断数据库中当前batch的transaction id。如果跟要更新的transaction id一样,就跳过这次更新。如果不同,就更新这个count。

当然,你不需要在topology中手动处理这些逻辑。这些逻辑已经被封装在Stage的抽象中并自动进行。你的Stage object也不需要自己去实习transaction id的跟踪操作。如果你想了解更多的关于如何实现一个Stage以及在容错过程中的一些取舍问题,可以参照这篇文章.

一个Stage可以采用任何策略来存储状态。它可以存储到一个外部的数据库,也可以在内存中保持状态并备份到HDFS中。Stage并不需要永久的保持状态。比如说,你有一个内存版的Stage实现,它保存最近X个小时的数据并丢弃老的数据。可以把 Memcached integration 作为例子来看看State的实现.

Execution of Trident topologies

Trident的topology会被编译成尽可能高效的Storm topology。只有在需要对数据进行repartition的时候(如groupby或者shuffle)才会把tuple通过network发送出去,如果你有一个trident如下:

它将会被编译成如下的storm topology:

Conclusion

Trident使得实时计算更加优雅。你已经看到了如何使用Trident的API来完成大吞吐量的流式计算,状态维护,低延时查询等等功能。Trident让你在获取最大性能的同时,以更自然的一种方式进行实时计算。

专题一:Trident State 详解

一、什么是Trident State

直译过来就是trident状态,这里的状态主要涉及到Trident如何实现一致性语义规则,Trident的计算结果将被如何提交,如何保存,如何更新等等。我们知道Trident的计算都是以batch为单位的,但是batch在中的tuple在处理过程中有可能会失败,失败之后bach又有可能会被重播,这就涉及到很多事务一致性问题。Trident State就是管理这些问题的一套方案,与这套方案对应的就是Trident State API。这样说可能还比较抽象,下面就用一个例子具体说明一下。

1.1 举例具体例子来说明

假设有这么一个需求,统计一个数据流中各个单词出现的数量,并把单词和其数量更新到数据库中。假设我们在数据库中只有两个字段,单词和其数量,在计数过程中,如果遇到相同的单词则就把其数量加一。但是这么做有一个问题,如果某个单词是被重播的单词,就有可能导致这个单词被多加了一遍。因此,在数据库中只保存单词和其数量两个字段是无法做到“数据只被处理一次”的语义要求的。

1.2 Trident是怎么解决这个问题的呢?

Trident定义了如下语义规则:

1. 所有的Tuple都是以batch的形式处理的
2. 每个batch都会被分配一个唯一的“transaction id”(txid),如果batch被重发,txid不变
3. 各个batch状态的更新是有序的,也就是说batch2一定会在batch3之前更新

有了这三个规则,我们就可以通过txid知道batch是否被处理过,然后就可以根据实际情况来更新状态信息了。很明显,要满足这几个语义规则,就需要spout来支持,因为把tuple封装成batch,分配txid等等都是有spout来负责的。

但是在具体应用场景中,storm应该能够提供不同的容错级别,因为某些情况下我们并不需要强一致性。为了更灵活的处理,Trident提供了三类spout,分别是:

1. Transactional spouts : 事务spout,提供了强一致性
2. Opaque Transactional spouts:不透明事务spout,提供了弱一致性
3. No-Transactional spouts:非事务spout,对一致性无法保证
  • 注意,所有的Trident Spout都是以batch的形式发送数据,每个batch也都会分配一个唯一的txid,决定它们有不同性质的地方在于它们对各自的batch提供了什么样的保证。

1.3 Trident State的类型

我们已经知道Trident 提供了三种类型的spout来服务Trident State管理,那么对应的Trident State也有三种类型:

1. Transactional
2. Opaque Transactional
3. No-Transactional
  • 二、各类Trident Spout详解

2.1 Transactional spouts

Transactional spouts对batch的发送提供了如下保证:

1. 相同txid的batch完全一样,如果一个batch被重播,重播的batch的txid及其所有tuple和原batch的完全一致
2. 两个batch中的tuple不会有重合
3. 每个tuple都在batch中,不会有batch漏掉某个tuple

这三个特性是“最完美”的保证,也最容易理解,Stream被分割成固定的batch,而且不会改变。Storm就提供了一个Transactional spout的实现:TransactionalTridentKafkaSpout。

我们现在再看上面1.1节提到的那个实例,我们要把单词和其数量保存在数据库中,为了保证“数据只被处理一次”,除了要保存单词和数量两个字段之外,我们再加一个字段txid。在更新数据时,我们先对比一下当前的数据的txid和数据库中数据的txid,若txid相同,说明是被重播的数据,直接跳过即可,如果不同,则把两个数值相加即可。

下面具体说明一下,假设当前处理的batch的txid=3,其中的tuples为:

[man]
[man]
[dog]
  • 再假设数据库中保存的数据为:
man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
  • 数据库中“man”单词的txid为1,而当前batch的txid为3,说明当前batch中的“man”单词未被累加过,所以需要把当前batch中”man”的个数累加到数据库中。数据库中“dog”单词的txid为3,和当前batch的txid相同,说明已经被累计过了直接跳过。最终数据库中的结果变为:
man => [count=5, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
  • 总结一下整个处理过程:
if(database txid=current txid){//两次更新的txid相同跳过;
}else{用current value替换掉database value;
}
  • 2.2 Opaque Transactional spouts

上面已经提到过,并不是所有情形下都需要保证强一致性。例如在TransactionalTridentKafkaSpout中(关于Kafka相关介绍,点这里),如果它的一个batch中的tuples来自一个topic的所有partitions,如果要满足Transactionnal Spout语义的话,一旦这个batch因为某些失败而被重发,重发batch中的所有tuple必须与这个batch中的完全一致,而恰好kafka集群某个节点down掉导致这个topic其中一个partition无法使用,那么就会导致这个batch无法凑齐所有tuple(无法获取失败partition上的数据),整个处理过程被挂起。而Opaque Transactional spouts就可以解决这个问题。

Opaque Transactional spouts提供了如下保证:

- 每个tuple只在一个batch中被成功处理,如果一个batch中的tuple处理失败的话,会被后面的batch继续处理
  • 怎么理解这个特性呢,简要来说就OpaqueTransactional spout和Transactional spouts基本差不多,只是在Opaque Transactional spout中,相同txid的batch中的tuple集合可能不一样。OpaqueTridentKafkaSpout就是符合这种特性的spout的,所以它可以容忍kafka节点失败。

因为重播的batch中的tuple集合可能不一样,所以对于Opaque Transactional Spout,就不能根据txid是否一致来决定是否需要更新状态了。我们需要在数据库中保存更多的状态信息,除了单词名,数量、txid之外,我们还需要保存一个pre-value来记录前一次计算的值。我们再用上面例子具体说明一下。

假设数据库中的记录如下:

{ value = 4,preValue = 1,txid = 2
}
  • 假设当前batch的count值为2,txid=3。因为当前txid和数据库中的不同,我们需要把preValue替换成value的值,累计value值,然后更新txid值为3,结果如下:
{ value = 6,prevValue = 4,txid = 3
}
  • 再假设当前batch的count值为1,txid=2。这是当前txid和数据库中的相同,虽然两个txid值相同,但由于两个batch的内容已经变了,所以上次的更新可以忽略掉,需要对数据库中的value值进行重新计算,即把当前值和preValue值相加,结果如下:
{ value =3,prevValue = 1,txid = 2
}
  • 总结一下整个处理过程:
if(database txid=current txid){value=preValue+current value;//重新更新value//preValue不变;
}else{preValue=value;//更新preValuevalue=preValue+current value;//更新valuetxid=current txid;//更新txid
}
  • 2.3 No-Transactional spouts

No-Transactional spouts对每个batch的内容不做任何保证。如果失败的batch没被重发,它有会出现“最多被处理一次”的请况,如果tuples被多个batch处理,则会发生“最少被处理一次的情况”,很难保证“数据只被处理一次”的情况。

三、Spout和State的类型总结

下面这个表格描述了“数据只被处理一次”的spout/state的类型组合:

总的来说, Opaque transactional states即有一定的容错性又能保证数据一致性,但它的代价是需要在数据库中保存更多的状态信息(txid和preValue)。Transactional states虽然需要较少的状态信息(txid),但是它需要transactional spouts的支持。non-transactional states需要在数据库中保存最少的状态信息但难以保证“数据只被处理一次”的语义。

因此,在实际应用中,spout和state类型的选择需要根据我们具体应用需求来决定,当然在容错性和增加存储代价之间也需要做个权衡。

四、State API

上面讲的看上去有点啰嗦,庆幸的是Trident State API 在内部为我们实现了所有状态管理的逻辑,我们不需要再进行诸如对比txid,在数据库中存储多个值等操作,仅需要简单调用Trident API即可,例如:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =topology.newStream("spout1", spout).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))               .parallelismHint(6);
  • 所有的管理Opaque transactional states状态的逻辑都在MemcachedState.opaque()方法内部实现了。另外,所有的更新操作都是以batch为单位的,这样减少了对数据库的调用次数,极大的提高了效率。下面就向大家介绍一下和Trident State 相关的API。

4.1 State接口

Trident API中最基本的State接口只有两个方法:

public interface State {void beginCommit(Long txid);void commit(Long txid);
}
  • State接口只定义了状态什么时候开始更新,什么时候结束更新,并且我们都能获得一个txid。具体这个State如何工作,如何更新State,如何查询State,Trident并没有对此作出限制,我们可以自己任意实现。

假设我们有一个Location数据库,我们要通过Trident查新和更新这个数据库,那么我们可以自己实现这样一个LocationDB State,因为我们需要查询和更新,所以我们为这个LocationDB 可以添加对Location的get和set的实现:

public class LocationDB implements State {public void beginCommit(Long txid) {   }public void commit(Long txid) {   }public void setLocation(long userId, String location) {// code to access database and set location}public String getLocation(long userId) {// code to get location from database}}
  • 4.2 StateFactory工厂接口

Trident提供了State Factory接口,我们实现了这个接口之后,Trident 就可以通过这个接口获得具体的Trident State实例了,下面我们就实现一个可以制造LocationDB实例的LocationDBFactory:

public class LocationDBFactory implements StateFactory {public State makeState(Map conf, int partitionIndex, int numPartitions) {return new LocationDB();}
}
  • 4.3 QueryFunction接口

这个接口是用来帮助Trident查询一个State,这个接口定义了两个方法:

public interface QueryFunction<S extends State, T> extends EachOperation {List<T> batchRetrieve(S state, List<TridentTuple> args);void execute(TridentTuple tuple, T result, TridentCollector collector);
}
  • 接口的第一个方法batchRetrieve()有两个参数,分别是要查询的State源和查询参数,因为trident都是以batch为单位处理的,所以这个查询参数是一个List<TridentTuple>集合。关于第二个方法execute()有三个参数,第一个代表查询参数中的某个tuple,第二个代表这个查询参数tuple对应的查询结果,第三个则是一个消息发送器。下面就看一个QuaryLocation的实例:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {List<String> ret = new ArrayList();for(TridentTuple input: inputs) {ret.add(state.getLocation(input.getLong(0)));}return ret;
}public void execute(TridentTuple tuple, String location, TridentCollector collector) {collector.emit(new Values(location));}
}
  • QueryLocation接收到Trident发送的查询参数,参数是一个batch,batch中tuple内容是userId信息,然后batchRetrieve()方法负责从State源中获取每个userId对应的的location。最终batchRetrieve()查询的结果会被execute()方法发送出去。

但这里有个问题,batchRetrieve()方法中针对每个userid都做了一次查询State操作,这样处理显然效率不高,也不符合Trident所有操作都是针对batch的原则。所以,我们要对LocationDB这个State做一下改造,提供一个bulkGetLocations()方法来替换掉getLocation()方法,请看改造后的LocationDB的实现:

public class LocationDB implements State {public void beginCommit(Long txid) {   }public void commit(Long txid) {   }public void setLocationsBulk(List<Long> userIds, List<String> locations) {// set locations in bulk}public List<String> bulkGetLocations(List<Long> userIds) {// get locations in bulk}}
  • 我们可以看到,改造的LocationDB对Location的查询和更新都是批量操作的,这样显然可以提高处理效率。此时,我们再稍微改一下QueryFunction中batchRetrieve()方法:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {List<Long> userIds = new ArrayList<Long>();for(TridentTuple input: inputs) {userIds.add(input.getLong(0));}return state.bulkGetLocations(userIds);}public void execute(TridentTuple tuple, String location, TridentCollector collector) {collector.emit(new Values(location));}
}
  • QueryLocation在topology中可以这么使用:
TridentTopology topology = new TridentTopology();
topology.newStream("myspout", spout).stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))
  • 4.4 UpdateState接口

当我们要更新一个State源时,我们需要实现一个UpdateState接口。UpdateState接口只提供了一个方法:

public interface StateUpdater<S extends State> extends Operation {void updateState(S state, List<TridentTuple> tuples, TridentCollector collector);
}
  • 下面我们来具体看一下LocationUpdater的实现:
public class LocationUpdater extends BaseStateUpdater<LocationDB> {public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {List<Long> ids = new ArrayList<Long>();List<String> locations = new ArrayList<String>();for(TridentTuple t: tuples) {ids.add(t.getLong(0));locations.add(t.getString(1));}state.setLocationsBulk(ids, locations);}
}
  • 对于LocationUpdater在topology中可以这么使用:
TridentTopology topology = new TridentTopology();
TridentState locations =
topology.newStream("locations", locationsSpout)
  • 通过调用Trident Stream的partitionPersist方法可以更新一个State。在上面这个实例中,LocationUpdater接收一个State和要更新的batch,最终通过调用LocationFactory制造的LocationDB中的setLocationsBulk()方法把batch中的userid及其location批量更新到State中。

partitionPersist操作会返回一个TridentState对象,这个对象即是被TridentTopology更新后的LocationDB,所以,我们可以在topology中续继续对这个返回的State做查询操作。

另外一点需要注意的是,从上面StateUpdater接口可以看出,在它的updateState()方法中还提供了一个TridentCollector,因此在执行StateUpdate的同时仍然可以形成一个新的Stream。若要操作StateUpdater形成的Stream,可以通过调用TridentState.newValueStream()方法实现。

五、persistentAggregate

Trident另一个update state的方法时persistentAggregate,请看下面word count的例子:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =topology.newStream("spout1", spout).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
  • 5.1 MapState接口

persistentAggregate是在partitionPersist之上的另一个抽象,它会对Trident Stream进行聚合之后再把聚合结果更新到State中。在上面这个例子中,因为聚合的是一个groupedStream,Trident要求这种情况下State需要实现MapState接口,被grouped的字段会被做为MapSate的key,被grouped的数据计算的结果会被做为MapSate的value。MapSate接口定义如下:

public interface MapState<T> extends State {List<T> multiGet(List<List<Object>> keys);List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);void multiPut(List<List<Object>> keys, List<T> vals);
}
  • 对于MapSate的实现有MemoryMapState。

5.2 Snapshottable接口

如果我们聚合的不是一个groupedStream,Trident要求我们的State实现Snapshottable接口:

public interface Snapshottable<T> extends State {T get();T update(ValueUpdater updater);void set(T o);
}
  • 对于Snapshottable的实现有 MemcachedState。

六、Map States的实现

在Trident中实现MapState很简单,大部分工作Trident已经替我们做了。OpaqueMap,TransactionalMap, 和NonTransactionalMap类已经替我们完成了和容错相关的处理逻辑. 我们仅仅提供一个 IBackingMap的实现类即可, IBackingMap的接口定义如下:

public interface IBackingMap<T> {List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals);
}
  • OpaqueMap’s调用的multiPut将会把value值自动封装成OpaqueValue来处理, TransactionalMap’s 将会把value封装成TransactionalValue再进行处理, 而NonTransactionalMaps 则不会对value做处理,直接传递给topology。

另外, 
Trident提供的CachedMap 类会对Map中的key/value做自动的LRU缓存 。 
Trident提供的SnapshottableMap类会把MapState转换成SnapShottable对象(把MapState中的所有key/value对聚合成一个固定的key)。

详细更详细的了解整个MapState的实现过程,请查看 MemcachedState 的实现,MemcachedState除了把上面介绍的相关接口整合到一起之外,还提供了对opaque transactional, transactional, non-transactional三个语义规则的支持。

Storm专题二:Storm Trident API 使用详解

在Trident中有五种操作类型:
  1. Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输
  2. Repartitioning:数据流重定向,单纯的改变数据流向,不会改变数据内容,这部分会有网络传输
  3. Aggragation:聚合操作,会有网络传输
  4. Grouped streams上的操作
  5. Merge和Join
小结:上面提到了Trident实际上是通过把函数应用到每个节点的Batch上的数据以实现并行,而应用的这些函数就是TridentAPI,下面我们就具体介绍一下TridentAPI的各种操作。  
二、Trident五种操作详解
2.1 Apply Locally本地操作:操作都应用在本地节点的Batch上,不会产生网络传输
2.1.1 Functions:函数操作
     函数的作用是接收一个tuple(需指定接收tuple的哪个字段),输出0个或多个tuples。输出的新字段值会被追加到原始输入tuple的后面,如果一个function不输出tuple,那就意味这这个tuple被过滤掉了,下面举例说明:
  • 定义一个Function:
[java] view plain copy
  1. public class MyFunction extends BaseFunction {
  2. @Override
  3. public void execute(TridentTuple tuple, TridentCollector collector) {
  4. for ( int i = 0; i < tuple.getInteger(0); i++) {
  5. collector.emit( new Values(i));
  6. }
  7. }
     小结:Function实际上就是对经过Function函的tuple做一些操作以改变其内容。
  • 比如我们处理一个“mystream”的数据流,它有三个字段分别是[“a”, “b”, “c”] ,数据流中tuple的内容是:
     [1, 2, 3] [4, 1, 6] [3, 0, 8]
  • 我们运行我们的Function:
[java] view plain copy
  1. java mystream.each(new Fields("b"), new MyFunction(), new Fields("d")));
     它意思是接收输入的每个tuple “b”字段得值,把函数结算结果做为新字段“d”追加到每个tuple后面,然后发射出去。
  • 最终运行结果会是每个tuple有四个字段[“a”, “b”, “c”, “d”],每个tuple的内容变成了:
     [1, 2, 3, 0] [1, 2, 3, 1] [4, 1, 6, 0]
    小结:我们注意到,如果一个function发射多个tuple时,每个发射的新tuple中仍会保留原来老tuple的数据。
2.1.2 Filters:过滤操作
  • Filters很简单,接收一个tuple并决定是否保留这个tuple。举个例子,定义一个Filter:
[java] view plain copy
  1. public class MyFilter extends BaseFilter {
  2. public boolean isKeep(TridentTuple tuple) {
  3. return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
  4. }
  5. }
  • 假设我们的tuples有这个几个字段 [“a”, “b”, “c”]:
     [1, 2, 3] [2, 1, 1] [2, 3, 4]
  • 然后运行我们的Filter:
[java] view plain copy
  1. java mystream.each(new Fields("b", "a"), new MyFilter());
  • 则最终得到的tuple是 :
     [2, 1, 1]
     说明第一个和第三个不满足条件,都被过滤掉了。
     小结:Filter就是一个过滤器,它决定是否需要保留当前tuple。
2.1.3 PartitionAggregate
    PartitionAggregate的作用对每个Partition中的tuple进行聚合,与前面的函数在原tuple后面追加数据不同,PartitionAggregate的输出会直接替换掉输入的tuple,仅数据PartitionAggregate中发射的tuple。下面举例说明:
  • 定义一个累加的PartitionAggregate:
[java] view plain copy
  1. java mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"));
  • 假设我们的Stream包含两个字段 [“a”, “b”],各个Partition的tuple内容是:
     ``` Partition 0: [“a”, 1] [“b”, 2]

Partition 1: [“a”, 3] [“c”, 8]

     Partition 2: [“e”, 1] [“d”, 9] [“d”, 10] ```
  • 输出的内容只有一个字段“sum”,值是:
     ``` Partition 0: [3]

Partition 1: [11]

     Partition 2: [20] ```
    TridentAPI提供了三个聚合器的接口:CombinerAggregator, ReducerAggregator, and Aggregator.
我们先看一下CombinerAggregator接口:   
[java] view plain copy
  1. public interface CombinerAggregator <T> extends Serializable {
  2. T init(TridentTuple tuple);
  3. T combine(T val1, T val2);
  4. T zero();
  5. }
    CombinerAggregator接口只返回一个tuple,并且这个tuple也只包含一个field。init方法会先执行,它负责预处理每一个接收到的tuple,然后再执行combine函数来计算收到的tuples直到最后一个tuple到达,当所有tuple处理完时,CombinerAggregator会发射zero函数的输出,举个例子:
  • 定义一个CombinerAggregator实现来计数:
[java] view plain copy
  1. public class CombinerCount implements CombinerAggregator<Integer>{
  2. @Override
  3. public Integer init(TridentTuple tuple) {
  4. return 1;
  5. }
  6. @Override
  7. public Integer combine(Integer val1, Integer val2) {
  8. return val1 + val2;
  9. }
  10. @Override
  11. public Integer zero() {
  12. return 0;
  13. }
  14. }
     小结:当你使用aggregate 方法代替PartitionAggregate时,CombinerAggregator的好处就体现出来了,因为Trident会自动优化计算,在网络传输tuples之前做局部聚合。
我们再看一下ReducerAggregator:
[java] view plain copy
  1. public interface ReducerAggregator <T> extends Serializable {
  2. T init();
  3. T reduce(T curr, TridentTuple tuple);
  4. }
     ReducerAggregator通过init方法提供一个初始值,然后为每个输入的tuple迭代这个值,最后生产处一个唯一的tuple输出,下面举例说明:
  • 定义一个ReducerAggregator接口实现技术器的例子:
[java] view plain copy
  1. public class ReducerCount implements ReducerAggregator<Long>{
  2. @Override
  3. public Long init() {
  4. return 0L;
  5. }
  6. @Override
  7. public Long reduce(Long curr, TridentTuple tuple) {
  8. return curr + 1;
  9. }
  10. }
最后一个是Aggregator接口,它是最通用的聚合器,它的形式如下:
  
[java] view plain copy
  1. public interface Aggregator<T> extends Operation {
  2. T init(Object batchId, TridentCollector collector);
  3. void aggregate(T val, TridentTuple tuple, TridentCollector collector);
  4. void complete(T val, TridentCollector collector);
  5. }
    Aggregator接口可以发射含任意数量属性的任意数据量的tuples,并且可以在执行过程中的任何时候发射:
  1. init:在处理数据之前被调用,它的返回值会作为一个状态值传递给aggregate和complete方法
  2. aggregate:用来处理每一个输入的tuple,它可以更新状态值也可以发射tuple
  3. complete:当所有tuple都被处理完成后被调用
    下面举例说明:
  • 定义一个实现来完成一个计数器:
[java] view plain copy
  1. public class CountAgg extends BaseAggregator<CountState>{
  2. static class CountState { long count = 0; }
  3. @Override
  4. public CountState init(Object batchId, TridentCollector collector) {
  5. return new CountState();
  6. }
  7. @Override
  8. public void aggregate(CountState val, TridentTuple tuple, TridentCollector collector) {
  9. val. count+=1;
  10. }
  11. @Override
  12. public void complete(CountState val, TridentCollector collector) {
  13. collector.emit( new Values(val. count));
  14. }
  15. }
    有时候我们需要同时执行多个聚合器,这在Trident中被称作chaining,使用方法如下:
[java] view plain copy
  1. java mystream.chainedAgg() .partitionAggregate(new Count(), new Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")) .chainEnd();
    这点代码会在每个Partition上运行count和sum函数,最终输出一个tuple:[“count”, “sum”]
projection:投影操作
     投影操作作用是仅保留Stream指定字段的数据,比如有一个Stream包含如下字段: [“a”, “b”, “c”, “d”]
     运行如下代码:   
[java] view plain copy
  1. java mystream.project(new Fields("b", "d"))
    则输出的流仅包含 [“b”, “d”]字段。
2.2 Repartitioning重定向操作
     重定向操作是如何在各个任务间对tuples进行分区。分区的数量也有可能改变重定向的结果。重定向需要网络传输,下面介绍下重定向函数:
  1. shuffle:通过随机分配算法来均衡tuple到各个分区
  2. broadcast:每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery
  3. partitionBy:根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确保相同字段列表的数据被划分到同一个分区
  4. global:所有的tuple都被发送到一个分区,这个分区用来处理整个Stream
  5. batchGlobal:一个Batch中的所有tuple都被发送到同一个分区,不同的Batch会去往不同的分区
  6. Partition:通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping
2.3 Aggragation聚合操作
     Trident有aggregate和 persistentAggregate方法来做聚合操作。aggregate是独立的运行在Stream的每个Batch上的,而persistentAggregate则是运行在Stream的所有Batch上并把运算结果存储在state source中。运行aggregate方法做全局聚合。当你用到  ReducerAggregator或Aggregator时,Stream首先被重定向到一个分区中,然后其中的聚合函数便在这个分区上运行。当你用到CombinerAggregator时,Trident会首先在每个分区上做局部聚合,然后把局部聚合后的结果重定向到一个分区,因此使用CombinerAggregator会更高效,可能的话我们需要优先考虑使用它。下面举个例子来说明如何用aggregate进行全局计数:
[java] view plain copy
  1. java mystream.aggregate(new Count(), new Fields("count"));

和paritionAggregate一样,aggregators的聚合也可以串联起来,但是如果你把一个 CombinerAggregator和一个非CombinerAggregator串联在一起,Trident是无法完成局部聚合优化的。

2.4 grouped streams
      GroupBy操作是根据特定的字段对流进行重定向的,还有,在一个分区内部,每个相同字段的tuple也会被Group到一起,下面这幅图描述了这个场景:
     如果你在grouped Stream上面运行aggregators,聚合操作会运行在每个Group中而不是整个Batch。persistentAggregate也能运行在GroupedSteam上,不过结果会被保存在MapState中,其中的key便是分组的字段。当然,aggregators在GroupedStreams上也可以串联。
2.5 Merge和Joins:
api的最后一部分便是如何把各种流汇聚到一起。最简单的方式就是把这些流汇聚成一个流。我们可以这么做:   
[java] view plain copy
  1. java topology.merge(stream1, stream2, stream3);

另一种合并流的方式就是join。一个标准的join就像是一个sql,必须有标准的输入,因此,join只针对符合条件的Stream。join应用在来自Spout的每一个小Batch中。join时候的tuple会包含:

 1. join的字段,如Stream1中的key和Stream2中的x    
 2. 所有非join的字段,根据传入join方法的顺序,a和b分别代表steam1的val1和val2,c代表Stream2的val1          
     当join的是来源于不同Spout的stream时,这些Spout在发射数据时需要同步,一个Batch所包含的tuple会来自各个Spout。    

专题三:Storm Trident API 实践

一、概要

1.1 Storm(简介)

Storm是一个实时的可靠地分布式流计算框架。
     具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息(可以是logs,clicks,sensor data)、通过Storm对消息进行计算聚合等预处理、把处理结果持久化到NoSQL数据库或者HDFS做进一步深入分析。

1.2 Trident(简介)

Trident是对Storm的更高一层的抽象,除了提供一套简单易用的流数据处理API之外,它以batch(一组tuples)为单位进行处理,这样一来,可以使得一些处理更简单和高效。
     我们知道把Bolt的运行状态仅仅保存在内存中是不可靠的,如果一个node挂掉,那么这个node上的任务就会被重新分配,但是之前的状态是无法恢复的。因此,比较聪明的方式就是把storm的计算状态信息持久化到database中,基于这一点,trident就变得尤为重要。因为在处理大数据时,我们在与database打交道时通常会采用批处理的方式来避免给它带来压力,而trident恰恰是以batch groups的形式处理数据,并提供了一些聚合功能的API。

二、Trident API 实践

Trident其实就是一套API,但现阶段网上关于Trident API中各个函数的用法含义资料不多,下面我就根据一些英文资料和自己的理解,详细介绍一下Trident API各个函数的用法和含义。阅读本文需要有一定的Trident API基础。

2.1 each() 方法

作用:操作batch中的每一个tuple内容,一般与Filter或者Function函数配合使用。
     下面通过一个例子来介绍each()方法,假设我们有一个FakeTweetsBatchSpout,它会模拟一个Stream,随机产生一个个消息。我们可以通过设置这个Spout类的构造参数来改变这个Spout的batch Size的大小。

2.1.1 Filter类:过滤tuple

一个通过actor字段过滤消息的Filter:

[java] view plain copy
  1. public static class PerActorTweetsFilter extends BaseFilter {
  2. String actor;
  3. public PerActorTweetsFilter(String actor) {
  4. this.actor = actor;
  5. }
  6. @Override
  7. public boolean isKeep(TridentTuple tuple) {
  8. return tuple.getString(0).equals(actor);
  9. }
  10. }

Topology:

[java] view plain copy
  1. topology.newStream("spout", spout)
  2. .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
  3. .each(new Fields("actor", "text"), new Utils.PrintFilter());

从上面例子看到,each()方法有一些构造参数

  1. 第一个构造参数:作为Field Selector,一个tuple可能有很多字段,通过设置Field,我们可以隐藏其它字段,仅仅接收指定的字段(其它字段实际还在)。
  2. 第二个是一个Filter:用来过滤掉除actor名叫"dave"外的其它消息。

2.1.2 Function类:加工处理tuple内容

一个能把tuple中text内容变成大写的Function:

[java] view plain copy
  1. public static class UppercaseFunction extends BaseFunction {
  2. @Override
  3. public void execute(TridentTuple tuple, TridentCollector collector) {
  4. collector.emit(new Values(tuple.getString(0).toUpperCase()));
  5. }
  6. }

Topology:

[java] view plain copy
  1. topology.newStream("spout", spout)
  2. .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
  3. .each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text"))
  4. .each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());

首先,UppercaseFunction函数的输入是Fields("text", "actor"),其作用是把其中的"text"字段内容都变成大写。
     其次,它比Filter多出一个输出字段,作用是每个tuple在经过这个Function函数处理后,输出字段都会被追加到tuple后面,在本例中,执行完Function之后的tuple内容多了一个"uppercased_text",并且这个字段排在最后面。

2.1.3 Field Selector与project

我们需要注意的是,上面每个each()方法的第一个Field字段仅仅是隐藏掉没有指定的字段内容,实际上被隐藏的字段依然还在tuple中,如果想要彻底丢掉它们,我们就需要用到project()方法。
   投影操作作用是仅保留Stream指定字段的数据,比如有一个Stream包含如下字段: [“a”, “b”, “c”, “d”],运行如下代码:

[java] view plain copy
  1. java mystream.project(new Fields("b", "d"))

则输出的流仅包含 [“b”, “d”]字段。

2.2 parallelismHint()方法和partitionBy()

2.2.1 parallelismHint()

指定Topology的并行度,即用多少线程执行这个任务。我们可以稍微改一下我们的Filter,通过打印当前任务的partitionIndex来区分当前是哪个线程。
Filter:

[java] view plain copy
  1. public static class PerActorTweetsFilter extends BaseFilter {
  2. private int partitionIndex;
  3. private String actor;
  4. public PerActorTweetsFilter(String actor) {
  5. this.actor = actor;
  6. }
  7. @Override
  8. public void prepare(Map conf, TridentOperationContext context) {
  9. this.partitionIndex = context.getPartitionIndex();
  10. }
  11. @Override
  12. public boolean isKeep(TridentTuple tuple) {
  13. boolean filter = tuple.getString(0).equals(actor);
  14. if(filter) {
  15. System.err.println("I am partition [" + partitionIndex + "] and I have kept a tweet by: " + actor);
  16. }
  17. return filter;
  18. }
  19. }

Topology:

[java] view plain copy
  1. topology.newStream("spout", spout)
  2. .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
  3. .parallelismHint(5)
  4. .each(new Fields("actor", "text"), new Utils.PrintFilter());

如果我们指定执行Filter任务的线程数量为5,那么最终的执行结果会如何呢?看一下我们的测试结果:

[plain] view plain copy
  1. I am partition [4] and I have kept a tweet by: dave
  2. I am partition [3] and I have kept a tweet by: dave
  3. I am partition [0] and I have kept a tweet by: dave
  4. I am partition [2] and I have kept a tweet by: dave
  5. I am partition [1] and I have kept a tweet by: dave

我们可以很清楚的发现,一共有5个线程在执行Filter。
     如果我们想要2个Spout和5个Filter怎么办呢?如下面代码所示,实现很简单。

[java] view plain copy
  1. topology.newStream("spout", spout)
  2. .parallelismHint(2)
  3. .shuffle()
  4. .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
  5. .parallelismHint(5)
  6. .each(new Fields("actor", "text"), new Utils.PrintFilter());

2.2.2 partitionBy()和重定向操作(repartitioning operation)

我们注意到上面的例子中用到了shuffle(),shuffle()是一个重定向操作。那什么是重定向操作呢?重定向定义了我们的tuple如何被route到下一处理层,当然不同的层之间可能会有不同的并行度,shuffle()的作用是把tuple随机的route下一层的线程中,而partitionBy()则根据我们的指定字段按照一致性哈希算法route到下一层的线程中,也就是说,如果我们用partitionBy()的话,同一个字段名的tuple会被route到同一个线程中。
     比如,如果我们把上面代码中的shuffle()改成partitionBy(new Fields("actor")),猜一下结果会怎样?

[plain] view plain copy
  1. I am partition [2] and I have kept a tweet by: dave
  2. I am partition [2] and I have kept a tweet by: dave
  3. I am partition [2] and I have kept a tweet by: dave
  4. I am partition [2] and I have kept a tweet by: dave

测试结果正如我们上面描述的那样,相同字段的tuple被route到了同一个partition中。
重定向操作有如下几种:

  1. shuffle:通过随机分配算法来均衡tuple到各个分区
  2. broadcast:每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery
  3. partitionBy:根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确保相同字段列表的数据被划分到同一个分区
  4. global:所有的tuple都被发送到一个分区,这个分区用来处理整个Stream
  5. batchGlobal:一个Batch中的所有tuple都被发送到同一个分区,不同的Batch会去往不同的分区
  6. Partition:通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping

2.3 聚合(Aggregation)

我们前面讲过,Trident的一个很重要的特点就是它是以batch的形式处理tuple的。我们可以很容易想到的针对一个batch的最基本操作应该就是聚合。Trident提供了聚合API来处理batches,来看一个例子:

2.3.1 Aggregator:

[java] view plain copy
  1. public static class LocationAggregator extends BaseAggregator<Map<String, Integer>> {
  2. @Override
  3. public Map<String, Integer> init(Object batchId, TridentCollector collector) {
  4. return new HashMap<String, Integer>();
  5. }
  6. @Override
  7. public void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) {
  8. String location = tuple.getString(0);
  9. val.put(location, MapUtils.getInteger(val, location, 0) + 1);
  10. }
  11. @Override
  12. public void complete(Map<String, Integer> val, TridentCollector collector) {
  13. collector.emit(new Values(val));
  14. }
  15. }

Topology:

[java] view plain copy
  1. topology.newStream("spout", spout)
  2. .aggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts"))
  3. .each(new Fields("location_counts"), new Utils.PrintFilter());

这个aggregator很简单:计算每一个batch的location的数量。通过这个例子我们可以看到Aggregator接口:

  1. init():当刚开始接收到一个batch时执行
  2. aggregate():在接收到batch中的每一个tuple时执行
  3. complete():在一个batch的结束时执行

我们前面讲过aggregate()方法是一个重定向方法,因为它会随机启动一个单独的线程来进行这个聚合操作。
     下面我们来看一下测试结果:

[plain] view plain copy
  1. [{USA=3, Spain=1, UK=1}]
  2. [{USA=3, Spain=2}]
  3. [{France=1, USA=4}]
  4. [{USA=4, Spain=1}]
  5. [{USA=5}]

我们可以看到打印的结果,其中每一条的和都是5,这是因为我们的Spout的每个batch中tuple数量设置的是5,所以每个线程的计算结果也会是5。 除此之外,Trident还提供了其它两个Aggregator接口: CombinerAggregator, ReducerAggregator,具体使用方法请参考Trident API。

2.3.2 partitionAggregate():

如果我们将上面的Topology稍微改造一下,猜一下结果会是如何?

[java] view plain copy
  1. topology.newStream("spout", spout)
  2. .partitionBy(new Fields("location"))
  3. .partitionAggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts"))
  4. .parallelismHint(3)
  5. .each(new Fields("location_counts"), new Utils.PrintFilter());

我们一起来分析一下,首先partitionBy()方法将tuples按其location字段重定向到下一处理逻辑,而且相同location字段的tuple一定会被分配到同一个线程中处理。其次,partitionAggregate()方法,注意它与Aggregate不同,它不是一个重定向方法,它仅仅是对当前partition上的各个batch执行聚合操作。因为我们根据location进行了重定向操作,测试数据一共有4个location,而当前一共有3个partition,因此可以猜测我们的最终测试结果中,有一个partition会处理两个location的batch,最终测试结果如下:

[plain] view plain copy
  1. [{France=10, Spain=5}]
  2. [{USA=63}]
  3. [{UK=22}]

需要注意的是,partitionAggregate虽然也是聚合操作,但与上面的Aggregate完全不同,它不是一个重定向操作。

2.4 groupBy

我们可以看到上面几个例子的测试结果,其实我们通常想要的是每个location的数量是多少,那该怎么处理呢?看下面这个Topology:

[java] view plain copy
  1. topology.newStream("spout", spout)
  2. .groupBy(new Fields("location"))
  3. .aggregate(new Fields("location"), new Count(), new Fields("count"))
  4. .each(new Fields("location", "count"), new Utils.PrintFilter());

我们先看一下执行的结果:

[plain] view plain copy
  1. ...
  2. [France, 25]
  3. [UK, 2]
  4. [USA, 25]
  5. [Spain, 44]
  6. [France, 26]
  7. [UK, 3]
  8. ...

上面这段代码计算出了每个location的数量,即使我们的Count函数没有指定并行度。这就是groupBy()起的作用,它会根据指定的字段创建一个GroupedStream,相同字段的tuple都会被重定向到一起,汇聚成一个group。groupBy()之后是aggregate,与之前的聚合整个batch不同,此时的aggregate会单独聚合每个group。我们也可以这么认为,groupBy会把Stream按照指定字段分成一个个stream group,每个group就像一个batch一样被处理。

不过需要注意的是,groupBy()本身并不是一个重定向操作,但如果它后面跟的是aggregator的话就是,跟的是partitionAggregate的话就不是。

三、总结

Storm是一个实时流计算框架,Trident是对storm的一个更高层次的抽象,Trident最大的特点以batch的形式处理stream。
     一些最基本的操作函数有Filter、Function,Filter可以过滤掉tuple,Function可以修改tuple内容,输出0或多个tuple,并能把新增的字段追加到tuple后面。
     聚合有partitionAggregate和Aggregator接口。partitionAggregate对当前partition中的tuple进行聚合,它不是重定向操作。Aggregator有三个接口:CombinerAggregator, ReducerAggregator,Aggregator,它们属于重定向操作,它们会把stream重定向到一个partition中进行聚合操作。
     重定向操作会改变数据流向,但不会改变数据内容,重定向操会产生网络传输,可能影响一部分效率。而Filter、Function、partitionAggregate则属于本地操作,不会产生网络传输。
     GroupBy会根据指定字段,把整个stream切分成一个个grouped stream,如果在grouped stream上做聚合操作,那么聚合就会发生在这些grouped stream上而不是整个batch。如果groupBy后面跟的是aggregator,则是重定向操作,如果跟的是partitionAggregate,则不是重定向操作。

上面所以的例子都可以在github上找到: https://github.com/pereferrera/trident-hackaton/

[Trident] Storm Trident 教程,state详解、trident api详解及实例相关推荐

  1. Android基础入门教程——8.3.18 Canvas API详解(Part 3)Matrix和drawBitmapMash

    Android基础入门教程--8.3.18 Canvas API详解(Part 3)Matrix和drawBitmapMash 标签(空格分隔): Android基础入门教程 本节引言: 在Canva ...

  2. hibernate教程--常用配置和核心API详解

    一.Hibernate的常用的配置及核心API. 1.1 Hibernate的常见配置: 1.1.1.核心配置: 核心配置有两种方式进行配置:  1)属性文件的配置: * hibernate.prop ...

  3. Storm源码分析之四: Trident源码分析

    Storm源码分析之四: Trident源码分析 @(STORM)[storm] Storm源码分析之四 Trident源码分析 一概述 0小结 1简介 2关键类 1Spout的创建 2spout的消 ...

  4. fullPage教程 -- 整屏滚动效果插件 fullpage详解

    为什么80%的码农都做不了架构师?>>>    本文为 H5EDU 机构官方 HTML5培训 教程,主要介绍:fullPage教程 -- 整屏滚动效果插件 fullpage详解 1. ...

  5. ASP.NET MVC 5 学习教程:Details 和 Delete 方法详解

    ASP.NET MVC 5 学习教程:Details 和 Delete 方法详解 原文 ASP.NET MVC 5 学习教程:Details 和 Delete 方法详解 在教程的这一部分,我们将研究一 ...

  6. Stale branches 设置_Mac OS 网络设置教程 wifi设置与宽带设置详解

    虽然所有设备连接无线网络的步骤都相差无几,但是Mac与windows系统还是不相同的,那么,苹果Mac怎么连接无线网络呢?针对此问题,本文就为大家介绍Mac网络的设置教程,有兴趣的朋友们可以了解下. ...

  7. mount: 未知的文件系统类型“vboxsf”_好程序员云计算学习路线教程大纲课件:Mount 挂载详解...

    好程序员云计算学习路线教程大纲课件:Mount 挂载详解: ====================================================================== ...

  8. k8s教程(Volume篇)-PVC详解

    文章目录 01 引言 02 PVC详解 2.1 参数配置 2.1.1 资源请求(Resources) 2.1.2 访问模式 (Access Modes) 2.1.3 存储卷模式(Volume Mode ...

  9. php laravel入口文件,Laravel学习教程之从入口到输出过程详解

    php 的 Laravel学习教程之从入口到输出过程详解 本文主要给大家介绍了关于Laravel从入口到输出过程的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. I. 预备 ...

最新文章

  1. python argsort排序结果_numpy.argsort()可以对元组或列表进行排序
  2. 宝塔面板备份网站方法
  3. 我总结的 jsonp
  4. 复杂纹理复制及纹理叠加效果
  5. Google基本语法二,指令
  6. 【转载】AssetBundle资源打包加载管理
  7. 《零秒工作》的一些总结
  8. raft论文阅读理解翻译
  9. diskpart给u盘分区
  10. Vue活动倒计时的功能
  11. 如何关闭服务器系统防火墙设置方法,怎么关闭防火墙 Windows自带防火墙关闭方法...
  12. 从万物归零到虚拟与现实交错
  13. 《梵高》-孤独的天才
  14. Map map=new HashMap(); 为什么是这样
  15. 空中“撒网”有商机 各行各业争相分一杯羹
  16. linux挂载移动硬盘乱码
  17. 便携式心电监护仪——LabVIEW心电信号采集系统设计
  18. 关于使用echarts堆叠柱状图百分比显示的问题
  19. 我家云粒子云刷机教程刷机助手下载
  20. 公司注册购买企业邮箱的意义

热门文章

  1. Redis内存淘汰策略LRU、LFU详解
  2. 专访天联世纪总裁朱威廉:暴利网游遭遇拐点
  3. Google智能家居控制ESP8266
  4. Rao-Cramer下界
  5. SQL 入门的必读好书
  6. 需求定律的4个准则——《可以量化的…
  7. 数字孪生与3D可视化
  8. ola.hallengren的SQL Server维护脚本
  9. android微信支付毁掉,android微信支付 需要注意的坑
  10. 30套免费的响应式 HTML5 CSS3 模板下载