storm 开发实例
实时统计当前接收的单词出现的频率。
主要分为四个部分:
- 数据源 Spout 持续的从这 5 条句子中随机发射某条句子,通过类 SentenceSpout 继承BaseRichSpout 抽象类来实现。
- 将接收到句子以空格分割,并将分割得到的每个单词发射出去,通过类 SplitBolt 继承BaseRichBolt 抽象类来实现。
- 统计当前接收到的单词出现的频率,并将单词跟出现的频率发射出去,通过类 CountBolt 继承BaseRichBolt 抽象类来实现。
- main 方法提供建立一个 StormTopology,并提交到 Storm Nimbus 节点。
1、 随机发射数据Spout
数据源 Spout 持续的从 5条句子中随机发射某条句子,通过类 SentenceSpout 继承BaseRichSpout 抽象类来实现。
public class SentenceSpout extends BaseRichSpout { SpoutOutputCollector _collector; Random _rand; //产生随机数工具类 public void open(Map conf, TopologyContext context, SpoutOutputCollector outputCollector) { _collector = outputCollector; _rand = new Random(); } // 在 SpoutTracker 类中被调用,每调用一次就可以向 storm 集群中发射一条数据(一个 tuple 元组),该方法会被不停的调用 public void nextTuple() { Utils.sleep(100); String[] sentences = { "the cow jumped over the moon","an apple a day keeps the doctor away","four score and seven years ago","snow white and the seven dwarfs","i am at two with nature"}; String sentence = sentences[_rand.nextInt(sentences.length)]; _collector.emit(new Values(sentence)); } //定义字段id,该id 在简单模式下没有用处,但在按照字段分组的模式下有很大的用处 public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }
}
2、Bolt消息处理者
SplitBolt
public class SplitBolt extends BaseRichBolt { //发送数据的工具类 private OutputCollector collector; /*** 初始化 collector */ public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } /** *处理接收到的消息,并发射新的消息 * @ param tuple */ public void execute(Tuple tuple) { String message = tuple.getString(0); System.out.println("msg=" +message); for(String word: message.split(" " )){ collector.emit(new Values(word)); } } /** * 定义字段 id,该 id 在简单模式下没有用处,但在按照字段分组的模式下有很大的用处 */ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); }
}
CountBolt
public class CountBolt extends BaseRichBolt { //用来保存单词出现的频率 Map<String, Integer> counts = new HashMap<String, Integer>(); //发送数据的工具类 private OutputCollector collector; /** * 初始化 collector */ public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } /** *处理接收到的消息,并发射新的消息 * @param tuple */ public void execute(Tuple tuple) { String word = tuple.get String(0); Integer count = counts.get(word); i f (count == null) count = 0; count++; counts.put(word, count); System.out.println(word+" " +count); collector.emit(new Values(word, count)); } /** * 定义字段 id,该 id 在简单模式下没有用处,但在按照字段分组的模式下有很大的用处 */ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "count")); }
}
3、topology
public class WordCountTopology { public static void main(String[] args) throws Exception { // 实例化 TopologyBuilder 类。 Topology Builder builder = new TopologyBuilder(); // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。 builder.setSpout("spout", new SentenceSpout(), 5); // 设置接收 id 为“spout”的 Spout 发射过来的数据的处理 Bolt 并分配并发数。//指定该节点接收喷发节点的策略为随机方式。 builder.setBolt("split", new SplitBolt(), 8).shuffleGrouping("spout"); // 设置接收 id 为“split”的 Bolt 发射过来的数据的处理 Bolt 并分配并发数。// 指定该节点接收喷发节点的策略为按字段分组方式。 builder.setBolt(" count", new CountBolt(), 12).fieldsGrouping("split", new Fields("word")); //Topology 运行配置 Config conf = new Config(); //当它被设置成 true 的话, storm 会记录下每个组件所发射的每条消息。// 这在本地环境调试 topology 很有用, 但是在线上这么做的话会影响性能的。 // conf.setDebug(true); if (args != null && args.length > 0) { //使用集群模式 //定义你希望集群分配多少个工作进程给你来执行这个 topology. conf.setNumWorkers(3); //args[0]为 topology 的名字是用来唯一区别一个 topology 的,这样你然后可以用这个名字来杀死这个 topology 的。
// 前面已经说过了, 你必须显式的杀掉一个 topology, 否则它会一直运行。StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); }else { //使用本地模式,仅限于调试代码 conf.setMaxTaskParallelism(3); Local Cluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }
}
storm 开发实例相关推荐
- JBOSS+EJB3之Entity 开发实例
我用的是 mysql 数据库, EJB3.0 相对于2.0 变化蛮大的,真的是迫于 Spring+Hibernate 的逼式,ejb 3.0 已经出现几年了,自从她轻装上阵,也不知道现在的应用情况如何 ...
- 《Unity 3.x游戏开发实例》一1.5 欢迎来到Unity 3D
本节书摘来异步社区<Unity 3.x游戏开发实例>一书中的第1章,第1.5节,作者: [加]Ryan Henson Creighton 译者: 师蓉 责编: 陈冀康,更多章节内容可以访问 ...
- JDBC - 开发实例 - MVC模式
JDBC - 开发实例 - MVC模式 1. 在web.xml中配置连接数据库的信息 web.xml: <context-param> <param-name>server& ...
- solidworks api二次开发实例详解_Solidworks开发语言对比及分析
很多初学Solidworks二次开发的同学,也许都会纠结使用何种语言进行二次开发.对于Solidworks二次开发的语言,官方有VBA,VB.NET,C#以及C++,四种语言. 用户通常会有如下疑问, ...
- AgileEAS.NET平台开发实例-药店系统-准备开发环境(上)
开篇 上一篇我们主要是讲述了如何根据数据库原型设计器提供的相关功能来构建ORM提供的相关信息,例如根据数据库与实体对象之间通过数据原型来进行双向的映射.本篇 我们将会根据上篇<AgileEAS. ...
- 安卓音乐播放器开发实例
本文将引导大家做一个音乐播放器,在做这个Android开发实例的过程中,能够帮助大家进一步熟悉和掌握学过的ListView和其他一些组件.为了有更好的学习效果,其中很多功能我们手动实现,例如音乐播放的 ...
- python简单编程例子-中文方便就用中文编程!Python图形界面开发实例
原标题:中文方便就用中文编程!Python图形界面开发实例 之前做的一个Python小程序,功能很简单,面对用户群也很窄,是五笔编码编.校人员使用的五笔编码编辑器. 这样的"周末" ...
- RESTLET开发实例(二)使用Component、Application的REST服务
2019独角兽企业重金招聘Python工程师标准>>> 上一篇文章,我们介绍了基于JAX-RS的REST服务,本篇文章我们介绍不基于JAX-RS的模式.JAX-RS其实就是一个简单的 ...
- QT中树控件QTreeView开发实例
转自:http://mobile.51cto.com/symbian-268700.htm 本文讲解了QT中树控件QTreeView开发实例,对于QTreeView没有过多的讲解,那么不说废话了,看代 ...
最新文章
- 程序运行背后的那些事 ~ 【程序的编译(预处理操作)+链接】
- 普鲁克分析(Procrustes Analysis)评估物种-环境/功能关联度的一个示例
- 影视中渐隐渐现特点作用_影视配音的技巧
- 清华裴丹:AIOps 落地路线图
- 【转】ubuntu 12.04 LTS将关闭最大化最小化移动到右上角
- 计算机如何学会自动构图?
- 从入门到入土(三)RocketMQ 怎么保证的消息不丢失?
- iOS - Swift NSRect 位置和尺寸
- maven依赖的作用域\<scope>
- MATLAB中施密特正交化的实现
- 洪恩机器人课程提示_【重要提示】全国儿童机器人等级考试报名启动!
- CAD入门教程,基本设置,使用技巧
- android媒体--stagefright概述【一】
- ibm主机安装服务器系统安装系统安装系统安装方法,IBM系列服务器安装操作系统安装方法.ppt...
- PHP5.5 ~ PHP7.2新特性总结
- java批量图片下载+打包成zip格式
- 地磅系统连不上云端服务器,LiteCMS云称重管理系统
- 梅林安装opkg后安装iperf3_路由器最高速度/性能测试 - Windows 安装 IPerf3 及 使用方法...
- kong mysql_konga 安装
- 实用技巧 | Chrome浏览器如何对标签页进行分组整理?
热门文章
- 只需5行代码! LSTM时间序列建模以及预测
- 佳博打印机接入(java)
- python绘制饼状图图例_Python图表绘制很简单,一文带你学会如何生成带图例的饼图...
- 软件开发项目保密协议
- 字节跳动前端实习生一面总结与反思
- centos7 安装中文字体库
- Ruby学习-安装、升级Ruby菜鸟教程(Linux环境下)
- iOS系统最好的拨号app_我是亲民_新浪博客
- java静态函数_(基础)java中的静态变量与静态函数Static
- Pygame入门-游戏代码结构及背景轮播、声音音效及图像动态效果