之前我们已经极为简单的介绍了Storm。现在我们要对它做一个更详细的了解。Storm是一个复杂事件处理引擎(CEP),最初由Twitter实现。在实时计算与分析领域,Storm正在得到日益广泛的应用。Storm可以辅助基本的流式处理,例如聚合数据流,以及基于数据流的机器学习(译者注:原文是ML,根据上下文判断,此处应是指机器学习,下文相同不再缀述)。通常情况,数据分析(译者注:原文为prestorage analytics,意义应是保存分析结果之前的分析计算)在Storm之上进行,然后把结果保存在NOSQL或关系数据库管理系统(RDBMSs)。以气象频道为例,使用Storm以并行方式处理大数据集(译者注:原文用到munging,意义应是洗数据)并为离线计算持久化它们。

下面是一些公司使用Storm的有趣方式:

  • Storm用于持续计算,p并把处理过的数据传输给一个可视化引擎。Data Salt,一个先行者,使用Storm处理大容量数据源。Twitter采用相同的方式,将Storm作为它的发布者分析产品的基础。
  • Groupon也采用Storm实现了低延迟、高吞吐量的数据处理。
  • Yahoo采用Storm作为CEP每天处理数以亿计的事件。他们还把Storm整合进了0和Hadoop YARN,以便Storm能够弹性的使用集群资源,以及更易于使用HBase和Hadoop生态系统中的其它组件。
  • Infochimps采用Storm-Kafka加强他们的数据交付云服务。
  • Storm还被Cerner公司用于医疗领域,用来处理增量更新,并低延迟的把它们保存在HBase,有效的运用Storm作为流式处理引擎和Hadoop作为批处理引擎。
  • Impetus将Storm与Kafka结合,运行机器学习算法,探索制造业的故障模式。他们的客户是一家大型的电子一站式服务商。他们运行分类算法,依据日志实时探测故障,识别故障根源。这是一个更一般的用例:日志实时分析。
  • Impetus还利用Storm在一个分布式系统中构建实时索引。这个系统非常强大,因为它搜索过程几乎是瞬时的。

数据流

Storm的一个基本概念是数据流,它可以被定义为无级的无界序列。Storm只提供多种去中心化且容错的数据流转换方式。流的模式可以指定它的数据类型为以下几种之一:整型、布尔型、字符串、短整型、长整型、字节、字节数组等等。类OutputFieldsDeclarer用来指定流的模式。还可以使用用户自定义类型,这种情况下,用户可能需要提供自定义序列化程序。一旦声明了一个数据流,它就有一个ID,并有一个默认类型的默认值。

拓扑

在Storm内部,数据流的处理由Storm拓扑完成。拓扑包含一个spout,数据源;bolt,负责处理来自spout和其它bolt的数据。目前已经有各种spout,包括从Kafka读取数据的spout(LinkedIn贡献的分布式发布-订阅系统),Twitter API的spout,Kestrel队列的spout,甚至还有从像Oracle这样的关系数据库读取数据的spoutspout可以是可靠的,一旦数据处理失败,它会重新发送数据流。不可靠的spout不跟踪流的状态,不会在失败时重新发送数据。Spout的一个重要方法是nextTuple——它返回下一条待处理的元组。还有两个分别是ack和fail,分别在流被处理成功或处理不成功时调用。Storm的每个spout必须实现IRichSpout接口。Spout可能会分发多个数据流作为输出。

拓扑中的另一个重要的实体是boltbolt执行数据流转换,包括比如计算、过滤、聚合、连接。一个拓扑可以有多个bolt,用来完成复杂的转换和聚合。在声明一个bolt的输入流时,必须订阅其它组件(要么是spout要么是其它bolt)的特定数据流。通过InputDeclarer类和基于数据流组的适当方法完成订阅,这个方法针对数据流组做了简短说明。

execute方法是bolt的一个重要方法,通过调用它处理数据。它从参数接收一个新的数据流,通过OutputCollector分发新的元组。这个方法是线程安全的,这意味着bolt可以是多线程的。bolt必须实现IBasicBolt接口,这个接口提供了ack方法的声明,用来发送确认通知。

Storm集群

一个Storm集群由主节点和从节点构成。主节点通常运行着Nimbus守护进程。Storm已经实现了在Hadoop YARN之上运行——它可以请求YARN的资源管理器额外启动一个应用主节点的守护进程。Nimbus守护进程负责在集群中传输代码,分派任务,监控集群健康状态。在YARN之上实现的Storm可以与YARN的资源管理器配合完成监控及分派任务的工作。

