简介:

Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入、有状态的流式处理与低延时的分布式查询无缝结合起来。如果你了解 Pig 或者 Cascading 这样的高级批处理工具,你就会发现他们和 Trident 的概念非常相似。Trident 同样有联结(join)、聚合(aggregation)、分组(grouping)、函数(function)以及过滤器(filter)这些功能。Trident 为数据库或者其他持久化存储上层的状态化、增量式处理提供了基础原语。由于 Trident 有着一致的、恰好一次的语义,因此推断出 Trident 拓扑的状态也是一件很容易的事。

Trident 流程图:

Paste_Image.png

就按照流程图来讲吧:

Trident Spouts

查看官方demo中代码:

TridentTopology topology = new TridentTopology();

topology.newStream("myspoutid", new MyRichSpout());

查看newStream()方法源代码:

//上节中BaseRichSpout类就是实现了IRichSpout

public Stream newStream(String txId, IRichSpout spout) {

return newStream(txId, new RichSpoutBatchExecutor(spout));

}

//非事务型 spout,每次会输出一个 batch 的 tuple.接下来的demo会用到

public Stream newStream(String txId, IBatchSpout spout) {

Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);

return addNode(n);

}

//这是最常用的 API,支持事务型和模糊事务型的语义实现。不过一般会根据需要使用它的某个已有的实现,而不是直接实现该接口。

public Stream newStream(String txId, ITridentSpout spout) {

Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);

return addNode(n);

}

//可以从分布式数据源(比如一个集群或者 Kafka 服务器)读取数据的事务型 spout。

public Stream newStream(String txId, IPartitionedTridentSpout spout) {

return newStream(txId, new PartitionedTridentSpoutExecutor(spout));

}

//可以从分布式数据源读取数据的模糊事务型 spout。

public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {

return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));

}

Trident Bolts

主要有 5 类操作:

  1. 针对每个小分区(partition)的本地操作,这类操作不会产生网络数据传输
  2. 针对一个数据流的重新分区操作,这类操作不会改变数据流中的内容,但是会产生一定的网络传输
  3. 通过网络数据传输进行的聚合操作
  4. 针对数据流的分组操作
  5. 融合与联结操作

本地分区操作

函数:

函数负责接收一个输入域的集合并选择输出或者不输出 tuple。输出 tuple 的域会被添加到原始数据流的输入域中。如果一个函数不输出 tuple,那么原始的输入 tuple 就会被直接过滤掉。否则,每个输出 tuple 都会复制一份输入 tuple 。假设你有下面这样的函数:

public class Split extends BaseFunction {

@Override

public void execute(TridentTuple tuple, TridentCollector collector) {

for(String word: tuple.getString(0).split(" ")) {

if(word.length() > 0) {

collector.emit(new Values(word));

}

}

}

}

Paste_Image.png

上图是源码中提供的一个Split函数用于按空格进行分割,分割完成以后继续延续原来的输出。

过滤器

过滤器负责判断输入的 tuple 是否需要保留,直接改变stream 的内容:

public class FilterNull extends BaseFilter {

@Override

public boolean isKeep(TridentTuple tuple) {

for(Object o: tuple) {

if(o==null) return false;

}

return true;

}

}

Paste_Image.png

上图就是一个判断tuple是否为空的filter,如果为false的则不继续留在stream流中

partitionAggregate

会在一批 tuple 的每个分区上执行一个指定的功能操作。与上面的函数不同,由 partitionAggregate

发送出的 tuple 会将输入 tuple 的域替换。以下面这段代码为例:

官方给出的代码:

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

假设stream 中tuple的内容如下:

Partition 0:

["a", 1]

["b", 2]

Partition 1:

["a", 3]

["c", 8]

Partition 2:

["e", 1]

["d", 9]

["d", 10]

执行上面的分区聚合的结果为:

Partition 0:

[3]

Partition 1:

[11]

Partition 2:

[20]

解释一下上面的函数功能为:分区聚合内容对分区中的内容求和即sum()然后输出key为sum的bolt流。

查看sum函数源代码:

public class Sum implements CombinerAggregator {

@Override

public Number init(TridentTuple tuple) {

return (Number) tuple.getValue(0);

}

@Override

public Number combine(Number val1, Number val2) {

return Numbers.add(val1, val2);

}

@Override

public Number zero() {

return 0;

}

}

Paste_Image.png

