统计文本中的单词出现的频率,其中文本内容如下:

创建项目

项目结构如下:

创建pom.xml,代码如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.strom</groupId><artifactId>wordCountStromDemo</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><!--<scope>provided</scope>--><version>1.1.0</version></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka</artifactId><version>1.1.0</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.8.2</artifactId><version>0.8.1</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!--告诉运行的主类是哪个,注意根据自己的情况,下面的包名做相应的修改--><mainClass>cn.toto.strom.wordcount.StormTopologyDriver</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin></plugins></build>
</project>

注意其中的mainClass配置,根据自己的项目情况,包名要做相应的变化


使用spout读取数据,其中MyLocalFileSpout的代码如下:

package cn.toto.strom.wordcount;import org.apache.commons.lang.StringUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** Created by maoxiangyi on 2016/8/16.*/
public class MyLocalFileSpout extends BaseRichSpout {private SpoutOutputCollector collector;private BufferedReader bufferedReader;//初始化方法public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;try {this.bufferedReader = new BufferedReader(new FileReader(new File("/home/tuzq/software/stormInstallPath/workdir/aaa.txt")));} catch (FileNotFoundException e) {e.printStackTrace();}}//Storm实时计算的特性就是对数据一条一条的处理//while(true){// this.nextTuple()// }public void nextTuple() {//每被调用一次就会发送一条数据出去try {String line = bufferedReader.readLine();if (StringUtils.isNotBlank(line)){List<Object> arrayList = new ArrayList<Object>();arrayList.add(line);collector.emit(arrayList);}} catch (IOException e) {e.printStackTrace();}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("juzi"));}
}

使用bolt对单词进行分割,MySplitBolt的代码如下:

package cn.toto.strom.wordcount;import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;public class MySplitBolt  extends BaseBasicBolt {public void execute(Tuple input, BasicOutputCollector collector) {//1、数据如何获取String juzi = (String)input.getValueByField("juzi");//2、进行切割String[] strings = juzi.split(" ");//3、发送数据for (String word : strings){//Values 对象帮我们生成一个listcollector.emit(new Values(word,1));}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word","num"));}
}

使用Bolt对单词进行统计,MyWordCountAndPrintBolt的代码如下:

package cn.toto.strom.wordcount;import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;import java.util.HashMap;
import java.util.Map;/*** 代码说明** @author tuzq* @create 2017-06-20 16:50*/
public class MyWordCountAndPrintBolt extends BaseBasicBolt {private Map<String, Integer> wordCountMap = new HashMap<String, Integer>();public void execute(Tuple input, BasicOutputCollector collector) {String word = (String) input.getValueByField("word");Integer num = (Integer) input.getValueByField("num");//1、查看单词对应的value是否存在Integer integer = wordCountMap.get(word);if (integer == null || integer.intValue() == 0) {wordCountMap.put(word,num);}else {wordCountMap.put(word,integer.intValue() + num);}//2、打印数据System.out.println(wordCountMap);}public void declareOutputFields(OutputFieldsDeclarer declarer) {//todo 不需要定义输出的字段}
}

使用TopologyDriver串联spout和bolt进行运行,代码如下:

package cn.toto.strom.wordcount;/*** Created by toto on 2017/6/20.*/import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;/*** 代码说明** @author tuzq* @create 2017-06-20 16:57*/
public class StormTopologyDriver {public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {//1、准备任务信息TopologyBuilder topologyBuilder = new TopologyBuilder();topologyBuilder.setSpout("mySpout", new MyLocalFileSpout(),1);topologyBuilder.setBolt("bolt1", new MySplitBolt(),4).shuffleGrouping("mySpout");topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt(),2).shuffleGrouping("bolt1");//2、任务提交//提交给谁?提交什么内容?Config config = new Config();config.setNumWorkers(2);StormTopology stormTopology = topologyBuilder.createTopology();//本地模式LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("wordcount", config, stormTopology);//集群模式//StormSubmitter.submitTopology("wordcount1", config, stormTopology);}
}

如果是集群模式运行,StormTopologyDriver的代码是:

package cn.toto.strom.wordcount;import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;/** @author tuzq* @create 2017-06-20 16:57*/
public class StormTopologyDriver {public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {//1、准备任务信息TopologyBuilder topologyBuilder = new TopologyBuilder();//使用2个线程来运行topologyBuilder.setSpout("mySpout", new MyLocalFileSpout(),2);//使用4个线程来运行topologyBuilder.setBolt("bolt1", new MySplitBolt(),4).shuffleGrouping("mySpout");//使用2个线程来运行topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt(),2).shuffleGrouping("bolt1");//2、任务提交//提交给谁?提交什么内容?Config config = new Config();config.setNumWorkers(2);StormTopology stormTopology = topologyBuilder.createTopology();//本地模式//LocalCluster localCluster = new LocalCluster();//localCluster.submitTopology("wordcount", config, stormTopology);//集群模式StormSubmitter.submitTopology("wordcount1", config, stormTopology);}
}

StormTopologyDriver 的代码说明:
1.上面有2个worker
2.spout的两个并行度平均分配在两个worker上。每个组件的task数量会被平均分配到worker
3.bolt1的4个并行度平均分配在两个worker上。
4.bolt2的2个并行度平均分配在两个worker上。

一般将多个并行度中的实例,叫做task,默认情况下,一个bolt的并行度是4,代表了4个task.

本地模式运行
可以直接右键Run运行,最终运行的结果如下:

集群模式运行
在idean中对maven项目打包:
由于集群模式下已经有了strom-core-1.1.0XXX.jar,所以在package之前,要修改pom文件,修改storm-core的依赖为(也就是说加上provided,如果是本地模式需要注释这个):

<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><scope>provided</scope><version>1.1.0</version>
</dependency>

如果不修改,将会报如下的错误:

Exception in thread "main" java.lang.ExceptionInInitializerErrorat org.apache.storm.config$read_storm_config.invoke(config.clj:78)at org.apache.storm.config$fn__908.invoke(config.clj:100)at org.apache.storm.config__init.load(Unknown Source)at org.apache.storm.config__init.<clinit>(Unknown Source)at java.lang.Class.forName0(Native Method)at java.lang.Class.forName(Class.java:348)at clojure.lang.RT.classForName(RT.java:2154)at clojure.lang.RT.classForName(RT.java:2163)at clojure.lang.RT.loadClassForName(RT.java:2182)at clojure.lang.RT.load(RT.java:436)at clojure.lang.RT.load(RT.java:412)at clojure.core$load$fn__5448.invoke(core.clj:5866)at clojure.core$load.doInvoke(core.clj:5865)at clojure.lang.RestFn.invoke(RestFn.java:408)at clojure.core$load_one.invoke(core.clj:5671)at clojure.core$load_lib$fn__5397.invoke(core.clj:5711)at clojure.core$load_lib.doInvoke(core.clj:5710)at clojure.lang.RestFn.applyTo(RestFn.java:142)at clojure.core$apply.invoke(core.clj:632)at clojure.core$load_libs.doInvoke(core.clj:5753)at clojure.lang.RestFn.applyTo(RestFn.java:137)at clojure.core$apply.invoke(core.clj:634)at clojure.core$use.doInvoke(core.clj:5843)at clojure.lang.RestFn.invoke(RestFn.java:408)at org.apache.storm.command.config_value$loading__5340__auto____12276.invoke(config_value.clj:16)at org.apache.storm.command.config_value__init.load(Unknown Source)at org.apache.storm.command.config_value__init.<clinit>(Unknown Source)at java.lang.Class.forName0(Native Method)at java.lang.Class.forName(Class.java:348)at clojure.lang.RT.classForName(RT.java:2154)at clojure.lang.RT.classForName(RT.java:2163)at clojure.lang.RT.loadClassForName(RT.java:2182)at clojure.lang.RT.load(RT.java:436)at clojure.lang.RT.load(RT.java:412)at clojure.core$load$fn__5448.invoke(core.clj:5866)at clojure.core$load.doInvoke(core.clj:5865)at clojure.lang.RestFn.invoke(RestFn.java:408)at clojure.lang.Var.invoke(Var.java:379)at org.apache.storm.command.config_value.<clinit>(Unknown Source)
Caused by: java.lang.RuntimeException: java.io.IOException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/home/tuzq/software/stormInstallPath/servers/apache-storm-1.1.0/lib/storm-core-1.1.0.jar!/defaults.yaml, jar:file:/home/tuzq/software/stormInstallPath/workdir/wordCountStromDemo-1.0-SNAPSHOT-jar-with-dependencies.jar!/defaults.yaml]at org.apache.storm.utils.Utils.findAndReadConfigFile(Utils.java:383)at org.apache.storm.utils.Utils.readDefaultConfig(Utils.java:427)at org.apache.storm.utils.Utils.readStormConfig(Utils.java:463)at org.apache.storm.utils.Utils.<clinit>(Utils.java:177)... 39 more
Caused by: java.io.IOException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/home/tuzq/software/stormInstallPath/servers/apache-storm-1.1.0/lib/storm-core-1.1.0.jar!/defaults.yaml, jar:file:/home/tuzq/software/stormInstallPath/workdir/wordCountStromDemo-1.0-SNAPSHOT-jar-with-dependencies.jar!/defaults.yaml]at org.apache.storm.utils.Utils.getConfigFileInputStream(Utils.java:409)at org.apache.storm.utils.Utils.findAndReadConfigFile(Utils.java:362)... 42 more

注意,如果引入的storm-core的jar包要和集群中的jar包版本是一样
如果本地部署和集群部署的storm-core版本不一样,还需要修改代码中的包名结构,否则将会报错

接着执行如下:

接着执行下图的:

进入项目目录,比如我的:

进入target目录:

红框中的jar是带有其它jar包依赖的jar,上面一个jar是不带依赖的jar,集群模式运行的时候使用wordCountStromDemo-1.0-SNAPSHOT-jar-with-dependencies.jar来运行

将wordCountStromDemo-1.0-SNAPSHOT-jar-with-dependencies.jar上传到:/home/tuzq/software/stormInstallPath/workdir,执行以下命令:

[root@hadoop1 workdir]# storm jar wordCountStromDemo-1.0-SNAPSHOT-jar-with-dependencies.jar cn.toto.strom.wordcount.StormTopologyDriver

命令说明:
表示通过storm运行wordCountStromDemo-1.0-SNAPSHOT-jar-with-dependencies.jar中的cn.toto.strom.wordcount.StormTopologyDriver

运行效果:

通过UI界面查看一下程序在哪儿运行:浏览器地址是http://hadoop1:8080/

点击进入,查看效果:

查看最后结果打印位置
点击UI界面中的Blot2


进入日志目录,查看日志结果:

strom-1.1.0模拟单词统计功能,Spout编写,Bolt编写,TopologyDriver编写,本地模式运行,集群模式运行,集群模式下看输出结果相关推荐

  1. php 英文单词 数,php实现单词统计功能

    php如何实现单词统计功能?本文主要为大家详细介绍了php英文单词统计器的实现代码,希望对大家有所帮助. 具体内容如下 程序开始运行, 按"浏览"钮选择一个英文文档, 再按&quo ...

  2. 《编程珠玑》代码之路21:设计一个比C++库函数快一个数量级的《圣经》单词统计功能

    给出这么一个问题,统计<圣经>出现的所有单词以及其出现次数,单词定义为两个空格之间的词语,当然开头结尾这种自然也是单词. 容易想到的是用C++库函数的map,但这其实并不是最优的选择,根据 ...

  3. php mysql 站内消息_php如何开发网易云信消息抄送功能之聊天室消息保存到本地数据库...

    在开发APP时,需要使用到即时通讯功能.这里选择了网易云信.我们要将开发消息抄送功能,并且将聊天室的消息保存到本地的数据库中.其他的比如P2P聊天消息,群组聊天消息,群组操作,好友操作等消息,我这里就 ...

  4. MapReduce之单词统计

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 MapReduce单词统计 一.搭建环境 二.编写WordCount程序 1.在IDEA中引入所需的jar包,IDEA支持文件夹方式引 ...

  5. hadoop集群-单词统计

    1.在用Hadoop进行单词统计前,要做好Hadoop的集群部署 输入上述命令,就能在浏览器中分别访问namenode:50070(namenode指的是你主节点的名字,这里我的主节点名字是namen ...

  6. Hadoop经典案例——单词统计

    通过 Hadoop经典案例--单词统计,来演示 Hadoop集群的简单使用. (1)打开 HDFS 的 UI,选择Utilities→Browse the file system查看分布式文件系统里的 ...

  7. WordCount单词统计笔记

    1.在本机的/root目录下,依次创建文件夹data,文本文件word.txt. mkdir -p /root/data vim /root/data/word.txt 键入i,进入编辑模式,输入如下 ...

  8. python统计文章单词次数_Python实现的统计文章单词次数功能示例

    本文实例讲述了Python实现的统计文章单词次数功能.分享给大家供大家参考,具体如下: 题目是这样的:你有一个目录,放了你一个月的日记,都是 txt,为了避免分词的问题,假设内容都是英文,请统计出你认 ...

  9. python统计英文文章中单词出现的次数并排序_Python实现的统计文章单词次数功能示例...

    本文实例讲述了Python实现的统计文章单词次数功能.分享给大家供大家参考,具体如下: 题目是这样的:你有一个目录,放了你一个月的日记,都是 txt,为了避免分词的问题,假设内容都是英文,请统计出你认 ...

最新文章

  1. Centos 7 解压文件
  2. 参加维基链超级节点竞选有什么好处呢?
  3. 多线程断点续传及下载
  4. POJ 1679 判断最小树是否唯一
  5. super关键字和final关键字
  6. 检索数据_22_根据数据项的值排序
  7. 找二叉树中给定元素的的左孩子结点_LeetCode高频题:二叉树(五)
  8. Vim配置文件(全平台可用)2012-05-01版
  9. ldap2.8.2_Spring LDAP 2.0.0发布
  10. ubuntu使用python读串口_ubuntu16.04上Python串口编程学习1
  11. AD中如何查看快捷键
  12. 最长回文字串--动态规划
  13. 清华大学-操作系统实验-Lab1
  14. html做一个课程表
  15. 电视机顶盒CM311-1A-YST基于openwrt搭建pptpipsec服务器
  16. 电脑从硬盘启动计算机,BIOS怎么设置成从硬盘启动?开启计算机或重新启动
  17. 二级分销商城模式开发
  18. 建行浙江分行总部【等。。。】
  19. 因为文件目录存在空格导致kafka运行错误:提示找不到或者无法加载主类错误
  20. 全国计算机考试一的书,《全国计算机等级考试全能教程》—甲虎网一站式图书批发平台...

热门文章

  1. MySQL之单表查询、多表查询(一)
  2. 最短路径Dijkstra讲解,工具包使用 python
  3. VTK:Cell Edge Neighbors用法实战
  4. JavaScript实现permutate With Repetitions重复排列算法(附完整源码)
  5. OpenCASCADE:建模算法之隐藏线去除
  6. wxWidgets:TextFrame 类的完整实现
  7. wxWidgets:wxRichTextHTMLHandler 类用法
  8. boost::mpi模块实现传输数据类型的骨架和内容的通信器的测试
  9. boost::mpi模块is_mpi_op 功能的测试
  10. boost::graph::isomorphism用法的测试程序