关于storm的基础,参照我这篇文章:流式计算storm
关于并发和并行,参照我这篇文章:并发和并行
关于storm的并行度解释,参照我这篇文章:storm的并行度解释
关于storm的流分组策略,参照我这篇文章:storm的流分组策略
关于storm的消息可靠机制,参照我这篇文章:storm的消息可靠机制

storm简介

Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop批量处理大数据一样,Storm可以实时处理数据。Storm简单,可以使用任何编程语言。

storm核心组件

1.Nimbus

相当于storm的master,负责资源分配和任务调度,一个普通的storm集群只有一个nimbus(京东是对nimbus做了集群,加入了选举等概念,防止nimbus突然挂掉)

2.Supervisor

相当于storm的slave,负责接收Nimbus分配的任务,管理和启动所有的Worker

3.Worker

一个Worker就是一个jvm进程,对应一个Topology程序,可以有多个Executor

4.Executor

一个Executor就是一个线程,默认对应一个task,也可以设置成对应多个task

5.Task

一个Task是一个实例(spot/bolt),有多少个task就会new多少个bolt,task是storm中进行计算的最小的运行单位

6.Topology

拓扑结构,一个计算任务场景对应一个拓扑结构,拓扑结构中对声明spout和bolt直接的关系

7.Spout

是拓扑结构中的数据来源,可以向多个bolt发送数据,Spout 既可以定义为可靠的数据源,也可以定义为不可靠的数据源

8.Bolt

真正的数据处理部分,一个bolt可以发给多个bolt,多个bolt也可以发给一个bolt

9.Component

Spout和Bolt都是Component,Storm定义了一个名叫IComponent的总接口
全家谱如下:绿色部分是我们最常用、比较简单的部分。红色部分是与事务相关的

spout和bolt的关系:

整体的topology结构:

storm使用zookeeper来协调集群中的多个节点,但并不是用zookeeper来传递消息
zookeeper可以看这个
Nimbus和Supervisor都是无状态的,他们的心跳都由zookeeper协调

storm优点

1.使用简单,容易上手
2.可扩展,可以调整正在运行的topologies的并行度
3.容错,可靠,当工作节点宕了,storm会尝试重启另一个,而且Nimbus和Supervisors都是无状态的,死掉重启都不影响
4.无数据丢失,Storm的抽象组件确保了数据至少处理一次,即使使用消息队列系统失败时,也能确保消息被处理
5.支持多种编程语言,Storm用Thrift定义和提交topologies.由于Thrift能被任何一种编程语言使用,因此,topologies也能被任何一种编程语言定义和使用。
6.容易部署和操作
7.高性能,低延迟

storm入门案例 ( 实时统计单次个数 )

首先导入maven依赖

     <dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.0.4</version></dependency>

1.先写一个Spout,确定数据源,实际应用中一般是接入kafka等消息,入门案例使用随机字符串代替

