参考网址:https://www.cnblogs.com/hzorac/p/5570723.html

1 Storm和JStorm简介

Storm是开源的分布式容错实时计算系统,为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息;Storm也可被用于“连续计算”(continuous computation),对数据流做连续处理,在计算时就将结果以流的形式输出给用户;它还可被用于“分布式RPC”,以并行的方式执行运算。

Storm主要特点如下:

0、简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了实时处理的复杂性。

1、语言无关。Storm的消息处理组件可以用任何语言来定义。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。

2、容错性。如果在消息处理过程中出了一些异常,Storm会重新调度出问题的处理逻辑。Storm保证一个处理单元永远运行,除非显式杀掉。

3、可伸缩性。Storm的可伸缩性可以使其每秒处理的消息量达到很高。为了扩展一个实时计算任务,需要做的就是增加节点并且提高计算任务的并行度设置(parallelism setting)。Storm应用在10个节点的集群上每秒可以处理高达1000000个消息,包括每秒一百多次的数据库调用。同时Storm使用ZooKeeper来协调集群内的各种配置使得Storm的集群可以很容易扩展。

4、保证无数据丢失。实时系统必须保证所有的数据被成功的处理。 那些会丢失数据的系统的适用场景非常窄,而Storm保证每一条消息都会被处理。

5、适用场景广泛。消息流处理、持续计算、分布式方法调用等是Storm适用场景广泛的基础,Storm的这些基础原语可以满足大量的场景。

虽然Storm具备诸多优势,但也存在不足:

0、Storm目前还存在Nimbus SPOF的问题;

1、存在雪崩问题;

2、资源粒度较粗;

3、Clojure实现引入了学习成本;

JStorm继承了Storm的所有优点,同时与Storm相比JStorm所特有的如下特点:

0、兼容Storm接口。开发者在Storm上运行的程序无需任何修改即可运行在JStorm上。

1、Nimbus HA。解决了Storm的Nimbus单点问题,支持自动热备切换Nimbus。

2、更细粒度的资源划分。JStorm从CPU、MEMORY、DISK和NET四个维度进行任务调度,同时不存在任务抢占问题。

3、可定制的任务调度机制。(Storm的任务调度目前也可定制)

4、更好的性能。通过底层ZeroMQ和Netty使JStorm具有更好的性能,同时具有更好的稳定性。

5、解决了Storm的雪崩问题。通过Netty和disruptor机制实现RPC保证可以匹配的数据发送和接收速度避免雪崩问题。

此外,JStorm通过减少对zookeeper的访问量、增加反序列化线程、优化ACK、增加监控内容及JAVA本身优势等各个方面优化了Storm的性能和稳定性。总之,JStorm比Storm更强大、更稳定、性能更好。

2 数据模型

JStorm通过一系列基本元素实现实时计算的目标,其中包括了Topology、Stream、Spout、Bolt等等。JStorm在模型上和MapReduce有很多相似的地方,下表从不同维度对JStorm和MapReduce进行了比较。

实时计算任务需要打包成Topology提交,和MapReduce Job相似,不同的是,MapReduce Job在计算完成后结束,而JStorm的Topology任务一旦提交永远不会结束,除非显式停止。

计算任务Topology是由不同的Spout和Bolt通过Stream连接起来的DAG图。其中:

Spout:JStorm的消息源。用于生产消息,一般是从外部数据源(如MQ/RDBMS/NoSQL/RTLog等)不间断读取数据并向下游发送消息。

Bolt:JStorm的消息处理者。用于为Topology进行消息处理,Bolt可以执行查询、过滤、聚合及各种复杂运算操作,Bolt的消息处理结果可以作为下游Bolt的输入不断迭代。

Stream:JStorm中对数据进行的抽象,它是时间上无界的Tuple元组序列。在Topology中Spout是Stream的源头,负责从特定数据源发射Stream;Bolt可以接收任意多个Stream输入然后进行数据的加工处理,如果需要Bolt还可以发射出新Stream给下游Bolt。

Tuple:JStorm使用Tuple作为数据模型,存在于任意两个有数据交互的组件(Spout/Bolt)之间。每个Tuple是一组具有各自名称的值,值可以是任何类型,JStorm支持所有的基本类型、字符串以及字节数组,也可以使用自定义类型(需实现对应序列化器)作为值类型。简单来说,Tuple就是一组实现了序列化器带有名称的Java对象集合。

