本人原创翻译,转载请注明出处

Trident是基于Storm做实时计算的高等级的抽象。它允许你无缝集成高吞吐量(每秒100万级别的消息)、无状态流处理、低延时的分布式查询。 如果你熟悉Pig或Cascading等高级别的批处理工具,就会很熟悉Trident的概念——Trident有joins, aggregations, grouping, functions, and filters。除此以外,Trident原生支持任何数据库或持久化存储之上的有状态的、增加的处理。Trident有一致的、仅一次的语义,所以很容易推出Trident topologies。(最后这句不是很理解,请参考原文)。

示例

这个例子会做两件事:

1.从输入句子流中计算单词的数量

2.实现查询:给出单词列表,返回单词数量之和

这个例子从下面的数据源中读取无限的句子流:

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,

new Values("the cow jumped over the moon"),

new Values("the man went to the store and bought some candy"),

new Values("four score and seven years ago"),

new Values("how many apples can you eat"));

spout.setCycle(true);

下面是做单词计数的代码:

TridentTopology topology = new TridentTopology();

TridentState wordCounts =

topology.newStream("spout1", spout)

.each(new Fields("sentence"), new Split(), new Fields("word"))

.groupBy(new Fields("word"))

.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))

.parallelismHint(6);

我们一行一行的看代码。第一行,TridentTopology对象被创建,暴露出构成Trident计算的接口。TridentTopology 有一个方法newStream,读取输入,创建一个新的流。在这里,输入源就是之前定义的FixedBatchSpout。输入源也可以是Kestrel或Kafka这样的消息队列。对每个输入源,Trident跟踪一个较小数量的状态(关于已消费消息的元数据),状态信息存放在zookeeper中。zookeeper的节点根据字符串"spout1"来确定状态元数据的存放位置。

Trident以小批量tuples的形式处理流,例如,输入的句子流可能会被拆分成batchs,像这样:

batched-stream.png

batchs的大小与输入流的大小挂钩,输入流吞吐量大,batchs就会大。

Trident提供了成熟的API来处理这些小型batchs。API和Pig或Cascading的很像:可以做group by's, joins, aggregations, 运行functions, 运行filters等等。当然,孤立的处理每个小型batch没什么意义,所以Trident提供了多个batchs聚合及持久化的函数——包括内存, Memcached, Cassandra,等各种存储。最后,Trident有一流的查询实时状态源的函数,状态可以被Trident 更新(就像这个例子),或者也可能是独立的状态源。

回到例子,spout发出"sentence"流,topology 定义的下一行启用了Split函数,把句子转成单词。 Split定义:

public class Split extends BaseFunction {

public void execute(TridentTuple tuple, TridentCollector collector) {

String sentence = tuple.getString(0);

for(String word: sentence.split(" ")) {

collector.emit(new Values(word));

}

}

}

topology剩下的部分计算单词数量并持久化存储。首先以"word"字段分组,然后每个分组以Count聚合器持久化聚合。persistentAggregate 函数知道如何存储聚合的结果。这个例子中,单词计数保存在内存中,但是也可以轻易的切换到Memcached, Cassandra等其他持久化。例如切换到Memcached:

.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))

MemcachedState.transactional()

"serverLocations"是Memcached集群的host/port列表。

Trident 很酷的一点是它有完全的容错和“仅一次”处理机制。这会使你的实时处理更易理解(reason about)。当失败和重传发生的时候,Trident 保存的状态使它不会为同一个数据多次更新数据库。

persistentAggregate把流传输给TridentState 对象,这个例子中,TridentState 代表了所有的单词计数,我们将使用它来进行分布式查询。

topology 的下一个部分实现了单词计数的低延时分布式查询。输入是以空格分隔的单词组,输出这些单词的数量总和。这些查询像普通RPC调用的一样执行,不过是以分布式形式。例如:

DRPCClient client = new DRPCClient("drpc.server.location", 3772);

System.out.println(client.execute("words", "cat dog the man");

// prints the JSON-encoded result, e.g.: "[[5078]]"

像这种简单查询的延时大概10ms。

topology 的分布式查询实现如下:

topology.newDRPCStream("words")

.each(new Fields("args"), new Split(), new Fields("word"))

.groupBy(new Fields("word"))

.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))

.each(new Fields("count"), new FilterNull())

.aggregate(new Fields("count"), new Sum(), new Fields("sum"));

