部署Storm集群。

环境配置

Strom

下载

  • 首先从官网下载Strom压缩包,这里以最新的Strom1.2.2作为演示。

  • 解压到/usr/local:

    • sudo tar xzvf apache-storm-1.2.2.tar.gz -C /usr/local
      
  • 查看解压文件:

  • 需要重命名:

    • sudo mv /usr/local/apache-storm-1.2.2/ /usr/local/storm
      
  • 修改拥有者:

    • sudo chown -R hadoop:hadoop storm
      
  • 添加到环境变量

    • vim ~/.bashrc
      export STROM=/usr/local/storm
      source ~/.bashrc
      

修改配置文件

  • 进入软件目录,修改文件storm.yaml

    • cd /usr/local/storm/
      sudo vim conf/storm.yaml
      
  • 修改为本地server:

    • storm.zookeeper.servers:- "localhost"nimbus.seeds: ["localhost"]supervisor.slots.ports:- 6700- 6701- 6702- 6703storm.zookeeper.port: 2181storm.local.dir: "/usr/local/storm/storm-local"storm.health.check.dir: "healthchecks"storm.health.check.timeout.ms: 5000
      

Python

  • 检查是否有高于2.6版本的python:

启动storm

修改执行脚本

  • 参考这里修改脚本,主要是修改python的路径

  • 先查找自己的python路径:

    • ls /usr/bin/python*
      
  • 找到执行脚本,在./bin/storm中,根据之前的python路径修改

启动

  • 执行命令:

    • nohup storm nimbus >/dev/null 2>&1 &
      nohup storm supervisor >/dev/null 2>&1 &
      nohup storm ui >/dev/null 2>&1 &
      
    • 这里的2>&1表示就是把所有标准输出(&1)和标准出错(2)都扔到垃圾桶里面,最后的&表示后台执行

  • 这里可能需要几分钟才能完成启动,使用jps命令查看可以看到nimbus, supervisor, core:

  • 在浏览器中localhost:8080 上查看集群情况:

  • 成功!

测试运行

示例

  • /usr/local/storm/example目录下的storm-starter里有很多Storm的项目,比如DRPCWord Count

  • 在该目录路径下使用maven来打包Stormjar包

  • 首先找到该目录:

  • 我们发现该目录是用maven进行打包的:

  • 因此,我们可以使用IDEA来打包,也可以直接使用mvn:

    • mvn clean install -Dmaven.test.skip=true
      mvn package
      

Stream Join的简单实例

WordCount

SentenceSpout.java

该文件为模拟外部输入

import java.util.Map;
import java.util.UUID;import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;public class SentenceSpout extends BaseRichSpout {private SpoutOutputCollector spoutOutputCollector;private String[] sentences = {"the cow jumped over the moon", "an apple a day keeps the doctor away","four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"};public void open(Map map, TopologyContext topologycontext, SpoutOutputCollector spoutoutputcollector) {this.spoutOutputCollector = spoutoutputcollector;}public void nextTuple() {for (String sentence : sentences) {Values values = new Values(sentence);UUID msgId = UUID.randomUUID();this.spoutOutputCollector.emit(values, msgId);}Utils.sleep(1000);}public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {outputfieldsdeclarer.declare(new Fields("sentence"));}}

SplitSentenceBolt.java

分割字符的bolt

import java.util.Map;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;public class SplitSentenceBolt extends BaseRichBolt {private OutputCollector outputCollector;public void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for (String word : words) {this.outputCollector.emit(new Values(word));}}public void prepare(Map map, TopologyContext topologycontext, OutputCollector outputcollector) {this.outputCollector = outputcollector;}public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {outputfieldsdeclarer.declare(new Fields("word"));}}

WordCountBolt.java

统计次数的bolt

