trident API指南

@(STORM)[storm]

  • trident API指南
  • 零 概述
    • 1 本地分区操作
    • 2 重新分区操作
    • 3 聚合操作
    • 4 流分组操作
    • 5合并与连接
  • 一 本地分区操作
    • 一 函数
    • 二 filter
    • 三 分区聚合
      • 1Aggregator接口
      • 2init方法
      • 3aggregate方法
      • 4complete方法
    • 四态查询与分区持久化
    • 五投影
  • 二重新分区操作
  • 三 聚合操作
  • 四 流分组操作
  • 五 合并与连接
  • 六 一些注意事项
    • 一归纳及思考
    • 二partitionBy与groupBy
    • 三partitionAggregate与aggregate
    • 四partitionPersistence与persistentAggregate
    • 五分组与聚合

官方文档请参考:https://storm.apache.org/documentation/Trident-API-Overview.html

零、 概述

Trident的核心数据模型是一系统批处理的流,流被分发到集群的节点上,对流的操作也并行在各个分区中进行。
在Trident中,分区的概念大致与task对应,并行在各个分区中进行,也就是并行在各个task中进行。
对比于store core的统一bolt,Trident提供了各种各样的函数操作,方便用户直接调用,主要有以下5类:

1、 本地分区操作

(1)应用在本地的每个分区,不需要网络传输
(2)包括Function,filter, partitionAggregate, stateQuery/partitionPersist, projection
(3)以filter为例,它会对这个分区(task)中的数据进行过滤,再发送出去,不会涉及其它分区的内容。

2、 重新分区操作

(1)重新分区一个流,但不改变其内容,需要网络传输,即在不同的task间交换数据
(2)包括shuffle, broadcast, partitionBy, global, batchGlobal,partition等
(3)以partitionBy为例,它会将各个分区中的数据根据字段值分区到各个目标分区中,保证相同字段值在同一个分区中,但消息中的内容是没有被改变的,只是重新作了分区。

3、 聚合操作

(1)对数据流进行聚合,会有网络传输,输出结果是其聚合操作的结果
(2)包括aggregate、persistenAggregate。
(3)一般需要一个聚合函数作为参数,指定如何进行聚合,如trident自身提供的Sum, Count等类,它们都实现了Aggregator接口。

4、 流分组操作

(1)对数据流进行分区并分组,即根据字段的值先分到不同的分区,然后在分区内再根据字段的值进行分组。
(2)主要包括groupBy。
(3)如果在一个流分组中运行聚合器,聚会会在每个组内运行。persistenAggregate也可以运行在一个GroupStream中,在这种情况下,结果将保存在一个按关键字段进行分组的MapState中。

5、合并与连接

(1)合并或者连接几个数据流
(2)包括merger, join等

一、 本地分区操作

(一) 函数

输出元组的字段附加到原输入元组字段后面,一般通过继承BaseFuntion来实现。

(二) filter

把一个元组作为输出,调用filter的isKeep方法,判断是否保留这个元组。

(三) 分区聚合

partitionAggregate的具体使用可以参考trident State应用指南 http://blog.csdn.net/lujinhong2/article/details/49909945

简单的说就是partitionAggregate在分区内调用一个实现了一个Aggregator接口的类,实现聚合操作,如Sum等。
关于partitionAggregate与aggregate请参考后面内容

内容如下:
这里涉及了一些trident常用的API,但project等相对容易理解,这里只介绍partitionAggregate的用法。

再看看上面代码中对partitionAggregate的使用:

Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue"))

第一,三个参数分别表示输入流的名称与输出流的名称。中间的NameCountAggregator是一个Aggregator的对象,它定义了如何对输入流进行聚合。我们看一下它的代码:

public class NameCountAggregator implements Aggregator<Map<String, Integer>> {private static final long serialVersionUID = -5141558506999420908L;@Overridepublic Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>();}//判断某个名字是否已经存在于map中,若无,则put,若有,则递增@Overridepublic void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);}}//将聚合后的结果emit出去@Overridepublic void complete(Map<String, Integer> map,TridentCollector collector) {if (map.size() > 0) {for(Entry<String, Integer> entry : map.entrySet()){System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue());collector.emit(new Values(entry.getKey(),entry.getValue()));}map.clear();} }@Overridepublic void prepare(Map conf, TridentOperationContext context) {}@Overridepublic void cleanup() {}}

(1)Aggregator接口

它实现了Aggregator接口,这个接口有3个方法:

public interface Aggregator<T> extends Operation {T init(Object batchId, TridentCollector collector);void aggregate(T val, TridentTuple tuple, TridentCollector collector);void complete(T val, TridentCollector collector);
}

init方法:在处理batch之前被调用。init的返回值是一个表示聚合状态的对象,该对象会被传递到aggregate和complete方法。
aggregate方法:为每个在batch分区的输入元组所调用,更新状态
complete方法:当batch分区的所有元组已经被aggregate方法处理完后被调用。

除了实现Aggregator接口,还可以实现ReducerAggregator或者CombinerAggregator,它们使用更方便。详见《从零开始学storm》或者官方文档
https://storm.apache.org/documentation/Trident-API-Overview.html