/*** 向后端发射tuple数据流* @author soul**/
public class SentenceSpout extends BaseRichSpout {//BaseRichSpout是ISpout接口和IComponent接口的简单实现,接口对用不到的方法提供了默认的实现private SpoutOutputCollector collector;private String[] sentences = {"my name is soul","im a boy","i have a dog","my dog has fleas","my girl friend is beautiful"};private int index=0;/*** open()方法中是ISpout接口中定义,在Spout组件初始化时被调用。* open()接受三个参数:一个包含Storm配置的Map,一个TopologyContext对象,提供了topology中组件的信息,SpoutOutputCollector对象提供发射tuple的方法。* 在这个例子中,我们不需要执行初始化,只是简单的存储在一个SpoutOutputCollector实例变量。*/@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {// TODO Auto-generated method stubthis.collector = collector;}/*** nextTuple()方法是任何Spout实现的核心。* Storm调用这个方法,向输出的collector发出tuple。* 在这里,我们只是发出当前索引的句子,并增加该索引准备发射下一个句子。*/@Overridepublic void nextTuple() {//collector.emit(new Values("hello world this is a test"));// TODO Auto-generated method stubthis.collector.emit(new Values(sentences[index]));index++;if (index>=sentences.length) {index=0;}Utils.sleep(1000);}/*** declareOutputFields是在IComponent接口中定义的,所有Storm的组件(spout和bolt)都必须实现这个接口* 用于告诉Storm流组件将会发出那些数据流,每个流的tuple将包含的字段*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("sentence"));//告诉组件发出数据流包含sentence字段}}

2.写第一个bolt,将Spout传过来的Tuple拆成一个个的单次,循环发给下一个bolt

/*** 订阅sentence spout发射的tuple流,实现分割单词* @author soul**/
public class SplitSentenceBolt extends BaseRichBolt {//BaseRichBolt是IComponent和IBolt接口的实现//继承这个类,就不用去实现本例不关心的方法private OutputCollector collector;/*** prepare()方法类似于ISpout 的open()方法。* 这个方法在blot初始化时调用,可以用来准备bolt用到的资源,比如数据库连接。* 本例子和SentenceSpout类一样,SplitSentenceBolt类不需要太多额外的初始化,* 所以prepare()方法只保存OutputCollector对象的引用。*/@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.collector=collector;}/*** SplitSentenceBolt核心功能是在类IBolt定义execute()方法,这个方法是IBolt接口中定义。* 每次Bolt从流接收一个订阅的tuple,都会调用这个方法。* 本例中,收到的元组中查找“sentence”的值,* 并将该值拆分成单个的词,然后按单词发出新的tuple。*/@Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString sentence = input.getStringByField("sentence");String[] words = sentence.split(" ");for (String word : words) {this.collector.emit(new Values(word));//向下一个bolt发射数据}}/*** plitSentenceBolt类定义一个元组流,每个包含一个字段(“word”)。*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("word"));}}

3.再写一个bolt,一方面接收上个bolt传过来的单次,另一方面将相同单次出现的次数记录下来,并将现在的结果传给下个bolt

/*** 订阅 split sentence bolt的输出流,实现单词计数,并发送当前计数给下一个bolt* @author soul**/
public class WordCountBolt extends BaseRichBolt {private OutputCollector collector;//存储单词和对应的计数private HashMap<String, Long> counts = null;//注:不可序列化对象需在prepare中实例化/*** 大部分实例变量通常是在prepare()中进行实例化,这个设计模式是由topology的部署方式决定的* 因为在部署拓扑时,组件spout和bolt是在网络上发送的序列化的实例变量。* 如果spout或bolt有任何non-serializable实例变量在序列化之前被实例化(例如,在构造函数中创建)* 会抛出NotSerializableException并且拓扑将无法发布。* 本例中因为HashMap 是可序列化的,所以可以安全地在构造函数中实例化。* 但是,通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行复制和实例化* 而在prepare()方法中对不可序列化的对象进行实例化。*/@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.collector = collector;this.counts = new HashMap<String, Long>();}/*** 在execute()方法中,我们查找的收到的单词的计数(如果不存在,初始化为0)* 然后增加计数并存储,发出一个新的词和当前计数组成的二元组。* 发射计数作为流允许拓扑的其他bolt订阅和执行额外的处理。*/@Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString word = input.getStringByField("word");Long count = this.counts.get(word);if (count == null) {count = 0L;//如果不存在,初始化为0}count++;//增加计数this.counts.put(word, count);//存储计数this.collector.emit(new Values(word,count));}/****/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub//声明一个输出流,其中tuple包括了单词和对应的计数,向后发射//其他bolt可以订阅这个数据流进一步处理declarer.declare(new Fields("word","count"));}}

4.再写一个bolt,接收上个bolt传过来的单次统计结果,在控制台打印.实际最后一步一般会将数据结果存在非关系型数据库中,比如存入HBase或者Redis中

/*** 生成一份报告* @author soul**/
public class ReportBolt extends BaseRichBolt {private HashMap<String, Long> counts = null;//保存单词和对应的计数@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.counts = new HashMap<String, Long>();}@Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString word = input.getStringByField("word");Long count = input.getLongByField("count");this.counts.put(word, count);//实时输出System.out.println("结果:"+this.counts);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub//这里是末端bolt,不需要发射数据流,这里无需定义}/*** cleanup是IBolt接口中定义* Storm在终止一个bolt之前会调用这个方法* 本例我们利用cleanup()方法在topology关闭时输出最终的计数结果* 通常情况下,cleanup()方法用来释放bolt占用的资源,如打开的文件句柄或数据库连接* 但是当Storm拓扑在一个集群上运行,IBolt.cleanup()方法不能保证执行(这里是开发模式,生产环境不要这样做)。*/@Overridepublic void cleanup(){System.out.println("---------- FINAL COUNTS -----------");ArrayList<String> keys = new ArrayList<String>();keys.addAll(this.counts.keySet());Collections.sort(keys);for(String key : keys){System.out.println(key + " : " + this.counts.get(key));}System.out.println("----------------------------");}}

5.写拓扑结构,将前面四步的Spout和Bolt组成一个拓扑结构,直接运行主方法就能看到结果,这个是Storm的本地模式,将提交的方法稍作修改,就可以变成集群模式,实际都是集群模式,将这些代码打成jar包传到Nimbus上,运行在集群中

/*** 实现单词计数topology**/
public class App
{private static final String SENTENCE_SPOUT_ID = "sentence-spout";private static final String SPLIT_BOLT_ID = "split-bolt";private static final String COUNT_BOLT_ID = "count-bolt";private static final String REPORT_BOLT_ID = "report-bolt";private static final String TOPOLOGY_NAME = "word-count-topology";public static void main( String[] args ) //throws Exception{//System.out.println( "Hello World!" );//实例化spout和boltSentenceSpout spout = new SentenceSpout();SplitSentenceBolt splitBolt = new SplitSentenceBolt();WordCountBolt countBolt = new WordCountBolt();ReportBolt reportBolt = new ReportBolt();TopologyBuilder builder = new TopologyBuilder();//创建了一个TopologyBuilder实例//TopologyBuilder提供流式风格的API来定义topology组件之间的数据流//builder.setSpout(SENTENCE_SPOUT_ID, spout);//注册一个sentence spout//设置两个Executeor(线程),默认一个builder.setSpout(SENTENCE_SPOUT_ID, spout,2);// SentenceSpout --> SplitSentenceBolt//注册一个bolt并订阅sentence发射出的数据流,shuffleGrouping方法告诉Storm要将SentenceSpout发射的tuple随机均匀的分发给SplitSentenceBolt的实例//builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);//SplitSentenceBolt单词分割器设置4个Task,2个Executeor(线程)builder.setBolt(SPLIT_BOLT_ID, splitBolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);// SplitSentenceBolt --> WordCountBolt//fieldsGrouping将含有特定数据的tuple路由到特殊的bolt实例中//这里fieldsGrouping()方法保证所有“word”字段相同的tuuple会被路由到同一个WordCountBolt实例中//builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));//WordCountBolt单词计数器设置4个Executeor(线程)builder.setBolt(COUNT_BOLT_ID, countBolt,4).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));// WordCountBolt --> ReportBolt//globalGrouping是把WordCountBolt发射的所有tuple路由到唯一的ReportBoltbuilder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);Config config = new Config();//Config类是一个HashMap<String,Object>的子类,用来配置topology运行时的行为//设置worker数量//config.setNumWorkers(2);LocalCluster cluster = new LocalCluster();//本地提交cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());Utils.sleep(10000);cluster.killTopology(TOPOLOGY_NAME);cluster.shutdown();}
}

storm与其他流式计算框架的对比

1.Spark Streaming
在处理前按时间间隔预先将其切分为一段一段的批处理作业.
Spark针对持续性数据流的抽象称为DStream(DiscretizedStream),
一个DStream是一个微批处理(micro-batching)的RDD(弹性分布式数据集),
而RDD则是一种分布式数据集,能够以两种方式并行运作,分别是任意函数和滑动窗口数据的转换。

2.Flink
针对流数据和批数据的分布式处理引擎
原生的流处理系统,
其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已
Flink 会把所有任务当成流来处理

3.Storm
原生的流处理系统,可以做到毫秒级处理

流式计算storm核心组件介绍以及入门案例---跟着就能在本地跑起来的storm项目相关推荐