import java.util.HashMap;
import java.util.Map;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;public class WordCountBolt extends BaseRichBolt {private OutputCollector outputCollector;private HashMap<String, Integer> counts = null;public void prepare(Map map, TopologyContext topologycontext, OutputCollector outputcollector) {this.outputCollector = outputcollector;this.counts = new HashMap<String, Integer>();}public void execute(Tuple tuple) {String word = tuple.getStringByField("word");Integer count = counts.get(word);if (count == null) {count = 0;}count++;this.counts.put(word, count);this.outputCollector.emit(new Values(word, count));this.outputCollector.ack(tuple);System.out.println(word + ": " + count);}public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {outputfieldsdeclarer.declare(new Fields("word", "count"));}}

WordCountTopology.java

主函数

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;public class WordCountTopology {public static void main(String[] args) throws Exception {SentenceSpout sentenceSpout = new SentenceSpout();SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt();WordCountBolt wordCountBolt = new WordCountBolt();TopologyBuilder builder = new TopologyBuilder();builder.setSpout("sentenceSpout-1", sentenceSpout);builder.setBolt("splitSentenceBolt-1", splitSentenceBolt).shuffleGrouping("sentenceSpout-1");builder.setBolt("wordCountBolt-1", wordCountBolt).fieldsGrouping("splitSentenceBolt-1", new Fields("word"));Config config = new Config();LocalCluster cluster = new LocalCluster();cluster.submitTopology("wordCountTopology-1", config, builder.createTopology());Thread.sleep(999999999);cluster.shutdown();}}

运行

  • 使用已经编译好的WordCountjar包,提交任务:

    • storm jar wordcount.jar WordCountTopology wc
      
    • 这里WordCountTopoloy为主class,wc为任务别名

  • 可以看见,WordCount的统计是不断变化的:

Stream Join

SimpleJoinBolt.java

这里主要处理join的过程

