关于storm的基础,参照我这篇文章:流式计算storm
关于并发和并行,参照我这篇文章:并发和并行
关于storm的并行度解释,参照我这篇文章:storm的并行度解释
关于storm的流分组策略,参照我这篇文章:storm的流分组策略
关于storm的消息可靠机制,参照我这篇文章:storm的消息可靠机制

storm的消息可靠机制可以确保spout发出的每条tuple消息都会被完整的处理;
主要是由spout和bolt共同完成的.
本文主要讨论storm的消息可靠机制的原理和使用

storm的可靠机制,是storm的一大亮点,那么他是如何实现的呢?
先看效果:1.spout每发一条消息,就新建一个唯一的msgId(比如UUID),然后将这条消息和这个唯一id存在map中;2.每个bolt在处理tuple后,emit的时候带上tulpe,成功,就调用ack方法,代表成功,失败就调用fail方法,代表失败;这样编写代码后,你会发现,失败的消息spout会重新发送,效果就出来了
实现原理:原理很简单,使用了异或的知识点.我们知道,任意两个相同的数字,异或的结果都是0.例如:1^1=0现在请跟着我的思路想:1.首先想象有个服务,叫ack,他的主要作用就是判断每条tuple信息是否都成功处理2.每个spout发送和接收成功,都要给ack发送一个数字,最后由ack计算,判断整条链路是否成功处理3.spout作为发送方,假设他要给3个bolt发送消息,分别是bolt1,bolt2,bolt3;4.假设这3个bolt最后都发给bolt4;5.假设本次要处理的消息叫做root_id;6.开始发送了;7.spout给bolt1发送消息<root_id,1>8.spout给bolt2发送消息<root_id,2>9.spout给bolt3发送消息<root_id,3>10.发送完spout再给ack发送1^2^311.bolt1收到<root_id,1>,处理成功再给bolt4发送<root_id,4>;12.发送完bolt1再给ack发送1^4,处理不成功就不发送了;13.bolt2收到<root_id,2>,处理成功再给bolt4发送<root_id,5>;14.发送完bolt2再给ack发送2^5,处理不成功就不发送了;15.bolt3收到<root_id,3>,处理成功再给bolt4发送<root_id,6>;16.发送完bolt3再给ack发送3^6,处理不成功就不发送了;17.bolt4收到前3个bolt的消息,<root_id,4>,<root_id,5>,<root_id,6>,处理成功后分别给ack发送4,5,6,处理不成功就不发送了;18.我们站在ack的角度来看,对于root_id这条消息来说,如果所有spout和bolt都成功,那么应该会收到:1^2^3,1^4,2^5,3^6,4,5,6;19.将所有收到的数字异或操作,即:1^2^3^1^4^2^5^3^6^4^5^6,由于相同数字异或结果为0,即上面的式子的结果就是0,任意少收到哪个值,最终的结果都不会为0;20.如果ack最终计算的结果是0,那么就代表这个消息root_id处理成功了21.如果ack最终计算结果不为0,那么就代表这个消息root_id处理失败了

如何使用

