在4月10日柏林BigData啤酒节上,Pere介绍了Trident,于此同时,来自Continuum Analytics也介绍了Disco。在Storm环节中大家了解了正确使用Trident的基本知识,包括最基本的API,原理,使用场景以及一个流操作简单例子,这次介绍的框架,一些可执行例子和tweet模拟器可以在github上找到。

借助前面提到的github中示例,这篇文章我们将会大致介绍Trident API。

Storm概述

总之,Storm用于实时处理数据流,较之普通的消息传递系统,它是更高级的抽象,允许定义DAG Topology,容错处理,保证至少一次的语义。

一个典型的应用场景是清算,预处理和聚合许多并发消息,就想日志、点击、跟踪数据。一个典型的大数据实时处理系统是从消息队列中读取消息,通过Storm进行处理和聚合,最后持久化到NoSQL中,比如Cassandra,或Hadoop HDSF中,用于后续深入分析。

Trident概述

Trident是基于Storm的抽象,除了提供更高级的级联构造,它还给Tuples分组,让流处理更合理,持久化处理结果,同时提供仅仅处理一次语义的API。
我们意识到将bolt的状态存到内存并不可靠,如果一个节点挂掉,其上的worker会重新分配,但是worker的状态并不会恢复,所有最明智的做法就是持久化到可靠的数据库,这时Trident会很有用。我们处理大数据,一般会分批次而不会每条消息更新一次以防止数据库压力过大。Trident帮我们进行Tuples分组并提供了聚合API。

each

我们从类Skeleton开始, FakeTweetsBatchSpout用于产生一系列随机伪造tweets的spout,可以通过构造函数参数改变Spout的batch大小。each允许通过Filter或者Function操作batch真的每一个tuple,我们可以实现一个filter用于过滤tweets。

public static class PerActorTweetsFilter extends BaseFilter {String actor;public PerActorTweetsFilter(String actor) {this.actor = actor;}@Overridepublic boolean isKeep(TridentTuple tuple) {return tuple.getString(0).equals(actor);}
}

我们可以将filter串联在一起:

topology.newStream("spout", spout)
.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
.each(new Fields("actor", "text"), new Utils.PrintFilter());

在输入中我们选择了actor、text两列,each()的输入也许不止这两列,但我们可以选择输入的子集,这个filter的输入是一系列的tuple,位置0是actor,位置1是tweet text。我们同时串联起了一个仅仅用于打印的filter。这个topology的行为是比较明显的,将会过滤出dave的tweet。
我们再来看一个function的例子:

public static class UppercaseFunction extends BaseFunction {@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {collector.emit(new Values(tuple.getString(0).toUpperCase()));}}

这个function将tuple position为0的字符串转换成大写,我们可以在topology中串起这些function:

topology.newStream("spout", spout).each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text")).each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());

在UpperCaseFunciton中我们将text放在了position 0, 同时在function中需要声明输出field,在调用function后,在输出tuple中将会增加一列,以上topology将会把dave的tweet转换成大写,同时打印出原始tweet和转换后的tweet。

each()对tuple通过选择一个子集进行了隐式projection,那些没有projected的列在后续依然可用,有时我们也需要使用project()这个API进行显示选择列。

parallelismHint()和partitionBy()

我们再来看看Filter的示例,如果这样定义topology会发生什么?

topology.newStream("spout", spout)
.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
.parallelismHint(5)
.each(new Fields("actor", "text"), new Utils.PrintFilter());

parallelismHint()将topology的并行度提高为指定的参数,我们暂且这样理解,现在我们将PerActorTweetsFilter改成如下:

public static class PerActorTweetsFilter extends BaseFilter {private int partitionIndex;private String actor;public PerActorTweetsFilter(String actor) {this.actor = actor;}@Overridepublic void prepare(Map conf, TridentOperationContext context) {this.partitionIndex = context.getPartitionIndex();}@Overridepublic boolean isKeep(TridentTuple tuple) {boolean filter = tuple.getString(0).equals(actor);if(filter) {System.err.println("I am partition [" + partitionIndex + "] and I have kept a tweet by: " + actor);}return filter;}
}

运行topology结果如下:

I am partition [4] and I have kept a tweet by: dave
I am partition [3] and I have kept a tweet by: dave
I am partition [0] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
I am partition [1] and I have kept a tweet by: dave