从整个Topology上看,Spout/Bolt可以看作DAG的节点,Stream是连接不同节点之间的有向边,Tuple则是流过Stream的数据集合。

Topology中每一个计算组件(Spout和Bolt)都有一个并行度,在创建Topology时指定(默认为1),JStorm在集群内分配对应个数的线程Task并行。

既然对于Spout/Bolt都会有多个线程来并行执行,那么如何在两个组件(Spout和Bolt)之间发送Tuple会成为新的问题。

JStorm通过定义Topology时为每个Bolt指定输入Stream以及指定提供的若干种数据流分发(Stream Grouping)策略用来解决这一问题。

JStorm提供了以下几种Stream Grouping策略:

0) Shuffle Grouping:随机分组,随机派发Stream里面的Tuple,保证每个Bolt接收到的Tuple数目大致相同,通过轮询随机的方式使得下游Bolt之间接收到的Tuple数目差值不超过1。

1) Fields Grouping:按字段分组,具有同样字段值的Tuple会被分到相同Bolt里的Task,不同字段值则会被分配到不同Task。

2) All Grouping:广播分组,每一个Tuple,所有的Bolt都会收到。

3) Global Grouping:全局分组,Tuple被分配到Bolt中ID值最低的一个Task。

4) Non Grouping:不分组,Tuple会按照完全随机的方式分发到下游Bolt。

5) Direct Grouping:直接分组,Tuple需要指定由Bolt的哪个Task接收。 只有被声明为Direct Stream的消息流可以声明这种分组方法。

6) Local or Shuffle Grouping:基本同Shuffle Grouping。

7) Custom Grouping:用户自定义分组策略,CustomStreamGrouping是自定义分组策略时用户需要实现的接口。

3 系统架构

JStorm与Hadoop相似,保持了Master/Slave的简洁优雅架构。与Hadoop不同,JStorm的M/S之间不是直接通过RPC交换心跳信息,而是借助ZK来实现,这样的设计虽然引入了第三方依赖,但是简化了Nimbus/Supervisor的设计,同时也极大提高了系统的容错能力。

整个JStorm系统中共存三类不同的Daemon进程,分别是Nimbus,Supervisor和Worker。

Nimbus:JStorm中的主控节点,Nimbus类似于MR的JT,负责接收和验证客户端提交的Topology,分配任务,向ZK写入任务相关的元信息,此外,Nimbus还负责通过ZK来监控节点和任务健康情况,当有Supervisor节点变化或者Worker进程出现问题时及时进行任务重新分配。Nimbus分配任务的结果不是直接下发给Supervisor,也是通过ZK维护分配数据进行过渡。特别地,JStorm 0.9.0领先Apache Storm实现了Nimbus HA,由于Nimbus是Stateless节点,所有的状态信息都交由ZK托管,所以HA相对比较简单,热备Nimbus subscribe ZK关于Master活跃状态数据,一旦发现Master出现问题即从ZK里恢复数据后可以立即接管。

Supervisor:JStorm中的工作节点,Supervisor类似于MR的TT,subscribe ZK分配到该节点的任务数据,根据Nimbus的任务分配情况启动/停止工作进程Worker。Supervisor需要定期向ZK写入活跃端口信息以便Nimbus及时监控。Supervisor不执行具体的数据处理工作,所有的数据处理工作都交给Worker完成。

Worker:JStorm中任务执行者,Worker类似于MR的Task,所有实际的数据处理工作最后都在Worker内执行完成。Worker需要定期向Supervsior汇报心跳,由于在同一节点,同时为保持节点的无状态,Worker定期将状态信息写入本地磁盘,Supervisor通过读本地磁盘状态信息完成心跳交互过程。Worker绑定一个独立端口,Worker内所有单元共享Worker的通信能力。

Nimbus、Supervisor和Worker均为Stateless节点,支持Fail-Fast,这为JStorm的扩展性和容错能力提供了很好的保障。

还剩一个问题是Topology的各个计算组件(Spout/Bolt)如何映射到计算资源上。梳理这个问题前需要先明确Worker/Executor/Task之间的关系:

0、Worker:完整的Topology任务是由分布在多个Supervisor节点上的Worker进程(JVM)来执行,每个Worker都执行且仅执行Topology任务的一个子集。

