集群容错:

1.Nimbus挂掉

如果失去了Nimbus节点,Worker也会继续执行;如果workerye消亡,Supervisor会重启worker。
但是如果没有Nimbus,worker不会被安排到其他主机节点,客户端也无法提交新的任务。

2.zookeeper挂掉

zookeeper有主从结构,挂掉无影响。

3.Supervisor失败

真正执行进程的是worker,所以Supervisor失败不会影响但前运行的任务,且所有状态都保存在zookeeper或磁盘上,Nimbus及时通过zookeeper重启Supervisor即可

4.worker失败

worker是真正的执行节点,每个worker包含数个spout/bolt任务。supervisor负责监控这些任务,当Supervisor重启worker超过了一定的失败重启次数,无法发送心跳到Nimbus,Nimbus将在另一台主机上重新分配worker。

------------------------------------------------

数据容错:

容错机制 :

一般来说,分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。

面向大规模数据分析,数据检查点操作成本很高,需要通过数据中心的网络连接在机器之间复制庞大的数据集,

而网络带宽往往比内存带宽低的多,同事还需要消耗更多的存储资源。

storm:数据检查

ack机制即,spout发送的每一条消息:

1)在规定时间内,spout收到acker的ack响应,即认为该tuple被后续bolt成功处理
2)在规定时间内,没有收到acker的ack响应tuple,就触发fail动作,即认为该tuple处理失败
3)收到acker发送的fail响应tuple,认为失败,触发fail动作

限流作用:为了避免spout发送数据太快,而bolt处理太慢 ,二选一即可

1)需要设置pending数,当spout有等于或超过peding数的tuple没有收到ack或fail响应时,跳过执行nextTuple,从而限制spout发送数据

2)设置最大超时时间
通过 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来指定

通过conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);设置spout pend数。

关闭Ack机制

1)spout发送数据时不带上msgid 
   collector.emit(new Values("value1","value2"), msgId);

2)设置acker数等于0
   将参数Config.TOPOLOGY_ACKERS设置为0,通过此方法,当Spout发送一个消息的时候,它的ack方法将立刻被调用

------------------------------------
------------------------------------

ack应答机制可以帮助识别消息是否成功消费,but也仅仅是通过messageId标识,消费失败的数据需要我们自己来做缓存,然后在重新发射出去。

保证消息的可靠性前提: emit发射数据一定要发射messageId

代码实现概要思路:

1)成员变量定义一个ConcurrentHashMap (ConcurrentHashMap是线程安全的,分段锁,效率高),且该map只能初始化不能创建对象,创建对象会有运行异常,需要重写open初始化方法中创建对象

2)这个messageId可以是上游中获取的,也可以是在当前定义的

3)nextTuple() 中通过map缓存tuple与messageId,其中key=messageId,value=tuple;发射数据

4)处理成功,调用ack(messageId)。从缓存中删除该数据

5)处理失败,调用fail(messageId)。重新发射根据messageId从缓存中获取的tuple及messageId。

说明:
1)数据处理成功/失败,是指当前的spout/bolt。如果需要下游的数据处理成功上游才删除数据,那么需要缓存数据库缓存数据,内存集合不行。
2)storm是分布式多线程并发操作,跨jvm。此例中的map近对当前spout/bolt的当前线程起作用。横跨上下游的/集群的需要用缓存数据库操作。
3)框架不包含事物操作,更不包含对数据库的事务操作。比如入库操作,需要记录消息的入口状态,再次入库时查询状态判定是否给予执行。

样例代码:

public class MySpout extends BaseRichSpout {
    private static final long serialVersionUID = 5028304756439810609L;

// key:messageId,Data
    private ConcurrentHashMap <String, String> waitAck;

private SpoutOutputCollector collector;

public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        waitAck =new ConcurrentHashMap <String, String>();
    }

public void nextTuple() {
        String sentence = "i am liu yang";
        String messageId = UUID.randomUUID().toString().replaceAll("-", "");
        waitAck.put(messageId, sentence);
        //指定messageId,开启ackfail机制
        collector.emit(new Values(sentence), messageId);
    }

@Override
    public void ack(Object msgId) {
        System.out.println("消息处理成功:" + msgId);
        System.out.println("删除缓存中的数据...");
        waitAck.remove(msgId);
    }

@Override
    public void fail(Object msgId) {
        System.out.println("消息处理失败:" + msgId);
        System.out.println("重新发送失败的信息...");
        //重发如果不开启ackfail机制,那么spout的map对象中的该数据不会被删除的。
        collector.emit(new Values(waitAck.get(msgId)),msgId);
    }
}