这显示了Filter被5个并行的任务执行,现在我们也有了5个Spouts, 可以在log中grep “Open Spout instance”查看。如果我们只需要2个Spouts和5个Filter可以这样:

topology.newStream("spout", spout)
.parallelismHint(2)
.shuffle()
.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
.parallelismHint(5)
.each(new Fields("actor", "text"), new Utils.PrintFilter());

Shuffle()是一个重分配操作,partitionBy()和global()也是。Repartition允许我们指定Tuple到达下一层处理方式的规则,从而可以指定不同处理层的并行度。Shuffle()是随机路由tuple,partitionBy()则是根据指定Fields的一致性Hash进行路由。现在我们介绍了所有这些概念后,重新来看parallelismHint()的意义:它将指定所有位于它之前操作的并行度,直到一些排序之类的重分配。
现在将shuffle()替换成partitionBy(new Fields(“actor”)),结果将会这样:

I am partition [2] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave

partitionBy(new Fields(“actor”))使相同actor的tuple分配到同一个task,所以5个中只有1个会接收到dave并过滤出来。

Aggregation

Trident是批量处理Tuples. 提到batch,自然想到的就是聚合操作,Trident提供了原生地batch聚合操作。

public static class LocationAggregator extends BaseAggregator<Map<String, Integer>> {@Overridepublic Map<String, Integer> init(Object batchId, TridentCollector collector) {return new HashMap<String, Integer>();}@Overridepublic void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) {String location = tuple.getString(0);val.put(location, MapUtils.getInteger(val, location, 0) + 1);}@Overridepublic void complete(Map<String, Integer> val, TridentCollector collector) {collector.emit(new Values(val));}
}

这个聚合很简单,用来统计各个地点总数,这个例子中我们看到了Aggregator各个接口,Trident将会在batch开始时调用init(),调用each()处理batch中每个tuple,在batch()结束时调用complete(),三个函数中都可以使用TridentCollector,出于效率考虑一般仅在最后才使用collector,使用aggregator的输出更新数据库。

使用aggregate()函数可以测试这个功能,aggregate()也是一个重分配操作,它将会聚合batch中所有tuple到一个task中。为了尽可能的减少网络传输,如果逻辑上允许本地聚合,可以使用CombinerAggregator,现在我们仍然来看低级的聚合接口:

topology.newStream("spout", spout)
.aggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts"))
.each(new Fields("location_counts"), new Utils.PrintFilter());

结果像这样:

[{USA=3, Spain=1, UK=1}]
[{USA=3, Spain=2}]
[{France=1, USA=4}]
[{USA=4, Spain=1}]
[{USA=5}]

可以看到每行输出总和都是5,因为spout的batch大小就是5.

这样可以增大batch大小:

FakeTweetsBatchSpout spout = new FakeTweetsBatchSpout(100);

让我们对topology做稍微修改:

topology.newStream("spout", spout).partitionBy(new Fields("location")).partitionAggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts")).parallelismHint(3).each(new Fields("location_counts"), new Utils.PrintFilter());

输出大致会这样:

[{France=10, Spain=5}]
[{USA=63}]
[{UK=22}]

其实,partitionAggregate()并不是一个重分配操作,它会在每个batch的各个partition上做聚合,我们按照location进行partition,总共三个partition,4个location,因此France和Spain在一个partition,USA、UK则分别一个。

上面例子有点晦涩,但是这些是理解trident的关键,耐心点,后面的就会比较直观。

groupBy

下面的代码比较简单:

topology.newStream("spout", spout).groupBy(new Fields("location")).aggregate(new Fields("location"), new Count(), new Fields("count")).each(new Fields("location", "count"), new Utils.PrintFilter());

输出这样:

...
[France, 25]
[UK, 2]
[USA, 25]
[Spain, 44]
[France, 26]
[UK, 3]
...

即使没有指定parallelism,每个country依然一行,我们使用了一个相当简单的Aggregator:内置count(),groupBy()创建GroupedStream,按指定field进行逻辑group,group会改变接下来的aggregate()行为,不再是聚合整个batch,而是分别聚合每个group,就像将当前stream拆分成多个stream,batch中有多个不同的group。

然而,groupBy()并不总是重分配操作,后跟aggregation()是重分配,但后跟partitionAggregation()就不是,可以自己思考并试验。

