第2周 Storm概念详解和工作原理,topology、spout、bolt的细节和API讲解之2
2019独角兽企业重金招聘Python工程师标准>>>
[root@localhost storm]# cd /root/soft/code/teststorm/src/main/java/cn/dataguru/storm
[root@localhost storm]# rm -rf *
[root@localhost storm]# ls
ReportBolt.java SplitSentenceBolt.java WordCountTopology.java
SentenceSpout.java WordCountBolt.java
[root@localhost teststorm]# mvn install
[root@localhost teststorm]# mvn compile exec:java -Dstorm.topology=cn.dataguru.storm.WordCountTopology -Dexec.mainClass=cn.dataguru.storm.WordCountTopology
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.1-incubating</version>
<scope>provided</scope>
</dependency>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
package cn.dataguru.storm;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
public class ReportBolt extends BaseRichBolt {
private HashMap<String, Long> counts = null;
public void prepare(Map config, TopologyContext context,
OutputCollector collector) {
this.counts = new HashMap<String, Long>();
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = tuple.getLongByField("count");
this.counts.put(word, count);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
public void cleanup() {
System.out.println("--- FINAL COUNTS ---");
List<String> keys = new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for (String key : keys) {
System.out.println(key + " : " + this.counts.get(key));
}
System.out.println("--------------");
}
}
package cn.dataguru.storm;
import java.util.Map;
import org.apache.jute.Utils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class SentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = { "my dog has fleas", "i like cold beverages",
"the dog ate my homework", "don't have a cow man",
"i don't think i like fleas" };
private int index = 0;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
public void open(Map config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index++;
if (index >= sentences.length) {
index = 0;
}
// Utils.waitForMillis(1);
}
}
package cn.dataguru.storm;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
public void prepare(Map config, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
package cn.dataguru.storm;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private HashMap<String, Long> counts = null;
public void prepare(Map config, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.counts = new HashMap<String, Long>();
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = this.counts.get(word);
if (count == null) {
count = 0L;
}
count++;
this.counts.put(word, count);
this.collector.emit(new Values(word, count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
package cn.dataguru.storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) throws Exception {
SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout);
// SentenceSpout --> SplitSentenceBolt
builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(
SENTENCE_SPOUT_ID);
// SplitSentenceBolt --> WordCountBolt
builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID,
new Fields("word"));
// WordCountBolt --> ReportBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(
COUNT_BOLT_ID);
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
//waitForSeconds(10);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}
转载于:https://my.oschina.net/goudingcheng/blog/614301
第2周 Storm概念详解和工作原理,topology、spout、bolt的细节和API讲解之2相关推荐
- 详解帧中继工作原理及作用
详解帧中继工作原理及作用 帧中继特点 帧中继工作原理 帧中继的作用 帧中继习题 帧中继(FrameRelay)是一种用于连接计算机系统的面向分组的通信方法.它主要用在公共或专用网上的局域网互联以及广域 ...
- 【转详解步进电机工作原理】
详解步进电机工作原理[转自知乎gk-auto] 步进电机是将电脉冲信号转变为角位移或线位移的开环控制元件.在非超载的情况下,电机的转速.停止的位置只取决于脉冲信号的频率和脉冲数,而不受负载变化的影响, ...
- 图文详解开关电源工作原理
一.PC电源知多少 个人PC所采用的电源都是基于一种名为"开关模式"的技术,所以我们经常会将个人PC电源称之为--开关电源 (Switching Mode Power Suppli ...
- 报文如何截取时间_5种报文、8种邻居状态机详解OSPF工作原理
上一章节介绍了OSPF相比RIP具有无环路.路由收敛速度快.可扩展性好的特点.知识卡片 | 链路状态路由协议OSPF凭什么会取代RIP? 我们知道路由协议的最终目的是为了计算最优路由加入路由表来指导I ...
- 详解Spring工作原理
1.spring原理 内部最核心的就是IOC了,动态注入,让一个对象的创建不用new了,可以自动的生产,这其实就是利用java里的反射,反射其实就是在运行时动态的去创建.调用对象,Spring就是在运 ...
- HTTP详解(1)-工作原理【转】
转自:http://blog.csdn.net/hguisu/article/details/8680808 版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[-] 1 HTTP简介 ...
- Java 详解 JVM 工作原理和流程
2019独角兽企业重金招聘Python工程师标准>>> 作为一名Java使用者,掌握JVM的体系结构也是必须的. 说起Java,人们首先想到的是Java编程语言,然而事实上,Java ...
- java开发流程图_Java 详解 JVM 工作原理和流程
作为一名Java使用者,掌握JVM的体系结构也是必须的. 说起Java,人们首先想到的是Java编程语言,然而事实上,Java是一种技术,它由四方面组成:Java编程语言.Java类文件格式.Java ...
- springmvc工作流程详解_SpringMVC工作原理详解
点击上方"方志朋",选择"置顶或者星标" 你的关注意义重大! 先来看一下什么是 MVC 模式 MVC 是一种设计模式. MVC 的原理图如下: SpringMV ...
- BGP协议详解及工作原理
边界网关协议(BGP) 是运行于 TCP 上的一种自治系统的路由协议. BGP 是唯一一个用来处理像因特网大小的网络的协议,也是唯一能够妥善处理好不相关路由域间的多路连接的协议. BGP的特征 传输协 ...
最新文章
- 英特尔第七任CEO敲定 斯旺为何受到董事会青睐?
- Camera 涉及的文件70
- python开发网络小工具_Python集成网络诊断小工具(含有ping,tracert,tcping等小工具)...
- python中的for循环
- JAVA面试常考系列五
- OSGi服务测试助手:ServiceRegistrationRule
- 润乾报表 数据集ds1中,数据源xmglxt_x3无数据库连接,且未设定数据连接工厂,请检查数据源设定:...
- 面试官 | SpringBoot 中如何实现异步请求和异步调用?
- laravel-admin 使用记录(1) - 安装
- 为什么都建议学java而不是python-就目前来说,学Java好还是学Python好?
- php urldecode 加号不显示_php|urldecode urlencode 的加号问题
- python实现散列表的链表法
- php 统计fasta 序列长度和GC含量
- 个人所得税分几个等级
- js去掉url中的域名的方法
- DI高速计数器编码器脉冲计数器PNPNPN输入模块高低电平
- java system.nanotime_java - System.nanoTime()完全没用吗?
- 简单好用高薪的Python!!!!
- Heart-Shaped Box
- 项目8 数据库的安全性维护
热门文章
- python之面向对象基础一
- Elasticsearch摄取节点(八)——数据解析处理器
- 一款面世较早的音乐播放器-酷狗音乐播放器下载
- Violent python - UNIX Password CrackerZipfile Password Cracker
- 图像处理中的一阶偏导和二阶偏导
- 「镁客·请讲」小库科技何宛余:用人工智能去更高效的协助建筑设计工作
- android 连接web加密的wifi,Android 与WEP加密连接
- SQL自动审核工具archer
- 算法题--字符串排列组合、n皇后、字符出现次数(C++)
- 【完整源码】如何在BSC部署多代层级分红