本文介绍介绍使用storm开发项目代码,介绍spout和bolt的相关接口等。storm的业务开发主要包括spout的开发,bolt的开发以及topology的创建等。

代码框架

spout
下述spout主要实现,随机的从给定的语句中发送一个sentence。

//FastRandomSentenceSpout.java
package spout.fastWordCount;import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;public class FastRandomSentenceSpout extends BaseRichSpout {SpoutOutputCollector _collector;Random _rand;private static final String[] CHOICES = {"marry had a little lamb whos fleese was white as snow","and every where that marry went the lamb was sure to go","one two three four five six seven eight nine ten","this is a test of the emergency broadcast system this is only a test","peter piper picked a peck of pickeled peppers"};@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;_rand = ThreadLocalRandom.current();}@Overridepublic void nextTuple() {String sentence = CHOICES[_rand.nextInt(CHOICES.length)];_collector.emit(new Values(sentence), sentence); //_collector.emit(List<Object> tuple,messageId);//当spout发射元组时,可以使用messageId来标记该元组。messageId可以是任何类型,用于跟踪消息的处理情况。}@Overridepublic void ack(Object id) {//Ignored}@Overridepublic void fail(Object id) {_collector.emit(new Values(id), id);//失败时发送消息的ID}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("sentence"));}
} FastRandomSentenceSpout继承并重写了BaseRichSpout类
/**open()方法会在进程的初始化时被调用*conf:storm中关于spout的配置*context:可用来获取关于这个任务在拓扑中的位置信息,包括任务的ID,任务的组件ID,输入/输出信息等*collector:收集器,用于元组的发送,线程安全*/
open(Map conf, TopologyContext context, SpoutOutputCollector collector);
/*
*用于元组的发送,storm框架会不断的调用该函数,如果没有元组可供发送,则简单的返回;
*在实践中如果没有元组可供发送,可调用sleep()睡眠短暂的时间,以减少CPU的负载
*/
nextTuple();
/**当storm确定标识符为id的消息已经被处理时,会调用ack()方法,并将消息移除发送队列*该方法应该是非阻塞的,其和ack,fail等方法在一个线程里面被调用,如果发生阻塞,会导致其他方法调用也被阻塞,从引起各种异常*/
ack(Object id);
/**当storm确定标识符为id的消息未被完全处理时,会调用该方法,并将消息重新放回发送队列*/
fail(Object id);
/**提供会发送字段的声明*/
declareOutputFields(OutputFieldsDeclarer declarer)
另外BaseRichSpout()还提供了待重写的方法close(),activate(),deactivate()
close():当一个spout被关闭时调用,但是并不能保证一定被调用,因为实践中可以随时kill一个进程
activate():当spout从失效模式中被激活时调用
deactivate():当spout失效时被调用

Bolt
Bolt->SplitSentence:实现将一句话以切分成单词,并发送到下一个bolt进行处理。

 //SplitSentence.java
package bolt.fastWordCount;import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;public class SplitSentence extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String sentence = tuple.getString(0);for (String word: sentence.split("\\s+")) {collector.emit(new Values(word, 1));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}
}

Bolt->WordCount:从流中取出单词,并进行计数,并通过新增的日志文件进行输出。

//WordCount.java
这里增加了日志输出,关于日志的配置需要在/home/storm/log4j2/worker.xml增加
/*
<RollingFile name="stat"fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.stat"filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.stat.%i.gz"><PatternLayout><pattern>${pattern}</pattern></PatternLayout><Policies><SizeBasedTriggeringPolicy size="100 MB"/></Policies><DefaultRolloverStrategy max="3"/>
</RollingFile><logger name="stat" level="INFO"><appender-ref ref="stat"/>
</logger>
*/package bolt.fastWordCount;import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.HashMap;
import java.util.Map;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class WordCount extends BaseBasicBolt {Map<String, Integer> counts = new HashMap<String, Integer>();Logger log = LoggerFactory.getLogger("stat");@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String word = tuple.getString(0);Integer count = counts.get(word);if (count == null)count = 0;count++;counts.put(word, count);collector.emit(new Values(word, count));log.info("word:{}, count:{}",word,count);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}
}BaseBasicBolt封装并提供了一套可靠的消息处理机制,会在execute()方法后根据处理情况自动的调用ack()或者fail()方法。
/**storm会不断的调用execute()方法,来处理一个输入元组,*tuple:输入的元组*collector:采集器,用于元组的发送*/
execute(Tuple tuple, BasicOutputCollector collector);
如果WordCount继承自其它的类,例如IRichBolt或BaseRichBolt
那么还必须重写,参数的含义类似于spout的open()方法
public void prepare(Map conf, TopologyContext context, OutputCollector collector);
另外execute()方法也略有不同,在execute()中为了保证消息的可靠处理(并非必须的),必须自己调用ack()或者fail()方法,
public void execute(Tuple tuple);

Topology
创建拓扑

