Storm API文档网址如下:

http://storm.apache.org/releases/current/javadocs/index.html

一、关联代码

使用maven,代码如下。

pom.xml  和Storm入门(三)HelloWorld示例相同

RandomSentenceSpout.java

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package cn.ljh.storm.wordcount; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; import java.util.Random; public class RandomSentenceSpout extends BaseRichSpout { private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class); SpoutOutputCollector _collector; Random _rand; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")}; final String sentence = sentences[_rand.nextInt(sentences.length)]; LOG.debug("Emitting tuple: {}", sentence); _collector.emit(new Values(sentence)); } protected String sentence(String input) { return input; } @Override public void ack(Object id) { } @Override public void fail(Object id) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } // Add unique identifier to each tuple, which is helpful for debugging public static class TimeStamped extends RandomSentenceSpout { private final String prefix; public TimeStamped() { this(""); } public TimeStamped(String prefix) { this.prefix = prefix; } protected String sentence(String input) { return prefix + currentDate() + " " + input; } private String currentDate() { return new SimpleDateFormat("yyyy.MM.dd_HH:mm:ss.SSSSSSSSS").format(new Date()); } } }

WordCountTopology.java

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package cn.ljh.storm.wordcount; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; public class WordCountTopology { public static class SplitSentence implements IRichBolt { private OutputCollector _collector; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } public Map<String, Object> getComponentConfiguration() { return null; } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple input) { String sentence = input.getStringByField("word"); String[] words = sentence.split(" "); for(String word : words){ this._collector.emit(new Values(word)); } } public void cleanup() { // TODO Auto-generated method stub  } } public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static class WordReport extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getStringByField("word"); Integer count = tuple.getIntegerByField("count"); this.counts.put(word, count); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public void cleanup() { System.out.println("-----------------FINAL COUNTS START-----------------------"); List<String> keys = new ArrayList<String>(); keys.addAll(this.counts.keySet()); Collections.sort(keys); for(String key : keys){ System.out.println(key + " : " + this.counts.get(key)); } System.out.println("-----------------FINAL COUNTS END-----------------------"); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); //ShuffleGrouping:随机选择一个Task来发送。 builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); //FiledGrouping:根据Tuple中Fields来做一致性hash,相同hash值的Tuple被发送到相同的Task。 builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); //GlobalGrouping:所有的Tuple会被发送到某个Bolt中的id最小的那个Task。 builder.setBolt("report", new WordReport(), 6).globalGrouping("count"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(20000); cluster.shutdown(); } } }

二、执行效果

转载于:https://www.cnblogs.com/xuxiuxiu/p/7020407.html

Storm入门(四)WordCount示例相关推荐

  1. Storm入门与实践(3)通过WordCount展开Storm的编程之旅

    介绍 貌似WordCount已经成了大数据,分布式计算的入门标配程序,其实仔细想一下WordCount的例子,它还有很用应用的场景,例如统计过去一段时间网站中各个商品的浏览量,最近一段时间相同查询的数 ...

  2. 《Storm入门》中文版

    本文翻译自<Getting Started With Storm>译者:吴京润    编辑:郭蕾 方腾飞 本书的译文仅限于学习和研究之用,没有原作者和译者的授权不能用于商业用途. 译者序 ...

  3. (转载)MFC入门(四)  作者 zhoujiamurong

    关键字 MFC 原作者姓名 zhoujiamurong 介绍 这一节,介绍工具条和状态栏 读者评分 71 评分次数 18 正文 MFC入门(四) 工具条和状态栏 原创  作者:zhoujiamuron ...

  4. 大数据入门:WordCount程序解析

    大数据入门:WordCount程序解析 文章目录 大数据入门:WordCount程序解析 一.输入 二.程序解析 三.输出 四.需要注意的地方 为一个顽固的家伙写的,都要考试了还不让我给详细讲一下,我 ...

  5. Hadoop入门(四)——模板虚拟机环境准备(图文详解步骤2021)

    Hadoop入门(四)--模板虚拟机环境准备(图文详解步骤2021) 系列文章传送门 这个系列文章传送门: Hadoop入门(一)--CentOS7下载+VM上安装(手动分区)图文步骤详解(2021) ...

  6. mongoDB 入门指南、示例

    http://www.cnblogs.com/hoojo/archive/2011/06/01/2066426.html mongoDB 入门指南.示例 上一篇:简单介绍mongoDB 一.准备工作 ...

  7. WPF入门(四)-线形区域Path内容填充之填充图(ImageBrush)

    WPF入门(四)->线形区域Path内容填充之填充图(ImageBrush) 原文:WPF入门(四)->线形区域Path内容填充之填充图(ImageBrush) 前面我们提到了Linear ...

  8. Storm入门之第一章

    原书下载地址 译者:吴京润   编辑:方腾飞 译者注:本文翻译自<Getting Started With Storm>,本书中所有Storm相关术语都用斜体英文表示. 这些术语的字面意义 ...

  9. 文本分类入门(四)训练Part 1

    文本分类入门(四)训练Part 1 训练,顾名思义,就是training(汗,这解释),简单的说就是让计算机从给定的一堆文档中自己学习分类的规则(如果学不对的话,还要,打屁屁?). 开始训练之前,再多 ...

  10. Python爬虫入门四之Urllib库的高级用法

    1.设置Headers 有些网站不会同意程序直接用上面的方式进行访问,如果识别有问题,那么站点根本不会响应,所以为了完全模拟浏览器的工作,我们需要设置一些Headers 的属性. 首先,打开我们的浏览 ...

最新文章

  1. kafka数据丢失的场景
  2. 【跃迁之路】【732天】程序员高效学习方法论探索系列(实验阶段489-2019.2.22)...
  3. 【RHCA翻译计划】EX436第一章:集群存储概论2
  4. 顶级风投First Round Capital对创业者的30个建议
  5. from .filename import class
  6. 新闻发布项目——实体类(categoryTB)
  7. HBase BlockCache系列 - 探求BlockCache实现机制
  8. 16位浮点 c语言,C语言中的16位浮点乘法
  9. 信息学奥赛一本通 1125:矩阵乘法 | OpenJudge NOI 1.8 08
  10. VMware安装系统时“无法创建新虚拟机: 不具备执行此操作的权限“的解决方案
  11. iOS语言中的代理模式
  12. 路孚特:300天350个版本,旗舰移动产品“0”到“1”的交付之路
  13. 禁忌搜索算法求解 TSP 问题的代码示例
  14. 简单MFC ActiveX插件例子
  15. 如何解决“Appstore无法下载软件”的问题
  16. vue插槽面试题_vue面试题总结
  17. cesium is not defined
  18. 数据结构目录树(严蔚敏王道)版
  19. GitChat 是一个怎样的产品?
  20. 程序员增加收入的实用之道

热门文章

  1. django传递临时数据
  2. eclipse sdk 无法更新
  3. 触发器 索引视图 游标 事务
  4. 配置Spring.NET
  5. synchronized关键字的4种用法
  6. oracle rpad()函数
  7. ASP实例讲解:用分页符实现长文章分页显示
  8. Python学习:3.Python学习基础
  9. RXJAVA之Subject
  10. Servlet JSP - 转发与重定向的区别