Storm原理与实现
2019独角兽企业重金招聘Python工程师标准>>>
Storm原理与实现
转自:http://duanple.blog.163.com/blog/static/7097176720138258405697/
作者:phylips@bmy 2013-02
1 Storm简介
1.1 简介
本文主要是从内部实现的角度来认识下Storm(0.7.1版本),因此需要用户对Storm的基本原理和使用具有一定的了解。如果缺乏这方面的知识,建议首先阅读下Storm的官方wiki:https://github.com/nathanmarz/storm/wiki
目前也有一些中文文章,大部分都未超出官方wiki所包含的内容。推荐几个还不错的链接:
http://xumingming.sinaapp.com/ 这里有一些官方wiki的中文翻译以及一些实现分析
http://blog.linezing.com/category/storm-quick-start 关于storm的一个入门教程
1.2 核心API
1.2.1 普通Topology
如果建立自己的Topology(非Transactional的),用户通常需要利用如下接口和对象:
IRichBolt
IRichSpout
TopologyBuilder
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void activate();
void deactivate();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
public interface IBolt extends Serializable {
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
void execute(Tuple input);
void cleanup();
}
IRichBolt和IRichSpout与IBolt和ISpout的不同在于多了两个接口:
declareOutputFields(OutputFieldsDeclarer declarer):声明输出字段
getComponentConfiguration() :该接口是在0.7.0引入的,用于支持组件级的配置,即允许用户针对单个Spout或Bolt进行参数配置。
实现了这两个接口后,通过调用TopologyBuilder建立起Topology。TopologyBuilder实际上是封装了StormTopology的thrift接口,也就是说Topology实际上是通过thrift定义的一个struct,TopologyBuilder将这个对象建立起来,然后nimbus实际上会运行一个thrift服务器,用于接收用户提交的结构。由于是采用thrift实现,所以用户可以用其他语言建立Topology,这样就提供了比较方便的多语言操作支持。
对于用户来说,通常需要做的就是提供自己的ISpout和IBlot实现,然后利用TopologyBuilder建立起自己需要的拓扑结构。
Storm框架会拿到用户提供这个拓扑结构及Spout和Blot对象,驱动整个处理过程。简单介绍下ISpout的那些接口的调用时机,在创建Spout对象时,会调用open函数。对象销毁时调用close(),但是框架并不保证close函数一定会被调用,因为进程可能是通过kill -9被杀死的。activate和deactivate是在spout被activate或deactivate时被调用,这两个动作是由用户从外部触发的,Strom的命令行提供两个命令activate和deactivate,允许用户activate和deactivate一个Topology,当用户执行deactivate时,对应Topology的spout会被deactivate,产生影响就是spout的nextTuple此后将不会被调用,直到用户再调用activate。Spout的核心功能是通过nextTuple实现的,用户通过该函数完成Tuple的发射。该函数会被框架周期性的调用。会有类似如下的一个循环:
While(true)
{
if(…)
spout.activate();
if(…)
sput.deactivate();
if(…)
spout.nextTupe();
}
首先这三个函数都是在一个线程中,因此不需要同步。其次,nextTuple()不能阻塞,如果没有Tuple可以发射需要立即返回,用户不能提供一个阻塞式的实现,否则可能阻塞整个后台循环。另外,后台可能会调节nextTuple()的调用频率,比如系统有一个配置参数可以控制当前被pending的Tuple最大数目,如果达到这个限制,可能就会做一些流控。
ack和fail则是两个回调函数。Spout在发射出一个tuple后,该tuple会通过acking机制被acker追踪,除了显式的fail和ack外,每个tuple有一个超时时间,如果超过这个时间还未确定该tuple的状态,那么acker会通知spout,这个tuple处理失败了,然后框架得到这个消息后,就会调用spout的fail函数,如果acker发现这个tuple处理成功了,也会通知spout,然后会调用spout的ack函数。所以通常来说用户在发射tuple时,要确保数据不丢失,都会将已经发射的tuple缓存起来,然后在ack函数中删除对应tuple,在fail函数中重发对应的tuple。
另外需要注意的一点是,Spout使用的collector是SpoutOutputCollector,Bolt使用的collector是OutputCollector。这两个虽然提供的功能类似,都是负责发送tuple的,但是由于一个是面向Spout,一个是面向Bolt的,它们的接口也略有不同。具体如下:
public interface ISpoutOutputCollector {
List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
void reportError(Throwable error);
}
Spout通过调用ISpoutOutputCollector的emit函数进行tuple的发射,当然实际上emit函数并未完成实际的发送,它主要是根据用户提供的streamId,计算出该tuple需要发送到的目标taskID。emitDirect函数,更裸一些,直接指定目标taskID。它们都只是将<tasked,tuple>组成的序列对放到一个队列中,然后会有另一个线程负责将tuple从队列中取出并发送到目标task。
public interface IOutputCollector extends IErrorReporter {
List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);
void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);
void ack(Tuple input);
void fail(Tuple input);
}
IOutputCollector是会被Bolt调用的,与ISpoutOutputCollector功能类似。但是区别也很明显,首先我们可以看到它的emit系列函数,多了一个参数Collection<Tuple> anchors,增加这样一个anchors原因在于,对于spout来说,它产生的tuple就是root tuple,但是对于bolt来说,它是通过一个或多个输入tuple,进而产生输出tuple的,这样tuple之间是有一个父子关系的,anchors就是用于指定当前要emit的这个tuple的所有父亲,正是通过它,才建立起tuple树,如果用户给了一个空的anchors,那么这个要emit的tuple将不会被加入tuple树,也就不会被追踪,即使后面它丢失了,也不会被spout感知。
除了anchors参数外,IOutputCollector还多了ack和fail两个接口。这两个接口,与Spout的ack和fail完全不同,对于Spout来说ack和fail是提供给Spout在tuple发送成功或失败时进行处理的一个机会。而IOutputCollector的ack和fail则是向acker汇报当前tuple的处理状态的,是需要Bolt在处理完tuple后主动调用的。
1.2.2 Transactional Topology
对于普通Topology来说,它通过acking机制保证了每个Tuple会至少被处理一次,保证了Tuple不会丢失,但是一个Tuple可能会因为重发而被处理多次。引入Transactional Topology就是为了解决重复处理的问题。同时它暴露给用户的API,ITransactionalSpout与普通的Spout相比有很大的差异,而Bolt则基本保持了一致。
对于Transactional Topology,用户需要提供一个ITransactionalSpout(4.5.1 ITransactionalSpout)实现,对于batch类型的Bolt需要继承自IBatchBolt(4.4.1 IBatchBolt),那些会改变外部状态的关键Bolt需要实现ICommiter接口。用户需要通过专用的TransactionalTopologyBuilder而不是TopologyBuilder来建立Topology。
另外需要注意的一点是Storm已经将TransactionalTopology相关的功能移植到了trident中,而src/jvm/backtype/storm/transactional下的实现实际上会被废弃掉,尽管如此我们下面的分析还是针对src/jvm/backtype/storm/transactional下的实现。二个地方的实现基本上是完全一致的,当然trident可能做了一些改进,比如它暴露出了更丰富的API,允许用户对事务进行更多的控制。
2 Clojure基础
Storm是由两种语言实现的,基本上50%的java,50%的Clojure。框架性的东西基本上都是采用Clojure实现的,因此要真正理解Storm,Clojure是绕不过去的。
2.1 简介
Clojure是一种可以运行在JVM上的函数式编程语言,在 CLR 和 JavaScript 平台上也有各自的实现。属于一种Lisp方言(LISP,全名List Processor,即列表处理语言,由约翰·麦卡锡在1960年左右创造的一种基于λ演算的函数式编程语言)。作为一种函数式编程语言,Clojure基于JVM,可以直接使用现有的java类库,通过SMT(software transactional memory )和异步agent提供了内建的并发支持。
官方网站:http://clojure.org/。
根据getting_started 的步骤,我们可以在本地建立起一个clojure的执行环境。另外这个网站也提供了在线的脚本执行支持:http://himera.herokuapp.com/index.html 。
由于此处主要是为了分析Storm的Clojure源码,因此这里主要关注下Storm里所用到的Clojure语言特性。更深入的内容可以参考:
官方文档:http://clojure.org/documentation 。
中文文档:Clojure API文档,Clojure入门教程: Clojure – Functional Programming for the JVM中文版,Clojure Handbook。
2.2 基本语法
2.3 与java的交互
3 代码结构
Storm Structure-of-the-codebase
4 源码分析
关于实现方面的文章,Storm的官方wiki有一个目录列表,但是基本上只是一个提纲,很多内容还未来得及编写:
4.1 Topology生命周期
Twitter Storm源代码分析之Topology的执行过程
4.2 消息传输机制
Storm Message-passing-implementation
4.3 Acking机制
Storm对用户提供的Topology会在内部进行修改,添加一些系统内部的流和Bolt来实现acking框架,实现代码参见:common.clj。
4.3.1 Q&A
4.3.1.1 MessageID是何作用?与tuple-id的关系?
4.4 Coordination实现
Twitter Storm源代码分析之CoordinatedBolt
src/jvm/backtype/storm/coordination/
4.4.1 IBatchBolt
public interface IBatchBolt<T> extends Serializable, IComponent {
void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
4.4.2 BatchBoltExecutor
4.4.3 CoordinatedBolt
CoordinatedBolt内部会记住所有向它发送Tuple边的入度,保存在_numSourceReports中,以及它会发送tuple的那些目标task,保存在_countOutTasks中。
public static class TrackingInfo {
Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>();
List<Tuple> ackTuples = new ArrayList<Tuple>();
这里面,还需要注意的一点是,我们看到对于CoordinatedBolt来说Tuple分为三种类型:
此外,我们观察到TrackingInfo里有一个List<Tuple> ackTuples成员,那么这个成员又是来做啥的呢?ackTuples里保存了两种Tuple:ID和COORD类型的。
if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID){
4.4.4 BatchOutputCollectorImpl
4.4.5 Q&A
4.4.5.1 Coordination中并无Spout,那么ack(COORD type tuple)是如何被追踪的?如果某个COORD消息丢失,是如何触发重发的?
_collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
而对于Emitter来说,tup就是Coordinator发出的那个原始Tuple,tuple树就是这样建立起来的。
4.5 Transactional Topology实现
src/jvm/backtype/storm/transactional;src/jvm/backtype/storm/coordination/。
4.5.1 ITransactionalSpout
public interface ITransactionalSpout<T> extends IComponent {
public interface Coordinator<X> {
X initializeTransaction(BigInteger txid, X prevMetadata);
void emitBatch(TransactionAttempt tx, X coordinatorMeta, BatchOutputCollector collector);
void cleanupBefore(BigInteger txid);
Coordinator<T> getCoordinator(Map conf, TopologyContext context);
Emitter<T> getEmitter(Map conf, TopologyContext context);
4.5.2 TransactionalSpoutCoordinator
initializeTransaction:生成本次事务对应的元数据
4.5.3 TransactionalSpoutBatchExecutor
TransactionalSpoutBatchExecutor继承自IRichBolt,充当了Emitter的运行容器。
TransactionalSpoutBatchExecutor用的collector也是BatchOutputCollectorImpl。关于该类请参考coordination实现部分。
4.5.4 TransactionalTopologyBuilder
4.5.5 Transaction状态存储
代码目录src/jvm/backtype/storm/transactional/state/。
与Transaction相关的状态是通过Zookeeper保存的,具体代码在TransactionalState.java中。在该类中直接使用Curator进行Zookeeper相关操作,
Curator是Netflix开源的一套ZooKeeper客户端框架。Curator主要解决了三类问题:
封装ZooKeeper client与ZooKeeper server之间的连接处理;
提供ZooKeeper各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装。
4.5.6 PartitionedTransactionalSpout实现
代码目录src/jvm/backtype/storm/transactional/partitioned/。
4.5.7 MemoryTransactionalSpout
4.5.8 Acking机制
4.5.9 总结
Here's how transactional spout works:
1. 1.Transactional spout is a subtopology consisting of a coordinator spout and an emitter bolt
2. 2.The coordinator is a regular spout with a parallelism of 1
?Transactional spouts are a sub-topology consisting of a spout and a bolt
othe spout is the coordinator and contains a single task
othe bolt subscribes to the coordinator with an all grouping
?state is stored in zookeeper using RotatingTransactionalState
?commiting bolts subscribe to the coordinators commit stream using an all grouping
?CoordinatedBolt is used to detect when a bolt has received all the tuples for a particular batch.
othis is the same abstraction that is used in DRPC
4.5.10Q&A
4.5.10.1 AttemptID的作用?
TransactionAttempt是在TransactionalSpoutCoordinator.java中的sync()中被首次生成的。
TransactionAttempt attempt = new TransactionAttempt(curr, _rand.nextLong());
4.5.10.2 有没有emitBolt这样的类?
4.5.10.3 对于Emitter的_sourceArgs 为空吗?它与CoordinatedBolt如何沟通的?
4.5.10.4 TransactionalTopology中的非BatchBlot是否也会被套上一层CoordinatedBolt?
4.5.10.5 Commit Batch时是如何保证顺序执行的?
具体细节请参考: 4.4.3CoordinatedBolt。
4.5.10.6 Emit是否是只能在finishBatch里调用?
4.5.10.7 TransactionalTopology中是否允许所有的Bolt都不是commiter?如果允许那么Bolt又是如何来确定可以进行资源回收的呢?因为在commiter存在的情况下,它可以回收那些已经commit的事务,但是如果没有commiter,它如何判断事务已经可以回收?
4.5.10.8 Transaction何时变为PROCESSED?
4.5.10.9 Coordinator重放某个batch时,BatchBoltExecutor中收到相同txid但是AttemptID不同的batch时,如何处理?之前的batch计算结果何时会被清除?
4.5.10.10 虽然说Emitter需要保证相同TransactionID的batch具有相同的tuple系列,但是有时Emitter可能无法提供这种保证,此时如何保证处理且处理一次语义?
实际证明对于一个非幂等的 transactional spout来说,是可以实现这种语义的,只是用户在开发Topology时需要做更多的事情。
class Value {
Object count;
BigInteger txid;
}
对于非幂等的 transactional spout来说,需要存储如下信息:
class Value {
Object count;
BigInteger txid;
Object prevCount;
}
更新逻辑会变成如下:
1. 如果当前batch中的 transaction id等于数据库中的transaction id:
val.count = val.prevCount + partialCount.
2. 否则:
val.prevCount = val.count
, val.count = val.count + partialCount
and val.txid = batchTxid
。。。
5 相关讨论
https://groups.google.com/forum/?fromgroups=#!searchin/storm-user/transactional
1. 将acker放到spout中,可以减少网络传输,避免id碰撞:
2. task之间的消息是保序的
3. Emitter会缓存transaction直到调用cleanupBefore ,因为一个batch可能会在commit阶段失败,这时就需要重发,因此需要Emitter缓存它们,否则如果直接删除那么就需要在重发时创建,可能会因为丢失了一些状态无法精确地重发。
4. 优化了DRPC和TransactionalTopology中批处理的tuple树,只有coordination tuples会被anchored。用户如果想fail掉一个batch,抛出一个FailedException即可,这会导致该batch的重发。不会对一个batch里的所有tuple进行ack,而是通过将coordination tuple纳入tuple树进行ack。同时Storm的消息机制会保证,task间的消息传递是保序的,也就是说一定是先收到了该batch的tuple,然后才会收到对应的coordination tuple的。
5. 关于acking中xor导致的数据碰撞的概率分析
6. When HBase fails and no rollback is possible
7. Transactional topology Coordinator emits tuple always.
initializeTransaction()不能阻塞,否则可能影响到其他事务的处理,比如事务的提交,fail。确认下spout的ack和fail与nextTuple是否是在一个循环里?
8. Transactional topology not commiting?
9. About TransactionTopology's attemptID
10. Topologies with fan-out and fan-in
11. some questions about Transactional topologies
12. Trident 与老版本spout和bolt的关系
Spout和Bolt抽象将一直保留,用户可以直接使用它们实现Trident无法提供的功能。Trident会自动将stream作为一个一个的batch进行处理,同时自动保证了exactly-once 语义,同时提供了更面向批处理的API。但是以batch为单位进行处理引入了额外的开销,导致延迟增大到至少数百毫秒的级别,与之相比Storm一次处理一个tuple的方式只需要10几毫秒。但是一次处理一个tuple的问题在于无法实现exactly-once 语义,只能实现 at-least-once或 at-most-once 语义。此外当前的Trident无法实现 cyclic topologies,很多机器学习算法可能会用这种拓扑,但是是可以直接用Storm来实现这种拓扑的。
Trident可以做任何 transactional topology可以做的事情,这也是 transactional topology将要被废弃的原因。但是 transactional topology并不等于spout+bolt,如下几点就属于spout+bolt可以完成但却是Trident无法完成的:
1. get access to which tasks receive a message
2. direct groupings
3. cyclic topologies
除此之外,主要不同在于batching模型及与外部状态交互形式上。
Trident的操作将会被放到Bolt中执行,同时Trident本身负责所有acking机制,用户不需要关心。
13. understanding-the-parallelism-of-a-storm-topology
转载于:https://my.oschina.net/u/2326085/blog/391238
Storm原理与实现相关推荐
- Storm原理与实践--大数据技术栈14
回顾:大数据平台技术栈 (ps:可点击查看),今天就来说说其中的Storm! 来自:有米加瓦 一.Storm简介 1. 引例 在介绍Storm之前,我们先看一个日志统计的例子:假如我们想要根据用户的访 ...
- storm 原理详解
1 为什么要使用Storm? Apache Storm是一款免费且开源的分布式实时计算系统. Storm可以轻松地可靠地处理无限数据流,从而实时处理Hadoop进行批处理的操作.Storm很简单,可以 ...
- Storm介绍及安装部署
本节内容: Apache Storm是什么 Apache Storm核心概念 Storm原理架构 Storm集群安装部署 启动storm ui.Nimbus和Supervisor 一.Apache S ...
- Storm学习入门视频教程
Storm流计算从入门到精通之技术篇(高并发策略.批处理事务.Trident精解.运维监控.企业场景) 课程讲师:Cloudy 课程分类:大数据 适合人群:初级 课时数量:28课时 用到技术:Stor ...
- Storm基础(完整版)
Apache Storm 流式计算框架 1.Storm 基础 1.1.Storm是什么 Hadoop在处理数据的时候,时效性不够,市场期望能够尽快得到处理后的数据. Storm是一个流式计算框架(类比 ...
- House of storm学习总结
文章目录 一.前言介绍 二.漏洞产生条件 三.利用方法 四.原理和源码分析 (一)首先要熟悉largebin和unsortedbin的特点 (二)漏洞触发得关键代码 五.网上大佬们得实验样例记录 六. ...
- 一分钟构建Apache Storm简单程序
目录 一 说明 二 步骤 1.创建项目 2.引入依赖 3.主方法 4.创建Spout类 5.创建Bolt01 6.创建Bolt02 7.本地运行结果 8. 提交到Storm集群 三 总结 一 说明 通 ...
- 免费的大数据学习资料,这一份就足够
朋友不在于多,知心就好;资料不在于多,精致就好.一份专业的大数据学习资料才是学习大数据的利刃.小编分享的这套大数据学习资料将从学习大纲.书籍.视频教程分别分享. 在这里还是要推荐下我自己建的大数据 ...
- 分布式系统领域经典论文翻译集
分布式领域论文译序 sql&nosql年代记 SMAQ:海量数据的存储计算和查询 一.google论文系列 1. google系列论文译序 2. The anatomy o ...
最新文章
- HDU 4638 Group(莫队)题解
- 聚焦 AI + 大数据全球视野引领行业创新升级
- C# Span 源码解读和应用实践
- Log4j 2.x XSD的描述不完整
- 非等高cell实战--实现微博页面
- 计算机病毒解析与防范结束语,2016年04月30日计算机病毒解析与防范题纲_向必青.doc...
- linux下copy命令c实现,C语言自己实现linux下cp文件复制命令
- 苹果手机投影到墙上_针对商业用户倾情打造,明基E582智能无线投影仪体验
- android10新特性 视频解码,Android万能视频播放器10-OpenGL ESMediaCodec解码数据t
- android视图动画(ViewAnimation动画)
- 【PS】106个水彩花卉和树叶画笔
- Python flag用法
- Google, with new Pixel and camera, is serious about devices
- 【office相关】excel 中使用 switch函数
- mysql 存储过程
- 【图像算法】pytesseract简单实现图片数字识别
- 2021-11-08笔记本搜不到手机热点的解决
- 单板嵌入式计算机定义,用于嵌入式控制系统的单板计算机
- 修改mysql数据库密码
- oracle bulk select,批量查询 Oracle的bulk collect用法
热门文章
- 李飞飞等6名华人入选ACM 2018 Fellow,无国内学者入选
- 马斯克回应停工事件:Model 3周产量将达6000辆,不开玩笑哦
- 前端利器!让AI根据手绘原型生成HTML | 教程+代码
- 【leetcode】1053. Previous Permutation With One Swap
- JavaScript是如何工作的:引擎,运行时和调用堆栈的概述!
- 网络攻防实验(五)——201521460003王浩洋
- 我要学ASP.NET MVC 3.0(十三): MVC 3.0 防止跨站点请求伪造 (CSRF) 攻击
- dropzonejs中文翻译手册 DropzoneJS是一个提供文件拖拽上传并且提供图片预览的开源类库....
- SpringMVC中自定义类型转换器
- plsql配置连接远程数据库