CombinerAggregator类只提供了sum和count求和函数

Storm 有三个用于定义聚合器的接口:CombinerAggregator、ReducerAggregator

、 Aggregator。ReducerAggregator

融合(Merge)与联结(join)

Trident API 的最后一部分是联结不同的数据流的操作。联结数据流最简单的方式就是将所有的数据流融合到一个流中。你可以使用 TridentTopology 的 merge 方法实现该操作,比如这样:

topology.merge(stream1, stream2, stream3);

Trident 会将融合后的新数据流的域命名为为第一个数据流的输出域。

联结数据流的另外一种方法是使用 join。像 SQL 那样的标准 join 操作只能用于有限的输入数据集,对于无限的数据集就没有用武之地了。Trident 中的 join 只会应用于每个从 spout 中输出的小 batch。

下面是两个流的 join 操作的示例,其中一个流含有 [“key”, “val1″, “val2″] 域,另外一个流含有 [“x”, “val1″] 域:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

上面的例子会使用 “key” 和 “x” 作为 join 的域来联结 stream1 和 stream2。Trident 要求先定义好新流的输出域,因为输入流的域可能会覆盖新流的域名。从 join 中输出的 tuple 中会包含:

join 域的列表。在这个例子里,输出的 “key” 域与 stream1 的 “key” 域以及 stream2 的 “x” 域对应。

来自所有流的非 join 域的列表。这个列表是按照传入 join 方法的流的顺序排列的。在这个例子里,“ a” 和 “b” 域与 stream1 的 “val1” 和 “val2” 域对应;而 “c” 域则与 stream2 的 “val1” 域相对应。

在对不同的 spout 发送出的流进行 join 时,这些 spout 上会按照他们发送 batch 的方式进行同步处理。也就是说,一个处理中的 batch 中含有每个 spout 发送出的 tuple。

最后的结果查询你可以使用 partitionPersist 和 stateQuery 来实现这个过程。过去一段时间内的 tuple 会以 join 域为关键字被保存到一个 state 源中。然后就可以使用 stateQuery 查询 join 域来实现这个“联结”(join)的过程。

我想还是上个demo吧,要不然都要睡过去啦:

public class Print extends BaseFilter {

//分区索引号从0开始标示

private int partitionIndex;

//总的分区数

private int numPartitions;

@Override

public void prepare(Map conf, TridentOperationContext context) {

//获取当前分区以及总的分区数

this.partitionIndex = context.getPartitionIndex();

this.numPartitions = context.numPartitions();

}

//过滤条件,其实这边就是用来打印输出,对最后的tuple元数据没有任何改变

@Override

public boolean isKeep(TridentTuple tuple) { System.err.println(String.format("Partition idx: %s out of %s partitions got %s/%s", partitionIndex, numPartitions, tuple.get(0).toString(),tuple.get(1).toString()));

return true;

}

//构造StormTopology

public static StormTopology buildTopology(LocalDRPC drpc) {

//构造一个固定的batch数的spout,这个类代码上面有大概分析过

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 5,

new Values("the cow jumped over the moon"),

new Values("the man went to the store and bought some candy"),

new Values("four score and seven years ago"), new Values("how many apples can you eat"),

new Values("to be or not to be the person"));

//循环发送数据

spout.setCycle(true);

TridentTopology topology = new TridentTopology();

//TridentState对象最终代表了所有的单词的数量。我们会使用这个TridentState对象来实现在计算过程中的进行分布式查询。

TridentState wordCounts = topology.newStream("testSpout", spout)

//对每个tuple内容用空格来分隔,然后通过相同的字符串来分组

.each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word"))

//persistentAggregate函数使用三个并行度(三个线程)对源源不断发送过来数据流做一个总的聚合,对出现的次数累加,然后加结果缓存在当前节点的内存中

.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")).parallelismHint(3);

topology.newDRPCStream("print", drpc)

.stateQuery(wordCounts, new TupleCollectionGet(), new Fields("word", "count"))

.each(new Fields("word", "count"), new Print());

return topology.build();

}

public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {

Config conf = new Config();

LocalDRPC drpc = new LocalDRPC();

LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordCounter", conf, buildTopology(drpc));

for (int i = 0; i < 10; i++) {

drpc.execute("print", "");

Thread.sleep(1000);

}

cluster.deactivate("wordCounter"); cluster.killTopology("wordCounter");

}

}

