2019独角兽企业重金招聘Python工程师标准>>>

Storm的acker机制,能够保证消息至少被处理一次(at least once)。也就是说,能够保证不丢消息。这里就详细解析一下acker的实现原理。

消息流

假设我们有一个简单的topology,结构为spout -> bolt。 spout emit了一条消息,发送至bolt。bolt作为最后一个处理者,没有再向下游emit消息。

从 上图可以看到,所有的ack消息都会发送到acker,acker会根据算法计算从特定spout发射出来的tuple tree是否被完全处理。如果成功处理,则发送__acker_ack消息给spout,否则发送__acker_fail消息给spout。然后 spout中可以做相应的逻辑如重发消息等。

在JStorm中,acker是一种bolt,因此它的处理、消息发送跟正常的bolt是一样的。只不过,acker是JStorm框架创建的bolt,用户不能自行创建。如果用户在代码中使用:

Config.setNumAckers(conf, 1);

就会自动创建并行度为1的acker bolt;如果为0,则就没有acker bolt了。

如何判断消息是否被成功处理?

acker的算法非常巧妙,它利用了数学上的异或操作来实现对整个tuple tree的判断。在一个topology中的一条消息形成的tuple tree中,所有的消息,都会有一个MessageId,它内部其实就是一个map:

Map<Long, Long> _anchorsToIds;

存储的是anchor和anchor value。而anchor其实就是root_id,它在spout中生成,并且一路透传到所有的bolt中,属于同一个tuple tree中的消息都会有相同的root_id,它可以唯一标识spout发出来的这条消息(以及从下游bolt根据这个tuple衍生发出的消息)。

下面是一个tuple的ack流程:

  1. spout发送消息时,先生成root_id。
  2. 对每一个目标bolt task,生成<root_id, random()>,即为这个root_id对应一个随机数值,然后随着消息本身发送到下游bolt中。假设有2个bolt,生成的随机数对分别为:<root_id, r1>, <root_id, r2>
  3. spout向acker发送ack_init消息,它的MessageId = <root_id, r1 ^ r2>(即所有task产生的随机数列表的异或值)。
  4. bolt收到spout或上游bolt发送过来的tuple之后,首先它会向acker发送ack消息,MessageId即为收到的值。同时,如果bolt下游还有bolt,则跟步骤2类似,会对每一个bolt,生成随机数对,root_id相同,但是值变为当前值 ^ 新生成的随机数。以此类推。
  5. acker收到消息后,会对root_id下所有的值做异或操作,如果算出来的值为0,表示整个tuple tree被成功处理;否则就会一直等待,直到超时,则tuple tree处理失败。
  6. acker通知spout消息处理成功或失败。

我们以一个稍微复杂一点的topology为例,描述一下它的整个过程。 假设我们的topology结构为: spout -> bolt1/bolt2 -> bolt3即spout同时向bolt1和bolt2发送消息,它们处理完后,都向bolt3发送消息。bolt3没有后续处理节点。

1). spout发射一条消息,生成root_id,由于这个值不变,我们就用root_id来标识。 spout -> bolt1的MessageId = <root_id, 1> spout -> bolt2的MessageId = <root_id, 2> spout -> acker的MessageId = <root_id, 1^2>

2). bolt1收到消息后,生成如下消息: bolt1 -> bolt3的MessageId = <root_id, 3> bolt1 -> acker的MessageId = <root_id, 1^3>

3). 同样,bolt2收到消息后,生成如下消息: bolt2 -> bolt3的MessageId = <root_id, 4> bolt2 -> acker的MessageId = <root_id, 2^4>

4). bolt3收到消息后,生成如下消息: bolt3 -> acker的MessageId = <root_id, 3> bolt3 -> acker的MessageId = <root_id, 4>

5). acker中总共收到以下消息: <root_id, 1^2> <root_id, 1^3> <root_id, 2^4> <root_id, 3> <root_id, 4> 所有的值进行异或之后,即为1^2^1^3^2^4^3^4 = 0。

转载于:https://my.oschina.net/u/1246109/blog/793737