举个项目中的例子:
spout中:

    这个类 extends BaseRichSpoutprivate OutputCollector collector;private ConcurrentHashMap<UUID, Values> pending;@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {this.collector = collector;this.pending = new ConcurrentHashMap<>();}@Overridepublic void nextTuple() {//具体业务...Values value = new Values("要传的业务数据");UUID msgId = UUID.randomUUID();this.pending.put(msgId, value);this.collector.emit(value, msgId);}@Overridepublic void ack(Object msgId) {//收到成功消息,就删除这条msgIdthis.pending.remove(msgId);}@Overridepublic void fail(Object msgId) {//收到失败消息就重新发送一遍//一般成熟的做法是会再记录个失败次数,不会一直失败重发的this.collector.emit(this.pending.get(msgId), msgId);}

bolt中:

    这个类 extends BaseRichBoltprivate OutputCollector collector;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {this.collector=collector;}@Overridepublic void execute(Tuple tuple) {try {//具体业务...//注意,这里发送的时候,一定要带上tuplethis.collector.emit(tuple,new Values("业务数据"));collector.ack(tuple);} catch (Exception e) {collector.fail(tuple);e.printStackTrace();}}

storm消息可靠机制(ack)的原理和使用相关推荐

  1. MQ 入门(四)—— 消息确认机制Ack

    一.ACK机制简介 ACK (Acknowledgement),即确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符.表示发来的数据已确认接收无误. JMS API中约定了Client端可以 ...

  2. iOS之深入解析objc_msgSend消息转发机制的底层原理

    一.抛砖引玉 objc_msgSend() 消息发送的过程就是 通过 SEL 查找 IMP 的过程 . objc_msgSend() 是用 汇编语言 实现的,使用汇编实现的优势是: 消息发送的过程需要 ...

  3. RabbitMQ ACK消息确认机制 快速入门

    RabbitMQ 消息确认机制ACK ack机制保证的是broker和消费者之间的可靠性 ack表示的是消费端收到消息后的确认方式,有三种确认方式 自动确认:acknowledge="non ...

  4. RabbitMQ学习笔记(四)-消息确认机制

    引入 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 return ...

  5. ack是什么,如何使用Ack机制,如何关闭Ack机制,基本实现,STORM的消息容错机制,Ack机制

    1.ack是什么 ack 机制是storm整个技术体系中非常闪亮的一个创新点. 通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作.比如在Me ...

  6. Storm编程入门API系列之Storm的可靠性的ACK消息确认机制

    概念,见博客 Storm概念学习系列之storm的可靠性  什么业务场景需要storm可靠性的ACK确认机制? 答:想要保住数据不丢,或者保住数据总是被处理.即若没被处理的,得让我们知道. publi ...

  7. 【转】ActiveMQ消息传送机制以及ACK机制详解

    2019独角兽企业重金招聘Python工程师标准>>> 本文转载自 http://shift-alt-ctrl.iteye.com/blog/2020182 AcitveMQ是作为一 ...

  8. ActiveMQ消息传送机制以及ACK机制详解

    2019独角兽企业重金招聘Python工程师标准>>> AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全 ...

  9. 理解storm的ACKER机制原理

    一.简介:       storm中有一个很重要的特性: 保证发出的每个tuple都会被完整处理.一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所产生的所有的子tuple都被成 ...

最新文章

  1. Mysql共享锁实例_mysql共享锁与排他锁用法实例分析
  2. 1亿美元,苹果把网易投资的VR公司收了
  3. JavaScript判断图片是否加载完成的三种方式
  4. .NET特性(Attribute)应用一例
  5. [解决]Win7 操作系统不能安装VMware
  6. HDLBits答案(13)_Verilog移位寄存器附加题
  7. MATLAB仿真TSC在哪里找,-bash:tsc:找不到命令
  8. 关于未能启用约束。一行或多行中包含违反非空、唯一或外键约束的值的解决方法...
  9. EPIC《禅意花园》项目开放下载
  10. 明晚9点 华为新款折叠屏手机MateXs即将闪亮登场
  11. ssm框架requestmapping找不到_从MVC原理开始手敲一个MVC框架,带你体会当大神的乐趣...
  12. Hough(霍夫变换) 基于Opencv2.4.9 和VS2012平台下编写
  13. 【UOJ78】二分图最大匹配
  14. 计算机网络之网络安全基础-消息完整性与数字签名
  15. Python在气象与海洋中的实践技术应用
  16. app抓包服务器证书错误,Fiddler抓包iOS出现证书错误的解决办法
  17. 使用微博自动记录俯卧撑个数
  18. SAP销售发票会计凭证汇率跟随客户汇率类型
  19. Oracle EBS GL日记账批“选定以过账”状态数据修复
  20. 总结两个最近遇到 校园网连不上或丢失WLAN如何解决的方案

热门文章

  1. zabbix编译安装
  2. 网络对抗技术—-网络对抗实验四
  3. (六)Spark-Eclipse开发环境WordCount-JavaPython版Spark
  4. 【ARM-Linux开发】U-Boot启动过程--详细版的完全分析
  5. 团队博客 一 需求分析
  6. cocos2dx 父元素影响子元素
  7. 《Cracking the Coding Interview》——第11章:排序和搜索——题目7
  8. CRITICAL_SECTION 学习
  9. 实现一个压缩Remoting传输数据的Sink:CompressionSink (转载)
  10. 《动手学深度学习 PyTorch版》学习笔记(一):数据操作