1,spout编写,读取文件内容:

package com.storm.test;

import java.io.BufferedReader;

import java.io.FileReader;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

public class WordReader extends BaseRichSpout{

private SpoutOutputCollector collector;

private FileReader fileReader;

private String filePath;

private boolean completed = false;

@Override

public void ack(Object msgId) {

System.out.println("msgId === "+msgId);

}

@Override

public void close() {

}

@Override

public void fail(Object msgId) {

System.out.println("fail === "+msgId);

}

@Override

public void open(Map conf, TopologyContext context,

SpoutOutputCollector collector) {

try {

this.fileReader = new FileReader(conf.get("wordFile").toString());

} catch (Exception e) {

e.printStackTrace();

}

this.filePath = conf.get("wordFile").toString();

this.collector = collector;

}

@Override

public void nextTuple() {

if(completed){

try {

Thread.sleep(1000);

} catch (Exception e) {

e.printStackTrace();

}

return ;

}

String str;

BufferedReader reader = new BufferedReader(fileReader);

try {

while ((str = reader.readLine()) != null){

System.out.println("read line = "+str);

this.collector.emit(new Values(str),str);

System.out.println("WordReader spout = "+str);

}

} catch (Exception e) {

e.printStackTrace();

}finally{

completed = true;

}

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("line"));

}

}

2,Bolt编写实现句子分割:

package com.storm.test;

import java.util.Map;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

public class WordNormalizer extends BaseBasicBolt{

@Override

public void execute(Tuple input, BasicOutputCollector collector) {

String sentence = input.getString(0);

String[] words = sentence.split(",");

System.out.println("reader line = "+sentence);

for(String word : words){

if(!word.trim().isEmpty()){

collector.emit(new Values(word.trim().toUpperCase()));

}

}

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}

}

3,bolt编写是单词统计:

package com.storm.test;

import java.util.HashMap;

import java.util.Map;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

public class WordCount extends BaseBasicBolt{

Integer id;

String name;

Map counters ;

@Override

public void cleanup() {

System.out.println("word counter :["+name+"-"+id);

for(Map.Entry entry : counters.entrySet()){

System.out.println(entry.getKey()+":"+entry.getValue());

}

System.out.println("counter finish!");

}

@Override

public void prepare(Map stormConf, TopologyContext context) {

this.counters = new HashMap();

this.name = context.getThisComponentId();

this.id = context.getThisTaskId();

}

@Override

public void execute(Tuple input, BasicOutputCollector collector) {

String str = input.getString(0);

System.out.println("wordCounter recever "+str);

if(!counters.containsKey(str)){

counters.put(str,1);

}else{

Integer c = counters.get(str) + 1;

counters.put(str,c);

}

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}

}

4,main方法编写:

package com.storm.test;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;

public class ToplogyTest {

public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("word-reader",new WordReader());

builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader");

builder.setBolt("word-count",new WordCount(),1).fieldsGrouping("word-normalizer",new Fields("word"));

Config conf = new Config();

conf.setDebug(true);

conf.setNumWorkers(2);

//word.txt:hello,world,hello,storm,hello,spark,hadoop,hadoop

conf.put("wordFile", "/home/lixun/word.txt");

//conf.setDebug(true);

conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("wordCounterTopology",conf,builder.createTopology());

Thread.sleep(4000);

cluster.killTopology("wordCounterTopology");

cluster.shutdown();

}

}

