2019独角兽企业重金招聘Python工程师标准>>>

我们知道Storm有一个很重要的特性,那就是Storm API能够保证它的一个Tuple能够被完全处理,这一点尤为重要,其实storm中的可靠性是由spout和bolt组件共同完成的,下面就从spout和bolt两个方便给大家介绍一下storm中的可靠性,最后会给出一个实现了可靠性的例子。

1.Spout的可靠性保证

在Storm中,消息处理可靠性从Spout开始的。storm为了保证数据能正确的被处理, 对于spout产生的每一个tuple,storm都能够进行跟踪,这里面涉及到了ack/fail的处理, 如果一个tuple被处理成功,那么spout便会调用其ack方法,如果失败,则会调用fail方法。而topology中处理tuple的每一个bolt都会通过OutputCollector来告知storm,当前bolt处理是否成功。

我们知道spout必须能够追踪它发射的所有tuples或其子tuples,并且在这些tuples处理失败时能够重发。那么spout如何追踪tuple呢?storm是通过一个简单的anchor机制来实现的(在下面的bolt可靠性中会讲到)。

spout发射根tuple,根tuple产生子tuples。这就形成一个TupleTree。在这个tree中,所有的bolt都会ack或fail一个tuple,如果tree中所有的bolt都ack了经过它的tuple,那么Spout的ack方法就会被调用,表示整个消息被处理完成。如果tree中的任何一个bolt fail一个tuple,或者整个处理过程超时,则Spout的fail方法便会被调用。

另外一点, storm只是通过ack/fail机制来告诉应用方bolt中间的处理情况, 对于成功/失败该如何处理, 必须由应用自己来决定, 因为storm内部也没有保存失败的具体数据, 但是也有办法知道失败记录,因为spout的ack/fail方法会附带一个msgId对象, 我们可以在最初发射tuple的时候将将msgId设置为tuple, 然后在ack/fail中对该tuple进行处理。这里其实有个问题, 就是每个bolt执行完之后要显式的调用ack/fail,否则会出现tuple不释放导致oom. 不知道storm在最初设计的时候,为什么不将bolt的ack设置为默认调用。

Storm的ISpout接口定义了三个与可靠性有关的方法:nextTuple,ack和fail。

public interface ISpout extends Serializable {void open( Map conf, TopologyContext context, SpoutOutputCollector collector);void close();void nextTuple();void ack(Object msgId);void fail(Object msgId);}

我们知道,当Storm的Spout发射一个Tuple后,他便会调用nextTuple()方法,在这个过程中,保证可靠性处理的第一步就是为发射出的Tuple分配一个唯一的ID,并把这个ID传给emit()方法:

collector.emit( new Values("value1" , "value2") , msgId ); 

为Tuple分配一个唯一ID的目的就是为了告诉Storm,Spout希望这个Tuple产生的Tuple tree在处理完成或失败后告知它,如果Tuple被处理成功,Spout的ack()方法就会被调用,相反如果处理失败,Spout的fail()方法就会被调用,Tuple的ID也都会传入这两个方法中。

需要注意的是,虽然spout有可靠性机制,但这个机制是否启用由我们控制的。IBasicBolt在emit一个tuple后自动调用ack()方法,用来实现比较简单的计算,这个是不可靠的。如果是IRichBolt的话,如果想要实现anchor,必须自己调用ack方法,这个保证可靠性。

2.Bolt中的可靠性

Bolt中的可靠性主要靠两步来实现:

  1. 发射衍生Tuple的同时anchor原Tuple
  2. 对各个Tuples做ack或fail处理

anchor一个Tuple就意味着在输入Tuple和其衍生Tuple之间建立了关联,关联之后的Tuple便加入了Tuple tree。我们可以通过如下方式anchor一个Tuple:

collector.emit( tuple, new Values( word));  

如果我们发射新tuple的时候不同时发射元tuple,那么新发射的Tuple不会参与到整个可靠性机制中,它们的fail不会引起root tuple的重发,我们成为unanchor:

collector.emit( new Values( word));

ack和fail一个tuple的操作方法:

this .collector.ack(tuple);
this .collector.fail(tuple);

上面讲过了,IBasicBolt 实现类不关心ack/fail, spout的ack/fail完全由后面的bolt的ack/fail来决定. 其execute方法的BasicOutputCollector参数也没有提供ack/fail方法给你调用. 相当于忽略了该bolt的ack/fail行为。