总结

我们介绍了Trident的基础理论,还有一些没有涉及,比如state API,然而对于前面介绍的概念,希望已经解释清楚。
你可以访问github,实现一些简单的例子:

  • Per-hashtag counts
  • Last three tweets for every actor
  • Most used words per actor
  • Most used words
  • Trending hashtags in a window of time

有几个会涉及到维持一些状态,可以使用Trident State,也可以在Aggregator或Function中直接连接数据,另外一种方式就是在内存中维持状态,当请注意这并不是高可靠的,在生产环境中并不鼓励这样做。

Storm Trident API实践相关推荐

  1. Storm Trident API

    在Storm Trident中有五种操作类型 Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输 Repartitioning:数据流重定向,单纯的改变数据流向,不会 ...

  2. [Trident] Storm Trident 教程,state详解、trident api详解及实例

    英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...

  3. Storm Trident 详细介绍

    一.概要 1.1 Storm(简介)      Storm是一个实时的可靠地分布式流计算框架.      具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息( ...

  4. trident API指南

    trident API指南 @(STORM)[storm] trident API指南 零 概述 1 本地分区操作 2 重新分区操作 3 聚合操作 4 流分组操作 5合并与连接 一 本地分区操作 一 ...

  5. Apache Storm 官方文档 —— Trident API 概述

    转载自并发编程网 – ifeve.com本文链接地址: Apache Storm 官方文档 -- Trident API 概述 窗口部分的内容是我自己翻译的 Trident 的核心数据模型是" ...

  6. Storm Trident拓扑中的错误处理

    这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法. 我在这里关注代码设计,而不是监督或冗余之类的部署良好实践. 由于Storm的实时流性质,当面对大多数错误时,我们最终将不得不 ...

  7. Trident API 概览

    Trident API 概览 在网上看到了很多有TRIDENT相关API的翻译,看来看去,总觉得没有说清楚很多东西,所以自己结合使用的经验翻译了一篇出来:翻译完以后,也发现 在自己的翻译中也有很多地方 ...

  8. Storm Trident简介

    转载自:[翻译][Trident] Storm Trident 教程 英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial -- ...

  9. storm trident mysql,storm_Trident

    简介: Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入.有状态的流式处理与低延时的分布式查询无缝结合起来.如果你了解 Pig 或者 Cascadi ...

最新文章

  1. 【TensorFlow】tf.nn.conv2d是怎样实现卷积的?
  2. YYCache 源码分析(一)
  3. VS2019配置opencv环境时找不到Microsoft.Cpp.x64.user.props
  4. “开始菜单”按钮今年8月将重回Windows 8
  5. Halcon算子:min_max_gray和gray_histo的区别
  6. 8086汇编-实验1、2-debug调试命令
  7. 深度解析LSTM神经网络的设计原理
  8. python没有tkinter_Python升级提示Tkinter模块找不到的解决方法
  9. 原生html开发环境,推荐HTML5/Javascript的开发环境?
  10. vs资源视图加载失败
  11. 理解MapReduce计算构架
  12. 《企业IT架构转型之道》边读边想——内容主线
  13. 学会这个方法,轻松为PDF文件加密,快来码住
  14. ios 手游SDK 开发教程
  15. c语言用数字定义字符串,c语言怎么定义数字字符串 c语言怎么把数字字符定义字符串...
  16. Ubuntu16.04桌面版pxe启动实现自动安装
  17. element-ui el-descriptions取消冒号
  18. 1.(python)阿拉伯数字转中文大写
  19. KK凯文.凯利:第一届中国社群领袖峰会演讲实录(全部版)
  20. 设计模式——代理模式详解(Java版)

热门文章

  1. python制作英语小词典_Python 爬虫:自制简易词典
  2. 孕期饮食新理念——初光孕妇餐 让孕期营养更科学
  3. [DB][Oracle]Oracle格式化数字的方法(指定小数点位数,每3位加逗号)
  4. 啊,CET6----六级高频词2
  5. 阿里云SDK播放器集成
  6. SQL取日期时间部分
  7. CAS TGT 校验不成功:No principal was found in the response from the CAS server.WHO: audit:unknown
  8. 实习日志_2022/3/11
  9. 热爱云南的100个理由 [转]
  10. 软件工程(一)------软件生存周期