1、Executor:Worker内部会有一个或多个Executor,每个Executor对应一个线程。Executor包括SpoutExecutor和BoltExecutor,同一个Worker里所有的*Executor只能属于某一个Topology里的执行单元。

2、Task:执行具体数据处理实体,也就是用户实现的Spout/Blot实例。一个Executor可以对应多个Task,定义Topology时指定,默认Executor和Task一一对应。这就是说,系统中Executor数量一定是小于等于Task数量(#Executor≤#Task)。

4 关键流程

0、Topology提交

JStorm为用户提供了StormSubmitter. submitTopology用来向集群提交Topology,整个提交流程:

Client端:

0)客户端简单验证;

1)检查是否已经存在同名Topology;

2)提交jar包;

3)向Nimbus提交Topology;

Nimbus端:

0)Nimbus端简单合法性检查;

1)生成Topology Name;

2)序列化配置文件和Topology Code;

3)Nimbus本地准备运行时所需数据;

4)向ZK注册Topology和Task;

5)将Task压入分配队列等待TopologyAssign分配;

1、任务调度策略

从0.9.0开始,JStorm提供非常强大的调度功能,基本上可以满足大部分的需求,同时支持自定义任务调度策略。JStorm的资源不再仅是Worker的端口,而从CPU/Memory/Disk/Net等四个维度综合考虑。

Nimbus任务调度算法如下:

0)优先使用自定义任务分配算法,当资源无法满足需求时,该任务放到下一级任务分配算法;

1)使用历史任务分配算法(如果打开使用历史任务属性),当资源无法满足需求时,该任务放到下一级任务分配算法;

2)使用默认资源平衡算法,计算每个Supervisor上剩余资源权值,取权值最高的Supervisor分配任务。

2、Acker机制

为保证无数据丢失,Storm/JStorm使用了非常漂亮的可靠性处理机制,如图当定义Topology时指定Acker,JStorm除了Topology本身任务外,还会启动一组称为Acker的特殊任务,负责跟踪Topolgogy DAG中的每个消息。每当发现一个DAG被成功处理完成,Acker就向创建根消息的Spout任务发送一个Ack信号。Topology中Acker任务的并行度默认parallelism hint=1,当系统中有大量的消息时,应该适当提高Acker任务的并行度。

Acker按照Tuple Tree的方式跟踪消息。当Spout发送一个消息的时候,它就通知对应的Acker一个新的根消息产生了,这时Acker就会创建一个新的Tuple Tree。当Acker发现这棵树被完全处理之后,他就会通知对应的Spout任务。

Acker任务保存了数据结构Map<MessageID,Map< TaskID, Value>>,其中MessageID是Spout根消息ID,TaskID是Spout任务ID,Value表示一个64bit的长整型数字,是树中所有消息的随机ID的异或结果。通过TaskID,Acker知道当消息树处理完成后通知哪个Spout任务,通过MessageID,Acker知道属于Spout任务的哪个消息被成功处理完成。Value表示了整棵树的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的MessageID发送过来做异或。当Acker发现一棵树的Value值为0的时候,表明这棵树已经被成功处理完成。

例如,对于前面Topology中消息树,Acker数据的变化过程:

Step0.A发送T0给B后:

R0=r0

<id0,<taskA,R0>>

Step1.B接收到T0并成功处理后向C发送T1,向D发送T2:

R1=R0^r1^r2=r0^r1^r2

<id0,<taskA,R0^R1>>

=<id0,<taskA,r0^r0^r1^r2>>

=<id0,<taskA,r1^r2>>

Step2.C接收到T1并成功处理后:

R2=r1

<id0,<taskA,r1^r2^R2>>

=<id0,<taskA,r1^r2^r1>>

=<id0,<taskA,r2>>

Step3.D接收到T2并成功处理后:

R3=r2

<id0,<taskA,r2^R3>>

=<id0,<taskA,r2^r2>>

=<id0,<taskA,0>>

当结果为0时Acker可以通知taskA根消息id0的消息树已被成功处理完成。

需要指出的是,Acker并不是必须的,当实际业务可以容忍数据丢失情况下可以不用Acker,对数据丢失零容忍的业务必须打开Acker,另外当系统的消息规模较大是可适当增加Acker的并行度。

3、故障恢复

0)节点故障

Nimbus故障。Nimbus本身无状态,所以Nimbus故障不会影响正在正常运行任务,另外Nimbus HA保证Nimbus故障后可以及时被备份Nimbus接管。