在main方法控制台输出为:

Partition idx: 1 out of 3 partitions got the/20

Partition idx: 1 out of 3 partitions got or/4

Partition idx: 1 out of 3 partitions got score/4

Partition idx: 1 out of 3 partitions got moon/4

Partition idx: 1 out of 3 partitions got four/4

Partition idx: 1 out of 3 partitions got over/4

Partition idx: 1 out of 3 partitions got bought/4

Partition idx: 1 out of 3 partitions got can/4

Partition idx: 0 out of 3 partitions got went/8

Partition idx: 0 out of 3 partitions got candy/8

Partition idx: 0 out of 3 partitions got seven/8

Partition idx: 0 out of 3 partitions got jumped/8

Partition idx: 0 out of 3 partitions got ago/8

Partition idx: 0 out of 3 partitions got store/8

Partition idx: 0 out of 3 partitions got cow/8

Partition idx: 0 out of 3 partitions got many/8

Partition idx: 0 out of 3 partitions got years/8

Partition idx: 0 out of 3 partitions got eat/8

Partition idx: 0 out of 3 partitions got person/8

Partition idx: 0 out of 3 partitions got to/24

Partition idx: 0 out of 3 partitions got apples/8

Partition idx: 2 out of 3 partitions got be/24

Partition idx: 2 out of 3 partitions got not/12

Partition idx: 2 out of 3 partitions got some/12

Partition idx: 2 out of 3 partitions got and/24

Partition idx: 2 out of 3 partitions got man/12

Partition idx: 2 out of 3 partitions got how/12

Partition idx: 2 out of 3 partitions got you/12

Partition idx: 2 out of 3 partitions got be/32

Partition idx: 2 out of 3 partitions got not/16

Partition idx: 2 out of 3 partitions got some/16

Partition idx: 2 out of 3 partitions got and/32

Partition idx: 2 out of 3 partitions got man/16

Partition idx: 2 out of 3 partitions got how/16

Partition idx: 2 out of 3 partitions got you/16

Partition idx: 2 out of 3 partitions got be/38

Partition idx: 2 out of 3 partitions got not/19

Partition idx: 2 out of 3 partitions got some/19

Partition idx: 2 out of 3 partitions got and/38

Partition idx: 2 out of 3 partitions got man/19

Partition idx: 2 out of 3 partitions got how/19

Partition idx: 2 out of 3 partitions got you/19

Partition idx: 0 out of 3 partitions got went/23

Partition idx: 0 out of 3 partitions got candy/23

Partition idx: 0 out of 3 partitions got seven/23

Partition idx: 0 out of 3 partitions got jumped/23

Partition idx: 0 out of 3 partitions got ago/23

Partition idx: 0 out of 3 partitions got store/23

Partition idx: 0 out of 3 partitions got cow/23

Partition idx: 0 out of 3 partitions got many/23

Partition idx: 0 out of 3 partitions got years/23

Partition idx: 0 out of 3 partitions got eat/23

Partition idx: 0 out of 3 partitions got person/23

Partition idx: 0 out of 3 partitions got to/69

Partition idx: 0 out of 3 partitions got apples/23

Partition idx: 0 out of 3 partitions got went/25

Partition idx: 0 out of 3 partitions got candy/25

Partition idx: 0 out of 3 partitions got seven/25

Partition idx: 0 out of 3 partitions got jumped/25

Partition idx: 0 out of 3 partitions got ago/25

Partition idx: 0 out of 3 partitions got store/25

Partition idx: 0 out of 3 partitions got cow/25

Partition idx: 0 out of 3 partitions got many/25

Partition idx: 0 out of 3 partitions got years/25

Partition idx: 0 out of 3 partitions got eat/25

Partition idx: 0 out of 3 partitions got person/25

Partition idx: 0 out of 3 partitions got to/75

Partition idx: 0 out of 3 partitions got apples/25

Partition idx: 2 out of 3 partitions got be/56

Partition idx: 2 out of 3 partitions got not/28

Partition idx: 2 out of 3 partitions got some/28

Partition idx: 2 out of 3 partitions got and/56

Partition idx: 2 out of 3 partitions got man/28

Partition idx: 2 out of 3 partitions got how/28

Partition idx: 2 out of 3 partitions got you/28

不过对于TridentState 中的数据在分布式存储的环境如何存取的?

DRPCClient client = new DRPCClient("drpc.server.location", 3772);