在 IRichBolt实现类中, 如果OutputCollector.emit(oldTuple,newTuple)这样调用来发射tuple(anchoring), 那么后面的bolt的ack/fail会影响spout ack/fail, 如果collector.emit(newTuple)这样来发射tuple(在storm称之为anchoring), 则相当于断开了后面bolt的ack/fail对spout的影响.spout将立即根据当前bolt前面的ack/fail的情况来决定调用spout的ack/fail. 所以某个bolt后面的bolt的成功失败对你来说不关心, 你可以直接通过这种方式来忽略.中间的某个bolt fail了, 不会影响后面的bolt执行, 但是会立即触发spout的fail. 相当于短路了, 后面bolt虽然也执行了, 但是ack/fail对spout已经无意义了. 也就是说, 只要bolt集合中的任何一个fail了, 会立即触发spout的fail方法. 而ack方法需要所有的bolt调用为ack才能触发. 所以IBasicBolt用来做filter或者简单的计算比较合适。

3.总结

storm的可靠性是由spout和bolt共同决定的,storm利用了anchor机制来保证处理的可靠性。如果spout发射的一个tuple被完全处理,那么spout的ack方法即会被调用,如果失败,则其fail方法便会被调用。在bolt中,通过在emit(oldTuple,newTuple)的方式来anchor一个tuple,如果处理成功,则需要调用bolt的ack方法,如果失败,则调用其fail方法。一个tuple及其子tuple共同构成了一个tupletree,当这个tree中所有tuple在指定时间内都完成时spout的ack才会被调用,但是当tree中任何一个tuple失败时,spout的fail方法则会被调用。

IBasicBolt类会自动调用ack/fail方法,而IRichBolt则需要我们手动调用ack/fail方法。我们可以通过TOPOLOGY_MESSAGE_TIMEOUT_SECS参数来指定一个tuple的处理完成时间,若这个时间未被处理完成,则spout也会调用fail方法。

4.一个可靠的WordCount例子

