Trident是Storm中最为核心的概念,在做Strom开发的过程中,绝大部分情况下我们都会使用Trident,而不是使用传统的Spout、Bolt。Trident是Storm原语的高级封装,学会Trident之后,将会使得我们Storm开发变得非常简单。

一、什么是Storm Trident ?

简而言之:Trident是编写Storm Topology的一套高级框架,是对传统Spout、Bolt的高级封装。在学习Trident之前,我们都是都Spout、Bolt的相关API来编写一个Topology,在学习了Trident之后,我们会使用Trident API来编写Topology。

可以将StormTopology与TridentTopology的关系,类比为JDBC与ORM框架(mybatis、hibernate)之间的关系,后者是前者的高级封装,功能相同,但是可以极大的减少我们的开发的工作量。

当然,就像我们学习JDBC与ORM框架一样,JDBC可能很容易理解,但是学习ORM框架,可能就相对复杂一点。甚至学习ORM框架的时间可能要比学习JDBC的时间要更长,但是一旦我们学会了ORM框架,可能就再也不想去使用JDBC了,因为ORM框架可以帮助我们更高效的进行开发。

学习Trident也一样,可能我们学习理解许多新的概念,但是学会了会极大的提高我们的开发效率。

Storm原语中,最重要的就是Spout、Bolt、Grouping等概念。

Trident对于Storm原语的抽象主要也就是针对这些基本概念的抽象。主要体现在:Trident Spout,Operation、State 。

Trident Spout是针对Storm原语中的Spout进行的抽象

Operation是针对Bolt、Grouping等概念的抽象

State是新提出的概念,实际上就是数据持久化的接口。

通常情况下,新的概念意味着要使用新的API。但是归根结底,还是底层还是通过storm原语来实现。在Trident中,我们使用TridentTopology表示一个拓扑,而在Storm原语中,我们使用StormTopology来表示一个拓扑。TridentTopology最终会被转换成StormTopology。在接下来的内容,我们将首先介绍TridentTopology的构建过程,以及TridentTopology如何转化为StormTopology。

二、TridentTopology与StormToplogy

1、API区别:

在Trident中,有着新的一套构建Topology的API,我们先通过从代码层面上对比来进行分析:

StormTopology:由传统的Spout和Bolt的API编写的Topology,最终是通过TopologyBuilder对象来创建的,返回的结果就是StormTopology对象。StormTopologytopology= topologyBuilder.createTopology();

TridentTopology:由Trident的API编写的Topology,因为在Trident的API中,使用TridentTopology来表示一个Topology,是直接new出来的。TridentTopology tridentTopology = new TridentTopology();

单独提出这两个概念,是为了以后的区分。以后我们提到StormTopology时,表示的就是以Spout、Bolt等这些API创建的Topology,而提到TridentTopology表示的就是以Trident的API创建的Topology。

2、创建Topology的区别

我们以单词计数案例WordCountApp(超链接)进行对比

StormTopologyTopologyBuilder builder = new TopologyBuilder();

builder.setSpout("word-reader" , new WordReader(),4);

builder.setBolt("word-normalizer" , new WordNormalizer(),3).shuffleGrouping("word-reader" );

builder.setBolt("word-counter" , new WordCounter(),1).fieldsGrouping("word-normalizer" , new Fields("word"));

StormTopology topology = builder .createTopology();

TridentTopology:TridentTopology tridentTopology = new TridentTopology();

tridentTopology.newStream("word-reader-stream" , new WordReader()).parallelismHint(16)

.each( new Fields("line" ), new NormalizeFunction(), new Fields("word" ))

.groupBy( new Fields("word" ))

.persistentAggregate( new MemoryMapState.Factory(), new Sum(), new Fields("sum" ));

StormTopology stormTopology = tridentTopology.build();

对比:

在StormTopology中,我们都是通过TopologyBuilder的setSpout、setBolt的方式来创建Topology,然后通过Grouping策略指定Bolt的数据来源和分组策略。

在TridentTopology中,我们使用TridentTopology来创建Topology,整个创建过程中,都是流式编程风格的。要注意的是,在Trident中,我们依然使用了WordReader这个Spout,但是并没有使用Bolt,而是使用了类似于each、persistentAggregate这样方法,来取代Bolt的功能。关于这些方法的作用再之后会详细介绍,目前只要知道Bolt的作用被一些方法取代了即可。

三、TridentTopology与StormToplogy的联系

二者的联系主要是:TridentTopology最终会被编译成StormTopology。请再次查看上述构建构建TridentTopology的最后一句代码。StormTopology stormTopology = tridentTopology.build();