System.out.println(client.execute("print", "cat dog the man");

整合在我们自己的代码中就需要这么使用了:

topology.newDRPCStream("print", drpc)

.stateQuery(wordCounts, new TupleCollectionGet(), new Fields("word", "count"))

.each(new Fields("word", "count"), new Print());

查看newDRPCStream源码:

public Stream newDRPCStream(String function, ILocalDRPC server) {

DRPCSpout spout;

if(server==null) {

spout = new DRPCSpout(function);

} else {

spout = new DRPCSpout(function, server);

}

return newDRPCStream(spout);

}

Paste_Image.png

发现是一个比较简单的spout

最后在main方法中执行execute,就这么跑起来了。

storm trident mysql,storm_Trident相关推荐

  1. storm trident mysql,Storm Trident(一)官方Tutorial

    本人原创翻译,转载请注明出处 Trident是基于Storm做实时计算的高等级的抽象.它允许你无缝集成高吞吐量(每秒100万级别的消息).无状态流处理.低延时的分布式查询. 如果你熟悉Pig或Casc ...

  2. Storm Trident拓扑中的错误处理

    这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法. 我在这里关注代码设计,而不是监督或冗余之类的部署良好实践. 由于Storm的实时流性质,当面对大多数错误时,我们最终将不得不 ...

  3. storm trident mysql_Trident-MySQL

    使用事物TridentTopology 持久化数据到MySQL1.构建拓扑JDBCTopology类packagestorm.trident.mysql;importjava.util.Arrays; ...

  4. Storm Trident API

    在Storm Trident中有五种操作类型 Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输 Repartitioning:数据流重定向,单纯的改变数据流向,不会 ...

  5. Storm Trident示例shuffleparallelismHint

    本例包括Storm Trident中shuffle与parallelismHint的使用. 代码当中包括注释 maven <dependency><groupId>org.ap ...

  6. Storm Trident简介

    转载自:[翻译][Trident] Storm Trident 教程 英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial -- ...

  7. [Trident] Storm Trident 教程,state详解、trident api详解及实例

    英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...

  8. Storm Trident 详细介绍

    一.概要 1.1 Storm(简介)      Storm是一个实时的可靠地分布式流计算框架.      具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息( ...

  9. Storm - Trident

    [align=center][size=large]Trident[/size][/align] 一.Storm 保证性 1.数据一定会发送 通过 ack / fail 方法确认,若失败,则提供重新发 ...

最新文章

  1. javascript运动系列第九篇——碰撞运动
  2. 公钥和私钥怎么生成_科普 | Eth2 验证者如何生成和保护取款密钥
  3. Github管理Eclipse分布式项目
  4. 蓝桥杯 123 二分+打表
  5. Centos7.x Hadoop 3.x HDFS 写入文件
  6. 泛微OA流程表单验证附加验证条件-js代码块
  7. java 聚合函数_如何使用Java流计算两个聚合函数?
  8. lambda表达式java_Java Lambda表达式
  9. 一些学习笔记和工作布置
  10. 几何画板椭圆九种画法_几何画板怎么画椭圆 几何画板椭圆绘制教程
  11. 协同过滤推荐算法总结(转载)
  12. 3DMAX解决Vray渲染材质溢色问题的三种方法
  13. 单片机的串口实验 串口介绍 串口原理
  14. 各大洲时区以及Linux环境下修改时区
  15. 京东物流一体化供应链建设实践
  16. 小秘书智能app登录
  17. 惊了,深圳房价比北京还高。。。
  18. 深入理解java虚拟机(五)GC垃圾回收-经典垃圾收集器
  19. android studio JSON Viewer
  20. LeetCode算法,每日一题,冲击字节跳动

热门文章

  1. Sentencepiece构建词典
  2. RPG的地牢猎手(优先队列广搜)
  3. I.MX6ULL开发板基于阿里云项目实战 3 :阿里云iot-SDK 移植到arm开发板
  4. 从GPT到chatGPT(一):GPT1
  5. EDVR: Video Restoration with Enhanced Deformable Convolutional Networks阅读笔记
  6. SVN+Gitee配置版本控制库
  7. 环回接口(Loopback Interface)【转】
  8. GithubPages教程 在GithubPages上搭建个人主页
  9. 看完后,你将离成功不远了...让我们一起奋斗吧!【转】
  10. C语言如何打开txt文件