storm trident mysql,Storm Trident(一)官方Tutorial相关推荐

  1. storm trident mysql,storm_Trident

    简介: Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入.有状态的流式处理与低延时的分布式查询无缝结合起来.如果你了解 Pig 或者 Cascadi ...

  2. storm mysql trident_Apache Storm 官方文档 —— Trident 教程

    Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入.有状态的流式处理与低延时的分布式查询无缝结合起来.如果你了解 Pig 或者 Cascading 这 ...

  3. Apache Storm 官方文档 —— Trident API 概述

    转载自并发编程网 – ifeve.com本文链接地址: Apache Storm 官方文档 -- Trident API 概述 窗口部分的内容是我自己翻译的 Trident 的核心数据模型是" ...

  4. redis storm mysql_flume+kafka+storm+redis/mysql启动命令记录

     1.flume启动 bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name fks -Dflum ...

  5. Storm 01之 Storm基本概念及第一个demo

    2.1 Storm基本概念 在运行一个Storm任务之前,需要了解一些概念: Topologies :[tə'pɑ:lədʒɪ]拓扑结构 Streams Spouts:[spaʊt]喷出; 喷射; 滔 ...

  6. Storm入门教程 Storm安装部署步骤

    本文以Twitter Storm官方Wiki为基础,详细描述如何快速搭建一个Storm集群,其中,项目实践中遇到的问题及经验总结,在相应章节以"注意事项"的形式给出. 3.1 St ...

  7. storm apache java_Apache Storm 示例 Java 拓扑 - Azure HDInsight | Microsoft Docs

    您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn. 以 Java 语言创建 ...

  8. 大数据技术之_17_Storm学习_Storm 概述+Storm 基础知识+Storm 集群搭建+Storm 常用 API+Storm 分组策略和并发度

    大数据技术之_17_Storm学习 一 Storm 概述 1.1 离线计算是什么? 1.2 流式计算是什么? 1.3 Storm 是什么? 1.4 Storm 与 Hadoop 的区别 1.5 Sto ...

  9. storm笔记:storm集群

    storm笔记:storm集群 Strom集群结构是有一个主节点(nimbus)和多个工作节点(supervisor)组成的主从结构,主节点通过配置静态指定(还有一种主从结构是在运行时动态选举,比如z ...

最新文章

  1. 操作系统结构-单体内核结构
  2. 写给过去的自己-No.2-数据结构篇-初尝柔性数组
  3. TiDB 官方设计文档翻译(三)
  4. android datepicker使用方法,android中DatePicker和TimePicker的使用方法详解
  5. 工作138:git使用
  6. array_agg_探索强大SQL模式:ARRAY_AGG,STRUCT和UNNEST
  7. 关于shiro授权 This subject is anonymous - it does not have any identifying principals and authorization
  8. L3-011 直捣黄龙 (30 分)-PAT 团体程序设计天梯赛 GPLT
  9. shell脚本:监控MySQL服务是否正常
  10. 深蓝超级计算机象棋人机大战,象棋人机大战绝唱:超级计算机“浪潮天梭”vs“象棋第一人”许银川的巅峰之战...
  11. EXCEL中关于角度的输入、输出及转换计算技巧
  12. AI+一统互联网和物联网
  13. 百度ueditor实现word图片自动转存
  14. 阿里靠什么武功秘籍渡过“双十一“的天量冲击
  15. 三个条件的if函数c语言,if函数如何设置多个条件
  16. 人工智能---深度学习是什么
  17. 假设检验中原假设和备择假设的选取问题
  18. python里面的爬虫爬取网页
  19. mac电脑使用小技巧
  20. 9部最值得一看的黑客电影

热门文章

  1. VHDL语言设计8421码加法器(使用quartus)
  2. 【屏蔽贴吧广告】贴吧广告屏蔽办法-解决方案 【普通用户 非会员】
  3. 【PTA题目】7-1 计算摄氏温度 (5 分)
  4. 新乡职业技术学校计算机,新乡职业技术学院校园计算机网络用户管理办法
  5. css预处理器sass/scss入门
  6. Linux平台设备框架驱动
  7. React 界面样式设计
  8. loopback回环接口
  9. 字符串(varchar)和二进制(varbinary)互转
  10. 电脑音量,电脑声音最大了还是小如何解决_电脑声音开到最大还是很小怎么办...