这句代码的的返回结果还是StormTopology对象,这实际上意味着,TridentTopology最终还是会被编译成StandardTopology。这很容易理解,就像ORM框架与JDBC一样,ORM框架只是一层封装,最终还是要通过JDBC操作数据库。而TridentTopology是高级封装,但是最终还是要通过编译StormTopology来运行。

注意:这一点是非常重要的。上面我们已经提到,在Trident中,依然要指定Spout,但是用了一系列其他的方法如each、persistentAggregate等(当然不止这些),代替了Bolt的功能。那么这里又提到,TridentTopology会被编译成StormTopology,实际上就意味着Storm最终会将这些方法转换为一个或多个Bolt。我们要了解Trident是如何工作的,就必须要了解,这些方法的最终是如何被转换为Bolt的。最简单的查看方式,就是查看编译后的StormTopology的getSpout,getBolt方法来看。

在后面我们会详细介绍,TridentTopology是如何转换为StormTopology的。目前,我们只需要知道TridentTopology最终是会转换为StormTopology即可。

事实上,在Trident框架会将调用的所有方法转换为一个个Node。Node类型如下:

newStream方法中参数Spout转换为SpoutNode,将调用的each方法,persistentAggregate等方法,转换为一个个ProcesserNode,而groupBy等操作,转换为一个PartitionNode,最终组成一个对象图,最后根据这个对象图,来将TridentTopology转换为StormTopology。当然,光说不练假把式,我们通过分析源码进行简单说明。

首先说明Spout转换为SpoutNode对象

其次说明each、persistentAggregate转换为ProcesserNode

最后说明如何根据SpoutNode和ProcesserNode将TridentTopology转换为StormTopology

1、Spout转换为SpoutNode

首先,我们看一下TridentTopology对象的newStream方法:

可以看到,可以接受五种类型的Spout,以下是这五个方法的实现:

我们可以看到,这五种方法,最终调用的实际上只有两个,并且在这两个方法中,最终都将Spout转换为了SpoutNode对象。

2、each、persistentAggregate转换为ProcessorNode

我们可以查看Stream对象的源码, 找到each、persistentAggregate两个方法内容

上图显示了这两个方法,这种都被转换为一个ProcessorNode,最终添加到Topology中。

3、最后我们看一下,TridentTopology.build()的实现

由于源码内容比较多,我们只分析感兴趣的地方,其中红色加深的地方,是目前最为关注的:public StormTopology build() {

...

TridentTopologyBuilder builder = new TridentTopologyBuilder();

Map spoutIds = genSpoutIds( spoutNodes);

Map boltIds = genBoltIds( mergedGroups);

// SpoutNode维护了Spout类型,根据类型转换为对应的Spout

for(SpoutNode sn : spoutNodes ) {

Integer parallelism = parallelisms.get(grouper .nodeGroup(sn));

if(sn .type == SpoutNode.SpoutType.DRPC) {

builder.setBatchPerTupleSpout(spoutIds .get(sn), sn.streamId ,

(IRichSpout) sn. spout, parallelism , batchGroupMap.get(sn ));

} else {

ITridentSpout s;

if(sn .spout instanceof IBatchSpout) {

s = new BatchSpoutExecutor((IBatchSpout)sn .spout );

} else if(sn .spout instanceof ITridentSpout) {

s = (ITridentSpout) sn. spout;

} else {

throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");

// TODO: handle regular rich spout without batches (need lots of updates to support this throughout)

}

builder.setSpout(spoutIds .get(sn), sn.streamId, sn. txId, s, parallelism , batchGroupMap .get(sn));

}

}

for(Group g : mergedGroups ) {

if(!isSpoutGroup( g)) {

Integer p = parallelisms.get(g );

Map streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);

//将调用each、processAggregate方法后的ProcessorNode,转换为Bolt

BoltDeclarer d = builder.setBolt(boltIds .get(g), new SubtopologyBolt(graph, g .nodes , batchGroupMap ), p,

committerBatches(g, batchGroupMap), streamToGroup);

Collection inputs = uniquedSubscriptions(externalGroupInputs(g ));             //根据调用GroupBy等方法转换成的PartitionNode进行Grouping策略

for(PartitionNode n : inputs ) {

Node parent = TridentUtils.getParent( graph, n );

String componentId;

if(parent instanceof SpoutNode) {

componentId = spoutIds .get(parent);

} else {

componentId = boltIds.get(grouper .nodeGroup(parent));

}

d.grouping( new GlobalStreamId(componentId , n.streamId ), n.thriftGrouping);

}

}

}

return builder .buildTopology();

}