package topology.fastWordCount;import bolt.fastWordCount.*;
import spout.fastWordCount.*;import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.*;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;import java.util.Map;public class FastWordCountTopology {public static void kill(Nimbus.Client client, String name) throws Exception {KillOptions opts = new KillOptions();opts.set_wait_secs(0);client.killTopologyWithOpts(name, opts);} public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new FastRandomSentenceSpout(), 4);builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");//随机分组,语句切分后的单词会被随机Bolt上的任务,这样能保证每个任务得到相同数量的元组builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word")); //按字段进行分组,保证相同的字段被同一个Config conf = new Config();String name = "wordcount";if (args != null && args.length > 0) {//生产环境name = args[0];conf.setNumWorkers(2); //设置工作进程数为2个StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());//以下代码展示了如何在生产环境中kill拓扑Map clusterConf = Utils.readStormConfig();clusterConf.putAll(Utils.readCommandLineOpts());Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();Thread.sleep(5 * 60 * 1000);kill(client, name);} else {//本地开发环境LocalCluster cluster = new LocalCluster();cluster.submitTopology("test", conf, builder.createTopology());Utils.sleep(10000);cluster.killTopology("test");cluster.shutdown();}}}StormSubmitter类提供了提交拓扑的方法,常用的方法主要有
/**name:待提交拓扑的名称,需唯一*stormConf:提交拓扑的配置*topology:拓扑对象*opts:提交时的附加参数*/
import org.apache.storm.StormSubmitter;submitTopology(String name, Map stormConf, StormTopology topology);
submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts);
submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology);
submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts);LocalCluster类提供了本地开发环境的拓扑。
import org.apache.storm.LocalCluster;LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
//cluster.submitTopologyWithOpts(String arg0, Map arg1, StormTopology arg2, SubmitOptions arg3);
cluster.killTopology("test"); //kill拓扑
cluster.shutdown();//关闭本地集群

将上述代码编译后组成成jar包,提交到storm即可运行。

Storm示例剖析-fastWordCount相关推荐

  1. Linux Make(Makefile)由浅入深的学习与示例剖析

    经过长时间学习和研究linux GNU make工程管理器 ,现在把学习心得与大家分享一下,希望本文能教会您一些有用的东西. make工具,是所有想在Linux/Unix系统上编程的用户都需要且必须掌 ...

  2. storm apache java_Apache Storm 示例 Java 拓扑 - Azure HDInsight | Microsoft Docs

    您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn. 以 Java 语言创建 ...

  3. storm示例之DRPC

    DRPC(分布式远程过程调用,Distributed Remote Procedure Call)是storm整合流(stream).Spout.Bolt.Topology而形成的一种模式.引入DRP ...

  4. STIX 2.0 示例剖析

    什么是STIX? STIX[1],Structured Threat Information Expression,结构化的威胁信息表达,是一种用于交换网络空间威胁情报的语言和序列化格式.STIX是开 ...

  5. Gavin老师Transformer直播课感悟 - Rasa对话机器人项目实战之教育领域Education Bot项目Slots解析、Slot Validation Action剖析(七十四)

    本文继续围绕工业级业务对话平台和框架Rasa,对Rasa对话机器人项目实战之教育领域Education Bot项目Slots的各种类型及运用实践.如何通过Slot Validation Action来 ...

  6. .net web开发经典图书总结

    这些年读.net关于web开发的方方面面的书很多,如下从历史的书单中选取一些比较经典的图书供参考. C#基础 语言基础和工具使用非常重要,如下是基本比较经典的图书 书名 概要 CLR via C#(第 ...

  7. Rasa课程、Rasa培训、Rasa面试系列之 Rasa幕后英雄系列-机器学习研究员 Johannes

    Rasa课程.Rasa培训.Rasa面试系列之 Rasa幕后英雄系列-机器学习研究员 Johannes Mosig 博士 Dialogue Transformers第二作者 在 Rasa,我们的团队正 ...

  8. android系统构建系统_构建系统简介

    android系统构建系统 Jan. 21. 2016 2016年1月21日 Roughly speadking, build in software development is the proce ...

  9. (二)改掉这些坏习惯,还怕写不出优雅的代码?

    Code Review 是一场苦涩但有意思的修行. 上期分享,通过示例剖析编码中一些经常触犯的性能点,以及编码时常犯的一些小毛病,来告诉新手程序员如何写出健壮的代码. 咱们书接上篇,本次一起来探讨一下 ...

最新文章

  1. Kubernetes 集群无损升级实践
  2. mysql主备数据库配置_MySQL双主互备配置
  3. 构建简单的微服务架构
  4. JavaFX将会留下来!
  5. java 产生无重复的随机数,Java创建无重复的随机数
  6. vscode配置js环境_VS Code配置Python开发环境
  7. LINUX系统管理与应用
  8. P2016 战略游戏[树形dp]
  9. springboot sessionfactory_Spring Boot从入门到精通(五)多数据源配置实现及源码分析...
  10. HTML5新增视频标签(HTML5)
  11. 分享几个Python小技巧函数里的4个小花招 1
  12. DosBox装Windows98
  13. 从入门到精通 - Fayson带你玩转CDH
  14. 关于电脑前置耳机插孔没声音的问题
  15. IntelliJ IDEA 自动导包设置以及idea import导包顺序Java
  16. 各大手机厂商快应用入口
  17. 选取一段代码块一下子都回退或前进空一格的快捷键
  18. 棉花异性纤维图像分割算法matlab程序,基于可见光机器视觉的棉花伪异性纤维识别方法-农业机械学报.PDF...
  19. CISCO 640-875 认证题库 更新
  20. LiteOS设备开发(1)——导读

热门文章

  1. Linux管道命令grep 和 wc
  2. 图灵机器人php调用案例,使用httpclient实现图灵机器人web api调用实例
  3. 常见的WebShell管理工具
  4. Pillow透视变换进行图片纠偏
  5. python的接口和抽象类
  6. 变分(Calculus of variations)的概念及运算规则(二)
  7. 最美的时候你遇见了谁
  8. 遇见你是我最美的意外
  9. pdf转换成html后打印不清晰,图片转换成pdf后很模糊不清晰怎么办?
  10. nrf51822代码流程(从main展开)