JStorm如何保证消息不丢失相关推荐

  1. 消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?...

    大家好,我是 yes. 最近我一直扎在消息队列实现细节之中无法自拔,已经写了 3 篇Kafka源码分析,还剩很多没肝完.之前还存着RocketMQ源码分析还没整理.今儿暂时先跳出来盘一盘大方向上的消息 ...

  2. rocketmq怎么保证数据不会重复_rocketmq如何保证消息不丢失

    一.大体可以从三方面来说: 分别从Producer发送机制.Broker的持久化机制,以及消费者的offSet机制来最大程度保证消息不易丢失 从Producer的视角来看:如果消息未能正确的存储在MQ ...

  3. 回答面试官:如何保证消息不丢失

    rocketmq是阿里开源的一个性能很强大的消息队列,很多公司都在用,而且经历了多次双十一的洗礼,支持多种特性 对于这个技术点不知道大家掌握的如何了,消息队列现在应该是公司必备的技能之一了,无论是Ra ...

  4. 你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你

    我们将消息队列这个组件加入到了我们的商城系统里,并且通过秒杀这个实际的案例进行了实际演练,知道了它对高并发写流量做削峰填谷,对非关键业务逻辑做异步处理,对不同的业务系统做解耦合. 场景: 现在我们的电 ...

  5. 使用消息中间件时,如何保证消息不丢失且仅仅被消费一次

    1.如何保证消息不丢失 一条消息从生产到消费这条链路中,有三个地方可能会造成消息丢失,分别如下: 消息从生产者写入到消息队列的过程投递失败. 消息在消息队列中,持久化失败 消息被消费者消费的过程出现异 ...

  6. RocketMQ如何保证消息不丢失? 如何快速处理积压消息?

    文章目录 1. 哪些环节会有丢消息的可能? 2. 消息生产阶段如何保证消息不丢失 2.1 同步发送 2.2 采用事务消息 3. Broker如何保证接收到的消息不会丢失 4. 消费者如何确保拉取到的消 ...

  7. RocketMQ如何保证消息不丢失(消息可靠性)

    为什么说ROcketMQ更适用于业务型的消息中间件,因为它能够保证消息不丢失且带有事务消息. 先来看一张RocketMQ集群部署结构 其中Name Server主要是提供路由信息,这里暂时忽略,大致流 ...

  8. 硬核图解| Kafka 如何保证消息不丢失?

    Kafka 消息框架,大家一定不陌生,很多人工作中都有接触.它的核心思路,通过一个高性能的MQ服务来连接生产和消费两个系统,达到系统间的解耦,有很强的扩展性. 你可能会有疑问,如果中间某一个环节断掉了 ...

  9. RocketMq怎么保证消息不丢失

    目录 Producer发送消息阶段 手段一:提供SYNC的发送消息方式,等待broker处理结果. 手段二:发送消息如果失败或者超时,则重新发送. 手段三:broker提供多master模式 总结 B ...

最新文章

  1. About me 留言板
  2. windows 修改hosts 立即生效的方法
  3. 正则表达式中的字符类
  4. OpenCASCADE绘制测试线束:布尔运算命令之调试命令
  5. Python3安装(Windows)
  6. [bzoj1036][ZJOI2008]树的统计Count
  7. 暑假前挑战赛1—— A,B题解
  8. pycharm下的xlwings+VBA混合编程注意事项
  9. 神器啊!轻松用 Python 写个 APP!
  10. 新域名 @live.com 和 @windowslive.com 即将上线 (from cnbeta)
  11. (已解决)Android Studio 模拟器连接不上问题
  12. C#支持正负数的数字正则验证表达式
  13. kafka 启动时提示 /brokers/ids/1001 is: NODEEXISTS
  14. (Ubuntu)下载及安装Genymosion模拟器并配置Android Studio
  15. leaflet加载天地图DataServer
  16. 循证护理教育中的移动辅助同伴评估方法
  17. CNN---用于图像分类的经典的卷积神经网络CNN
  18. 【杂谈】关于windows10电脑有网络,但是浏览器打不开网页的情况。
  19. Aptina公司在台湾成立工程中心
  20. HDMI-VGA转换器黑屏的排障方案

热门文章

  1. 监控SQL:通过SQL Server的DDL触发器来监控数据库结构的变化(1)
  2. .NET下,你采用的哪种方式来操作数据库
  3. 康奈尔大学研究员发现“代码投毒”攻击,可触发供应链攻击
  4. Kaseya 修复供应链勒索攻击事件中被利用的缺陷
  5. Rapid7 部分源代码遭泄露,成 Codecov 供应链攻击第四个受害者
  6. 一年三番五次修,卡巴斯基为何依然无法完美修复杀毒软件中的这些洞 (技术详情)?...
  7. 『线段树及扫描线算法 Atlantis』
  8. HYSBZ1061题解
  9. node 单个表加条件查询
  10. OpenCV 访问Mat 像素