2019独角兽企业重金招聘Python工程师标准>>>

在上一次单词计数的基础上做如下改动: 使用 自定义  分组策略,将首字母相同的单词发送给同一个task计数

自定义 CustomStreamGrouping

package com.zhch.v4;import backtype.storm.generated.GlobalStreamId;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.task.WorkerTopologyContext;import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;public class ModuleGrouping implements CustomStreamGrouping, Serializable {private List<Integer> tasks;@Overridepublic void prepare(WorkerTopologyContext workerContext, GlobalStreamId streamId, List<Integer> targetTasks) {this.tasks = targetTasks;}@Overridepublic List<Integer> chooseTasks(int taskId, List<Object> values) {List<Integer> taskIds = new ArrayList<Integer>();if (values.size() > 0) {String str = values.get(0).toString();if (str.isEmpty()) {taskIds.add(0);} else {Integer index = str.charAt(0) % tasks.size();taskIds.add(tasks.get(index));}}return taskIds;}
}

数据源spout

package com.zhch.v4;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;public class SentenceSpout extends BaseRichSpout {private FileReader fileReader = null;private boolean completed = false;private ConcurrentHashMap<UUID, Values> pending;private SpoutOutputCollector collector;@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("sentence"));}@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.collector = spoutOutputCollector;this.pending = new ConcurrentHashMap<UUID, Values>();try {this.fileReader = new FileReader(map.get("wordsFile").toString());} catch (Exception e) {throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]");}}@Overridepublic void nextTuple() {if (completed) {try {Thread.sleep(1000);} catch (InterruptedException e) {}}String line;BufferedReader reader = new BufferedReader(fileReader);try {while ((line = reader.readLine()) != null) {Values values = new Values(line);UUID msgId = UUID.randomUUID();this.pending.put(msgId, values);this.collector.emit(values, msgId);}} catch (Exception e) {throw new RuntimeException("Error reading tuple", e);} finally {completed = true;}}@Overridepublic void ack(Object msgId) {this.pending.remove(msgId);}@Overridepublic void fail(Object msgId) {this.collector.emit(this.pending.get(msgId), msgId);}
}

实现语句分割bolt

package com.zhch.v4;import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;import java.util.Map;public class SplitSentenceBolt extends BaseRichBolt {private OutputCollector collector;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;}@Overridepublic void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for (String word : words) {collector.emit(tuple, new Values(word));}this.collector.ack(tuple);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word"));}
}

实现单词计数bolt

package com.zhch.v4;import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;import java.io.BufferedWriter;
import java.io.FileWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class WordCountBolt extends BaseRichBolt {private OutputCollector collector;private HashMap<String, Long> counts = null;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;this.counts = new HashMap<String, Long>();}@Overridepublic void execute(Tuple tuple) {String word = tuple.getStringByField("word");Long count = this.counts.get(word);if (count == null) {count = 0L;}count++;this.counts.put(word, count);BufferedWriter writer = null;try {writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt"));List<String> keys = new ArrayList<String>();keys.addAll(this.counts.keySet());Collections.sort(keys);for (String key : keys) {Long c = this.counts.get(key);writer.write(key + " : " + c);writer.newLine();writer.flush();}} catch (Exception e) {e.printStackTrace();} finally {if (writer != null) {try {writer.close();} catch (Exception e) {e.printStackTrace();}writer = null;}}this.collector.ack(tuple);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word", "count"));}
}

实现单词计数topology

package com.zhch.v4;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;public class WordCountTopology {public static final String SENTENCE_SPOUT_ID = "sentence-spout";public static final String SPLIT_BOLT_ID = "split-bolt";public static final String COUNT_BOLT_ID = "count-bolt";public static final String TOPOLOGY_NAME = "word-count-topology-v4";public static void main(String[] args) throws Exception {SentenceSpout spout = new SentenceSpout();SplitSentenceBolt spiltBolt = new SplitSentenceBolt();WordCountBolt countBolt = new WordCountBolt();TopologyBuilder builder = new TopologyBuilder();builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);builder.setBolt(COUNT_BOLT_ID, countBolt, 2).customGrouping(SPLIT_BOLT_ID, new ModuleGrouping()); //使用 自定义 分组策略Config config = new Config();config.put("wordsFile", args[0]);if (args != null && args.length > 1) {config.setNumWorkers(2);//集群模式启动StormSubmitter.submitTopology(args[1], config, builder.createTopology());} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());try {Thread.sleep(5 * 1000);} catch (InterruptedException e) {}cluster.killTopology(TOPOLOGY_NAME);cluster.shutdown();}}
}

提交到Storm集群

storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v4.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v4

运行结果:

