Storm入门(七)可靠性机制代码示例
一、关联代码
使用maven,代码如下。
pom.xml 参考 http://www.cnblogs.com/hd3013779515/p/6970551.html
MessageTopology.java
package cn.ljh.storm.reliability;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils;public class MessageTopology {public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("MessageSpout", new MessageSpout(), 1);builder.setBolt("SpilterBolt", new SpliterBolt(), 5).shuffleGrouping("MessageSpout");builder.setBolt("WriterBolt", new WriterBolt(), 1).shuffleGrouping("SpilterBolt");Config conf = new Config();conf.setDebug(false);LocalCluster cluster = new LocalCluster();cluster.submitTopology("messagetest", conf, builder.createTopology());Utils.sleep(20000);cluster.killTopology("messagetest");cluster.shutdown();} }
MessageSpou.java
package cn.ljh.storm.reliability;import org.apache.storm.topology.OutputFieldsDeclarer;import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class MessageSpout extends BaseRichSpout {public static Logger LOG = LoggerFactory.getLogger(MessageSpout.class);private SpoutOutputCollector _collector;private int index = 0;private String[] subjects = new String[]{"Java,Python","Storm,Kafka","Spring,Solr","Zookeeper,FastDFS","Dubbox,Redis"};public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;}public void nextTuple() {if(index < subjects.length){String sub = subjects[index];//使用messageid参数,使可靠性机制生效_collector.emit(new Values(sub), index);index++;}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("subjects"));}@Overridepublic void ack(Object msgId) {LOG.info("【消息发送成功!】(msgId = " + msgId + ")");}@Overridepublic void fail(Object msgId) {LOG.info("【消息发送失败!】(msgId = " + msgId + ")");LOG.info("【重发进行中。。。】");_collector.emit(new Values(subjects[(Integer)msgId]), msgId);LOG.info("【重发成功!】");}}
SpliterBolt.java
package cn.ljh.storm.reliability;import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;public class SpliterBolt extends BaseRichBolt {OutputCollector _collector;private boolean flag = false;public void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector = collector;}public void execute(Tuple tuple) {try{String subjects = tuple.getStringByField("subjects");// if(!flag && subjects.equals("Spring,Solr")){ // flag = true; // int a = 1/0; // } String[] words = subjects.split(",");for(String word : words){//注意:要携带tuple对象,用于处理异常时重发策略。_collector.emit(tuple, new Values(word));}//对tuple进行ack _collector.ack(tuple);}catch(Exception ex){ex.printStackTrace();//对tuple进行fail,使重发。 _collector.fail(tuple);}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}
WriterBolt.java
package cn.ljh.storm.reliability;import java.io.FileWriter; import java.io.IOException; import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class WriterBolt extends BaseRichBolt {private static Logger LOG = LoggerFactory.getLogger(WriterBolt.class);OutputCollector _collector;private FileWriter fileWriter;private boolean flag = false;public void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector = collector;if(fileWriter == null){try {fileWriter = new FileWriter("D:\\test\\"+"words.txt");} catch (IOException e) {e.printStackTrace();}}}public void execute(Tuple tuple) {try {String word = tuple.getStringByField("word");// if(!flag && word.equals("Kafka")){ // flag = true; // int a = 1/0; // }fileWriter.write(word + "\r\n");fileWriter.flush();} catch (Exception e) {e.printStackTrace();//对tuple进行fail,使重发。 _collector.fail(tuple);}//对tuple进行ack _collector.ack(tuple);}public void declareOutputFields(OutputFieldsDeclarer declarer) {} }
二、执行效果
1、代码要点说明
MessageSpout.java
(1)发射tuple时要设置messageId来使可靠性机制生效
_collector.emit(new Values(sub), index);
(2)重写ack和fail方法
@Overridepublic void ack(Object msgId) {LOG.info("【消息发送成功!】(msgId = " + msgId + ")");}@Overridepublic void fail(Object msgId) {LOG.info("【消息发送失败!】(msgId = " + msgId + ")");LOG.info("【重发进行中。。。】");_collector.emit(new Values(subjects[(Integer)msgId]), msgId);LOG.info("【重发成功!】");}
SpliterBolt.java
(1)发射新tuple时设置输入tuple参数,以使新tuple和输入tuple为一个整体
_collector.emit(tuple, new Values(word));
(2)完成处理后进行ack,失败时进行fail
_collector.ack(tuple);_collector.fail(tuple);
WriterBolt.java
(1)完成处理后进行ack,失败时进行fail
_collector.ack(tuple);_collector.fail(tuple);
2、正常处理结果
3、放开SpliterBolt 的错误代码
结果显示能够正确的重发。
4、放开SpliterBolt 的错误代码
能够正确进行重发,但是文件中storm字符串出现了两次。
5、总结
通过以上测试,如果在第一个bolt处理时出现异常,可以让整个数据进行重发,如果第二个bolt处理时出现异常,也可以让整个数据进行重发,但是同时出现了重复处理的事务性问题,需要进行特殊的处理。
(1)如果数据入库的话,可以把messageId也进行入库保存。此messageId可以用来判断是否重复处理。
(2)事务性tuple尽量不要拆分。
(3)使用storm的Trident框架。
转载于:https://www.cnblogs.com/hd3013779515/p/6972525.html
Storm入门(七)可靠性机制代码示例相关推荐
- Storm入门(三)HelloWorld示例
一.配置开发环境 storm有两种操作模式: 本地模式和远程模式.使用本地模式的时候,你可以在你的本地机器上开发测试你的topology, 一切都在你的本地机器上模拟出来; 用远程模式的时候你提交的t ...
- 【机器学习基础】乡村F4带你入门线性回归 带python代码示例(一)
标注: 1.概率密度没有学好,所以涉及到密度函数部分会有不少地方感觉非常难懂吃力 如何理解回归? 个人理解:回归是指 量化 {变量} 之间"关系" 线性回归的英文regressio ...
- 《C#零基础入门之百识百例》(九十一)预处理器指令 -- 代码示例
C#零基础入门 预处理器指令 -- 代码示例 前言 一,预处理器指令 1.1 概念 1.2 常用预处理指令 1.3 语法规则 二,声明指令 2.1 声明指令概念 2.2 代码示例 三,条件编译指令 3 ...
- python入门代码示例-总算知道python入门代码示例
Python是一种解释型.面向对象.动态数据类型的高级程序设计语言.作为今年来越来越流行的语言,我们该如何学习或者转行学习Python呢,这里小迹为大家介绍如何入门学习Python.以下是小编为你整理 ...
- 中文代码示例之Vuejs入门教程(一)
为了检验中文命名在主流框架中的支持程度, 在vuejs官方入门教程第一部分的示例代码中尽量使用了中文命名. 所有演示都在本地测试通过, 源码在这里. 下面省略了很多原教程的说明内容, 而着重于代码示例 ...
- BIO,Socket网络编程入门代码示例,NIO网络编程入门代码示例,AIO 网络编程
BIO,Socket网络编程入门代码示例 1.BIO服务器端程序 package cn.itcast.bio;import java.io.InputStream; import java.io.Ou ...
- python代码示例-总算知道python入门代码示例
Python是一种解释型.面向对象.动态数据类型的高级程序设计语言.作为今年来越来越流行的语言,我们该如何学习或者转行学习Python呢,这里小迹为大家介绍如何入门学习Python.以下是小编为你整理 ...
- 【Vue】Vue入门 -(本地篇+网络篇)代码示例及运行效果
一.Vue 可参考:Vue官方文档 Vue 将 DOM 元素看作是对象,可以将元素与Vue实例绑定,实现通过操作数据改变元素内容.不需要用 jQuery 手写各种各样的选择器了. 二.本地篇:Vue基 ...
- RabbitMq(七) Topic模式介绍及代码示例
概述: 在上一文章中我们介绍了路由模式(Routing),routing模式是不同的消息队列绑定了不同的路由key,但是我们看出路由key为固定的字符串标记.而本章中的Topic模式则为在路由模式下, ...
最新文章
- Spring中@Value用法收集
- mysql子查询复杂操作_MySQL 子查询操作
- EditPlus 中添加 Win32 ASM 语法支持
- 化工设备与反应器 第二章 直杆的拉伸与压缩
- 当AI成为基础资源,360OS 发力在线教育的重心——专访360OS张焰
- 防止mysql拷贝_转载:mysql复制优化
- G++编译Note Pad++
- linux服务器,ping没问题,http请求经常超时、时好时坏的解决办法
- rails3 Route用法
- matlabadftest_adf检验matlab程序
- GreenSock Animation Platform
- Java包装类相关知识点
- MySQL练习题(4)
- Win-TortoiseGit-使用之-合并代码
- 0x0000007B:A problem has been detected and windows has been shut down to prevent damage to your Comp
- 删除Word文档空白页的方法,日常必备!
- html5/css实现字体上划线
- 今晚 8 点,开发者赏金计划正式开启
- 深度长文:智能手机的社会学研究
- MATLAB筛选数据