storm的容错机制相关推荐

  1. Storm 06_Storm 容错机制

    消息的完整性 从Spout中发出的Tuple,以及基于他所产生Tuple(例如上个例子当中Spout发出的句子,以及句子当中单词的tuple等) 由这些消息就构成了一棵tuple树 当这棵tuple树 ...

  2. ack是什么,如何使用Ack机制,如何关闭Ack机制,基本实现,STORM的消息容错机制,Ack机制

    1.ack是什么 ack 机制是storm整个技术体系中非常闪亮的一个创新点. 通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作.比如在Me ...

  3. Storm的ack机制在项目应用中的坑

    正在学习storm的大兄弟们,我又来传道授业解惑了,是不是觉得自己会用ack了.好吧,那就让我开始啪啪打你们脸吧. 先说一下ACK机制: 为了保证数据能正确的被处理, 对于spout产生的每一个tup ...

  4. Flink状态管理和容错机制介绍

    作者: 施晓罡 本文来自2018年8月11日在北京举行的 Flink Meetup会议,分享来自于施晓罡,目前在阿里大数据团队部从事Blink方面的研发,现在主要负责Blink状态管理和容错相关技术的 ...

  5. 2021年大数据Flink(二十七):Flink 容错机制 Checkpoint

    目录 Flink 容错机制 Checkpoint State Vs Checkpoint Checkpoint执行流程 简单流程 复杂流程 State状态后端/State存储介质 MemStateBa ...

  6. 常见的容错机制+failover+failback

    常见的容错机制+failover+failback 什么是failover? [电脑][数据库]失效备援 (为系统备援能力的一种,当系统中其中一项设备失效而无法运作时,另一项设备即可自动接手原失效系统 ...

  7. ack strom 保证只有一次_Storm容错机制(一):ACK机制

    前言 好久没有写文章了,然后一连就写了三篇, 前两篇文章 Storm入门(一):编程模型 Storm入门(二):架构模型和集群部署 都是一些比较简单的入门教程,这一篇我们来聊一聊稍微高级点的话题, 关 ...

  8. 《循序渐进学Spark》一3.5 容错机制及依赖

    本节书摘来自华章出版社<循序渐进学Spark>一书中的第3章,第3.5节,作者 小象学院 杨 磊,更多章节内容可以访问云栖社区"华章计算机"公众号查看. 3.5 容错机 ...

  9. 实时流处理系统容错机制(二):Apache Flink 基于State的异步容错机制

    1. Introduce Apache Flink 提供了可以恢复数据流应用到一致状态的容错机制.确保在发生故障时,程序的每条记录只会作用于状态一次(exactly-once),当然也可以降级为至少一 ...

最新文章

  1. oracle执行先决条件检查失败的解决方法
  2. scala与python混合调用实验
  3. 获得本机的IP,掩码和网关
  4. Linux ifconfig指令
  5. ASP NET Core --- HTTP 翻页、过滤、排序
  6. cookies默认过期时间_「图」Chrome Canary新版已启动“增强版cookies控制”预览测试...
  7. Mybatis基于XML配置SQL映射器(一)
  8. String s=hello;s+=world;s变化了吗?原始的String对象的内容变了吗?
  9. MYSQL到ORACLE法式迁徙的注意变乱
  10. SQL优化--使用内连接(inner join)代替外连接(left join,right join) (转)
  11. Linux注册服务(chkconfig)
  12. 【语音识别】基于matlab GUI HMM 0~9数字和汉字语音识别(带面板)【含Matlab源码 1716期】
  13. 自动化立体仓库AS/RS货架|分离式仓库货架与整体式仓库货架如何运用?
  14. 网络抖动多少ms算正常_网络延迟多少ms算正常 - 卡饭网
  15. MATLAB中dsolve与syms
  16. 什么是浏览器指纹,如何完整修改浏览器指纹?
  17. java中台阶问题_编程:跳台阶问题
  18. 山下英子(日)《断舍离》读书笔记
  19. 关于Navicat Premium连接Oracle数据库闪退(失败)的解决办法(带ocl.dll)
  20. 简单保护动物网页制作stu-works.com学生保护动物网页设计作品HTML 濒危动物静态网页成品下载

热门文章

  1. C语言主函数返回值含义
  2. 什么是编程,学习编程的意义是?
  3. ffmpeg将gif转换成mp4
  4. C++ throw()关键词:一个被C++标准抛弃的玩意儿
  5. Java - 谷歌邮箱发送邮件详解
  6. 初学电子快速入门的方法
  7. atof()函数实现
  8. rtthread 线程
  9. colorAccent,colorPrimary,colorPrimaryDark做什么的?
  10. 【K8S专栏】Kubernetes工作负载管理