[grid@hadoop5 stormData]$ cat result.txt
Apache : 1
ETL : 1
It : 1
Storm : 4
a : 4
analytics : 1
and : 5
any : 1
at : 1
can : 1
cases: : 1
clocked : 1
computation : 2
continuous : 1
easy : 2
guarantees : 1
is : 6
it : 2
machine : 1
makes : 1
many : 1
million : 1
more : 1
of : 2
online : 1
open : 1
operate : 1
over : 1
scalable : 1
second : 1
set : 1
simple : 1
source : 1
streams : 1
system : 1
unbounded : 1
up : 1
use : 2
used : 1
what : 1
will : 1
with : 1
your : 1
[grid@hadoop6 stormData]$ cat result.txt
Hadoop : 1
RPC : 1
batch : 1
be : 2
benchmark : 1
data : 2
did : 1
distributed : 2
doing : 1
fast: : 1
fault-tolerant : 1
for : 2
free : 1
fun : 1
has : 1
language : 1
learning : 1
lot : 1
node : 1
per : 2
process : 1
processed : 2
processing : 2
programming : 1
realtime : 3
reliably : 1
to : 3
torm : 1
tuples : 1

转载于:https://my.oschina.net/zc741520/blog/410179

Storm实验 -- 单词计数4相关推荐

  1. [Storm]分布式单词计数(一)一个简单的storm demo

    目录 前言: 基本概念: 1.1 Spout 1.1 业务 SentenceSpout 1.2 SentenceSpout引用的部分类源码 BaseRichSpout源码 Values源码 ISpou ...

  2. 【Storm】storm安装、配置、使用以及Storm单词计数程序的实例分析

    前言:阅读笔记 storm和hadoop集群非常像.hadoop执行mr.storm执行topologies. mr和topologies最关键的不同点是:mr执行终于会结束,而topologies永 ...

  3. 大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

       前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分 ...

  4. MIT自然语言处理第二讲:单词计数(第三、四部分)

    MIT自然语言处理第二讲:单词计数(第三部分) 自然语言处理:单词计数 Natural Language Processing: (Simple) Word Counting 作者:Regina Ba ...

  5. MIT自然语言处理第二讲:单词计数(第一、二部分)

    MIT自然语言处理第二讲:单词计数(第一部分) 自然语言处理:单词计数 Natural Language Processing: (Simple) Word Counting 作者:Regina Ba ...

  6. c语言字符统计2sdut,C语言实验——单词统计(字符串) SDUT

    C语言实验--单词统计 Problem Description 从键盘输入一行字符(长度小于100),统计其中单词的个数,各单词以空格分隔,且空格数可以是多个. Input 输入只有一行句子.仅有空格 ...

  7. Spark Core:Scala单词计数

    Spark Core:Scala单词计数 文章目录 Spark Core:Scala单词计数 1.实验描述 2.实验环境 3.相关技能 4.知识点 5.实验效果 6.实验步骤 8.总结 1.实验描述 ...

  8. 软件工程小组项目——单词计数

    软件工程小组项目--单词计数 项目地址 PSP表格 解题思路 实现过程 1.字符数 2.单词数 3.行数 4.各段行数统计 5.对子目录文件进行操作 6.图形界面 性能优化 实验总结 项目地址 Git ...

  9. 自然语言处理第二讲:单词计数

    自然语言处理:单词计数 这一讲主要内容(Today): 1.语料库及其性质: 2.Zipf 法则: 3.标注语料库例子: 4.分词算法: 一. 语料库及其性质: a) 什么是语料库(Corpora) ...

最新文章

  1. 互联网产品mysql数据库设计总结
  2. java取余时前者前者小于后者_Java基本语法
  3. 动态调整线程池_调整线程池的重要性
  4. rosserial_java_[学习笔记]Rosserial实现Windows-ROS交互操作(1)
  5. JDK8下maven使用maven-javadoc-plugin插件报错
  6. 软件项目管理和软件工程的区别
  7. 李南江 html链接,HTML5教程-多媒体标签-李南江
  8. 路由模式和桥接模式的区别
  9. JS淘宝商品广告效果
  10. 《剑指offer》-二叉树的下一个节点
  11. 内网渗透建立代理通道(如何攻击目标内网机器?)-Socks代理(゚益゚メ) 渗透测试
  12. 这届勒索病毒,其实很有上进心呀
  13. ES6的Array.from方法创建长度为N的undefined数组
  14. 盘点营销策划案例之2019年沙雕广告!
  15. 办公用计算机安全使用常识,办公电脑使用注意事项
  16. ORA-12560:TNS:协议适配器错误的解决方案
  17. iOS逆向学习之 Mac 登录到 iPhone
  18. 高新技术企业认定评分标准
  19. 教你自制五星级大酒店用的调味品
  20. 面试笔记 如何3秒钟看出一个人的实力?| 奸商行走江湖7年的经验分享

热门文章

  1. 《python接口自动化测试》笔记
  2. 前端白屏问题_记一次白屏统计与修理
  3. python requests.get无法取出网页_Python requests获取网页常用方法解析
  4. java php rsa加密解密算法_PHP rsa加密解密算法原理解析
  5. igllib 203 Curvature directions
  6. SQLite Tutorial 1 在ubuntu上安装SQLite 3.8.2
  7. Lec 15 Projections onto subspaces
  8. 3.7.2 - Escape Sequences
  9. 基于深度学习的植物病害表征新视角
  10. gridlayout布局单元格宽度设置_安卓界面布局之线性布局