现在要用storm做一个计数器,我的方案是:不断地输入一串字符串,然后统计每个单词的频数.

这篇博客从以下几个方面进行阐述:

  1. 基本配置
  2. 流程分析与类的确定
  3. 奉上代码(含注释)
一:基本配置
这里注意,导包的时候要注意,否则可能会出现神奇的强制类型转换或是提示你在使用一个不存在的方法
  • 三台搭建好storm集群的linux虚拟机centos7
  • 一台用于编程的window8.1虚拟机
  • 这三台虚拟机使用桥接模式,即与宿主机在同一个网络中
二.流程分析与类的确定
  • TopologyWordCount ------用于构建整个逻辑拓扑,是整个storm的核心
  • CreateSpout ------源源不断的创建原始字符串
  • SplitBolt ------把原始字符串分割为单词后,把每个单词发送出去
  • CountBolt ------对传来的单词进行频数记录
  • PrintBolt ------把所有结果进行打印
注:既然可以源源不断的创建字符串,那么PrintBolt要打印结果就需要有一个时间限制,在这里,设定10s打印一次.
三.奉上代码
先奉上pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>storm</groupId><artifactId>storm</artifactId><version>1.0-SNAPSHOT</version><!--上面的按照你自己的项目来就好,修改下面的内容就可以了--><properties><jstorm.version>2.1.1</jstorm.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><slf4j.version>1.7.12</slf4j.version><joad-time.version>2.9.4</joad-time.version><storm-kafka.version>0.9.4</storm-kafka.version><kafka.version>0.9.0.0</kafka.version><esper.version>5.4.0</esper.version></properties><dependencies><!-- https://mvnrepository.com/artifact/com.espertech/esper --><!-- https://mvnrepository.com/artifact/joda-time/joda-time --><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>${joad-time.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>${kafka.version}</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba.jstorm</groupId><artifactId>jstorm-core</artifactId><version>${jstorm.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka --><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka</artifactId><version>${storm-kafka.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-jdk14</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-nop</artifactId><version>${slf4j.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.7</source><target>1.7</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><archive><manifest><addClasspath>true</addClasspath><mainClass>TopologyWordCount.java</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-my-jar-with-dependencies</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
  • TopologyWordCount 用于构建整个逻辑拓扑,是整个storm的核心
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;public class TopologyWordCount {public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {TopologyBuilder builder = new TopologyBuilder();
//设置数据源builder.setSpout("spout", new CreateSpout(), 1);//读取spout的数据源,完成切分字符串的操作builder.setBolt("split", new SplitBolt(), 1).shuffleGrouping("spout");//读取split后的数据,进行count(tick周期10秒)builder.setBolt("count", new CountBolt(), 1).fieldsGrouping("split", new Fields("word"));//读取show之后的缓冲后的数据,进行最终的打印builder.setBolt("final", new PrintBolt(), 1).shuffleGrouping("count");
/*---------------套路--------------------*/Config config = new Config();config.setDebug(false);//集群模式if (args != null && args.length > 0) {config.setNumWorkers(2);StormSubmitter.submitTopology(args[0], config, builder.createTopology());//单机模式} else {config.setMaxTaskParallelism(1);LocalCluster cluster = new LocalCluster();cluster.submitTopology("word-count", config, builder.createTopology());Thread.sleep(3000000);cluster.shutdown();}}
}

这里定义了整个拓扑结构,在学习时可能 .shuffleGrouping() 这种函数不太明白,这个叫做storm的分组策略,现在讲解太细致不太好,因此我用下面的这个图简单的告诉大家大致的含义:

在这里,那个.shuffleGrouping()传入的参数就表明它要接收的数据是来自哪个Bolt 或是Spout
  • CreateSpout 源源不断的创建原始字符串
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.joda.time.DateTime;import java.util.Map;/*** 创建数据*/
public class CreateSpout extends BaseRichSpout {private SpoutOutputCollector collector;private String[] sentences = null; //用来存放数据@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.collector = spoutOutputCollector;sentences = new String[]{"Hahahaha! I am coming ~~"}; // "Hahaha...这个字符串即为要源源不断发送的信息"}@Overridepublic void nextTuple() {/*storm会循环的调用这个方法*//*线程进行休眠,10s发送一次数据,在这10s内,让其余工作进行*/Utils.sleep(10000);//获得数据源System.out.println(new DateTime().toString("HH:mm:ss") + "--------------CreateSpout 开始发送数据----------");this.collector.emit(new Values(sentences)); //发送出去}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//发送时设置接收方的Tuple实例中的keyoutputFieldsDeclarer.declare(new Fields("sentence"));}}

这里为什么要休眠10s,就是为了让整个流程每10s执行一遍,否则很难看清楚整个流程是如何执行的.
  • SplitBolt 把原始字符串分割为单词后,把每个单词发送出去
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.joda.time.DateTime;import java.util.Map;/*** 按照空格切分字符串,然后推出去*/
public class SplitBolt extends BaseRichBolt {private OutputCollector outputCollector;private int countTime = 0;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.outputCollector = collector;}@Overridepublic void execute(Tuple tuple) {/*这是官方文档中 tuple.getString(int i)的解释:Returns the String at position i in the tuple. If that field is not a String, you will get a runtime error.public String getString ( int i);*/String sentence = tuple.getString(0);for (String word : sentence.split(" ")) {System.out.println(new DateTime().toString("HH:mm:ss") + "--------------------SplitBolt 开始运行--------------------\n" + "> > > >  第"+count() +"次发送数据,这次发送的是:" + word);outputCollector.emit(new Values(word));}}//这是为了得到当前一共发送了多少个单词了,加深理解private int count() {return ++countTime;}/*在发射的时候,将接收方的tuple中的 key 设置为"word"*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}
}

  • CountBolt 对传来的单词进行频数记录
import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.TupleHelpers;
import org.joda.time.DateTime;import java.util.HashMap;
import java.util.Map;public class CountBolt extends BaseRichBolt {private Map<String, Integer> counts = new HashMap<>();private OutputCollector outputCollector;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.outputCollector = collector;}@Overridepublic Map<String, Object> getComponentConfiguration() {Map<String, Object> conf = new HashMap<String, Object>();/*加入Tick时间窗口, 统计*//*------------------?????????????????????????---------------------------*/conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);return conf;}@Overridepublic void execute(Tuple tuple) {/*时间窗口定义为10s内的统计数据,统计完毕后,发射到下一阶段的bolt进行处理*///发射完成后return结束,开始新一轮的事件窗口计数操作if (TupleHelpers.isTickTuple(tuple)) {/*来判断是否应该发射当前窗口数据*/System.out.println((new DateTime().toString("HH:mm:ss")) + "--------------------sumWordBolt 开始运行--------------------\n发送的数据内容是" + counts);outputCollector.emit(new Values(counts));return;}/*如果没有到发送时间,就继续统计wordcount*/String word = tuple.getStringByField("word");Integer count = counts.get(word);if (count == null) {count = 0;}count++;counts.put(word, count);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word_map"));}
}

  • PrintBolt 把所有结果进行打印
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import org.joda.time.DateTime;import java.util.Map;public class PrintBolt extends BaseRichBolt {@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Overridepublic void execute(Tuple input) {System.out.println(new DateTime().toString("HH:mm:ss") + "--------------------final bolt 开始运行--------------------");/*----------???????????????????????---------------------------*/Map<String, Integer> counts = (Map<String, Integer>) input.getValue(0);/*最后一个阶段,将最后的结果打印出来*/System.out.println(justForm(20-8)+"key"+justForm(20-8)+"      "+"value");for (Map.Entry<String, Integer> kv : counts.entrySet()) {/*这里的justForm()函数是为了保证格式一致*/System.out.println(kv.getKey() + justForm(kv.getKey().length()) + " 频数 : " + kv.getValue());}}//保证格式一致的私有方法private String justForm(int length) {for (int i = 0; i < 20 - length; i++) {System.out.print(" ");}return "";}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

在这里,如果有谜一般的强制类型转换,或是方法上的报错,估计是导包的时候错了,认真检查一下是不是导包导错了

运行之后的效果图可以帮助你理解整个storm的流程:


storm计数器(小白看懂系列)相关推荐

  1. python3 线程池源码解析_5分钟看懂系列:Python 线程池原理及实现

    概述 传统多线程方案会使用"即时创建, 即时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器 ...

  2. dict实现原理 python_5分钟看懂系列:Python 线程池原理及实现

    本文的文字及图片来源于网络,仅供学习.交流使用,不具有任何商业用途,如有问题请及时联系我们以作处理 概述 传统多线程方案会使用"即时创建, 即时销毁"的策略.尽管与创建进程相比,创 ...

  3. ( 保证能看懂系列)SVM系列(一)hard-margin SVM 详细原理

    感谢youtube上的视频对SVM的详细推导 https://www.youtube.com/watch?v=ZF2QR7nSUhg&list=PLOxMGJ_8X74Z1N3OcacUaCx ...

  4. 【Leetcode】Python实现字符串转整数 (atoi) - 详细备注,保证小白看懂

    ''' 6 字符串转整数(atoi) 实现 atoi,将字符串转为整数.在找到第一个非空字符之前,需要移除掉字符串中的空格字符.如果第一个非空字符是正号或负号,选取该符号,并将其与后面尽可能多的连续的 ...

  5. 【我奶奶都能看懂系列005】☀️python基础语法——容器,小学生也可以学!

  6. 【我奶奶都能看懂系列016】Python进程和线程的使用

  7. Java实现双链表(数据结构.小白看懂版)

    双向链表有前后两个指针,一个指向直接前驱节点,另一个指向直接后继节点.完整的双向链表如下图(原谅手画比较粗糙)所示: 在实现代码时,需要定义一个节点类,并且初始化一下他们的指针,如图所示:  节点类实 ...

  8. 《小学生都能看懂的生成函数从入门到升天教程》《生成函数全家桶》

    整理的算法模板合集: ACM模板 点我看算法全家桶系列!!! 实际上是一个全新的精炼模板整合计划 小学生都能看懂系列 目录 0x00 生成函数 0x10 例题引入 0x11 ExampleA\tt E ...

  9. 《小学生都能看懂的群论从入门到升天教程》 《群论全家桶》

    整理的算法模板合集: ACM模板 点我看算法全家桶系列!!! 实际上是一个全新的精炼模板整合计划 小学生都能看懂系列,小学生:我太难了   群论.置换.Bunrnside引理.Pόlya定理等概念是群 ...

最新文章

  1. matlab减,matlab-线性代数 矩阵的加、减、乘、除
  2. keyset(),entryset() 遍历 (转)
  3. SQLSTATE[HY000] [2002] 乱码解决方法
  4. android模糊后面视频,在安卓手机上怎么制作中间是横视频上下是模糊效果的竖视频?手机视频短片制作...
  5. Aspx页面生命周期(转)
  6. anaconda学习python_python深度学习笔记1-Anaconda软件安装
  7. leetcode面试题 08.03. 魔术索引(二分)
  8. 【今日CV 计算机视觉论文速览 91期】Mon, 1 Apr 2019
  9. python列表元组_Python列表元组操作
  10. c语言大作业 模拟泊松分布,C语言下泊松分布以及指数分布随机数生成器实现
  11. 7-5 全量复制和部分复制
  12. python自动化开发-[第十四天]-javascript(续)
  13. vsftp虚拟帐户配置
  14. My BlackBerry
  15. 南京全栈python培训
  16. 三级等保成标配,互联网医院安全架构报告发布
  17. 日文输入键盘罗马字对应表
  18. excel中插入图表改变横纵坐标问题
  19. Vue之filters传参问题
  20. RFM模型—零售数据实战

热门文章

  1. php生成标准excel表格,php导出生成excel表格几种方法介绍
  2. Unity3D核心类型一览
  3. OpenGL ES 简单教程
  4. Facebook 数据的横向扩展
  5. POJ 2348 Euclid's Game(博弈)题解
  6. JavaScript--函数
  7. 1月4日云栖精选夜读:阿里工程师如何叫外卖?99%的人猜不到
  8. php中::双冒号有什么作用
  9. 理解和使用SQL Server中的并行
  10. mysql mysqlslap压力测试用例