Storm部署与运行
部署Storm集群。
环境配置
Strom
下载
首先从官网下载Strom压缩包,这里以最新的Strom1.2.2作为演示。
解压到/usr/local:
sudo tar xzvf apache-storm-1.2.2.tar.gz -C /usr/local
查看解压文件:
需要重命名:
sudo mv /usr/local/apache-storm-1.2.2/ /usr/local/storm
修改拥有者:
sudo chown -R hadoop:hadoop storm
添加到环境变量
vim ~/.bashrc export STROM=/usr/local/storm source ~/.bashrc
修改配置文件
进入软件目录,修改文件
storm.yaml
cd /usr/local/storm/ sudo vim conf/storm.yaml
修改为本地server:
storm.zookeeper.servers:- "localhost"nimbus.seeds: ["localhost"]supervisor.slots.ports:- 6700- 6701- 6702- 6703storm.zookeeper.port: 2181storm.local.dir: "/usr/local/storm/storm-local"storm.health.check.dir: "healthchecks"storm.health.check.timeout.ms: 5000
Python
- 检查是否有高于2.6版本的python:
启动storm
修改执行脚本
参考这里修改脚本,主要是修改python的路径
先查找自己的python路径:
ls /usr/bin/python*
找到执行脚本,在
./bin/storm
中,根据之前的python路径修改
启动
执行命令:
nohup storm nimbus >/dev/null 2>&1 & nohup storm supervisor >/dev/null 2>&1 & nohup storm ui >/dev/null 2>&1 &
这里的
2>&1
表示就是把所有标准输出(&1)和标准出错(2)都扔到垃圾桶里面,最后的&
表示后台执行
这里可能需要几分钟才能完成启动,使用
jps
命令查看可以看到nimbus, supervisor, core:在浏览器中
localhost:8080
上查看集群情况:成功!
测试运行
示例
/usr/local/storm/example
目录下的storm-starter
里有很多Storm
的项目,比如DRPC
、Word Count
。在该目录路径下使用maven来打包
Storm
jar包首先找到该目录:
我们发现该目录是用
maven
进行打包的:因此,我们可以使用IDEA来打包,也可以直接使用mvn:
mvn clean install -Dmaven.test.skip=true mvn package
Stream Join的简单实例
WordCount
SentenceSpout.java
该文件为模拟外部输入
import java.util.Map;
import java.util.UUID;import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;public class SentenceSpout extends BaseRichSpout {private SpoutOutputCollector spoutOutputCollector;private String[] sentences = {"the cow jumped over the moon", "an apple a day keeps the doctor away","four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"};public void open(Map map, TopologyContext topologycontext, SpoutOutputCollector spoutoutputcollector) {this.spoutOutputCollector = spoutoutputcollector;}public void nextTuple() {for (String sentence : sentences) {Values values = new Values(sentence);UUID msgId = UUID.randomUUID();this.spoutOutputCollector.emit(values, msgId);}Utils.sleep(1000);}public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {outputfieldsdeclarer.declare(new Fields("sentence"));}}
SplitSentenceBolt.java
分割字符的bolt
import java.util.Map;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;public class SplitSentenceBolt extends BaseRichBolt {private OutputCollector outputCollector;public void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for (String word : words) {this.outputCollector.emit(new Values(word));}}public void prepare(Map map, TopologyContext topologycontext, OutputCollector outputcollector) {this.outputCollector = outputcollector;}public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {outputfieldsdeclarer.declare(new Fields("word"));}}
WordCountBolt.java
统计次数的bolt
import java.util.HashMap;
import java.util.Map;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;public class WordCountBolt extends BaseRichBolt {private OutputCollector outputCollector;private HashMap<String, Integer> counts = null;public void prepare(Map map, TopologyContext topologycontext, OutputCollector outputcollector) {this.outputCollector = outputcollector;this.counts = new HashMap<String, Integer>();}public void execute(Tuple tuple) {String word = tuple.getStringByField("word");Integer count = counts.get(word);if (count == null) {count = 0;}count++;this.counts.put(word, count);this.outputCollector.emit(new Values(word, count));this.outputCollector.ack(tuple);System.out.println(word + ": " + count);}public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {outputfieldsdeclarer.declare(new Fields("word", "count"));}}
WordCountTopology.java
主函数
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;public class WordCountTopology {public static void main(String[] args) throws Exception {SentenceSpout sentenceSpout = new SentenceSpout();SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt();WordCountBolt wordCountBolt = new WordCountBolt();TopologyBuilder builder = new TopologyBuilder();builder.setSpout("sentenceSpout-1", sentenceSpout);builder.setBolt("splitSentenceBolt-1", splitSentenceBolt).shuffleGrouping("sentenceSpout-1");builder.setBolt("wordCountBolt-1", wordCountBolt).fieldsGrouping("splitSentenceBolt-1", new Fields("word"));Config config = new Config();LocalCluster cluster = new LocalCluster();cluster.submitTopology("wordCountTopology-1", config, builder.createTopology());Thread.sleep(999999999);cluster.shutdown();}}
运行
使用已经编译好的
WordCount
jar包,提交任务:storm jar wordcount.jar WordCountTopology wc
这里
WordCountTopoloy
为主class,wc
为任务别名
可以看见,WordCount的统计是不断变化的:
Stream Join
SimpleJoinBolt.java
这里主要处理join的过程
import org.apache.storm.Config;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TimeCacheMap;import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;public class SimpleJoinBolt extends BaseRichBolt {private OutputCollector _collector;private Fields _outFields;private Fields _idFields;int _numSources;Map<String, GlobalStreamId> _fieldLocations;//在内存中保留近期活跃的对象//由于bolt在接收两个数据源的流数据时,同一id两个数据流很可能不会再一个时间点同时发出,因此需要对数据流先进行缓存,直到所有//id相同的数据源都被后被聚合处理,做完聚合处理后再将对应的tuple信息从缓存中删除。在TimeCacheMap<List<Object>,Map<GlobalStreamId, Tuple>> _pending;//传进的Fields是聚合后将被输出的字段public SimpleJoinBolt(Fields outFields){this._outFields=outFields;}public void execute(Tuple tuple) {// TODO Auto-generated method stub//从tuple中获取_idFields字段,如果不存在于等待被处理的队列_pending中,则加入一行List<Object> id=tuple.select(_idFields);GlobalStreamId streamId=new GlobalStreamId(tuple.getSourceComponent(),tuple.getSourceStreamId());//打印当前处理元组的来源SpoutSystem.out.println("元组来源:"+tuple.getSourceComponent());//打印当前元组System.out.println("接收的元组:"+tuple.getFields().get(0)+" = "+tuple.getValues().get(0)+" , "+tuple.getFields().get(1)+" = "+tuple.getValues().get(1));//如果当前pending中还不存在join key为此id的元组,则将该条记录加入if(!_pending.containsKey(id)){_pending.put(id, new HashMap<GlobalStreamId,Tuple>());}//从_pending队列中获取当前GlobalStreamId对应的HashMap对象Map<GlobalStreamId,Tuple> parts=_pending.get(id);//如果streamId已经包含其中,则抛出异常,接收到同一个spout中的两条一样id的tuple,否则将该streamId加入parts中if(parts.containsKey(streamId)){throw new RuntimeException("Received same side of single join twice");}parts.put(streamId, tuple);//如果parts中已经包含了聚合数据源的个数,则从_pending队列中移除这条记录if(parts.size()==_numSources){_pending.remove(id);List<Object> joinResult=new ArrayList<Object>();for(String outField:_outFields){GlobalStreamId loc=_fieldLocations.get(outField);joinResult.add(parts.get(loc).getValueByField(outField));}//输出聚合结果System.out.print("两条关系流中id值为"+id.get(0)+"的元组均已收到,聚合结果为:");for(Object obj:joinResult){System.out.print(obj+" ");}System.out.println();//多锚定_collector.emit(new ArrayList<Tuple>(parts.values()),joinResult);for (Tuple part : parts.values()) {_collector.ack(part);}}else{System.out.println("只从一个关系流中收取到id值为"+id+"的元组,不可进行join操作");}}public void prepare(Map conf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stub_fieldLocations = new HashMap<String, GlobalStreamId>();this._collector=collector;//创建TimeCacheMap对象,设置超时回调接口,用于tuple处理失败时fail消息int timeout=((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();_pending=new TimeCacheMap<List<Object>,Map<GlobalStreamId,Tuple>>(timeout,new ExpireCallback());//记录数据源的数据个数_numSources=context.getThisSources().size();Set<String> idFields=null;//遍历TopologyContext中不同的数据源:genderSpout和ageSpoutSystem.out.println(context.getThisSources().keySet());for(GlobalStreamId source:context.getThisSources().keySet()){//得到公共的Fields字段id,保存到_idFields中Fields fields=context.getComponentOutputFields(source.get_componentId(),source.get_streamId());//fields:[id,gender],[id,age]Set<String> setFields=new HashSet<String>(fields.toList());if(idFields==null){idFields=setFields;}else{//求交集idFields.retainAll(setFields);System.out.println(idFields);}//同时将_outFields中字段所在数据源记录下来,保存到一张HashMap _fieldLocations中,以便聚合后获取对应的字段值for(String outfield:_outFields){for(String sourcefield:fields){if(outfield.equals(sourcefield)){_fieldLocations.put(outfield, source);}}}//打印结果:gender=GlobalStreamId(componentId=gender-spout,streamId=default)//age=GlobalStreamId(componentId=age-spout,streamId=default)System.out.println(_fieldLocations);}_idFields=new Fields(new ArrayList<String>(idFields));if(_fieldLocations.size()!=_outFields.size()){throw new RuntimeException("Cannot find all outfields among sources");}}public void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(_outFields);}private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId,Tuple>>{public void expire(List<Object> key, Map<GlobalStreamId, Tuple> tuples) {// TODO Auto-generated method stubfor(Tuple tuple:tuples.values()){_collector.fail(tuple);}}}}
SingleJoinExample.java
这里处理两个spout和setBolt过程
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.testing.FeederSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;public class SingleJoinExample {public static void main(String[] args) {FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));TopologyBuilder builder = new TopologyBuilder();builder.setSpout("gender", genderSpout);builder.setSpout("age", ageSpout);builder.setBolt("join", new SimpleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id")).fieldsGrouping("age", new Fields("id"));Config conf = new Config();conf.setMaxTaskParallelism(3);LocalCluster cluster = new LocalCluster();cluster.submitTopology("join-bolt", conf, builder.createTopology());for (int i = 0; i < 100; i++) {String gender;if (i % 2 == 0) {gender = "male";}else {gender = "female";}genderSpout.feed(new Values(i, gender));}for (int i = 100; i >= 0; i--) {ageSpout.feed(new Values(i, i + 20));}Utils.sleep(2000);cluster.shutdown();}
}
运行
首先生成JAR包,然后执行:
storm jar ds.jar SingleJoinExample join
运行结果为:
可以看见,当接收到的元祖没有匹配的id时,会等待另一个元祖到来,然后再聚合。
Reference
- Async-loop-died
- Storm处理Stream Join的简单实例
Storm部署与运行相关推荐
- 亿级流量电商详情页系统实战-34.Storm部署
1.环境准备 1.1 安装Java 7 1.2 Python 2.6.6 2.部署 2.1 下载storm安装包 下载apache-storm-1.1.0.tar.gz上传至 解压缩 # cd /us ...
- Visual Studio 2013开发 mini-filter driver step by step (2) - 编译,部署,运行
编译driver 一个基本的mini filter项目创建好了以后,就可以编译,部署和运行了,在部署之前,我们要先确定在什么样的系统上运行,我有一台windows 2008 r2拟机,所以我的运行的目 ...
- 在SAP云平台上部署和运行Docker应用
容器技术,Docker,虚拟化,这些名词诞生尽管有很长一段时间了,但是在云原生开发领域仍旧热度不减.甚至连SAP赖以成名的ABAP Netweaver,如今也踏上了容器化的上云探索之路,比如下面这张来 ...
- Xamarin 跨移动端开发系列(01) -- 搭建环境、编译、调试、部署、运行
(本文是基于老版本的VS和Xamarin,而VS2017已经集成了Xamarin,所以,本文已经过时,最新的Xamarin开发介绍请参见 使用 Xamarin开发手机聊天程序 .) 如果是.NET开发 ...
- 项目 11 部署与运行
项目 11 部署与运行 #为什么要运行多个tornado实例 #网页响应不是特别的计算密集型处理,对于计算机来说不会消耗太多的CPU,运行多个实例可以充分利用CPU#多实例又会有多端口的问题,在下面的 ...
- zk-03-Zookeeper部署和运行
环境准备: 准备Java运行环境,确保你已经安装了 java7 或者更高的版本: 下载 ZooKeeper 安装包:https://zookeeper.apache.org/releases.html ...
- 华为Atlas200DK的环境部署与运行demo(人脸识别)
文章目录 前言 一.部署准备 1.基本准备 2.安全清空sd卡 3.安装摄像头 二.环境部署 1.运行环境与开发环境合设 1.烧录dd镜像 2.开发板启动 3.开发板连接PC机 1.USB端口连接 2 ...
- IDEA从零到精通06之创建web项目及部署tomcat运行
文章目录 作者简介 引言 导航 热门专栏推荐 视频讲解 概述 一.创建web项目 二.修改部署信息 三.启动服务 四.关闭服务 五.创建Servlet并访问 六.引入第三方jar包 小结 导航 热门专 ...
- 以太坊部署_从以太坊开始-部署和运行合同
以太坊部署 在过去的几周中,我们设置了工具并编写了合同(并对其进行了测试). 绝对是时候"真正"部署合同并运行它了. 这是第3 次后在开始复仇重点series.Other职位包括: ...
最新文章
- 密度聚类算法DBSCAN实战及可视化分析
- C6000系列DSP的内联函数
- base64/32/16编码
- VTK:Filtering之ImplicitBooleanDemo
- VS一直停留在“正在还原nuget程序包”
- strrchr php,php strstr() strrchr() strpos() strrpos()函数_PHP教程
- Spring Boot的事务管理注解@EnableTransactionManagement的使用
- 一个网页设计师应该考虑的9件事
- 最小环 floyd java_Floyd最小环
- flex 内嵌js文件
- 外星人双系统ubuntu18.04安装killer E3100网卡驱动
- 计算机视觉教程7-3:Openpose配置与实践
- ios原生条形码扫描 效率低下原因
- 计算机科学本质源自于数学思维,计算思维的特点、特征:形式化、程序化、机械化...
- [轉貼]奋斗5年从月薪3500到700万!
- 发布下今天学习内容--CentOS7安装Oracle 11gR2 图文详解
- js 实现时分秒的转换
- 做了一个网页版的串口调试助手
- 维基百科推荐算法阅读总结
- dotnet 从入门到放弃的 500 篇文章合集