在线实时大数据平台Storm并行度试验
集群模式试验:同一文件输入数据如何处理,数据变量共享
1)集群模式一个worker内一个spout一个Bolt
jps:1个worker
storm list:1个wokers,4个tasks
2)集群模式一个worker内一个spout 两个Bolt
jps:1个worker
storm list:1个wokers,6个tasks
不同bolt线程之间对变量counter是互斥读写的。试验证明多bolt可以同时对同一变量进行操作。
3)集群模式一个worker内两个spout两个Bolt
jps:1个worker
storm list:1个wokers,7个tasks
试验证明多spout对同一文件输入源会重复处理数据,spout线程间对输入源不会互斥读取。
基于以上三个试验,说明在同一进程内(worker),spout线程间对输入不能互斥(会重复处理数据,只能每个spout提供不同输入源),bolt线程间对变量是互斥的。可以理解,进程内部对多线程共享变量是有互斥控制,但对外部数据是不控制(spout是获取外部数据的)。
4)集群模式两个worker内一个spout一个Bolt
jps:2个worker
storm list:2个wokers,5个tasks
这个试验意义不大,主要是观察worker和task数。只有一个spout不会重复处理数据。
5)集群模式两个worker内一个spout两个Bolt
jps:2个worker
storm list:2个wokers,7个tasks
这个试验意义不大,主要是观察worker和task数。只有一个spout不会重复处理数据,多bolt间可以互斥访问变量。
6)集群模式两个worker内两个spout两个Bolt
jps:2个worker
storm list:2个wokers,8个tasks
多spout会重复读取同一输入源的数据。跨进程不能共享变量。
通过上面试验可以得出:
1)进程间(worker)是不能共享互斥访问变量;
2)线程间(spout)是不能共享互斥读取同一文件;
3)线程间(bolt)是可以共享互斥访问变量;
通过上面这个图,更好理解:
1)多个spout要提供不同输入源,同一文件会重复处理;
2)多个bolt间可以汇聚统计不同spout发射过来的同主题数据;
通过上面试验,实际上,对spout/bolt框架还是不能够全面了解,那些代码是storm框架控制,那些是自己控制,搞明白这个,就是在变量定义以及数据是否存储到磁盘来共享设计topology。
代码如下:
package cn.wc;import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;public class TopologyMain {public static void main(String[] args) throws InterruptedException {//Configuration Config conf = new Config(); conf.setNumWorkers(1);//设置2个进程 conf.put("inpath", args[0]); //输入文件路径conf.put("outpath", args[1]); //输出结果路径//Topology definition TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader",new WordReader(),2); builder.setBolt("word-normalizer",new WordNormalizer(),2).shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word")); //集群模式 try { //storm jar /mnt/wc.jar cn.wc.TopologyMain /tmp/topoin.txt /tmp/topoout.logStormSubmitter.submitTopology("topoword", conf, builder.createTopology()); } catch (AlreadyAliveException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvalidTopologyException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (AuthorizationException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}
package cn.wc;import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;//Spout作为数据源,它实现了IRichSpout接口,功能是读取一个文本文件并把它的每一行内容发送给bolt。
public class WordReader extends BaseRichSpout {private SpoutOutputCollector collector;private FileReader fileReader;private boolean completed = false;boolean ass=false;public void ack(Object msgId) {System.out.println("OK:"+msgId);}public void close() {}public void fail(Object msgId) {System.out.println("FAIL:"+msgId);}/*** The only thing that the methods will do It is emit each * file line* spout最主要的方法,读取文本文件,并把它的每一行发射出去(给bolt) * 这个方法会不断被调用,为了降低它对CPU的消耗,当任务完成时让它sleep一下 */public void nextTuple() {/*** The nextuple it is called forever, so if we have been readed the file* we will wait and then return*/if(completed){try {Thread.sleep(1000);} catch (InterruptedException e) {//Do nothing}return;}String str;//Open the readerBufferedReader reader = new BufferedReader(fileReader);try{//Read all lineswhile((str = reader.readLine()) != null){/*** By each line emmit a new value with the line as a their* 发射每一行,Values是一个ArrayList的实现 */if(str=="a" && ass) return;//如果对行值为a已经处理,就返回if(str=="a" && !ass) {//用于判断跨进程是否可以共享变量ass = true;this.collector.emit(new Values(str),str); }elsethis.collector.emit(new Values(str),str);}}catch(Exception e){throw new RuntimeException("Error reading tuple",e);}finally{completed = true;}}/*** We will create the file and get the collector object* 三个参数,第一个是创建Topology时的配置,第二个是所有的Topology数据,第三个是用来把Spout的数据发射给bolt * */public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {//获取创建Topology时指定的要读取的文件路径 this.fileReader = new FileReader(conf.get("inpath").toString());} catch (FileNotFoundException e) {throw new RuntimeException("Error reading file ["+conf.get("inpath")+"]");}//初始化发射器this.collector = collector;}/*** Declare the output field "line"*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("line"));}
}
package cn.wc;import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;//Spout已经成功读取文件并把每一行作为一个tuple(在Storm数据以tuple的形式传递)发射过来,我们这里需要创建两个bolt分别来负责解析每一行和对单词计数。
//Bolt中最重要的是execute方法,每当一个tuple传过来时它便会被调用。
public class WordNormalizer extends BaseBasicBolt {public void cleanup() {}/*** The bolt will receive the line from the* words file and process it to Normalize this line* * The normalize will be put the words in lower case* and split the line to get all words in this * bolt中最重要的方法,每当接收到一个tuple时,此方法便被调用 * 这个方法的作用就是把文本文件中的每一行切分成一个个单词,并把这些单词发射出去(给下一个bolt处理) */public void execute(Tuple input, BasicOutputCollector collector) {String sentence = input.getString(0);String[] words = sentence.split(" ");for(String word : words){word = word.trim();if(!word.isEmpty()){word = word.toLowerCase();collector.emit(new Values(word));}}}/*** The bolt will only emit the field "word" */public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}
}
package cn.wc;import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;public class WordCounter extends BaseBasicBolt {Integer id;String name;Map<String, Integer> counters;String outpath=null;/*** At the end of the spout (when the cluster is shutdown* We will show the word counters* Topology执行完毕的清理工作,比如关闭连接、释放资源等操作都会写在这里 */@Overridepublic void cleanup() {/*System.out.println("-- Word Counter ["+name+"-"+id+"] --");for(Map.Entry<String, Integer> entry : counters.entrySet()){System.out.println(entry.getKey()+": "+entry.getValue());}*/}/*** On create */@Overridepublic void prepare(Map stormConf, TopologyContext context) {this.counters = new HashMap<String, Integer>();this.name = context.getThisComponentId();this.id = context.getThisTaskId();outpath=stormConf.get("outpath").toString();}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String str = input.getString(0);Integer c=1;/*** If the word dosn't exist in the map we will create* this, if not We will add 1 */if(!counters.containsKey(str)){counters.put(str, 1);}else{c = counters.get(str) + 1;counters.put(str, c);}//写入文件try{File file=new File(outpath);if(!file.exists())file.createNewFile();FileOutputStream out=new FileOutputStream(file,true); StringBuffer sb=new StringBuffer();sb.append(str+": "+c);sb.append("\r\n");out.write(sb.toString().getBytes("utf-8")); }catch (IOException e){e.printStackTrace();}}
}
在线实时大数据平台Storm并行度试验相关推荐
- 在线实时大数据平台Storm集群组件学习
Hadoop常用于离线的复杂的大数据处理,Spark常用于离线的快速(轻量级)的大数据处理, Storm常用于在线的实时的大数据处理:这句话一定程度上反应了三套大数据平台的鲜明特征.Storm是一套实 ...
- 在线实时大数据平台Storm输入源共享试验
1.背景:topology程序提交集群模式运行试验,验证在同一文件输入源情况下,worker之间是否会重复输入处理,以及数据变量能否在不同worker之间共享,如果文件新增数据,topology会不会 ...
- 在线实时大数据平台Storm并行和通信机制理解
1.storm系统角色和应用组件基本理解: 和Hadoop一起理解,清晰点. 1)物理节点Nimubus,负责资源分配和任务调度: 2)物理节点Supervisor负责接受nimbus分配的任务,启动 ...
- 在线实时大数据平台Storm集成redis开发(分布锁)
1.需求场景:spout从ftp列表中拿到未读取的文件读取并发射行到Bolt,bolt进行业务处理后提交下一Bolt入库.用redis主要是:保存文件列表对象,使用分布锁来同步互斥访问共享对象,使文件 ...
- 在线实时大数据平台Storm版本兼容的问题
部署了storm1.0.1最新版,但原来生产的程序是storm0.8.2版本并在该版本环境中运行,直接将程序放到1.0.1环境中storm jar运行失败. 重构程序,引入storm-core-1.0 ...
- 在线实时大数据平台Storm开发之wordcount
可以在Eclipse下通过Maven引入storm-starter项目,这里直接将storm目录下lib中的jar包引入到工程中. 由于storm-core-1.0.1.jar中带有default.y ...
- 在线实时大数据平台Storm本地模式运行的一个小发现
1.现象:生产中分别部署了两台服务器,独立运行storm,然后拓扑程序提交是本地模式,发现不用启动storm和zookeeper也可以运行: #jps 没有下面进程 QuorumPeerMain ...
- 在线实时大数据平台Storm单机部署
centos单机下部署storm,主要用于开发测试用.部署的IP地址用zoo1代替,要注意/etc/hosts下对应ip和主机名. 1.Zookeeper部署 Apache官网http://zooke ...
- 实时大数据平台的设计与实现
实时大数据平台的设计与实现 什么是实时大数据平台 实时大数据平台和离线大数据平台还是有区别的,更强调数据的实时性.具体的架构,具体的代码该怎么写,模块怎么去构建,各个系统之间怎么去组织协调,都需要根据 ...
最新文章
- AI和机器学习如何改善用户体验?
- 原生js实现ajax的文件异步提交功能、图片预览功能.实例
- 注意区分Mb(Mbps)与MB(million bit和million Byte)
- IOS 输入框 placeholder字体的颜色
- git 每次都要输入用户名密码_Git向GitHub提供代码
- AJAX学习笔记(基本使用,请求参数传递,获取服务端响应,错误处理,低版本IE浏览器缓存问题及解决)
- ubuntu15.04在安装完vmware11后打开提示 VMware Kernel Module Updater
- 服装设计与工程_百度百科
- 中级软件测试笔试题100精讲_软件测试工程师笔试题目(含答案)
- win 10 arm iso 文件下载
- 2019 live tex 发行版_下载和安装Texlive2019
- java iv不是内部命令_java – 解密错误:“no iv set when one expected”
- Delphi控件-复合控件
- 【硬见小百科】高速PCB设计中的阻抗匹配
- 一个服务器启动两个mysql实例
- 数据中心服务在资源整合过程的实践
- fp函数式编程_全面了解函数式编程(FP)
- 软件工程课程团队项目——团队日志
- 如何在 Android 手机上实现抓包?
- css3遮盖,关于css3的阴影遮盖问题的小研究
热门文章
- python判断sqlite连接状态_python3 自动识别usb连接状态,即对usb重连的判断方法
- Linux 如何快速找到运行中的进程
- 无法找到脚本文件adsutil.vbs的解决方法
- python学习之散学
- C语言复习2_运算符
- .NET Core2.1下采用EFCore比较原生IOC、AspectCore、AutoFac之间的性能
- 移动端H5混合开发设置复盘与总结
- spring boot / cloud (二) 规范响应格式以及统一异常处理
- iOS - UITableViewCell Custom Selection Style Color
- Linux/Android Kconfig Makefile defconfig 和 .config关系