import org.apache.storm.Config;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TimeCacheMap;import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;public class SimpleJoinBolt extends BaseRichBolt {private OutputCollector _collector;private Fields _outFields;private Fields _idFields;int _numSources;Map<String, GlobalStreamId> _fieldLocations;//在内存中保留近期活跃的对象//由于bolt在接收两个数据源的流数据时,同一id两个数据流很可能不会再一个时间点同时发出,因此需要对数据流先进行缓存,直到所有//id相同的数据源都被后被聚合处理,做完聚合处理后再将对应的tuple信息从缓存中删除。在TimeCacheMap<List<Object>,Map<GlobalStreamId, Tuple>> _pending;//传进的Fields是聚合后将被输出的字段public SimpleJoinBolt(Fields outFields){this._outFields=outFields;}public void execute(Tuple tuple) {// TODO Auto-generated method stub//从tuple中获取_idFields字段,如果不存在于等待被处理的队列_pending中,则加入一行List<Object> id=tuple.select(_idFields);GlobalStreamId streamId=new GlobalStreamId(tuple.getSourceComponent(),tuple.getSourceStreamId());//打印当前处理元组的来源SpoutSystem.out.println("元组来源:"+tuple.getSourceComponent());//打印当前元组System.out.println("接收的元组:"+tuple.getFields().get(0)+" = "+tuple.getValues().get(0)+" , "+tuple.getFields().get(1)+" = "+tuple.getValues().get(1));//如果当前pending中还不存在join key为此id的元组,则将该条记录加入if(!_pending.containsKey(id)){_pending.put(id, new HashMap<GlobalStreamId,Tuple>());}//从_pending队列中获取当前GlobalStreamId对应的HashMap对象Map<GlobalStreamId,Tuple> parts=_pending.get(id);//如果streamId已经包含其中,则抛出异常,接收到同一个spout中的两条一样id的tuple,否则将该streamId加入parts中if(parts.containsKey(streamId)){throw new RuntimeException("Received same side of single join twice");}parts.put(streamId, tuple);//如果parts中已经包含了聚合数据源的个数,则从_pending队列中移除这条记录if(parts.size()==_numSources){_pending.remove(id);List<Object> joinResult=new ArrayList<Object>();for(String outField:_outFields){GlobalStreamId loc=_fieldLocations.get(outField);joinResult.add(parts.get(loc).getValueByField(outField));}//输出聚合结果System.out.print("两条关系流中id值为"+id.get(0)+"的元组均已收到,聚合结果为:");for(Object obj:joinResult){System.out.print(obj+" ");}System.out.println();//多锚定_collector.emit(new ArrayList<Tuple>(parts.values()),joinResult);for (Tuple part : parts.values()) {_collector.ack(part);}}else{System.out.println("只从一个关系流中收取到id值为"+id+"的元组,不可进行join操作");}}public void prepare(Map conf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stub_fieldLocations = new HashMap<String, GlobalStreamId>();this._collector=collector;//创建TimeCacheMap对象,设置超时回调接口,用于tuple处理失败时fail消息int timeout=((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();_pending=new TimeCacheMap<List<Object>,Map<GlobalStreamId,Tuple>>(timeout,new ExpireCallback());//记录数据源的数据个数_numSources=context.getThisSources().size();Set<String> idFields=null;//遍历TopologyContext中不同的数据源:genderSpout和ageSpoutSystem.out.println(context.getThisSources().keySet());for(GlobalStreamId source:context.getThisSources().keySet()){//得到公共的Fields字段id,保存到_idFields中Fields fields=context.getComponentOutputFields(source.get_componentId(),source.get_streamId());//fields:[id,gender],[id,age]Set<String> setFields=new HashSet<String>(fields.toList());if(idFields==null){idFields=setFields;}else{//求交集idFields.retainAll(setFields);System.out.println(idFields);}//同时将_outFields中字段所在数据源记录下来,保存到一张HashMap _fieldLocations中,以便聚合后获取对应的字段值for(String outfield:_outFields){for(String sourcefield:fields){if(outfield.equals(sourcefield)){_fieldLocations.put(outfield, source);}}}//打印结果:gender=GlobalStreamId(componentId=gender-spout,streamId=default)//age=GlobalStreamId(componentId=age-spout,streamId=default)System.out.println(_fieldLocations);}_idFields=new Fields(new ArrayList<String>(idFields));if(_fieldLocations.size()!=_outFields.size()){throw new RuntimeException("Cannot find all outfields among sources");}}public void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(_outFields);}private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId,Tuple>>{public void expire(List<Object> key, Map<GlobalStreamId, Tuple> tuples) {// TODO Auto-generated method stubfor(Tuple tuple:tuples.values()){_collector.fail(tuple);}}}}

SingleJoinExample.java

这里处理两个spout和setBolt过程

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.testing.FeederSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;public class SingleJoinExample {public static void main(String[] args) {FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));TopologyBuilder builder = new TopologyBuilder();builder.setSpout("gender", genderSpout);builder.setSpout("age", ageSpout);builder.setBolt("join", new SimpleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id")).fieldsGrouping("age", new Fields("id"));Config conf = new Config();conf.setMaxTaskParallelism(3);LocalCluster cluster = new LocalCluster();cluster.submitTopology("join-bolt", conf, builder.createTopology());for (int i = 0; i < 100; i++) {String gender;if (i % 2 == 0) {gender = "male";}else {gender = "female";}genderSpout.feed(new Values(i, gender));}for (int i = 100; i >= 0; i--) {ageSpout.feed(new Values(i, i + 20));}Utils.sleep(2000);cluster.shutdown();}
}

运行

首先生成JAR包,然后执行:

storm jar ds.jar SingleJoinExample join

运行结果为:

可以看见,当接收到的元祖没有匹配的id时,会等待另一个元祖到来,然后再聚合。

Reference

  • Async-loop-died
  • Storm处理Stream Join的简单实例

Storm部署与运行相关推荐

  1. 亿级流量电商详情页系统实战-34.Storm部署

    1.环境准备 1.1 安装Java 7 1.2 Python 2.6.6 2.部署 2.1 下载storm安装包 下载apache-storm-1.1.0.tar.gz上传至 解压缩 # cd /us ...

  2. Visual Studio 2013开发 mini-filter driver step by step (2) - 编译,部署,运行

    编译driver 一个基本的mini filter项目创建好了以后,就可以编译,部署和运行了,在部署之前,我们要先确定在什么样的系统上运行,我有一台windows 2008 r2拟机,所以我的运行的目 ...

  3. 在SAP云平台上部署和运行Docker应用

    容器技术,Docker,虚拟化,这些名词诞生尽管有很长一段时间了,但是在云原生开发领域仍旧热度不减.甚至连SAP赖以成名的ABAP Netweaver,如今也踏上了容器化的上云探索之路,比如下面这张来 ...

  4. Xamarin 跨移动端开发系列(01) -- 搭建环境、编译、调试、部署、运行

    (本文是基于老版本的VS和Xamarin,而VS2017已经集成了Xamarin,所以,本文已经过时,最新的Xamarin开发介绍请参见 使用 Xamarin开发手机聊天程序 .) 如果是.NET开发 ...

  5. 项目 11 部署与运行

    项目 11 部署与运行 #为什么要运行多个tornado实例 #网页响应不是特别的计算密集型处理,对于计算机来说不会消耗太多的CPU,运行多个实例可以充分利用CPU#多实例又会有多端口的问题,在下面的 ...

  6. zk-03-Zookeeper部署和运行

    环境准备: 准备Java运行环境,确保你已经安装了 java7 或者更高的版本: 下载 ZooKeeper 安装包:https://zookeeper.apache.org/releases.html ...

  7. 华为Atlas200DK的环境部署与运行demo(人脸识别)

    文章目录 前言 一.部署准备 1.基本准备 2.安全清空sd卡 3.安装摄像头 二.环境部署 1.运行环境与开发环境合设 1.烧录dd镜像 2.开发板启动 3.开发板连接PC机 1.USB端口连接 2 ...

  8. IDEA从零到精通06之创建web项目及部署tomcat运行

    文章目录 作者简介 引言 导航 热门专栏推荐 视频讲解 概述 一.创建web项目 二.修改部署信息 三.启动服务 四.关闭服务 五.创建Servlet并访问 六.引入第三方jar包 小结 导航 热门专 ...

  9. 以太坊部署_从以太坊开始-部署和运行合同

    以太坊部署 在过去的几周中,我们设置了工具并编写了合同(并对其进行了测试). 绝对是时候"真正"部署合同并运行它了. 这是第3 次后在开始复仇重点series.Other职位包括: ...

最新文章

  1. 密度聚类算法DBSCAN实战及可视化分析
  2. C6000系列DSP的内联函数
  3. base64/32/16编码
  4. VTK:Filtering之ImplicitBooleanDemo
  5. VS一直停留在“正在还原nuget程序包”
  6. strrchr php,php strstr() strrchr() strpos() strrpos()函数_PHP教程
  7. Spring Boot的事务管理注解@EnableTransactionManagement的使用
  8. 一个网页设计师应该考虑的9件事
  9. 最小环 floyd java_Floyd最小环
  10. flex 内嵌js文件
  11. 外星人双系统ubuntu18.04安装killer E3100网卡驱动
  12. 计算机视觉教程7-3:Openpose配置与实践
  13. ios原生条形码扫描 效率低下原因
  14. 计算机科学本质源自于数学思维,计算思维的特点、特征:形式化、程序化、机械化...
  15. [轉貼]奋斗5年从月薪3500到700万!
  16. 发布下今天学习内容--CentOS7安装Oracle 11gR2 图文详解
  17. js 实现时分秒的转换
  18. 做了一个网页版的串口调试助手
  19. 维基百科推荐算法阅读总结
  20. dotnet 从入门到放弃的 500 篇文章合集

热门文章

  1. js 读取图片路径并预览图片
  2. uniapp中调用震动、系统铃声以及自定义铃声
  3. 关于国密HTTPS的那些事(一)
  4. SVN mac 破解版
  5. 【计蒜客】蒜头君的旅游计划
  6. 【HMS core】【Wallet Kit】【解决方案】华为钱包的客户端示例代码为何无法运行
  7. 小练习 通过csv模块读取csv文件
  8. 一年303个漏洞,Chrome被评为“最脆弱”浏览器,Opera 最安全!网友:Opera 还有人用?...
  9. linux最大的账户,Linux系统账户安全
  10. 男人买鞋有点难——中国十大皮鞋