1、背景:topology程序提交集群模式运行试验,验证在同一文件输入源情况下,worker之间是否会重复输入处理,以及数据变量能否在不同worker之间共享,如果文件新增数据,topology会不会获取最新数据处理。

2、试验代码:

package cn.wc;import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;public class TopologyMain {public static void main(String[] args) throws InterruptedException {//ConfigurationConfig conf = new Config();conf.setNumWorkers(3);//设置3个进程conf.put("wordsFile", args[0]);conf.put("output", args[1]);//Topology definitionTopologyBuilder builder = new TopologyBuilder();builder.setSpout("word-reader",new WordReader(),3);builder.setBolt("word-normalizer",new WordNormalizer(),3).setNumTasks(6).shuffleGrouping("word-reader");builder.setBolt("word-counter", new WordCounter(),3).fieldsGrouping("word-normalizer", new Fields("word"));//集群模式try {StormSubmitter.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());} catch (AlreadyAliveException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InvalidTopologyException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (AuthorizationException e) {// TODO Auto-generated catch blocke.printStackTrace();}  }
}
package cn.wc;import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;//Spout作为数据源,它实现了IRichSpout接口,功能是读取一个文本文件并把它的每一行内容发送给bolt。
public class WordReader extends BaseRichSpout {private SpoutOutputCollector collector;private FileReader fileReader;private boolean completed = false;public void ack(Object msgId) {System.out.println("OK:"+msgId);}public void close() {}public void fail(Object msgId) {System.out.println("FAIL:"+msgId);}/*** The only thing that the methods will do It is emit each * file line* spout最主要的方法,读取文本文件,并把它的每一行发射出去(给bolt) * 这个方法会不断被调用,为了降低它对CPU的消耗,当任务完成时让它sleep一下 */public void nextTuple() {/*** The nextuple it is called forever, so if we have been readed the file* we will wait and then return*/if(completed){try {Thread.sleep(1000);} catch (InterruptedException e) {//Do nothing}return;}String str;//Open the readerBufferedReader reader = new BufferedReader(fileReader);try{//Read all lineswhile((str = reader.readLine()) != null){/*** By each line emmit a new value with the line as a their* 发射每一行,Values是一个ArrayList的实现 */this.collector.emit(new Values(str),str);}}catch(Exception e){throw new RuntimeException("Error reading tuple",e);}finally{completed = true;}}/*** We will create the file and get the collector object* 三个参数,第一个是创建Topology时的配置,第二个是所有的Topology数据,第三个是用来把Spout的数据发射给bolt *  */public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {//获取创建Topology时指定的要读取的文件路径  this.fileReader = new FileReader(conf.get("wordsFile").toString());} catch (FileNotFoundException e) {throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");}//初始化发射器this.collector = collector;}/*** Declare the output field "word"*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("line"));}
}
package cn.wc;import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;//Spout已经成功读取文件并把每一行作为一个tuple(在Storm数据以tuple的形式传递)发射过来,我们这里需要创建两个bolt分别来负责解析每一行和对单词计数。
//Bolt中最重要的是execute方法,每当一个tuple传过来时它便会被调用。
public class WordNormalizer extends BaseBasicBolt {public void cleanup() {}/*** The bolt will receive the line from the* words file and process it to Normalize this line* * The normalize will be put the words in lower case* and split the line to get all words in this * bolt中最重要的方法,每当接收到一个tuple时,此方法便被调用 * 这个方法的作用就是把文本文件中的每一行切分成一个个单词,并把这些单词发射出去(给下一个bolt处理) */public void execute(Tuple input, BasicOutputCollector collector) {String sentence = input.getString(0);String[] words = sentence.split(" ");for(String word : words){word = word.trim();if(!word.isEmpty()){word = word.toLowerCase();collector.emit(new Values(word));}}}/*** The bolt will only emit the field "word" */public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}
}
package cn.wc;import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;public class WordCounter extends BaseBasicBolt {Integer id;String name;Map<String, Integer> counters;String output=null;/*** At the end of the spout (when the cluster is shutdown* We will show the word counters* Topology执行完毕的清理工作,比如关闭连接、释放资源等操作都会写在这里 */@Overridepublic void cleanup() {/*System.out.println("-- Word Counter ["+name+"-"+id+"] --");for(Map.Entry<String, Integer> entry : counters.entrySet()){System.out.println(entry.getKey()+": "+entry.getValue());}*/}/*** On create */@Overridepublic void prepare(Map stormConf, TopologyContext context) {this.counters = new HashMap<String, Integer>();this.name = context.getThisComponentId();this.id = context.getThisTaskId();output=stormConf.get("output").toString();}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String str = input.getString(0);/*** If the word dosn't exist in the map we will create* this, if not We will add 1 */if(!counters.containsKey(str)){counters.put(str, 1);}else{Integer c = counters.get(str) + 1;counters.put(str, c);}//写入文件try{File file=new File(output);if(!file.exists())file.createNewFile();FileOutputStream out=new FileOutputStream(file,true);        for(Map.Entry<String, Integer> entry : counters.entrySet()){StringBuffer sb=new StringBuffer();sb.append(entry.getKey()+": "+entry.getValue());sb.append("\r\n");out.write(sb.toString().getBytes("utf-8"));}  }catch (IOException e){e.printStackTrace();}}
}

3、结果分析:

集群环境下执行:storm jar /mnt/wc.jar cn.wc.TopologyMain /mnt/words.txt /tmp/topo.log
/*并行和通信试验:
 * 设置worker为3,启动3个进程来服务这个topology
 * spout/bolt的线程线程设置为3,默认对应一个task,就是一个进程跑一个task,总共有9个;
 * 现在对word-normalizer这个bolt设置任务6个,那就是每个进程分2个,现在总共12个task;
 * 总的来说:worker进程有3个,executor线程有9个,task任务有12个;
 * 输入:/mnt/words.txt 输出:/tmp/topo.log
*/

1)storm list发现task是15个,不是12个,怎么算就有点疑惑了;

2)输入的词汇,明显被重复统计3次,也就是说3个executor在同一文件输入源下,不会自动去协调输入记录从而排斥;

3)topology程序中设置的变量,无法再executor之间共享;

