Storm入门(四)WordCount示例
Storm API文档网址如下:
http://storm.apache.org/releases/current/javadocs/index.html
一、关联代码
使用maven,代码如下。
pom.xml 和Storm入门(三)HelloWorld示例相同
RandomSentenceSpout.java
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package cn.ljh.storm.wordcount; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; import java.util.Random; public class RandomSentenceSpout extends BaseRichSpout { private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class); SpoutOutputCollector _collector; Random _rand; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")}; final String sentence = sentences[_rand.nextInt(sentences.length)]; LOG.debug("Emitting tuple: {}", sentence); _collector.emit(new Values(sentence)); } protected String sentence(String input) { return input; } @Override public void ack(Object id) { } @Override public void fail(Object id) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } // Add unique identifier to each tuple, which is helpful for debugging public static class TimeStamped extends RandomSentenceSpout { private final String prefix; public TimeStamped() { this(""); } public TimeStamped(String prefix) { this.prefix = prefix; } protected String sentence(String input) { return prefix + currentDate() + " " + input; } private String currentDate() { return new SimpleDateFormat("yyyy.MM.dd_HH:mm:ss.SSSSSSSSS").format(new Date()); } } }
WordCountTopology.java
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package cn.ljh.storm.wordcount; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; public class WordCountTopology { public static class SplitSentence implements IRichBolt { private OutputCollector _collector; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } public Map<String, Object> getComponentConfiguration() { return null; } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple input) { String sentence = input.getStringByField("word"); String[] words = sentence.split(" "); for(String word : words){ this._collector.emit(new Values(word)); } } public void cleanup() { // TODO Auto-generated method stub } } public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static class WordReport extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getStringByField("word"); Integer count = tuple.getIntegerByField("count"); this.counts.put(word, count); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public void cleanup() { System.out.println("-----------------FINAL COUNTS START-----------------------"); List<String> keys = new ArrayList<String>(); keys.addAll(this.counts.keySet()); Collections.sort(keys); for(String key : keys){ System.out.println(key + " : " + this.counts.get(key)); } System.out.println("-----------------FINAL COUNTS END-----------------------"); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); //ShuffleGrouping:随机选择一个Task来发送。 builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); //FiledGrouping:根据Tuple中Fields来做一致性hash,相同hash值的Tuple被发送到相同的Task。 builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); //GlobalGrouping:所有的Tuple会被发送到某个Bolt中的id最小的那个Task。 builder.setBolt("report", new WordReport(), 6).globalGrouping("count"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(20000); cluster.shutdown(); } } }
二、执行效果
转载于:https://www.cnblogs.com/xuxiuxiu/p/7020407.html
Storm入门(四)WordCount示例相关推荐
- Storm入门与实践(3)通过WordCount展开Storm的编程之旅
介绍 貌似WordCount已经成了大数据,分布式计算的入门标配程序,其实仔细想一下WordCount的例子,它还有很用应用的场景,例如统计过去一段时间网站中各个商品的浏览量,最近一段时间相同查询的数 ...
- 《Storm入门》中文版
本文翻译自<Getting Started With Storm>译者:吴京润 编辑:郭蕾 方腾飞 本书的译文仅限于学习和研究之用,没有原作者和译者的授权不能用于商业用途. 译者序 ...
- (转载)MFC入门(四) 作者 zhoujiamurong
关键字 MFC 原作者姓名 zhoujiamurong 介绍 这一节,介绍工具条和状态栏 读者评分 71 评分次数 18 正文 MFC入门(四) 工具条和状态栏 原创 作者:zhoujiamuron ...
- 大数据入门:WordCount程序解析
大数据入门:WordCount程序解析 文章目录 大数据入门:WordCount程序解析 一.输入 二.程序解析 三.输出 四.需要注意的地方 为一个顽固的家伙写的,都要考试了还不让我给详细讲一下,我 ...
- Hadoop入门(四)——模板虚拟机环境准备(图文详解步骤2021)
Hadoop入门(四)--模板虚拟机环境准备(图文详解步骤2021) 系列文章传送门 这个系列文章传送门: Hadoop入门(一)--CentOS7下载+VM上安装(手动分区)图文步骤详解(2021) ...
- mongoDB 入门指南、示例
http://www.cnblogs.com/hoojo/archive/2011/06/01/2066426.html mongoDB 入门指南.示例 上一篇:简单介绍mongoDB 一.准备工作 ...
- WPF入门(四)-线形区域Path内容填充之填充图(ImageBrush)
WPF入门(四)->线形区域Path内容填充之填充图(ImageBrush) 原文:WPF入门(四)->线形区域Path内容填充之填充图(ImageBrush) 前面我们提到了Linear ...
- Storm入门之第一章
原书下载地址 译者:吴京润 编辑:方腾飞 译者注:本文翻译自<Getting Started With Storm>,本书中所有Storm相关术语都用斜体英文表示. 这些术语的字面意义 ...
- 文本分类入门(四)训练Part 1
文本分类入门(四)训练Part 1 训练,顾名思义,就是training(汗,这解释),简单的说就是让计算机从给定的一堆文档中自己学习分类的规则(如果学不对的话,还要,打屁屁?). 开始训练之前,再多 ...
- Python爬虫入门四之Urllib库的高级用法
1.设置Headers 有些网站不会同意程序直接用上面的方式进行访问,如果识别有问题,那么站点根本不会响应,所以为了完全模拟浏览器的工作,我们需要设置一些Headers 的属性. 首先,打开我们的浏览 ...
最新文章
- kafka数据丢失的场景
- 【跃迁之路】【732天】程序员高效学习方法论探索系列(实验阶段489-2019.2.22)...
- 【RHCA翻译计划】EX436第一章:集群存储概论2
- 顶级风投First Round Capital对创业者的30个建议
- from .filename import class
- 新闻发布项目——实体类(categoryTB)
- HBase BlockCache系列 - 探求BlockCache实现机制
- 16位浮点 c语言,C语言中的16位浮点乘法
- 信息学奥赛一本通 1125:矩阵乘法 | OpenJudge NOI 1.8 08
- VMware安装系统时“无法创建新虚拟机: 不具备执行此操作的权限“的解决方案
- iOS语言中的代理模式
- 路孚特:300天350个版本,旗舰移动产品“0”到“1”的交付之路
- 禁忌搜索算法求解 TSP 问题的代码示例
- 简单MFC ActiveX插件例子
- 如何解决“Appstore无法下载软件”的问题
- vue插槽面试题_vue面试题总结
- cesium is not defined
- 数据结构目录树(严蔚敏王道)版
- GitChat 是一个怎样的产品?
- 程序员增加收入的实用之道