在Storm Trident中有五种操作类型

  •   Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输
  •   Repartitioning:数据流重定向,单纯的改变数据流向,不会改变数据内容,这部分会有网络传输
  •   Aggragation:聚合操作,会有网络传输
  •   Grouped streams上的操作
  •   Merge和Join

一Apply Locally

  1.functions函数操作

  函数的作用是接收一个tuple(需指定接收tuple的哪个字段),输出0个或多个tuples。输出的新字段值会被追加到原始输入tuple的后面,如果一个function不输出tuple,那就意味这这个tuple被过滤掉了,例如下面的例子:

1 class AddAndSubFuction extends BaseFunction{
2
3         public void execute(TridentTuple tuple, TridentCollector collector) {
4             int res1 = tuple.getInteger(0);
5             int res2 = tuple.getInteger(1);
6             int sub = res1 > res2 ? res1 - res2 : res2 - res1;
7             collector.emit(new Values(res1+res2,sub));
8         }
9     }

  2.Filter过滤操作

  Filters很简单,接收一个tuple,并决定是否保留这个tuple,例如

1 class ScoreFilter extends BaseFilter{
2
3         public boolean isKeep(TridentTuple tuple) {
4             return tuple.getInteger(0) >= 60;
5         }
6     }

  上述Filter过滤调成绩小于60的tuple.

  3.partitionAggregate

  PartitionAggregate的作用对每个Partition中的tuple进行聚合,与前面的函数在原tuple后面追加数据不同,PartitionAggregate的输出会直接替换掉输入的tuple,仅数据PartitionAggregate中发射的tuple。

  TridentAPI提供了三个聚合器接口:CombinerAggregator,ReducerAggregator,Aggregator

  我们先来看一看CombinerAggregator,CombinerAggregator接口的定义如下:

public interface CombinerAggregator<T> extends Serializable {T init(TridentTuple tuple);T combine(T val1, T val2);T zero();
}

  CombinerAggregator接口只返回一个tuple,并且这个tuple也只包含一个field。init方法会先执行,它负责预处理每一个接收到的tuple,然后再执行combine函数来计算收到的tuples直到最后一个tuple到达,当所有tuple处理完时,CombinerAggregator会发射zero函数的输出,比如CombinerAggregator的实现类Count的定义如下:

public class Count implements CombinerAggregator<Long> {@Overridepublic Long init(TridentTuple tuple) {return 1L;}@Overridepublic Long combine(Long val1, Long val2) {return val1 + val2;}@Overridepublic Long zero() {return 0L;}}

  当你使用aggregate 方法代替PartitionAggregate时,CombinerAggregator的好处就体现出来了,因为Trident会自动优化计算,在网络传输tuples之前做局部聚合。

  我们再来看一下ReducerAggregator,ReducerAggregator的定义如下:

public interface ReducerAggregator<T> extends Serializable {T init();T reduce(T curr, TridentTuple tuple);
}

  ReducerAggregator通过init方法提供一个初始值,然后为输入的每个tuple迭代这个值,最终产生一个唯一的tuple并输出,定义一个实例如下:

 public class ReducerCount implements ReducerAggregator<Long>{@Overridepublic Long init() {return 0L;}@Overridepublic Long reduce(Long curr, TridentTuple tuple) {return curr + 1;}}

  最后看一下通用的聚合器Aggregator,它的定义如下:

public interface Aggregator<T> extends Operation {T init(Object batchId, TridentCollector collector);void aggregate(T val, TridentTuple tuple, TridentCollector collector);void complete(T val, TridentCollector collector);
}

  Aggregator接口可以发射含任意数量属性的任意数据量的tuples,并且可以在执行过程中的任何时候发射:
  init:在处理数据之前被调用,它的返回值会作为一个状态值传递给aggregate和complete方法
  aggregate:用来处理每一个输入的tuple,它可以更新状态值也可以发射tuple
  complete:当所有tuple都被处理完成后被调用

  有时候我们需要执行多个聚合器,这在Trident中称为chaining

  4.projection投影操作

  投影操作的作用是仅仅保留stream指定字段的数据,和关系数据库中投影的概念类似

二Repartitioning重定向操作

  重定向操作是如何在各个任务间对tuples进行分区。分区的数量也有可能改变重定向的结果。重定向需要网络传输,下面介绍下重定向函数:

  1. shuffle:通过随机分配算法来均衡tuple到各个分区
  2. broadcast:每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery
  3. partitionBy:根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确保相同字段列表的数据被划分到同一个分区
  4. global:所有的tuple都被发送到一个分区,这个分区用来处理整个Stream
  5. batchGlobal:一个Batch中的所有tuple都被发送到同一个分区,不同的Batch会去往不同的分区
  6. Partition:通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping

三Aggragation聚合操作

  Trident有aggregate和 persistentAggregate方法来做聚合操作。aggregate是独立的运行在Stream的每个Batch上的,而persistentAggregate则是运行在Stream的所有Batch上并把运算结果存储在state source中。 运行aggregate方法做全局聚合。当你用到 ReducerAggregator或Aggregator时,Stream首先被重定向到一个分区中,然后其中的聚合函数便在这个分区上运行。当你用到CombinerAggregator时,Trident会首先在每个分区上做局部聚合,然后把局部聚合后的结果重定向到一个分区,因此使用CombinerAggregator会更高效,可能的话我们需要优先考虑使用它。