java strom实例_strom wordcount java 实现案例相关推荐

  1. 【Java 代码实例 13】Java操作pdf的工具类itext

    目录 一.什么是iText? 二.引入jar 1.项目要使用iText,必须引入jar包 2.输出中文,还要引入下面```itext-asian.jar```包 3.设置pdf文件密码,还要引入下面` ...

  2. java反射实例_关于java反射的一个案例

    案例: *需求:"写一个框架",可以帮我们创建任意类的对象,并且执行其中任意的方法. *实现: 1.配置文件 2.反射 *步骤: 1.将需要创建的对象的全类名和需要执行的方法定义在 ...

  3. java 经典 实例_5个JAVA入门必看的经典实例

    入门必看的5个java经典实例,供大家参考,具体内容如下 1.一个饲养员给动物喂食物的例子体现java中的面向对象思想,接口(抽象类)的用处 package com.softeem.demo; /** ...

  4. java 数据结构实例_数据结构(Java)——栈的实例

    惟大英雄能本色,是真名士自风流 --易中天(百家讲坛) 1.表达式的转换 1.1 中缀表达式转前缀表达式 中缀表达式转前缀表达式有许多的方式,有加括号去除法.语法树遍历法.堆栈处理法1. 测试程序的实 ...

  5. java爬虫实例_关于java爬虫以及一些实例

    首先是工具介绍 Jsoup jsoup 是一款Java 的HTML解析器,可直接解析某个URL地址.HTML文本内容.它提供了一套非常省力的API,可通过DOM,CSS以及类似于jQuery的操作方法 ...

  6. java httppost 实例_实战演练java 调用http接口 post 例子

    发起请求: import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.IOException; ...

  7. 事无巨细说Java之---Java 程序 | Java 编程实例--1)Java中的斐波那契数列--Fibonacci series

    在斐波那契数列中,下一个数是前两个数的和,例如 0.1.1.2.3.5.8.13.21.34.55 等.斐波那契数列的前两个数是 0 和 1. java中写斐波那契数列程序有两种方式: 不使用递归的斐 ...

  8. 【Java 代码实例 14】BeanUtils用法详解,附源码分析

    目录 一.org.apache.commons.beanutils.BeanUtils简介 二.使用的前置条件 三.添加pom 四.org.apache.commons.beanutils.BeanU ...

  9. Java程序应用实例:“你好 Java”

    编写Java源程序 Java 源程序可以使用任何一个文本编辑器来编写,这里以 Windows 下的记事本为例. 例 1 (1) 新建一个空白记事本,然后如实地输入下列内容. /* *第一个java程序 ...

  10. 【Stream流学习】Java 8 新特性|Collectors.joining() 案例详解

    [辰兮要努力]:hello你好我是辰兮,很高兴你能来阅读,昵称是希望自己能不断精进,向着优秀程序员前行! 博客来源于项目以及编程中遇到的问题总结,偶尔会有读书分享,我会陆续更新Java前端.后台.数据 ...

最新文章

  1. arcgis 投影变换与坐标转换研究
  2. R语言基于forestplot包可视化森林图实战详解:美化的森林图:自定义字体设置、置信区间、坐标轴(刻度、标签、范围)、无效线去除、水平线、辅助线、box形状、色彩等
  3. [存储过程]中的事务(rollback)回滚
  4. 计算机用于数据管理经历了,管理系统中计算机应用--期中测验答案
  5. 异构系统间Web Service通讯框架小结(补完企划)
  6. Newcoder lxh裁木棍 (不开long double见祖宗 ceil前不加long long也去
  7. 批量同时创建邮箱和AD账户
  8. mysql语句占位符_sql语句中的占位符?有什么作用
  9. 三菱plc 与 计算机 通讯,三菱FX系列PLC和PC的通讯联接
  10. unity开发xbox手柄 驱动坑
  11. 【Linux】ps -ef 和ps aux 有什么不同呢?
  12. SQL优化中索引列使用函数之灵异事件
  13. spring事务管理配置
  14. kubectl cp命令
  15. backupexec mysql_backup-mysql.sh
  16. I/O无线控制器(综科)-4G通讯直接上云平台(阿里云)
  17. hpe 服务器 稳定性6,低功耗易管理 惠普DL380 G6服务器评测
  18. 扫描网段找出树莓派IP
  19. Leetcode——回旋镖
  20. 欧姆龙 PLC CP1E 与电子称重仪表“柯力XK3101”Modbus RTU通信,稍微更改下Modbus通信地址可以跟其他Modbus设备进行通信

热门文章

  1. li怎么让文字在图片下面_div+css(ul li)实现图片上文字下列表布局
  2. Windows视频桌面壁纸实现(libvlc)(类似于wall paper engine效果)
  3. 虚无缥缈的代码到底是如何控制硬件工作的?
  4. 超越YOLOv4-tiny!YOLObile:移动设备上的实时目标检测 [左侧有码]
  5. 团队管理之—— 定目标:让你的方向与公司的方向保持一致
  6. Pure Strategy Game
  7. 医疗对话摘要论文阅读笔记
  8. Cisco Aironet WLAN系列AP的瘦胖模式转换
  9. 再谈微软复兴,纳德拉与库克、马斯克、皮查伊在管理上有什么不同
  10. 【Unity2D入门教程氵篇】简单制作一个弹珠游戏之制作场景④(设置不可破坏砖块,发布游戏设置)