Storm Trident 详细介绍
一、概要
public static class PerActorTweetsFilter extends BaseFilter {String actor;public PerActorTweetsFilter(String actor) {this.actor = actor;}@Overridepublic boolean isKeep(TridentTuple tuple) {return tuple.getString(0).equals(actor);}
}
topology.newStream("spout", spout).each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).each(new Fields("actor", "text"), new Utils.PrintFilter());
- 第一个构造参数:作为Field Selector,一个tuple可能有很多字段,通过设置Field,我们可以隐藏其它字段,仅仅接收指定的字段(其它字段实际还在)。
- 第二个是一个Filter:用来过滤掉除actor名叫"dave"外的其它消息。
public static class UppercaseFunction extends BaseFunction {@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {collector.emit(new Values(tuple.getString(0).toUpperCase()));}
}
topology.newStream("spout", spout).each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text")).each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());
Fields(
"text"
,
"actor"
),其作用是把其中的
"text"字段内容都变成大写。
java mystream.project(new Fields("b", "d"))
则输出的流仅包含 [“b”, “d”]字段。
public static class PerActorTweetsFilter extends BaseFilter {private int partitionIndex;private String actor;public PerActorTweetsFilter(String actor) {this.actor = actor;}@Overridepublic void prepare(Map conf, TridentOperationContext context) {this.partitionIndex = context.getPartitionIndex();}@Overridepublic boolean isKeep(TridentTuple tuple) {boolean filter = tuple.getString(0).equals(actor);if(filter) {System.err.println("I am partition [" + partitionIndex + "] and I have kept a tweet by: " + actor);}return filter;}
}
topology.newStream("spout", spout).each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).parallelismHint(5).each(new Fields("actor", "text"), new Utils.PrintFilter());
I am partition [4] and I have kept a tweet by: dave I am partition [3] and I have kept a tweet by: dave I am partition [0] and I have kept a tweet by: dave I am partition [2] and I have kept a tweet by: dave I am partition [1] and I have kept a tweet by: dave
topology.newStream("spout", spout).parallelismHint(2).shuffle().each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).parallelismHint(5).each(new Fields("actor", "text"), new Utils.PrintFilter());
I am partition [2] and I have kept a tweet by: dave I am partition [2] and I have kept a tweet by: dave I am partition [2] and I have kept a tweet by: dave I am partition [2] and I have kept a tweet by: dave
- shuffle:通过随机分配算法来均衡tuple到各个分区
- broadcast:每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery
- partitionBy:根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确保相同字段列表的数据被划分到同一个分区
- global:所有的tuple都被发送到一个分区,这个分区用来处理整个Stream
- batchGlobal:一个Batch中的所有tuple都被发送到同一个分区,不同的Batch会去往不同的分区
- Partition:通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping
public static class LocationAggregator extends BaseAggregator<Map<String, Integer>> {@Overridepublic Map<String, Integer> init(Object batchId, TridentCollector collector) {return new HashMap<String, Integer>();}@Overridepublic void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) {String location = tuple.getString(0);val.put(location, MapUtils.getInteger(val, location, 0) + 1);}@Overridepublic void complete(Map<String, Integer> val, TridentCollector collector) {collector.emit(new Values(val));}
}
topology.newStream("spout", spout).aggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts")).each(new Fields("location_counts"), new Utils.PrintFilter());
- init():当刚开始接收到一个batch时执行
- aggregate():在接收到batch中的每一个tuple时执行
- complete():在一个batch的结束时执行
[{USA=3, Spain=1, UK=1}] [{USA=3, Spain=2}] [{France=1, USA=4}] [{USA=4, Spain=1}] [{USA=5}]
topology.newStream("spout", spout).partitionBy(new Fields("location")).partitionAggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts")).parallelismHint(3).each(new Fields("location_counts"), new Utils.PrintFilter());
[{France=10, Spain=5}] [{USA=63}] [{UK=22}]
topology.newStream("spout", spout).groupBy(new Fields("location")).aggregate(new Fields("location"), new Count(), new Fields("count")).each(new Fields("location", "count"), new Utils.PrintFilter());
... [France, 25] [UK, 2] [USA, 25] [Spain, 44] [France, 26] [UK, 3] ...
Storm Trident 详细介绍
标签:storm trident trident api
原文:http://blog.csdn.net/suifeng3051/article/details/41117929
Storm Trident 详细介绍相关推荐
- [Trident] Storm Trident 教程,state详解、trident api详解及实例
英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...
- Storm Trident API
在Storm Trident中有五种操作类型 Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输 Repartitioning:数据流重定向,单纯的改变数据流向,不会 ...
- Hadoop生态系统的详细介绍
hadoop生态系统的详细介绍 简介 Hadoop是一个开发和运行处理大规模数据的软件平台,是Appach的一个用java语言实现开源软件框架,实现在大量计算机组成的集群中对海量数据进行分布式计算.今 ...
- Storm Trident拓扑中的错误处理
这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法. 我在这里关注代码设计,而不是监督或冗余之类的部署良好实践. 由于Storm的实时流性质,当面对大多数错误时,我们最终将不得不 ...
- python教程推荐-关于推荐系统的详细介绍
推荐系统中经常需要处理类似user_id, item_id, rating这样的数据,其实就是数学里面的稀疏矩阵,scipy中提供了sparse模块来解决这个问题,但scipy.sparse有很多问题 ...
- hadoop生态系统的详细介绍-详细一点
前提 日常喜欢看一些微信分享的好文,总结下来,可以作为过滤器吧(节约更多人的时间!),在这里引用的是别人的文章!对原文的作者表示感谢!确实写的很好! hadoop生态系统的详细介绍 简介 Hadoop ...
- 浏览器的排行榜及详细介绍
关于浏览器的排行榜 来自Net Market Share 的 数据,7月份,占据全球浏览器排行榜首位的为Chrome浏览器,总市场份额为48.65%.IE浏览器以31.65%的占比,位居第二位.排在第 ...
- HTML页面加载和解析流程详细介绍
浏览器加载和渲染html的顺序.如何加快HTML页面加载速度.HTML页面加载和解析流程等等,在本文将为大家详细介绍下,感兴趣的朋友不要错过 浏览器加载和渲染html的顺序 1. IE下载的顺序是从上 ...
- mysql为什么要压测_mysql集群压测的详细介绍
本篇文章给大家带来的内容是关于mysql集群压测的详细介绍,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. mysql压测 mysql自带就有一个叫mysqlslap的压力测试工具,通 ...
最新文章
- ListView和数据适配器SimpleAdapter例子
- win10系统下载-靠谱推荐
- Linux vi格式化文件命令
- 转整型_SPI转can芯片CSM300详解、Linux驱动移植调试笔记
- antd tree搜索并定位_KD-Tree原理详解
- iOS设计模式 - 单例
- linux7.3系统如何修改系统时间,两招修改​Centos7 系统时区!
- springboot集成购买阿里的rocketmq
- android xml 设置半透明
- 重定向后路径上自动添加jsessionid=
- Jmeter的基本功能使用方法
- Kafka 过期数据清理 详解
- 【计算机网络】路由器与交换机
- Oracle开发 之 主-外键约束FK及约束的修改
- Java项目:springboot ERP管理系统
- QtAndroid详解 6 集成信鸽推送
- switch分支语句注意事项及注册界面的使用思路
- 阿里面试应该注意什么?通过5轮面试的阿里实习生亲述
- JMS入门(一)--JMS基础
- python高德 查询县_Python和高德开放平台——地名地址空间化及采集POI信息
热门文章
- https协议能否让网站,优先被百度收录,个人观点
- Debian 11 DNS服务器修改不生效解决方法
- Linux/Unix操作系统mac地址怎么查
- 变分自编码器的推导,VAE的推导,ELBO|证据下界|训练方法
- 30000台苹果电脑遭恶意软件入侵,包括最新的M1系列!快检查一下自己的电脑
- 支付宝扫码验签实例php,PHP 支付宝支付,支付宝回调
- 木鱼《觉醒年代》观后感
- SQLSERVER binary 和 varbinary 用法全解
- 如何在visual studio下编译zxing cpp,以及zxing c++的使用
- 高级移动位置(AML)与紧急位置服务(ELS)