java strom实例_strom wordcount java 实现案例
1,spout编写,读取文件内容:
package com.storm.test;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReader extends BaseRichSpout{
private SpoutOutputCollector collector;
private FileReader fileReader;
private String filePath;
private boolean completed = false;
@Override
public void ack(Object msgId) {
System.out.println("msgId === "+msgId);
}
@Override
public void close() {
}
@Override
public void fail(Object msgId) {
System.out.println("fail === "+msgId);
}
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.fileReader = new FileReader(conf.get("wordFile").toString());
} catch (Exception e) {
e.printStackTrace();
}
this.filePath = conf.get("wordFile").toString();
this.collector = collector;
}
@Override
public void nextTuple() {
if(completed){
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
return ;
}
String str;
BufferedReader reader = new BufferedReader(fileReader);
try {
while ((str = reader.readLine()) != null){
System.out.println("read line = "+str);
this.collector.emit(new Values(str),str);
System.out.println("WordReader spout = "+str);
}
} catch (Exception e) {
e.printStackTrace();
}finally{
completed = true;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
2,Bolt编写实现句子分割:
package com.storm.test;
import java.util.Map;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer extends BaseBasicBolt{
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String sentence = input.getString(0);
String[] words = sentence.split(",");
System.out.println("reader line = "+sentence);
for(String word : words){
if(!word.trim().isEmpty()){
collector.emit(new Values(word.trim().toUpperCase()));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
3,bolt编写是单词统计:
package com.storm.test;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
public class WordCount extends BaseBasicBolt{
Integer id;
String name;
Map counters ;
@Override
public void cleanup() {
System.out.println("word counter :["+name+"-"+id);
for(Map.Entry entry : counters.entrySet()){
System.out.println(entry.getKey()+":"+entry.getValue());
}
System.out.println("counter finish!");
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
this.counters = new HashMap();
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String str = input.getString(0);
System.out.println("wordCounter recever "+str);
if(!counters.containsKey(str)){
counters.put(str,1);
}else{
Integer c = counters.get(str) + 1;
counters.put(str,c);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
4,main方法编写:
package com.storm.test;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class ToplogyTest {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-count",new WordCount(),1).fieldsGrouping("word-normalizer",new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
//word.txt:hello,world,hello,storm,hello,spark,hadoop,hadoop
conf.put("wordFile", "/home/lixun/word.txt");
//conf.setDebug(true);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCounterTopology",conf,builder.createTopology());
Thread.sleep(4000);
cluster.killTopology("wordCounterTopology");
cluster.shutdown();
}
}
java strom实例_strom wordcount java 实现案例相关推荐
- 【Java 代码实例 13】Java操作pdf的工具类itext
目录 一.什么是iText? 二.引入jar 1.项目要使用iText,必须引入jar包 2.输出中文,还要引入下面```itext-asian.jar```包 3.设置pdf文件密码,还要引入下面` ...
- java反射实例_关于java反射的一个案例
案例: *需求:"写一个框架",可以帮我们创建任意类的对象,并且执行其中任意的方法. *实现: 1.配置文件 2.反射 *步骤: 1.将需要创建的对象的全类名和需要执行的方法定义在 ...
- java 经典 实例_5个JAVA入门必看的经典实例
入门必看的5个java经典实例,供大家参考,具体内容如下 1.一个饲养员给动物喂食物的例子体现java中的面向对象思想,接口(抽象类)的用处 package com.softeem.demo; /** ...
- java 数据结构实例_数据结构(Java)——栈的实例
惟大英雄能本色,是真名士自风流 --易中天(百家讲坛) 1.表达式的转换 1.1 中缀表达式转前缀表达式 中缀表达式转前缀表达式有许多的方式,有加括号去除法.语法树遍历法.堆栈处理法1. 测试程序的实 ...
- java爬虫实例_关于java爬虫以及一些实例
首先是工具介绍 Jsoup jsoup 是一款Java 的HTML解析器,可直接解析某个URL地址.HTML文本内容.它提供了一套非常省力的API,可通过DOM,CSS以及类似于jQuery的操作方法 ...
- java httppost 实例_实战演练java 调用http接口 post 例子
发起请求: import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.IOException; ...
- 事无巨细说Java之---Java 程序 | Java 编程实例--1)Java中的斐波那契数列--Fibonacci series
在斐波那契数列中,下一个数是前两个数的和,例如 0.1.1.2.3.5.8.13.21.34.55 等.斐波那契数列的前两个数是 0 和 1. java中写斐波那契数列程序有两种方式: 不使用递归的斐 ...
- 【Java 代码实例 14】BeanUtils用法详解,附源码分析
目录 一.org.apache.commons.beanutils.BeanUtils简介 二.使用的前置条件 三.添加pom 四.org.apache.commons.beanutils.BeanU ...
- Java程序应用实例:“你好 Java”
编写Java源程序 Java 源程序可以使用任何一个文本编辑器来编写,这里以 Windows 下的记事本为例. 例 1 (1) 新建一个空白记事本,然后如实地输入下列内容. /* *第一个java程序 ...
- 【Stream流学习】Java 8 新特性|Collectors.joining() 案例详解
[辰兮要努力]:hello你好我是辰兮,很高兴你能来阅读,昵称是希望自己能不断精进,向着优秀程序员前行! 博客来源于项目以及编程中遇到的问题总结,偶尔会有读书分享,我会陆续更新Java前端.后台.数据 ...
最新文章
- arcgis 投影变换与坐标转换研究
- R语言基于forestplot包可视化森林图实战详解:美化的森林图:自定义字体设置、置信区间、坐标轴(刻度、标签、范围)、无效线去除、水平线、辅助线、box形状、色彩等
- [存储过程]中的事务(rollback)回滚
- 计算机用于数据管理经历了,管理系统中计算机应用--期中测验答案
- 异构系统间Web Service通讯框架小结(补完企划)
- Newcoder lxh裁木棍 (不开long double见祖宗 ceil前不加long long也去
- 批量同时创建邮箱和AD账户
- mysql语句占位符_sql语句中的占位符?有什么作用
- 三菱plc 与 计算机 通讯,三菱FX系列PLC和PC的通讯联接
- unity开发xbox手柄 驱动坑
- 【Linux】ps -ef 和ps aux 有什么不同呢?
- SQL优化中索引列使用函数之灵异事件
- spring事务管理配置
- kubectl cp命令
- backupexec mysql_backup-mysql.sh
- I/O无线控制器(综科)-4G通讯直接上云平台(阿里云)
- hpe 服务器 稳定性6,低功耗易管理 惠普DL380 G6服务器评测
- 扫描网段找出树莓派IP
- Leetcode——回旋镖
- 欧姆龙 PLC CP1E 与电子称重仪表“柯力XK3101”Modbus RTU通信,稍微更改下Modbus通信地址可以跟其他Modbus设备进行通信
热门文章
- li怎么让文字在图片下面_div+css(ul li)实现图片上文字下列表布局
- Windows视频桌面壁纸实现(libvlc)(类似于wall paper engine效果)
- 虚无缥缈的代码到底是如何控制硬件工作的?
- 超越YOLOv4-tiny!YOLObile:移动设备上的实时目标检测 [左侧有码]
- 团队管理之—— 定目标:让你的方向与公司的方向保持一致
- Pure Strategy Game
- 医疗对话摘要论文阅读笔记
- Cisco Aironet WLAN系列AP的瘦胖模式转换
- 再谈微软复兴,纳德拉与库克、马斯克、皮查伊在管理上有什么不同
- 【Unity2D入门教程氵篇】简单制作一个弹珠游戏之制作场景④(设置不可破坏砖块,发布游戏设置)