4)输入的文件新增词汇,topology没有及时去获取统计,当然topology仍然在集群中运行

4、总结:

1)一个topology被提交到不同节点的不同worker(进程)分布执行,要按照独立进程来看;

2)worker内要有自己唯一的输入源,同时要确保输入源是持续提供;

3)要在worker之间共享数据变量,只能通过其他办法,如redis来存储;

也就是说:topology被提交到集群分布式执行,不同worker之间是独立进程运作。

在线实时大数据平台Storm输入源共享试验相关推荐

  1. 在线实时大数据平台Storm集群组件学习

    Hadoop常用于离线的复杂的大数据处理,Spark常用于离线的快速(轻量级)的大数据处理, Storm常用于在线的实时的大数据处理:这句话一定程度上反应了三套大数据平台的鲜明特征.Storm是一套实 ...

  2. 在线实时大数据平台Storm并行度试验

    集群模式试验:同一文件输入数据如何处理,数据变量共享 1)集群模式一个worker内一个spout一个Bolt jps:1个worker storm list:1个wokers,4个tasks 2)集 ...

  3. 在线实时大数据平台Storm并行和通信机制理解

    1.storm系统角色和应用组件基本理解: 和Hadoop一起理解,清晰点. 1)物理节点Nimubus,负责资源分配和任务调度: 2)物理节点Supervisor负责接受nimbus分配的任务,启动 ...

  4. 在线实时大数据平台Storm集成redis开发(分布锁)

    1.需求场景:spout从ftp列表中拿到未读取的文件读取并发射行到Bolt,bolt进行业务处理后提交下一Bolt入库.用redis主要是:保存文件列表对象,使用分布锁来同步互斥访问共享对象,使文件 ...

  5. 在线实时大数据平台Storm开发之wordcount

    可以在Eclipse下通过Maven引入storm-starter项目,这里直接将storm目录下lib中的jar包引入到工程中. 由于storm-core-1.0.1.jar中带有default.y ...

  6. 在线实时大数据平台Storm本地模式运行的一个小发现

    1.现象:生产中分别部署了两台服务器,独立运行storm,然后拓扑程序提交是本地模式,发现不用启动storm和zookeeper也可以运行: #jps  没有下面进程  QuorumPeerMain ...

  7. 在线实时大数据平台Storm版本兼容的问题

    部署了storm1.0.1最新版,但原来生产的程序是storm0.8.2版本并在该版本环境中运行,直接将程序放到1.0.1环境中storm jar运行失败. 重构程序,引入storm-core-1.0 ...

  8. 在线实时大数据平台Storm单机部署

    centos单机下部署storm,主要用于开发测试用.部署的IP地址用zoo1代替,要注意/etc/hosts下对应ip和主机名. 1.Zookeeper部署 Apache官网http://zooke ...

  9. 实时大数据平台的设计与实现

    实时大数据平台的设计与实现 什么是实时大数据平台 实时大数据平台和离线大数据平台还是有区别的,更强调数据的实时性.具体的架构,具体的代码该怎么写,模块怎么去构建,各个系统之间怎么去组织协调,都需要根据 ...

最新文章

  1. performActionForShortcutItem方法未触发
  2. web前端-移动端HTML5微商城项目实战分享案例
  3. 一文看懂Python(三)-----字典篇
  4. 浏览器内存不足导致页面崩溃_深度精读:浏览器渲染原理 [8000字图文并茂]
  5. python函数自定义教程_Python中自定义函数的教程
  6. AV1生态系统更新:2019年6月
  7. 3台廉价机器每秒写入2百万!Kafka为什么那么快?
  8. CVE-2019-0708复现
  9. 系统集成资质培训 - 2013下半年系统集成资质申报及集成资质考试
  10. 东大OJ-1544: GG的战争法则
  11. Matlab中图像函数大全
  12. 10款概念手机,哪款是你的最爱
  13. 黑苹果 ACPI Error:method parse/execut failed SB.AC.ADJP
  14. Java 复制Excel工作表
  15. H3C模拟器安装及解决各种兼容性问题方法
  16. win32 应用程序更换icon图标
  17. C# 读取网卡、设置网上、自动连接Wifi
  18. 常微分方程和偏微分方程
  19. java fastmethod_Java FastMath.cbrt方法代码示例
  20. 电脑读取不U盘,在磁盘管理器中显示“无媒体”解决方法

热门文章

  1. 深入理解JVM虚拟机(四):Class类文件结构(二)
  2. eve模拟器华为镜像_EVE-NG简单入门介绍
  3. kali linux 里vim如何使用_Kali Linux的vi编辑器/vim编辑器使用方法
  4. yolo v3制作自己的数据_小白也能弄懂的目标检测之YOLO系列 第一期
  5. 【BZOJ4873】[六省联考2017]寿司餐厅(网络流)
  6. 获取C#中方法的执行时间及其代码注入
  7. 【设计模式之单例模式InJava】
  8. xunsearch: 开启后台服务,索引……随笔记录
  9. Meditation Guide
  10. WINCE设备开机灰屏问题(很怪异)