下面我们看一下这3个方法的实现。

(2)init方法

@Override
public Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>();
}

仅初始化了一个HashMap对象,这个对象会作为参数传给aggregate和complete方法。对一个batch只执行一次。

(3)aggregate方法

aggregate方法对于batch内的每一个tuple均执行一次。这里将这个batch内的名字出现的次数放到init方法所初始化的map中。

@Override
public void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);}
}

(4)complete方法

这里在complete将aggregate处理完的结果发送出去,实际上可以在任何地方emit,比如在aggregate里面。
这个方法对于一个batch也只执行一次。

@Override
public void complete(Map<String, Integer> map,TridentCollector collector) {if (map.size() > 0) {for(Entry<String, Integer> entry : map.entrySet()){System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue());collector.emit(new Values(entry.getKey(),entry.getValue()));}map.clear();}
}

(四)态查询与分区持久化

即stateQuery与partitionPersist,分别用于查询与更新状态源,注意均只对分区内的数据操作

(五)投影

即project操作,如果对字段[“a”,”b”,”c”]进行以下操作:

 myStream.project(new Field(“b”)),则输出流只包含字段b。

二、重新分区操作

重新分区操作运行一个函数改变元组在任务之间的分布,也可以调整分区的数量,它需要网络传输。包括以下方法:
shuffle: Use random round robin algorithm to evenly redistribute tuples across all target partitions
broadcast: Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do a stateQuery on every partition of data.
partitionBy: partitionBy takes in a set of fields and does semantic partitioning based on that set of fields. The fields are hashed and modded by the number of target partitions to select the target partition. partitionBy guarantees that the same set of fields always goes to the same target partition.
global: All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.
batchGlobal: All tuples in the batch are sent to the same partition. Different batches in the stream may go to different partitions.
partition: This method takes in a custom partitioning function that implements backtype.storm.grouping.CustomStreamGrouping

三、 聚合操作

(1)对数据流进行聚合,会有网络传输,输出结果是其聚合操作的结果
(2)包括aggregate、persistenAggregate。
(3)一般需要一个聚合函数作为参数,指定如何进行聚合,如trident自身提供的Sum, Count等类,它们都实现了Aggregator接口。

四、 流分组操作

(1)对数据流进行分区并分组,即根据字段的值先分到不同的分区,然后在分区内再根据字段的值进行分组。
(2)主要包括groupBy。
(3)如果在一个流分组中运行聚合器,聚会会在每个组内运行。persistenAggregate也可以运行在一个GroupStream中,在这种情况下,结果将保存在一个按关键字段进行分组的MapState中。

五、 合并与连接

(1)合并或者连接几个数据流
(2)包括merger, join等

六、 一些注意事项

(一)归纳及思考

trident每次处理一个batch(先不考虑setMaxSpoutPending的情况),这个batch会被分配到多个task进行处理(即多个分区),然后这些task完成会后继续发送到下游的目标task,直至所有的task都完成操作,再进行下一个batch的处理。
在task与task间的传输就需要用到这里提供的各种函数。

(二)partitionBy与groupBy

partitionBy根据字段的值进行哈希,然后根据目标分区的数量求模,以确定相应的值放到哪个分区中。因此可以保证相同的字段值放在同一个分区。但由于分区数量有限,而不同字段值的数量会较多,因此不同的字段值也可能放在一个分区中
groupBy根据字段的值做先做partitionBy,保证相同字段的值都已经在同一个分区,但同样不同的字段值也有可能在一个分区,然后,在一个分区内再进行分组,保证每一个组内的字段值都是相同的,看看官方的示意图:

  • partitionBy: partitionBy takes in a set of fields and does semantic partitioning based on that set of fields. The fields are hashed and modded by the number of target partitions to select the target partition. partitionBy guarantees that the same set of fields always goes to the same target partition.

  • groupBy: The groupBy operation repartitions the stream by doing a partitionBy on the specified fields, and then within each partition groups tuples together whose group fields are equal.

(三)partitionAggregate()与aggregate()

注意,Stream类和Aggregator类均有aggregate()方法,这里指的是Stream类的aggregate()方法。同时简单说一下二者的关系,Aggregator一般作为Stream类中aggregate()方法的参数,前者的aggregate方法用于对Stream中的每一个tuple进行处理。

partitionAggregate是对分区内的数据先进行一次聚合,而aggregate是对所有分区作聚合,举个例子:

trident.newStream(“TRIDENT_SPOUT”, new MySpout()).partitionAggregate(new Sum(), new Fields(“out1”)).parallelismHint(10).aggregate(new Fields(“out1”), new Sum(), new Fields(“out2”))

上述例子中,Spout发送一些数据,然后partitionAggregate先在各个分区内计算sum,最后把结果发送出去,aggregate汇总各个分区的内容再作一次聚合(sum)得出最后的结果,因此一般而言,aggregate有多个task同时执行的意义不大,除非是允许多个批次同时执行的情况,可以每个aggregate处理一个批次。

* 注意二者均使用一个实现了Aggregator接口的类对象作为参数*,如:

public class Sum implements CombinerAggregator<Number>