每个从节点运行一个叫做supervisor的守护进程。这是一个工人进程,负责执行拓扑的一部分工作。一个典型的拓扑由运行在多个集群节点中的进程组成。supervisor接受主节点分派的任务后启动工人进程处理。

主从节点之间的协调通讯由ZooKeeper集群完成。(ZooKeeper是一个apache的分布式协作项目,被广泛应用于诸如Storm,Hadoop YARN,以及Kafka等多个项目中。)集群状态由ZooKeeper集群维护,确保集群可恢复性,故障发生时可选举出新的主节点,并继续执行拓扑。

拓扑本身是由spoutsbolts,以及它们连接在一起的方式构成的图结构。它与Map-Reduce任务的主要区别在于,MR任务是短命的,而Storm拓扑一直运行。Storm提供了杀死与重启拓扑的方法。

简单的实时计算例子

一个Kafka spout就是下面展示的样子:

Kafka Spout的open()方法:

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){_collector = collector;_rand = new Random();
}

Kafka Spout的nextTuple()方法:

public void nextTuple() {KafkaConsumer consumer = new KafkaConsumer(kafkaServerURL, kafkaTopic);ArrayList<String> input_data = consumer.getKafkaStreamData();while(true) {for(String inputTuple : input_data){_collector.emit(new Values(inputTuple));}}
}

KafkaConsumer类来自开源项目storm-kafka:https:// github.com/nathanmarz/ storm-contrib/tree/master/storm-kafka。

public void prepare(Map stormConf, TopologyContext context){
//创建输出日志文件,记录输出结果日志try{String logFileName = logFileLocation;//"file"与"outputFile"已作为类属性定义file = new FileWriter(logFileName);outputFile = new PrintWriter(file);outputFile.println("In the prepare() method of bolt");} catch (IOException e){System.out.println("an exception has occured");e.printStackTrace();}
}
public void execute(Tuple input, BasicOutputCollector collector){//从元组取得要处理的字符串String inputMsg=input.getString(0);inputMsg=inputMsg = "I am a bolt";outputFile.println("接收的消息:" + inputMsg);outputFile.flush();collector.emit(tuple(inputMsg));
}

前面创建的spout与这个bolt连接,这个bolt向数据流的字符串域添加一条消息:我是一个bolt。前文显示的就是这个bolt的代码。接下来的代码是构建拓扑的最后一步。它显示了spoutbolts连接在一起构成拓扑,并运行在集群中。

public static void main(String[] args){int numberOfWorkers = 2;//拜年中的工人进程数量int numberOfExecutorsSpout = 1;//spout 执行者数量int numberOfExecutorsBolt = 1;//bolt执行者数量String numbersHost = "192.168.0.0";// Storm集群中运行Nimbus的节点IPTopologyBuilder builder = new TopologyBuilder();Config conf = new Config();builder.setSpout("spout", new TestSpout(false), numberOfExecutorsSpout);//set the spout for the topologybuilder.setBolt("bolt",new TestBolt(), numberOfExecutorsBolt).shuffleGrouping("spout");//set the bolt for the topology//启动远程 Storm集群的配置conf.setNumWorkers(numberOfWorkers);conf.put(Config.NIMBUS_HOST,nimbusHost);conf.put(Config.NIMBUS_THRIFT_PORT, 6627L);//远程Storm集群配置try{StormSubmitter.submitTopology("testing_topology", conf, builder.createTopology());} catch (AlreadyAliveException e){System.out.println("Topology with the Same name is already running on the cluster.");e.printStackTrace();} catch (InvalidTopologyException e) {System.out.println("Topology seems to be invalid.");e.printStackTrace();}
}

数据流组