storm mysql trident_Trident简介相关推荐

  1. MySQL数据库简介数据库介绍

    MySQL数据库简介 一.数据库介绍 (一)数据库相关概念 数据库是"按照数据结构来组织.存储和管理数据的仓库".是一个长期存储在计算机内的.有组织的.有共享的.统一管理的数据集合 ...

  2. 【十三】景区人流量统计:python日志生成+logstash+kafka+storm+mysql+springBoot+高德地图

    storm+kafka+logstash+springBoot+高德地图 项目概述: 作用:交通信息化,智慧城市 需求:实时统计人流量并通过热力图展示. 类似于腾讯热力图的景区人流量统计 如何采集某个 ...

  3. MYSQL数据库简介和常用的基本SQL语句

    1.MYSQL数据库简介 MySQL是一种开放源代码的关系型数据库管理系统(RDBMS),MySQL数据库系统使用最常用的数据库管理语言--结构化查询语言(SQL)进行数据库管理. 2.常用数据类型 ...

  4. 【①MySQL】浅谈数据库系统:MySQL的简介与安装配置

    前言 欢迎来到小K的MySQL专栏,本节将为大家带来MySQL的简介与安装配置的详细讲解~ 目录 前言 一.数据库系统概述 数据(Data) 数据库(Database) 数据库管理系统(Databas ...

  5. kfaka storm写入mysql_flume+kafka+storm+mysql架构设计

    序言 前段时间学习了storm,最近刚开blog,就把这些资料放上来供大家参考.这个框架用的组件基本都是最新稳定版本,flume-ng1.4+kafka0.8+storm0.9+mysql如果有需要测 ...

  6. mysql storm_flume+kafka+storm+mysql架构设计

    前段时间学习了storm,最近刚开blog,就把这些资料放上来供大家参考. 这个框架用的组件基本都是最新稳定版本,flume-ng1.4+kafka0.8+storm0.9+mysql(项目是mave ...

  7. php mysql数据库简介,mysql数据库

    数据库简介 什么是数据库 保存和管理数据的仓库,数据库 什么是数据 文件,图片,视频,订单,用户名,密码等等.这些数据都需要有专门的地方来保存和管理 数据库的分类 关系型:mysql ... 非关系型 ...

  8. mysql mgr简介_mysql8.0初探:(二)MySQL Group Replication-MGR集群简介

    mysql8.0初探:(二)MySQL Group Replication-MGR集群简介 发布时间:2020-06-12 23:59:17 来源:51CTO 阅读:49832 作者:arthur37 ...

  9. MySql数据库简介(一)

    数据库简介 人类在进化的过程中,创造了数字.文字.符号等来进行数据的记录,但是承受着认知能力和创造能力的提升,数据量越来越大,对于数据的记录和准确查找,成为了一个重大难题 计算机诞生后,数据开始在计算 ...

最新文章

  1. mysql部署 linux_linux 怎么部署mysql数据库
  2. Rust 阴阳谜题,及纯基于代码的分析与化简
  3. hessian java php_探讨Hessian在PHP中的使用分析
  4. Seafile 1.4 发布,文件同步和协作平台
  5. android各目录大小,Android 基础篇 — 放不同drawable文件夹中图片的大小
  6. spring boot:创建一个简单的web(maven web project)
  7. 简单struts,spring,mybatis组合使用
  8. 高能力成熟度软件企业中软件质量工程师的职责
  9. 【DDD】--好文收藏
  10. Intel 64/x86_64/IA-32/x86处理器指令集 - CPUID (2) - 起源
  11. Windows 操作小技巧 之一(持续更新)
  12. Teamcenter 入门开发系列问答(1)
  13. 芯片大神Jim Keller从特斯拉离职,转投“宿敌”英特尔
  14. django orm 以列表作为筛选条件进行查询
  15. Windows server 2003 下载
  16. 微信手写板 android,微信小程序:手写板功能实现(canvas)
  17. dorado java_概述-Dorado JDBC Addon
  18. 最全的肱三头肌训练图解,漂亮手臂必备
  19. C# winform中打开网页的方法
  20. 大数据Hadoop之——Spark SQL+Spark Streaming

热门文章

  1. Mapbox矢量瓦片pbf文件信息解析
  2. 百度智能云人脸识别java_demo完整实例
  3. Arduino实现数码管动态显示
  4. 财务管理计算机实验日志5天,关于财务管理的实习日记范文
  5. 创造习惯 — 如何科学地养成习惯
  6. 推荐爱码哥移动开发平台十大常用的原生UI控件
  7. stem教育小学制度管理
  8. keras 受限玻尔兹曼机_目前深度学习的模型有哪几种,适用于哪些问题?
  9. csharp基础练习题:noobCode 03:检查这些信件...查看是否在“字符串1”的信件出现在“字符串2”【难度:1级】--景越C#经典编程题库,不同难度C#练习题,适合自学C#的新手进阶训练
  10. tkinter 出现两个窗口 tk(未响应) 解决方法