一个实现可靠性的spout:

 public class ReliableSentenceSpout extends BaseRichSpout {private static final long serialVersionUID = 1L;private ConcurrentHashMap<UUID, Values> pending;private SpoutOutputCollector collector;private String[] sentences = { "my dog has fleas", "i like cold beverages" , "the dog ate my homework" , "don't have a cow man" , "i don't think i like fleas" };private int index = 0;public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare( new Fields( "sentence"));}public void open( Map config, TopologyContext context, SpoutOutputCollector collector) {this. collector = collector;this. pending = new ConcurrentHashMap<UUID, Values>();}public void nextTuple() {Values values = new Values( sentences[ index]);UUID msgId = UUID. randomUUID();this. pending.put(msgId, values);this. collector.emit(values, msgId);index++;if ( index >= sentences. length) {index = 0;}//Utils.waitForMillis(1);}public void ack(Object msgId) {this. pending.remove(msgId);}public void fail(Object msgId) {this. collector.emit( this. pending.get(msgId), msgId);}}

一个实现可靠性的bolt:

public class ReliableSplitSentenceBolt extends BaseRichBolt {private OutputCollector collector;public void prepare( Map config, TopologyContext context, OutputCollector collector) {this. collector = collector;}public void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence" );String[] words = sentence.split( " ");for (String word : words) {this. collector.emit(tuple, new Values(word));}this. collector.ack(tuple);}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare( new Fields( "word"));}}

这个例子中我们实现了storm的可靠性,tuple失败了将会重新发送,直到处理成功。这里pending是一个map,为了实现tuple的失败重发。storm里面topology.max.spout.pending属性解释:
1.同时活跃的batch数量,你必须设置同时处理的batch数量。你可以通过”topology.max.spout.pending” 来指定, 如果你不指定,默认是1。
2.topology.max.spout.pending 的意义在于 ,缓存spout发送出去的tuple,当下流的bolt还有topology.max.spout.pending 个 tuple 没有消费完时,spout会停下来,等待下游bolt去消费,当tuple 的个数少于topology.max.spout.pending个数时,spout 会继续从消息源读取消息。(这个属性仅对可靠消息处理)。

如果使用事务,则表示同时处理的batch数量,如果非事务,则理解成第二种。

总而言之,如果不需要保证可靠性,spout继承BaseRichSpout,bolt继承BaseBasicBolt,它们内部实现了一些方法,自动ack,我们不需要关心ack和fail;如果要保证可靠性,spout实现IRichSpout接口,发tuple的时候,带上msgId,自定义fail和ack方法,bolt继承BaseRichBolt,发送tuple的时候要带上原tuple,要手动ack。

转载于:https://my.oschina.net/u/2000675/blog/807272

storm 可靠性和非可靠性相关推荐

  1. rabbitmq可靠性投递_RabbitMQ可靠性

    添加 amqp 依赖 配置文件 application.properties 生产者 消费者 可以直接使用@RabbitListener注解,声明Queue和Exchange以及Binding关系.消 ...

  2. [架构之路-199] - 可靠性需求与可靠性分析:鱼骨图、故障树分析法FTA、失效模式与影响DFMEA,找到影响故障的主要因素

    目录 引言: 第1章 故障树分析法与鱼骨图的比较 1.1 相同点 1.2 区别点 第2章 鱼骨图 第3章 故障树 3.1 示意图 3.2 故障树解读 3.3 故障树常见符号 第4章 产品失效(Fail ...

  3. TCP的“非”可靠性

    目录 网络中断造成对端无FIN包 操作 read 操作 write 系统崩溃造成对端无FIN包 操作 read 操作 write 对端有FIN包 通过write产生RST,read调用感知RST 向一 ...

  4. kafka架构与原理 ,消息的可靠性与一致性幂等性,数据存储、zookeeper、使用场景

    一.Kafka概述 Kafka作为一个商业级消息中间件 ,发布和订阅记录流,它类似于一个消息队列 先了解下Kafka的基本原理,然后通过对kakfa的存储机制.复制原理.同步原理.可靠性和持久性保证等 ...

  5. [渝粤教育] 西南科技大学 质量与可靠性管理 在线考试复习资料

    质量与可靠性管理--在线考试复习资料 一.单选题 1.下列关于PDCA的内容说法正确的是( ) A.第一阶段进行申请.提交项目等 B.第二阶段按计划实地去做,去落实具体对策. C.第三阶段实施标准化, ...

  6. TCP如何保证可靠性

    TCP传输控制协议  TCP 协议是一种面向连接的,为不同主机进程间提供可靠数据传输的协议.TCP 协议假定其所使用的网络栈下层协议(如IP 协议)是非可靠的,其自身提供机制保证数据的可靠性传输.在目 ...

  7. 云上可靠性测试:让我们一起给开发找点事儿

    摘要:在产品上云之前,云上数据的可信(安全性.可靠性等)成为大家关注的重点. 引言 疫情之下,科技支撑有目共睹,多个产业迎来逆势增长.科技创新赋能的"云技术",不再仅仅是战&quo ...

  8. 测试软件是否丢失数据,11种方法检测软件的可靠性

    软件的安全可靠性是衡量软件好坏的一个重要标准,安全性指与防止对程序及数据的非授权的故意或意外访问的能力有关的软件属性,可靠性指与在规定的一段时间和条件下,软件能维 软件的安全可靠性是衡量软件好坏的一个 ...

  9. 可靠性设计基础知识大全,一起来学

    (一):理解可靠性 01 理解与可靠性定义 我们总是会说:某某公司的东西"好用":某某公司的产品"质量好":我也会经常抱怨某某系统"不稳定" ...

  10. 可靠性技术在医学仪器中的应用前景分析

    可靠性技术在医学仪器中的应用前景分析 1 引言 可靠性研究起源于武器系统,经过近半个世纪的发展,已成为一门遍及各学科各行业的工程技术学科,已经从电子产品的可靠性发展到机械和非电子产品的可靠性,从硬件的 ...

最新文章

  1. 机器视觉中如何选择工业相机与合适的相机镜头
  2. 《C语言及程序设计》实践参考——当年第几天
  3. 没有对“C:/WINDOWS/Microsoft.NET/Framework/v2.0.50727/Temporary ASP.NET Files”的写访问权限...
  4. 计算机实践报告800字,计算机软件专业大学生社会实践报告800字
  5. 入住两年的CSDN,在今天2020年8月27日,成为CSDN博客专家
  6. JS----javascript中使用reverse()方法反转数组
  7. 08-Oracle基本概念
  8. os.path.exists判断文件是否存在
  9. 《30天自制操作系统》 day8 小结
  10. python caffe框架_Caffe(卷积神经网络框架)配置-Windows篇
  11. 苹果电子邮件怎么注册_电子邮件地址怎么写
  12. 计算机恢复桌面,桌面图标打开方式怎么还原_电脑图标打开方式恢复方法-win7之家...
  13. Android开启指纹验证
  14. 携程Java后台开发三面面经
  15. Python爬虫监控(邮件和钉钉)
  16. 计算机毕业设计Java物流信息管理系统录像演示(源码+系统+mysql数据库+Lw文档)
  17. 双足机器人课设报告_双足竞步机器人-智能步行者设计-技术报告
  18. redis:客户端管理
  19. SpringBoot 启动类 @SpringBootApplication 注解 以及执行流程
  20. 【agc004e】Salvage Robots

热门文章

  1. 技巧----时间优化
  2. JavaScript 获取数组对象中某一值封装为数组
  3. 常见的HTTP状态码说明
  4. -bash: wget 未找到命令的解决办法
  5. Redis数据类型之字符串String
  6. 用Vue.js和Webpack开发Web在线钢琴
  7. KV型内存数据库Redis
  8. 【LeetCode】026. Remove Duplicates from Sorted Array
  9. Android中在布局中写ViewPager无法渲染出来的问题
  10. Asp.Net 之 枚举类型的下拉列表绑定