spoutbolt都可能并行执行多个任务。必须有一种方法指定哪个数据流路由到哪个spout/bolt。数据流组用来指定一个拓扑内必须遵守的路由过程。下面是Storm内建数据流组:

  • 随机数据流组:随机分发数据流,不过它确保所有任务都可得到相同数量的数据流。
  • 域数据流组:基于元组中域的数据流组。比如,有一个machine_id域,拥有相同machine_id域的元组由相同的任务处理。
  • 全部数据流组:它向所有任务分发元组——它可能导致处理冲突。
  • 直接数据流组:一种特殊的数据流组,实现动态路由。元组生产者决定哪个消费者应该接收这个元组。可能是基于运行时的任务ID。bolt可以通过TopologyContext类得到消费者的任务ID,或OutputCollector的emit方法也可使用直接直接数据流组。
  • 本地数据流组:如果目标bolt在相同进程中有一个以上的任务,元组将被随机分配(就像随机数据流组),但是只分配相同进程中的那些任务。
  • 全局数据流组:所有元组到达拥有最小ID的bolt
  • 不分组:目前与随机数据流组一样。

Storm的消息处理担保

spout生成的元组能够触发进一步的元组分发,基于拓扑和所应用的转换。这意味着可能是整个消息树。Storm担保每个元组被完整的处理了——树上的每个节点已被处理过了。这一担保不能没有程序员的支持。每当消息树中创建了一个新的节点或者一个节点被处理了,程序员都必须向Storm指明。第一点通过锚定实现,也就是将处理完成的元组作为OutputCollector的emit方法的第一个参数。这就保证了消息被锚定到了合适的元组。消息也可以锚定到多个元组,这样就构成了一个消息的非循环有向图(DAG),而不只是一棵树。即使在消息的循环有向图存在的情况下,Storm也可以担保消息处理。

在每条消息被处理后,程序员可通过调用ack或fail方法,告诉Storm这条消息已被成功处理或处理失败。Storm会在失败时重新发送数据流——这里满足至少处理一次的语义。Storm也会在发送数据流时采用超时机制——这是一个storm.yaml的配置参数(config.TOPOLOGY_MESSAGE_TIMEOUT_ SECS)。

在Storm内部,有一组“ackeer”任务持续追踪来自每条元组消息的DAG。这些任务的数量可通过storm.yaml中的TOPOLOGY_ACKERS参数设定。在处理大量消息时,可能将不得不增大这个数字。每个消息元组得到一个64-bit ID,用于ackers追踪。元组的DAG状态由一个叫做ack val的64-bit值维护,只是简单的把树中每个确认过(译者注:原谅是acked)的ID执行异或运算。当ack val成为0时,acker任务就认为这棵元组树被完全处理了。

在某些情况下,当性能至关重要,而可靠性又不是问题时,可靠性也可以关闭。在这些情况下,程序员可以指定TOPOLOGY_ACKERS为0,并在分发新元组时,不指定输入元组的非锚定消息(译者注:原文为unanchor messages)。这样就跳过了确认消息,节省了带宽,提高了吞吐量。到目前为止我们已经讨论且只讨论了至少处理一次数据流的语义(译者注:原文为at-least-once stream semantics)。

仅处理一次数据流的语义可以采用事务性拓扑实现。Storm通过为每条元组提供相关联的事务ID为数据流处理提供事务性语义(仅一次,不完全等同于关系数据库的ACID语义)。对于重新发送数据流来说,相同的事务ID也会被发送并担保这个元组不会被重复处理。这方面牵涉到对于消息处理的严格顺序,就像是在处理一个元组。由于这样做的低效率,Storm允许批量处理由一个事务ID关联的元组。不像早先的情况 ,程序不得不将消息锚定到输入元组,事务性拓扑对程序员是透明的。Storm内部将元组的处理分为两阶段——第一阶段为处理阶段,可以并行处理多个批次,第二阶段为提交阶段,强制严格按照批次ID提交。

事务性拓扑已经过时了——它已被整合进了一个叫做Trident的更大的框架。Trident允许对流数据进行查询,包括聚合、连接、分组函数,还有过滤器。Trident构建于事务性拓扑之上并提供了一致的一次性语义。更多关于Trident的细节请参考

  • 转载自 并发编程网 - ifeve.com

