storm计数器(小白看懂系列)
现在要用storm做一个计数器,我的方案是:不断地输入一串字符串,然后统计每个单词的频数.
这篇博客从以下几个方面进行阐述:
- 基本配置
- 流程分析与类的确定
- 奉上代码(含注释)
- 三台搭建好storm集群的linux虚拟机centos7
- 一台用于编程的window8.1虚拟机
- 这三台虚拟机使用桥接模式,即与宿主机在同一个网络中
- TopologyWordCount ------用于构建整个逻辑拓扑,是整个storm的核心
- CreateSpout ------源源不断的创建原始字符串
- SplitBolt ------把原始字符串分割为单词后,把每个单词发送出去
- CountBolt ------对传来的单词进行频数记录
- PrintBolt ------把所有结果进行打印
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>storm</groupId><artifactId>storm</artifactId><version>1.0-SNAPSHOT</version><!--上面的按照你自己的项目来就好,修改下面的内容就可以了--><properties><jstorm.version>2.1.1</jstorm.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><slf4j.version>1.7.12</slf4j.version><joad-time.version>2.9.4</joad-time.version><storm-kafka.version>0.9.4</storm-kafka.version><kafka.version>0.9.0.0</kafka.version><esper.version>5.4.0</esper.version></properties><dependencies><!-- https://mvnrepository.com/artifact/com.espertech/esper --><!-- https://mvnrepository.com/artifact/joda-time/joda-time --><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>${joad-time.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>${kafka.version}</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba.jstorm</groupId><artifactId>jstorm-core</artifactId><version>${jstorm.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka --><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka</artifactId><version>${storm-kafka.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-jdk14</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-nop</artifactId><version>${slf4j.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.7</source><target>1.7</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><archive><manifest><addClasspath>true</addClasspath><mainClass>TopologyWordCount.java</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-my-jar-with-dependencies</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
- TopologyWordCount 用于构建整个逻辑拓扑,是整个storm的核心
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;public class TopologyWordCount {public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {TopologyBuilder builder = new TopologyBuilder();
//设置数据源builder.setSpout("spout", new CreateSpout(), 1);//读取spout的数据源,完成切分字符串的操作builder.setBolt("split", new SplitBolt(), 1).shuffleGrouping("spout");//读取split后的数据,进行count(tick周期10秒)builder.setBolt("count", new CountBolt(), 1).fieldsGrouping("split", new Fields("word"));//读取show之后的缓冲后的数据,进行最终的打印builder.setBolt("final", new PrintBolt(), 1).shuffleGrouping("count");
/*---------------套路--------------------*/Config config = new Config();config.setDebug(false);//集群模式if (args != null && args.length > 0) {config.setNumWorkers(2);StormSubmitter.submitTopology(args[0], config, builder.createTopology());//单机模式} else {config.setMaxTaskParallelism(1);LocalCluster cluster = new LocalCluster();cluster.submitTopology("word-count", config, builder.createTopology());Thread.sleep(3000000);cluster.shutdown();}}
}
这里定义了整个拓扑结构,在学习时可能 .shuffleGrouping() 这种函数不太明白,这个叫做storm的分组策略,现在讲解太细致不太好,因此我用下面的这个图简单的告诉大家大致的含义:
- CreateSpout 源源不断的创建原始字符串
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.joda.time.DateTime;import java.util.Map;/*** 创建数据*/
public class CreateSpout extends BaseRichSpout {private SpoutOutputCollector collector;private String[] sentences = null; //用来存放数据@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.collector = spoutOutputCollector;sentences = new String[]{"Hahahaha! I am coming ~~"}; // "Hahaha...这个字符串即为要源源不断发送的信息"}@Overridepublic void nextTuple() {/*storm会循环的调用这个方法*//*线程进行休眠,10s发送一次数据,在这10s内,让其余工作进行*/Utils.sleep(10000);//获得数据源System.out.println(new DateTime().toString("HH:mm:ss") + "--------------CreateSpout 开始发送数据----------");this.collector.emit(new Values(sentences)); //发送出去}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//发送时设置接收方的Tuple实例中的keyoutputFieldsDeclarer.declare(new Fields("sentence"));}}
- SplitBolt 把原始字符串分割为单词后,把每个单词发送出去
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.joda.time.DateTime;import java.util.Map;/*** 按照空格切分字符串,然后推出去*/
public class SplitBolt extends BaseRichBolt {private OutputCollector outputCollector;private int countTime = 0;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.outputCollector = collector;}@Overridepublic void execute(Tuple tuple) {/*这是官方文档中 tuple.getString(int i)的解释:Returns the String at position i in the tuple. If that field is not a String, you will get a runtime error.public String getString ( int i);*/String sentence = tuple.getString(0);for (String word : sentence.split(" ")) {System.out.println(new DateTime().toString("HH:mm:ss") + "--------------------SplitBolt 开始运行--------------------\n" + "> > > > 第"+count() +"次发送数据,这次发送的是:" + word);outputCollector.emit(new Values(word));}}//这是为了得到当前一共发送了多少个单词了,加深理解private int count() {return ++countTime;}/*在发射的时候,将接收方的tuple中的 key 设置为"word"*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}
}
- CountBolt 对传来的单词进行频数记录
import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.TupleHelpers;
import org.joda.time.DateTime;import java.util.HashMap;
import java.util.Map;public class CountBolt extends BaseRichBolt {private Map<String, Integer> counts = new HashMap<>();private OutputCollector outputCollector;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.outputCollector = collector;}@Overridepublic Map<String, Object> getComponentConfiguration() {Map<String, Object> conf = new HashMap<String, Object>();/*加入Tick时间窗口, 统计*//*------------------?????????????????????????---------------------------*/conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);return conf;}@Overridepublic void execute(Tuple tuple) {/*时间窗口定义为10s内的统计数据,统计完毕后,发射到下一阶段的bolt进行处理*///发射完成后return结束,开始新一轮的事件窗口计数操作if (TupleHelpers.isTickTuple(tuple)) {/*来判断是否应该发射当前窗口数据*/System.out.println((new DateTime().toString("HH:mm:ss")) + "--------------------sumWordBolt 开始运行--------------------\n发送的数据内容是" + counts);outputCollector.emit(new Values(counts));return;}/*如果没有到发送时间,就继续统计wordcount*/String word = tuple.getStringByField("word");Integer count = counts.get(word);if (count == null) {count = 0;}count++;counts.put(word, count);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word_map"));}
}
- PrintBolt 把所有结果进行打印
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import org.joda.time.DateTime;import java.util.Map;public class PrintBolt extends BaseRichBolt {@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(Tuple input) {System.out.println(new DateTime().toString("HH:mm:ss") + "--------------------final bolt 开始运行--------------------");/*----------???????????????????????---------------------------*/Map<String, Integer> counts = (Map<String, Integer>) input.getValue(0);/*最后一个阶段,将最后的结果打印出来*/System.out.println(justForm(20-8)+"key"+justForm(20-8)+" "+"value");for (Map.Entry<String, Integer> kv : counts.entrySet()) {/*这里的justForm()函数是为了保证格式一致*/System.out.println(kv.getKey() + justForm(kv.getKey().length()) + " 频数 : " + kv.getValue());}}//保证格式一致的私有方法private String justForm(int length) {for (int i = 0; i < 20 - length; i++) {System.out.print(" ");}return "";}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}
}
运行之后的效果图可以帮助你理解整个storm的流程:
storm计数器(小白看懂系列)相关推荐
- python3 线程池源码解析_5分钟看懂系列:Python 线程池原理及实现
概述 传统多线程方案会使用"即时创建, 即时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器 ...
- dict实现原理 python_5分钟看懂系列:Python 线程池原理及实现
本文的文字及图片来源于网络,仅供学习.交流使用,不具有任何商业用途,如有问题请及时联系我们以作处理 概述 传统多线程方案会使用"即时创建, 即时销毁"的策略.尽管与创建进程相比,创 ...
- ( 保证能看懂系列)SVM系列(一)hard-margin SVM 详细原理
感谢youtube上的视频对SVM的详细推导 https://www.youtube.com/watch?v=ZF2QR7nSUhg&list=PLOxMGJ_8X74Z1N3OcacUaCx ...
- 【Leetcode】Python实现字符串转整数 (atoi) - 详细备注,保证小白看懂
''' 6 字符串转整数(atoi) 实现 atoi,将字符串转为整数.在找到第一个非空字符之前,需要移除掉字符串中的空格字符.如果第一个非空字符是正号或负号,选取该符号,并将其与后面尽可能多的连续的 ...
- 【我奶奶都能看懂系列005】☀️python基础语法——容器,小学生也可以学!
- 【我奶奶都能看懂系列016】Python进程和线程的使用
- Java实现双链表(数据结构.小白看懂版)
双向链表有前后两个指针,一个指向直接前驱节点,另一个指向直接后继节点.完整的双向链表如下图(原谅手画比较粗糙)所示: 在实现代码时,需要定义一个节点类,并且初始化一下他们的指针,如图所示: 节点类实 ...
- 《小学生都能看懂的生成函数从入门到升天教程》《生成函数全家桶》
整理的算法模板合集: ACM模板 点我看算法全家桶系列!!! 实际上是一个全新的精炼模板整合计划 小学生都能看懂系列 目录 0x00 生成函数 0x10 例题引入 0x11 ExampleA\tt E ...
- 《小学生都能看懂的群论从入门到升天教程》 《群论全家桶》
整理的算法模板合集: ACM模板 点我看算法全家桶系列!!! 实际上是一个全新的精炼模板整合计划 小学生都能看懂系列,小学生:我太难了 群论.置换.Bunrnside引理.Pόlya定理等概念是群 ...
最新文章
- matlab减,matlab-线性代数 矩阵的加、减、乘、除
- keyset(),entryset() 遍历 (转)
- SQLSTATE[HY000] [2002] 乱码解决方法
- android模糊后面视频,在安卓手机上怎么制作中间是横视频上下是模糊效果的竖视频?手机视频短片制作...
- Aspx页面生命周期(转)
- anaconda学习python_python深度学习笔记1-Anaconda软件安装
- leetcode面试题 08.03. 魔术索引(二分)
- 【今日CV 计算机视觉论文速览 91期】Mon, 1 Apr 2019
- python列表元组_Python列表元组操作
- c语言大作业 模拟泊松分布,C语言下泊松分布以及指数分布随机数生成器实现
- 7-5 全量复制和部分复制
- python自动化开发-[第十四天]-javascript(续)
- vsftp虚拟帐户配置
- My BlackBerry
- 南京全栈python培训
- 三级等保成标配,互联网医院安全架构报告发布
- 日文输入键盘罗马字对应表
- excel中插入图表改变横纵坐标问题
- Vue之filters传参问题
- RFM模型—零售数据实战