(四)partitionPersistence()与persistentAggregate()

  • 一般的state使用partitionPersistence()
  • MapSate使用persistentAggregate()

均是用于将处理结果进行持久化的。

(五)分组与聚合

分组(groupBy)是将数据按照key分成一个一个的组
聚合(aggregate)是将数据根据某种运算规则(如Sum)聚在一起,计算一个结果。
按键聚合(aggregateByKey)是将数据在相同的key范围内聚合在一起,比如同一个key的值相加。(spark的api)
persistenceAggregate接收的是一个GroupStream,经在已经分好组的组内进行聚合计算。

trident API指南相关推荐

  1. Django REST framework API 指南(2):响应

    Django REST framework API 指南(1):请求 Django REST framework API 指南(2):响应 Django REST framework API 指南(3 ...

  2. Zookeeper C API 指南一(转)

    Zookeeper 监视(Watches) 简介 Zookeeper C API 的声明和描述在 include/zookeeper.h 中可以找到,另外大部分的 Zookeeper C API 常量 ...

  3. 微信公众平台自定义菜单接口API指南

    微信公众平台开发模式自定义菜单接口API指南 开发实现方法,请查看 微信公众平台开发(58)自定义菜单 简介 开发者获取使用凭证(如何获取凭证)后,可以使用该凭证对公众账号的自定义菜单进行创建.查询和 ...

  4. Zookeeper C API 指南

    以前自己的博客中转载.翻译或写过(不过自己才疏学浅,写的不好)一些 Zookeeper 方面的文章,但是都没有涉及到 Zookeeper C API 的内容,今天的这篇博客是我农历新年的第一篇技术博客 ...

  5. sqoop2 java api实现_Sqoop2 Java客户端API指南

    原文连接:http://sqoop.apache.org/docs/1.99.6/ClientAPI.html Sqoop Java客户端API指南 这篇文章秒描述了额如何在外部应用中使用sqoop ...

  6. Microsoft REST API指南

    经过3个月的碎片时间的翻译和校验,由长沙.NET技术社区翻译的英文原文文档<Microsoft REST API指南 >已经翻译完成,现刊载前十一章如下,欢迎大家点击"查看原文& ...

  7. 微软HTTP API指南

    微软发布了创建"RESTful" API的指南.Roy Fielding将这些与REST没有多大关系的API称为HTTP API. 许多组织都发布了创建面向Web的HTTP API ...

  8. Trident API 概览

    Trident API 概览 在网上看到了很多有TRIDENT相关API的翻译,看来看去,总觉得没有说清楚很多东西,所以自己结合使用的经验翻译了一篇出来:翻译完以后,也发现 在自己的翻译中也有很多地方 ...

  9. Android API 指南

    Android API 指南 最好的 Android 学习资料非官方 API 指南莫属. 众所周知,Android开发者有中文网站了,API 指南一眼看去最左侧的菜单都是中文,然而点进去内容还是很多是 ...

最新文章

  1. 【leetcode 968. 监控二叉树】解题报告
  2. EXCEL数字前补零且转换成文本型
  3. [设计模式]8. C++与中介者模式(mediator pattern)
  4. 静物摄影用光技巧_摄影技巧:冬天的阳光怎样拍才更美?
  5. mysql 5.6升级8.0_Mysql数据库从5.6.28版本升到8.0.11版本部署项目时遇到的问题及解决方法...
  6. 国家可持续发展议程创新示范区创建工作推进会在北京召开
  7. js 请求接口获取不到登录cookie xhrFields 配置
  8. element 让日期选择器一直显示选择面板
  9. heap python_python topN max heap,使用heapq还是自实现?
  10. 修图必备:Photosho 2022 for Mac
  11. 后缀树c语言算法,C语言数据结构之中缀树转后缀树的实例
  12. 计算机网络超详细笔记(三):数据链路层
  13. LabWindows操作SQL SERVER
  14. ffmpeg 视频码率压缩、质量控制 -crf 和 -qp 参数详解
  15. C语言实现:输出明天的日期
  16. 商业智能,数据仓库,ETL,数仓调度工具informatica介绍手账(三)
  17. Vue 使用 video 标签实现视频播放
  18. 打开微信时站着的小人是谁?
  19. TTL、CMOS、LVTTL、LVCMOS、LVDS
  20. 面试中问什么问题最能让面试官记忆犹新?

热门文章

  1. 【超全解析】原码、反码、补码、移码的相互转化和解读(取值范围)
  2. [leetcode]102.二叉树的层序遍历
  3. linux curl 多线程,CURL多线程不执行一直在请求
  4. 远程图片保存到服务器 php,保存远程图片到本地服务器几种方法[php,asp]网
  5. android activity 被notification启动,Android通知Notification全面剖析
  6. linux sh脚本 递增,Linux shell 脚本实现进度框
  7. access工具_工具篇之pycharm小技巧-httpclient
  8. numpy 随机数_数据分析numpy基础看着一篇就够了
  9. vuecli启动的服务器位置,在vue cli 3生成的项目中启动dev服务器
  10. 预编码 matlab,无线通信-预编码-MATLAB代码合集