颠覆大数据分析之Storm简介相关推荐

  1. 流数据分析平台Storm简介

    流数据分析平台Storm简介 Storm是一个分布式的.容错的实时流计算系统,可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于实时处理,就好比Hadoop之于批处理.Storm保证 ...

  2. 机器学习第一章之大数据分析与机器学习简介

    大数据分析与机器学习简介 1.1 大数据分析与机器学习概述 1.1.1 大数据分析与机器学习的应用领域 1.1.2 机器学习的基本概念 1.1.3 Python在数据科学中的作用 1.2 Python ...

  3. 颠覆大数据分析之Spark弹性分布式数据集

    Spark中迭代式机器学习算法的数据流可以通过图2.3来进行理解.将它和图2.1中Hadoop MR的迭代式机器学习的数据流比较一下.你会发现在Hadoop MR中每次迭代都会涉及HDFS的读写,而在 ...

  4. 数据分析师需要具备什么能力,大数据分析书单

    数据分析师到底在做什么? 数据分析师需要具备什么能力? 快速学习能力应该是每位数据分析师必备的.大数据环境下催生了很多新的数据分析工具和方法,分析师们比拼的就是学习速度.快速掌握很重要. 如何快速成为 ...

  5. SaaSBase:推荐七款超好用的大数据分析工具

    如今,大小企业都可以利用商业智能工具来理解复杂的大数据.通过收集和分析这些数据,并将其转化成易于理解的报告,这些解决方案可以为企业提供有价值的洞察力,从而提高企业利润.SaaSBase(saasbas ...

  6. storm简介(大数据技术)

    Apache Storm简介 由 chalex 创建,小路依依 最后一次修改 2016-12-12 什么是Apache Storm? Apache Storm是一个分布式实时大数据处理系统.Storm ...

  7. 财务大数据比赛有python吗-【教改实验班简介】财务大数据分析班

    原标题:[教改实验班简介]财务大数据分析班 西京学院会计学院欢迎你 财务大数据分析班简介 1. 项目简介 会计学院财务大数据分析教改实验班从2019级学生开始招生. 近年来,随着大数据,人工智能,区块 ...

  8. 使用Storm实现实时大数据分析!

    随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战.Shruthi Kumar和Siddharth Patankar在Dr.Dobb's上结合了汽车超速监视,为我们演示了使用Storm进 ...

  9. 大数据分析处理框架——离线分析(hive,pig,spark)、近似实时分析(Impala)和实时分析(storm、spark streaming)...

    大数据分析处理架构图 数据源: 除该种方法之外,还可以分为离线数据.近似实时数据和实时数据.按照图中的分类其实就是说明了数据存储的结构,而特别要说的是流数据,它的核心就是数据的连续性和快速分析性: 计 ...

  10. 大数据简介与大数据分析

    最近几年,大数据热得像烫手山芋!什么是大数据?通过查阅资料,整理一番,博文将给您带来福利了! 大数据概念 "大数据"是一个体量特别大,数据类别特别大的数据集,并且这样的数据集无法用 ...

最新文章

  1. [UE4蓝图教程]蓝图入门之蓝图通信机制入门
  2. ubyntu 链接mysql_ubuntu mysql 的安装、配置、简单使用,navicat 连接
  3. C++矩阵库 Eigen 快速入门
  4. C语言求素数个数及素数之和
  5. python画小树_如何用Python画一颗小树?
  6. The ALTER TABLE statement conflicted with the FOREIGN KEY constraint FK_SortId.
  7. 矿泉水瓶勿重复使用易得癌病
  8. IM互通新方案-GTalk to VoIP回拨服务
  9. odoo 邮件自动发送相关知识
  10. 5G时代对IDC数据中心提出了怎样的要求?
  11. Docker内时区查询和修改方法
  12. 6D姿态估计算法汇总(下)
  13. 2009年三季度上市公司报表1
  14. VSCode编辑器中对PHP语言的支持
  15. Attempted to read or write protected memory. This is often an indication that other memory is corrup
  16. 「军民链智合创」科技美学出海 BitCEO比特维度全球CEO发展大会参展台北
  17. 《富爸爸、穷爸爸1》-01
  18. 【云原生利器之Cilium】什么是Cilium
  19. 施耐德 U.motion Builder软件被爆20多个0Day漏洞
  20. linux下怎么根据端口号杀死进程

热门文章

  1. 有k个list列表, 各个list列表的元素是有序的,将这k个列表元素进行排序( 基于堆排序的K路归并排序)...
  2. jQuery源码分析系列:Deferred延迟队列
  3. 强烈推荐深入浅出jBPM
  4. eclipse 无法启动选择的项,最近未进行任何启动
  5. python 3 导入 迭代判断
  6. AcWing 166. 数独
  7. Redis--位图BitMap
  8. 06.Android之消息机制问题
  9. 数据结构(十七)数组和矩阵
  10. C# 如何遍历删除某个控件上的所有子控件