  1. 分布式流式计算框架Storm

    Storm用于实时处理,就好比 Hadoop 用于批处理.         --> 离线计算:批量获取数据,批量传输数据,周期性比量计算数据,数据展示(Sqoop-->HDFS--> ...

  2. Storm 流式计算框架介绍

    文章目录 1.Storm简介 1.1 DAG(有向无环图) 1.2 Storm介绍 1.2.1 Storm 简介 1.2.2 Storm的优点 1.2.3 Storm的特性 1.3 Storm与Had ...

  3. 流式计算-Storm基本介绍

    1.离线计算是什么? 离线计算:批量获取数据.批量传输数据.周期性批量计算数据.数据展示 代表技术:Sqoop批量导入数据.HDFS批量存储数据.MapReduce批量计算数据.Hive批量计算数据. ...

  4. Storm流式计算入门

    流式计算 实时获取数据,实时数据储存,实时数据计算,实时结果缓存,持久化存储(mysql) 代表技术: Flume:实时获取数据 Kafka:实时数据存储 Storm/jstorm:实时数据计算 Re ...

  5. 高大上的介绍实时流式计算!

    实时流式计算,也就是RealTime,Streaming,Analyse,在不同的领域有不同的定义,这里我们说的是大数据领域的实时流式计算. 实时流式计算,或者是实时计算,流式计算,在大数据领域都是差 ...

  6. Flink流式计算从入门到实战 一

    文章目录 一.理解Flink与流计算 1.初识Flink 2.Flink的适用场景 3.流式计算梳理 二.Flink安装部署 1.Flink的部署方式 2.获取Flink 3.实验环境与前置软件 4. ...

  7. spark 流式计算_流式传输大数据:Storm,Spark和Samza

    spark 流式计算 有许多分布式计算系统可以实时或近实时处理大数据. 本文将从对三个Apache框架的简短描述开始,并试图对它们之间的某些相似之处和不同之处提供一个快速的高级概述. 阿帕奇风暴 在风 ...

  8. python 流式计算框架_流式计算的三种框架:Storm、Spark和Flink

    我们知道,大数据的计算模式主要分为批量计算(batch computing).流式计算(stream computing).交互计算(interactive computing).图计算(graph ...

  9. 流式计算的代表:Storm、Flink、Spark Streaming

    learn from 从0开始学大数据(极客时间) 文章目录 1. Storm 2. Spark Streaming 3. Flink 对存储在磁盘上的数据进行大规模计算处理,大数据批处理 对实时产生 ...

最新文章

  1. 在Ubuntu 14.04 64bit上安装python-pyqt5软件包(python 2.7)
  2. Eclipse配置C++时的三个关键环境变量
  3. oracle-备份工具exp-imp
  4. Java实现点击导出excel页面遮罩屏蔽,下载完成后解除遮罩
  5. Prism详解【转】
  6. ==和equals()比较
  7. 在Ubuntu1404的64bit版本下安装caffe
  8. Robin负载均衡策略存在问题及CSE解决方案
  9. [转]CMake 生成makefile 步骤
  10. Jquery Div居中
  11. Https的数据请求的证书设置
  12. Android DialogFragment
  13. 比较好的电脑系统_如何重装平板电脑系统?各种Windows系统平板win8升级win10方法...
  14. oracle 中的角色
  15. 40 多套 Java 完整实战项目,各个精品!
  16. R语言manova函数稳健多元方差分析(Robust one-way MANOVA)、rrcov包中的wilks.test函数稳健单向MANOVA、vegan包的adonis函数非参数Manova等效
  17. 《拆掉思维里的墙》的读后感作文900字
  18. Oracle什么情况使用omf,ORACLE OMF介绍
  19. 湖大计算机学院博士后李晓灿,谢鲲-湖大信息科学与工程学院
  20. 关于芯片寄存器地址的理解

热门文章

  1. PowerDesigner15在win7-64位系统下对MySQL 进行反向工程以及建立物理模型产生SQL语句步骤图文傻瓜式详解...
  2. CodeForces 869E The Untended Antiquity 二维树状数组,随机hash
  3. 用css、html编写一个两列布局的网页,名称为css.html ,要求左侧宽度为200px ,右侧自动扩展...
  4. Atitit.js this错误指向window的解决方案
  5. Ionic发布成android
  6. 脱裤子放屁,多此一举
  7. 鼠标关机后仍然发光的实用解决方法
  8. 牛客16589 机器翻译
  9. JDBC--Java Database Connectivity
  10. Leetcode--85. 最大矩形