四Grouped streams

  GroupBy操作是根据特定的字段对流进行重定向的,还有,在一个分区内部,每个相同字段的tuple也会被Group到一起。如果你在grouped Stream上面运行aggregators,聚合操作会运行在每个Group中而不是整个Batch。persistentAggregate也能运行在GroupedSteam上,不过结果会被保存在MapState中,其中的key便是分组的字段。 当然,aggregators在GroupedStreams上也可以串联。

五Merge和Join

  api的最后一部分便是如何把各种流汇聚到一起。最简单的方式就是把这些流汇聚成一个流。我们可以这么做:  

topology.merge(stream1, stream2, stream3); 

  另一种合并流的方式就是join。一个标准的join就像是一个sql,必须有标准的输入,因此,join只针对符合条件的Stream。join应用在来自Spout的每一个小Batch中。join时候的tuple会包含:
  1.join的字段,如Stream1中的key和Stream2中的x

  2.所有非join的字段,根据传入join方法的顺序,a和b分别代表steam1的val1和val2,c代表Stream2的val1

  当join的是来源于不同Spout的stream时,这些Spout在发射数据时需要同步,一个Batch所包含的tuple会来自各个Spout。

转载于:https://www.cnblogs.com/senlinyang/p/8081447.html

Storm Trident API相关推荐

  1. Storm Trident API实践

    译 在4月10日柏林BigData啤酒节上,Pere介绍了Trident,于此同时,来自Continuum Analytics也介绍了Disco.在Storm环节中大家了解了正确使用Trident的基 ...

  2. [Trident] Storm Trident 教程,state详解、trident api详解及实例

    英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...

  3. trident API指南

    trident API指南 @(STORM)[storm] trident API指南 零 概述 1 本地分区操作 2 重新分区操作 3 聚合操作 4 流分组操作 5合并与连接 一 本地分区操作 一 ...

  4. Apache Storm 官方文档 —— Trident API 概述

    转载自并发编程网 – ifeve.com本文链接地址: Apache Storm 官方文档 -- Trident API 概述 窗口部分的内容是我自己翻译的 Trident 的核心数据模型是" ...

  5. Trident API 概览

    Trident API 概览 在网上看到了很多有TRIDENT相关API的翻译,看来看去,总觉得没有说清楚很多东西,所以自己结合使用的经验翻译了一篇出来:翻译完以后,也发现 在自己的翻译中也有很多地方 ...

  6. Storm Trident简介

    转载自:[翻译][Trident] Storm Trident 教程 英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial -- ...

  7. storm trident mysql,storm_Trident

    简介: Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入.有状态的流式处理与低延时的分布式查询无缝结合起来.如果你了解 Pig 或者 Cascadi ...

  8. Storm Trident 详细介绍

    一.概要 1.1 Storm(简介)      Storm是一个实时的可靠地分布式流计算框架.      具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息( ...

  9. Storm Trident拓扑中的错误处理

    这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法. 我在这里关注代码设计,而不是监督或冗余之类的部署良好实践. 由于Storm的实时流性质,当面对大多数错误时,我们最终将不得不 ...

最新文章

  1. Xamarin iOS开发实战第1章使用C#编写第一个iOS应用程序
  2. jquery 异步调用方法中不能给全局变量赋值的原因及解决办法
  3. 7款Flash和Javascript网页视频播放器
  4. 按汇总分组/多维数据集
  5. em模型补缺失值_基于EM算法数据单变量缺失处理方法研究
  6. http respose status code (记)
  7. HEVC---xCompressCU()函数作用及位置
  8. 【HDFS】HDFS与getconf结合使用,获取配置信息
  9. Android Testing学习02 HelloTesting 项目建立与执行
  10. debian 配置linuxptp 软件时间戳
  11. Win10下Pytorch的安装和使用[斗之力三段]
  12. SilverLight跨域访问及其常用的几种解决方法
  13. DG导入mysql依赖包_mysql 命令行快速导出数据,导入数据
  14. multisim中pwl_multisim元器件
  15. 医学DICOM文件解析(笔记整理)
  16. 如何在百度收录平台注册账号获取Token
  17. 用matlab求roc曲线的面积Auc,sklearn计算ROC曲线下面积AUC
  18. Python Web开发:Django+BootStrap实现简单的博客项目
  19. 【论文笔记09】Differentially Private Hypothesis Transfer Learning 差分隐私迁移学习模型, ECMLPKDD 2018
  20. iOS 导入自定义字体不生效

热门文章

  1. 国家发改委:春运期间推动“健康码”全国一码通行
  2. 多线程编程之死锁已经死锁产生的原因
  3. extern C的主要作用简单解释
  4. pdo mysql 建库_一帖让PHP小白彻底了解PDO操作数据库的方法
  5. java classname.this_java 中 类名.this与类名.class
  6. pythonset操作教程_Python集合(set)方式和使用方法
  7. 海信电视root工具_海信璀璨系列家电:一次购买便能享受全方位智能家居生活...
  8. 贴片按键开关_轻触开关的常用类型和规格型号
  9. HashMap之tableSizeFor
  10. Uri跟Url的区别