Supervisors节点故障。Supervisor故障后,Nimbus会将故障节点上的任务迁移到其他可用节点上继续运行,但是Supervisor故障需要外部监控并及时手动重启。

Worker故障。Worker健康状况监控由Supervisor负责,当Woker出现故障时,Supervisor会及时在本机重试重启。

Zookeeper节点故障。Zookeeper本身具有很好的故障恢复机制,能保证至少半数以上节点在线就可正常运行,及时修复故障节点即可。

1)任务失败

Spout失败。消息不能被及时被Pull到系统中,造成外部大量消息不能被及时处理,而外部大量计算资源空闲。

Bolt失败。消息不能被处理,Acker持有的所有与该Bolt相关的消息反馈值都不能回归到0,最后因为超时最终Spout的fail将被调用。

Acker失败。Acker持有的所有反馈信息不管成功与否都不能及时反馈到Spout,最后同样因为超时Spout的fail将被调用。

任务失败后,需要Nimbus及时监控到并重新分配失败任务。

ELK之JStorm相关推荐

  1. 2021年大数据ELK(二十八):制作Dashboard

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 制作Dashboard 一.点击第三个组件图标,并创建一个新的Dashboar ...

  2. 2021年大数据ELK(二十七):数据可视化(Visualize)

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 数据可视化(Visualize) 一.数据可视化的类型 二.以饼图展示404与 ...

  3. 2021年大数据ELK(二十六):探索数据(Discovery)

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 探索数据(Discovery) 一.使用探索数据功能 二.导入更多的Apach ...

  4. 2021年大数据ELK(二十五):添加Elasticsearch数据源

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 添加Elasticsearch数据源 一.Kibana索引模式 添加Elast ...

  5. 2021年大数据ELK(二十四):安装Kibana

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 安装Kibana 在Linux下安装Kibana,可以使用Elastic stack ...

  6. 2021年大数据ELK(二十三):Kibana简介

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. Kibana简介 通过上面的这张图就可以看到,Kibana可以用来展示丰富的图表. ...

  7. 2021年大数据ELK(二十二):采集Apache Web服务器日志

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 采集Apache Web服务器日志 一.需求 二.准备日志数据 三.使用Fil ...

  8. 2021年大数据ELK(二十一):Logstash简介和安装

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Logstash简介和安装 一.简介 1.经典架构 2.对比Flume 3.对 ...

  9. 2021年大数据ELK(二十):FileBeat是如何工作的

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 FileBeat是如何工作的 一.input和harvester 1.inpu ...

最新文章

  1. C++ 类的大小计算
  2. DOSBOX使用的一些方法和注意点(汇编实验中遇到的)
  3. thinkphp模版调用函数方法
  4. linux服务器配置https访问
  5. nyoj-37 回文字符串
  6. cocos2d-x游戏实例(26)-简易动作游戏(4)
  7. Selenium脚本编写技巧和窍门
  8. 2021中国短视频和直播电商行业人才发展报告
  9. 用SYS本地登录或远程登录引起ORA-01031错误
  10. 2020山东大学计算机组成原理课程设计报告
  11. PCS7 数据库解析
  12. linux服务器生成密钥后无法登陆,securecrt用密钥安全登陆服务器
  13. VS中多字节字符集和UNICODE字符集的使用说明
  14. plc编程语言是c语言吗,PLC各种编程语言特点你了解多少?
  15. 一年级古诗风语文知识心田花开汇总
  16. 雷电、夜神、天天、逍遥等模拟器中找不到要下载的软件解决方法
  17. 多方位角极化SAR数据处理与信息提取方法
  18. 科比投篮预测——可视化与探索性数据分析(二)
  19. 3D打印技术新进展,正带来哪些产业新机会?
  20. CS5216 DP to hdmi 1080p转换器或者转接线设计原理

热门文章

  1. IC封装原理及功能特性汇总
  2. SimpleDateFormat 的使用及其 注意事项
  3. Android4.3 Bluetooth基本介绍
  4. pop3工作原理和命令
  5. 互斥锁(排它锁、独占锁、写锁、X锁)和共享锁(读锁、S锁) 自旋锁
  6. 三星刷机工具Odin图文刷机教程
  7. iframe滚动条样式
  8. LYNC 2010 RTM版正式公布
  9. 2021-08-28
  10. VS如何引入数据库模型(Model)