1.建立Java工程

使用idea,添加lib库,拷贝storm中lib到工程中

2.拷贝wordcount代码

下载src包,解压找到

apache-storm-0.9.4-src\apache-storm-0.9.4\examples\storm-starter\src\jvm\storm\starter目录下

拷贝WordCountTopology.java内容;

修改python处理方式;

1 importbacktype.storm.Config;2 importbacktype.storm.LocalCluster;3 importbacktype.storm.StormSubmitter;4 importbacktype.storm.task.ShellBolt;5 importbacktype.storm.topology.BasicOutputCollector;6 importbacktype.storm.topology.IRichBolt;7 importbacktype.storm.topology.OutputFieldsDeclarer;8 importbacktype.storm.topology.TopologyBuilder;9 importbacktype.storm.topology.base.BaseBasicBolt;10 importbacktype.storm.tuple.Fields;11 importbacktype.storm.tuple.Tuple;12 importbacktype.storm.tuple.Values;13 import com.bigdata.storm.spout.*;14

15 importjava.util.HashMap;16 importjava.util.Map;17 /**

18 * Created by Edward on 2016/8/17.19 */

20 public classMyTest {21

22 public static class SplitSentence extendsBaseBasicBolt{23

24 @Override25 public voidexecute(Tuple tuple, BasicOutputCollector basicOutputCollector) {26 String word = tuple.getString(0);27 String str[] = word.split(" ");28

29 System.out.println("Split Sentence:" +tuple.getSourceStreamId());30 for(int i=0; i

36 @Override37 public voiddeclareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {38 outputFieldsDeclarer.declare(new Fields("word"));39 }40 }41

42 public static class WordCount extendsBaseBasicBolt {43 Map counts = new HashMap();44

45 @Override46 public voidexecute(Tuple tuple, BasicOutputCollector collector) {47 String word = tuple.getString(0);48 Integer count =counts.get(word);49 if (count == null)50 count = 0;51 count++;52 counts.put(word, count);53 System.out.println("Word Count:" +tuple.getSourceStreamId());54 collector.emit(newValues(word, count));55

56 }57

58 @Override59 public voiddeclareOutputFields(OutputFieldsDeclarer declarer) {60 declarer.declare(new Fields("word", "count"));61 }62 }63

64 public static void main(String[] args) throwsException {65

66 TopologyBuilder builder = newTopologyBuilder();67

68 builder.setSpout("spout", new RandomSentenceSpout(), 5);69

70 builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");71 builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));72

73 Config conf = newConfig();74 conf.setDebug(true);75

76

77 if (args != null && args.length > 0) {78 conf.setNumWorkers(3);79

80 StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());81 }82 else{83 conf.setMaxTaskParallelism(3);84

85 LocalCluster cluster = newLocalCluster();86 cluster.submitTopology("word-count", conf, builder.createTopology());87

88 Thread.sleep(50000);89

90 cluster.shutdown();91 }92 }93 }

3.拷贝随机生成spout代码

找到 apache-storm-0.9.4-src\apache-storm-0.9.4\examples\storm-starter\src\jvm\storm\starter\spout

拷贝RandomSentenceSpout.java到工程中

1 importbacktype.storm.spout.SpoutOutputCollector;2 importbacktype.storm.task.TopologyContext;3 importbacktype.storm.topology.OutputFieldsDeclarer;4 importbacktype.storm.topology.base.BaseRichSpout;5 importbacktype.storm.tuple.Fields;6 importbacktype.storm.tuple.Values;7 importbacktype.storm.utils.Utils;8

9 importjava.util.Map;10 importjava.util.Random;11

12 public class RandomSentenceSpout extendsBaseRichSpout {13 SpoutOutputCollector _collector;14 Random _rand;15

16 @Override17 public voidopen(Map conf, TopologyContext context, SpoutOutputCollector collector) {18 _collector =collector;19 _rand = newRandom();20 }21

22 @Override23 public voidnextTuple() {24 Utils.sleep(10000);25 String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",26 "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"};27 String sentence =sentences[_rand.nextInt(sentences.length)];28 Object id = newObject();29 System.out.println(id);30 //id message ID 用来保证可靠性的,如果失败fail 会返回 message id 信息

31 _collector.emit(newValues(sentence), id);32 }33

34 @Override35 public voidack(Object id) {36 System.out.println("storm spout ack id = "+id);37 }38

39 @Override40 public voidfail(Object id) {41 }42

43 @Override44 public voiddeclareOutputFields(OutputFieldsDeclarer declarer) {45 declarer.declare(new Fields("word"));46 }47

48 }

4.本地运行

在idea中直接点击运行,观察运行过程

5.集群运行

将程序打包成jar,然后放到集群中运行。

把jar包放到storm/test目录下,执行:

bin/storm jar test/storm-sample.jar com.bigdata.storm.MyTest WordCount

storm java 例子_Storm 运行例子相关推荐

  1. fasttext的基本使用 java 、python为例子

    fasttext的基本使用 java .python为例子 今天早上在地铁上看到知乎上看到有人使用fasttext进行文本分类,到公司试了下情况在GitHub上找了下,最开始是c++版本的实现,不过有 ...

  2. [转]jackson json字符串、map、java对象的转换例子

    ackson框架 json字符串.map.java对象的转换例子 先下载框架jar包: 下面是一些例子: package jackson; import java.io.File; import ja ...

  3. VPB测试 使用Osgdem运行例子

    1.Osgdem运行例子所需数据下载地址: http://www.cc.gatech.edu/projects/large_models/ps.html Download Elevation Map: ...

  4. java实现死锁简单例子,Java死锁的简单例子

    Java死锁的简单例子 两个线程互相占有对方需要的资源而不释放,便形成了死锁. 代码如下: Program.java /** * 程序类 * @author michael * */ public c ...

  5. [Java] webservice soap,wsdl 例子

    java 调用webservice的各种方法总结 现在webservice加xml技术已经逐渐成熟,但要真正要用起来还需时日!! 由于毕业设计缘故,我看了很多关于webservice方面的知识,今天和 ...

  6. 借助开源工具高效完成Java应用的运行分析

    不止一次,我们都萌发过想对运行中程序的底层状况一探究竟的念头.产生这种需求的原因可能是运行缓慢的服务.Java虚拟机(JVM)崩溃.挂起.死锁.频繁的JVM暂停.突然或持续的高CPU使用率.甚至于可怕 ...

  7. Java开发和运行环境的搭建(详细教程)

    对JDK.JRE.Java 的认识 JDK 是Java Development Kit的缩写,即Java开发工具集.JDK是整个Java的核心,包括了Java运行环境(JRE).Java开发工具和Ja ...

  8. java main函数_一行JAVA代码如何运行起来?

    在程序员的世界中,你总会听到一句"PHP是世界上最好的语言"的调侃.然而在你进入软件程序开发之后,你会发现即使开发语言千千万,最盛行的还是JAVA.从淘宝的技术变迁中我们可以见一些 ...

  9. java 一个线程运行_Java并发(基础知识)—— 创建、运行以及停止一个线程

    在计算机世界,当人们谈到并发时,它的意思是一系列的任务在计算机中同时执行.如果计算机有多个处理器或者多核处理器,那么这个同时性是真实发生的:如果计算机只有一个核心处理器那么就只是表面现象. 现代所有的 ...

最新文章

  1. 未能加载文件或程序集“XXX”或它的一个依赖项,试图加载格式不正确的程序...
  2. 【鉴轻尘】你说你炒的了币,开发得了项目,然而这些你都知道吗?
  3. 怎么显示全部背景图片_Windows 聚焦图片在锁屏界面和登陆界面没有显示
  4. Python基础教程:获取list中指定元素的索引
  5. IsWindow,findwindow
  6. VS编译时使用/去除NuGet管理库
  7. 知乎高赞:看懂这个颠覆世界观的认知,远比做1000道题更有用!
  8. mysql完整性约束命名_第5章--MySQL索引与完整性约束.ppt
  9. QQ邮箱发送邮件,出现mail from address must be same as authorization user错误
  10. python工资高还是java-Python工资高还是Java工资高?Python和Java学哪个?
  11. 楼宇智能化工程设计、施工、验收规范目录
  12. ubuntu18.04+语音识别
  13. mysql扩容方案_MySQL分库分表:扩容方案
  14. 异常0x0000005
  15. 网页中文转英文(国际化)
  16. 耳朵后神经疼是怎么回事,耳朵引起的神经疼痛
  17. 实现strStr()
  18. 如何通adb命令删除安卓设备上指定的文件和apk
  19. 页式地址变换(虚地址转换为内存地址的计算方法)
  20. ECNU计科复试机试(2021)

热门文章

  1. 十三个提高远程办公效率的工具
  2. AttributeError: module cv2.face has no attribute 'createEigenFaceRecognizer'
  3. 基于微信小程序的ARPainting项目简述
  4. Android本地文件存储,机身和外置sd卡
  5. 黑马程序员武汉2019新版前端与移动开发学习路线图(视频+工具+书籍+资源)
  6. 百度地图SDK for Android【覆盖物】
  7. mac 破解安装 navicat
  8. Excel报错“不能使用对象链接和嵌入”并无法粘贴数据可能和输入法有关
  9. 完数什么意思_数学上角的定义是什么
  10. 面试技巧、专面、HR面、群面