一、关联代码

使用maven,代码如下。

pom.xml  参考 http://www.cnblogs.com/hd3013779515/p/6970551.html

MessageTopology.java

package cn.ljh.storm.reliability;import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;public class MessageTopology {public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("MessageSpout", new MessageSpout(), 1);builder.setBolt("SpilterBolt", new SpliterBolt(), 5).shuffleGrouping("MessageSpout");builder.setBolt("WriterBolt", new WriterBolt(), 1).shuffleGrouping("SpilterBolt");Config conf = new Config();conf.setDebug(false);LocalCluster cluster = new LocalCluster();cluster.submitTopology("messagetest", conf, builder.createTopology());Utils.sleep(20000);cluster.killTopology("messagetest");cluster.shutdown();}
}

MessageSpou.java

package cn.ljh.storm.reliability;import org.apache.storm.topology.OutputFieldsDeclarer;import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MessageSpout extends BaseRichSpout {public static Logger LOG = LoggerFactory.getLogger(MessageSpout.class);private SpoutOutputCollector _collector;private int index = 0;private String[] subjects = new String[]{"Java,Python","Storm,Kafka","Spring,Solr","Zookeeper,FastDFS","Dubbox,Redis"};public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;}public void nextTuple() {if(index < subjects.length){String sub = subjects[index];//使用messageid参数,使可靠性机制生效_collector.emit(new Values(sub), index);index++;}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("subjects"));}@Overridepublic void ack(Object msgId) {LOG.info("【消息发送成功!】(msgId = " + msgId + ")");}@Overridepublic void fail(Object msgId) {LOG.info("【消息发送失败!】(msgId = " + msgId + ")");LOG.info("【重发进行中。。。】");_collector.emit(new Values(subjects[(Integer)msgId]), msgId);LOG.info("【重发成功!】");}}

SpliterBolt.java

package cn.ljh.storm.reliability;import java.util.Map;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;public class SpliterBolt extends BaseRichBolt {OutputCollector _collector;private boolean flag = false;public void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector = collector;}public void execute(Tuple tuple) {try{String subjects = tuple.getStringByField("subjects");//            if(!flag && subjects.equals("Spring,Solr")){
//                flag = true;
//                int a = 1/0;
//            }
            String[] words = subjects.split(",");for(String word : words){//注意:要携带tuple对象,用于处理异常时重发策略。_collector.emit(tuple, new Values(word));}//对tuple进行ack
            _collector.ack(tuple);}catch(Exception ex){ex.printStackTrace();//对tuple进行fail,使重发。
            _collector.fail(tuple);}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

WriterBolt.java

package cn.ljh.storm.reliability;import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class WriterBolt extends BaseRichBolt {private static Logger LOG = LoggerFactory.getLogger(WriterBolt.class);OutputCollector _collector;private FileWriter fileWriter;private boolean flag = false;public void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector = collector;if(fileWriter == null){try {fileWriter = new FileWriter("D:\\test\\"+"words.txt");} catch (IOException e) {e.printStackTrace();}}}public void execute(Tuple tuple) {try {String word = tuple.getStringByField("word");//                if(!flag && word.equals("Kafka")){
//                    flag = true;
//                    int a = 1/0;
//                }fileWriter.write(word + "\r\n");fileWriter.flush();} catch (Exception e) {e.printStackTrace();//对tuple进行fail,使重发。
                _collector.fail(tuple);}//对tuple进行ack
            _collector.ack(tuple);}public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

二、执行效果

1、代码要点说明

MessageSpout.java

(1)发射tuple时要设置messageId来使可靠性机制生效

_collector.emit(new Values(sub), index);

(2)重写ack和fail方法

@Overridepublic void ack(Object msgId) {LOG.info("【消息发送成功!】(msgId = " + msgId + ")");}@Overridepublic void fail(Object msgId) {LOG.info("【消息发送失败!】(msgId = " + msgId + ")");LOG.info("【重发进行中。。。】");_collector.emit(new Values(subjects[(Integer)msgId]), msgId);LOG.info("【重发成功!】");}

SpliterBolt.java

(1)发射新tuple时设置输入tuple参数,以使新tuple和输入tuple为一个整体

_collector.emit(tuple, new Values(word));

(2)完成处理后进行ack,失败时进行fail

_collector.ack(tuple);_collector.fail(tuple);

WriterBolt.java

(1)完成处理后进行ack,失败时进行fail

_collector.ack(tuple);_collector.fail(tuple);

2、正常处理结果

3、放开SpliterBolt 的错误代码

结果显示能够正确的重发。

4、放开SpliterBolt 的错误代码

能够正确进行重发,但是文件中storm字符串出现了两次。

5、总结

通过以上测试,如果在第一个bolt处理时出现异常,可以让整个数据进行重发,如果第二个bolt处理时出现异常,也可以让整个数据进行重发,但是同时出现了重复处理的事务性问题,需要进行特殊的处理。

(1)如果数据入库的话,可以把messageId也进行入库保存。此messageId可以用来判断是否重复处理。

(2)事务性tuple尽量不要拆分。

(3)使用storm的Trident框架。

转载于:https://www.cnblogs.com/hd3013779515/p/6972525.html

Storm入门(七)可靠性机制代码示例相关推荐

  1. Storm入门(三)HelloWorld示例

    一.配置开发环境 storm有两种操作模式: 本地模式和远程模式.使用本地模式的时候,你可以在你的本地机器上开发测试你的topology, 一切都在你的本地机器上模拟出来; 用远程模式的时候你提交的t ...

  2. 【机器学习基础】乡村F4带你入门线性回归 带python代码示例(一)

    标注: 1.概率密度没有学好,所以涉及到密度函数部分会有不少地方感觉非常难懂吃力 如何理解回归? 个人理解:回归是指 量化 {变量} 之间"关系" 线性回归的英文regressio ...

  3. 《C#零基础入门之百识百例》(九十一)预处理器指令 -- 代码示例

    C#零基础入门 预处理器指令 -- 代码示例 前言 一,预处理器指令 1.1 概念 1.2 常用预处理指令 1.3 语法规则 二,声明指令 2.1 声明指令概念 2.2 代码示例 三,条件编译指令 3 ...

  4. python入门代码示例-总算知道python入门代码示例

    Python是一种解释型.面向对象.动态数据类型的高级程序设计语言.作为今年来越来越流行的语言,我们该如何学习或者转行学习Python呢,这里小迹为大家介绍如何入门学习Python.以下是小编为你整理 ...

  5. 中文代码示例之Vuejs入门教程(一)

    为了检验中文命名在主流框架中的支持程度, 在vuejs官方入门教程第一部分的示例代码中尽量使用了中文命名. 所有演示都在本地测试通过, 源码在这里. 下面省略了很多原教程的说明内容, 而着重于代码示例 ...

  6. BIO,Socket网络编程入门代码示例,NIO网络编程入门代码示例,AIO 网络编程

    BIO,Socket网络编程入门代码示例 1.BIO服务器端程序 package cn.itcast.bio;import java.io.InputStream; import java.io.Ou ...

  7. python代码示例-总算知道python入门代码示例

    Python是一种解释型.面向对象.动态数据类型的高级程序设计语言.作为今年来越来越流行的语言,我们该如何学习或者转行学习Python呢,这里小迹为大家介绍如何入门学习Python.以下是小编为你整理 ...

  8. 【Vue】Vue入门 -(本地篇+网络篇)代码示例及运行效果

    一.Vue 可参考:Vue官方文档 Vue 将 DOM 元素看作是对象,可以将元素与Vue实例绑定,实现通过操作数据改变元素内容.不需要用 jQuery 手写各种各样的选择器了. 二.本地篇:Vue基 ...

  9. RabbitMq(七) Topic模式介绍及代码示例

    概述: 在上一文章中我们介绍了路由模式(Routing),routing模式是不同的消息队列绑定了不同的路由key,但是我们看出路由key为固定的字符串标记.而本章中的Topic模式则为在路由模式下, ...

最新文章

  1. Spring中@Value用法收集
  2. mysql子查询复杂操作_MySQL 子查询操作
  3. EditPlus 中添加 Win32 ASM 语法支持
  4. 化工设备与反应器 第二章 直杆的拉伸与压缩
  5. 当AI成为基础资源,360OS 发力在线教育的重心——专访360OS张焰
  6. 防止mysql拷贝_转载:mysql复制优化
  7. G++编译Note Pad++
  8. linux服务器,ping没问题,http请求经常超时、时好时坏的解决办法
  9. rails3 Route用法
  10. matlabadftest_adf检验matlab程序
  11. GreenSock Animation Platform
  12. Java包装类相关知识点
  13. MySQL练习题(4)
  14. Win-TortoiseGit-使用之-合并代码
  15. 0x0000007B:A problem has been detected and windows has been shut down to prevent damage to your Comp
  16. 删除Word文档空白页的方法,日常必备!
  17. html5/css实现字体上划线
  18. 今晚 8 点,开发者赏金计划正式开启
  19. 深度长文:智能手机的社会学研究
  20. MATLAB筛选数据

热门文章

  1. js 快速集成开发:easyui 时间控件格式化
  2. ZOJ3944People Counting暴力/枚举
  3. cocos2dx-lua 笔记 方向控制 v2
  4. 2013下半年(11月)信息系统项目管理师考试题型分析(综合知识、案例分析、论文)...
  5. android preferenceActivity的用法
  6. SQL SERVER数据库开发之触发器的应用
  7. 计算机网络---计算机网络分层结构
  8. 想辞职专心做自媒体可以吗?有哪些建议吗?
  9. 迈腾车能进2.10米宽的车库吗?
  10. 品牌直播启动的三个关键点