Storm的acker消息确认机制...

ack/fail消息确认机制(确保一个tuple被完全处理)

在spout中发射tuple的时候需要同时发送messageid,这样才相当于开启了消息确认机制

如果你的topology里面的tuple比较多的话, 那么把acker的数量设置多一点,效率会高一点。

通过config.setNumAckers(num)来设置一个topology里面的acker的数量,默认值是1。

注意: acker用了特殊的算法,使得对于追踪每个spout tuple的状态所需要的内存量是恒定的(20 bytes)

注意:如果一个tuple在指定的timeout(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS默认值为30秒)时间内没有被成功处理,那么这个tuple会被认为处理失败了。

下面代码中Bolt的execute中模拟消息的正常和失败.

  1 import java.util.Map;
  2
  3 import backtype.storm.Config;
  4 import backtype.storm.LocalCluster;
  5 import backtype.storm.spout.SpoutOutputCollector;
  6 import backtype.storm.task.OutputCollector;
  7 import backtype.storm.task.TopologyContext;
  8 import backtype.storm.topology.OutputFieldsDeclarer;
  9 import backtype.storm.topology.TopologyBuilder;
 10 import backtype.storm.topology.base.BaseRichBolt;
 11 import backtype.storm.topology.base.BaseRichSpout;
 12 import backtype.storm.tuple.Fields;
 13 import backtype.storm.tuple.Tuple;
 14 import backtype.storm.tuple.Values;
 15 import backtype.storm.utils.Utils;
 16
 17 /**
 18  * 数字累加求和
 19  * 先添加storm依赖
 20  *
 21  * @author Administrator
 22  *
 23  */
 24 public class LocalTopologySumAcker {
 25
 26
 27     /**
 28      * spout需要继承baserichspout,实现未实现的方法
 29      * @author Administrator
 30      *
 31      */
 32     public static class MySpout extends BaseRichSpout{
 33         private Map conf;
 34         private TopologyContext context;
 35         private SpoutOutputCollector collector;
 36
 37         /**
 38          * 初始化方法,只会执行一次
 39          * 在这里面可以写一个初始化的代码
 40          * Map conf:其实里面保存的是topology的一些配置信息
 41          * TopologyContext context:topology的上下文,类似于servletcontext
 42          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
 43          */
 44         @Override
 45         public void open(Map conf, TopologyContext context,
 46                 SpoutOutputCollector collector) {
 47             this.conf = conf;
 48             this.context = context;
 49             this.collector = collector;
 50         }
 51
 52         int num = 1;
 53         /**
 54          * 这个方法是spout中最重要的方法,
 55          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
 56          * 每调用一次,会向外发射一条数据
 57          */
 58         @Override
 59         public void nextTuple() {
 60             System.out.println("spout发射:"+num);
 61             //把数据封装到values中,称为一个tuple,发射出去
 62             //messageid:和tuple需要是一一对应的,可以把messageid认为是数据的主键id,而tuple中的内容就是这个数据.
 63             //messageid和tuple中的消息是一一对应的. 它们之间的关系是需要我们程序员来维护的.
 64             //this.collector.emit(new Values(num++));
 65             this.collector.emit(new Values(num++),num-1);//传递messageid(num-1)参数就表示开启了消息确认机制.
 66             Utils.sleep(1000);
 67         }
 68
 69         @Override
 70         public void ack(Object msgId) {
 71             System.out.println("处理成功");
 72         }
 73
 74         @Override
 75         public void fail(Object msgId) {
 76             System.out.println("处理失败....."+msgId);
 77             //TODO--可以选择把失败的数据重发,或者单独存储后期进行分析
 78             //重发的方法...this.collector.emit(tuple);//这个tuple可以根据参数msgId来获得...
 79         }
 80
 81         /**
 82          * 声明输出字段
 83          */
 84         @Override
 85         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 86             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
 87             //fields中定义的参数和values中传递的数值是一一对应的
 88             declarer.declare(new Fields("num"));
 89         }
 90
 91     }
 92
 93
 94     /**
 95      * 自定义bolt需要实现baserichbolt
 96      * @author Administrator
 97      *
 98      */
 99     public static class MyBolt extends BaseRichBolt{
100         private Map stormConf;
101         private TopologyContext context;
102         private OutputCollector collector;
103
104         /**
105          * 和spout中的open方法意义一样
106          */
107         @Override
108         public void prepare(Map stormConf, TopologyContext context,
109                 OutputCollector collector) {
110             this.stormConf = stormConf;
111             this.context = context;
112             this.collector = collector;
113         }
114
115         int sum = 0;
116         /**
117          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
118          */
119         @Override
120         public void execute(Tuple input) {
121             try{
122                 //input.getInteger(0);//也可以根据角标获取tuple中的数据
123                 Integer value = input.getIntegerByField("num");
124                 if(value == 3){
125                     throw new Exception("value=3异常.....");
126                 }
127                 sum+=value;
128                 System.out.println("和:"+sum);
129                 this.collector.ack(input);//这个表示确认消息处理成功,spout中的ack方法会被调用
130             }catch(Exception e) {
131                 this.collector.fail(input);//这个表示确认消息处理失败,spout中的fail方法会被调用
132                 e.printStackTrace();
133             }
134         }
135
136         /**
137          * 声明输出字段
138          */
139         @Override
140         public void declareOutputFields(OutputFieldsDeclarer declarer) {
141             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
142             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
143         }
144
145     }
146     /**
147      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
148      * @param args
149      */
150     public static void main(String[] args) {
151         //组装topology
152         TopologyBuilder topologyBuilder = new TopologyBuilder();
153         topologyBuilder.setSpout("spout1", new MySpout());
154         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
155         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1");
156
157         //创建本地storm集群
158         LocalCluster localCluster = new LocalCluster();
159         Config config = new Config();
160         localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology());
161     }
162
163 }

运行结果:

从结果可以看到Bolt1执行execute成功了通过ack  调用Spout1中的ack方法....   失败了就通过fail 调用Spout1中的fail 方法 来达到对消息处理成功与否的追踪.

//=============================================================================================

上面的例子是一个Spout 和一个Bolt.....如果对应有1个Spout和2个Bolt 会是什么情况.....

改造上面的代码.....

  1 /**
  2  * 数字累加求和
  3  * 先添加storm依赖
  4  *
  5  * @author Administrator
  6  *
  7  */
  8 public class LocalTopologySumAcker2 {
  9
 10
 11     /**
 12      * spout需要继承baserichspout,实现未实现的方法
 13      * @author Administrator
 14      *
 15      */
 16     public static class MySpout extends BaseRichSpout{
 17         private Map conf;
 18         private TopologyContext context;
 19         private SpoutOutputCollector collector;
 20
 21         /**
 22          * 初始化方法,只会执行一次
 23          * 在这里面可以写一个初始化的代码
 24          * Map conf:其实里面保存的是topology的一些配置信息
 25          * TopologyContext context:topology的上下文,类似于servletcontext
 26          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
 27          */
 28         @Override
 29         public void open(Map conf, TopologyContext context,
 30                 SpoutOutputCollector collector) {
 31             this.conf = conf;
 32             this.context = context;
 33             this.collector = collector;
 34         }
 35
 36         int num = 1;
 37         /**
 38          * 这个方法是spout中最重要的方法,
 39          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
 40          * 每调用一次,会向外发射一条数据
 41          */
 42         @Override
 43         public void nextTuple() {
 44             System.out.println("spout发射:"+num);
 45             //把数据封装到values中,称为一个tuple,发射出去
 46             //messageid:和tuple需要是一一对应的,可以把messageid认为是数据的主键id,而tuple中的内容就是这个数据
 47             //messageid和tuple之间的关系是需要我们程序员维护的
 48             this.collector.emit(new Values(num++),num-1);//传递messageid参数就表示开启了消息确认机制
 49             Utils.sleep(1000);
 50         }
 51
 52         /**
 53          * 声明输出字段
 54          */
 55         @Override
 56         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 57             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
 58             //fields中定义的参数和values中传递的数值是一一对应的
 59             declarer.declare(new Fields("num"));
 60         }
 61
 62         @Override
 63         public void ack(Object msgId) {
 64             System.out.println("处理成功");
 65         }
 66
 67         @Override
 68         public void fail(Object msgId) {
 69             System.out.println("处理失败。。"+msgId);
 70             //TODO--可以选择吧失败的数据重发,或者单独存储后期分析
 71         }
 72     }
 73
 74
 75     /**
 76      * 自定义bolt需要实现baserichbolt
 77      * @author Administrator
 78      *
 79      */
 80     public static class MyBolt1 extends BaseRichBolt{
 81         private Map stormConf;
 82         private TopologyContext context;
 83         private OutputCollector collector;
 84
 85         /**
 86          * 和spout中的open方法意义一样
 87          */
 88         @Override
 89         public void prepare(Map stormConf, TopologyContext context,
 90                 OutputCollector collector) {
 91             this.stormConf = stormConf;
 92             this.context = context;
 93             this.collector = collector;
 94         }
 95
 96         int sum = 0;
 97         /**
 98          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
 99          */
100         @Override
101         public void execute(Tuple input) {
102             try {
103                 //input.getInteger(0);//也可以根据角标获取tuple中的数据
104                 Integer value = input.getIntegerByField("num");
105                 this.collector.emit(new Values(value+"_1"));
106                 //this.collector.emit(input,new Values(value+"_1"));//新的tuple是new Values(value+"_1")  老的tuple是input
107                 this.collector.ack(input);//确认数据处理成功,spout中的ack方法会被调用
108             } catch (Exception e) {
109                 this.collector.fail(input);//确认数据处理失败,spout中的fail方法会被调用
110                 e.printStackTrace();
111             }
112         }
113
114         /**
115          * 声明输出字段
116          */
117         @Override
118         public void declareOutputFields(OutputFieldsDeclarer declarer) {
119             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
120             //如果nextT|uple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
121             declarer.declare(new Fields("num_1"));
122         }
123
124     }
125
126     public static class MyBolt2 extends BaseRichBolt{
127         private Map stormConf;
128         private TopologyContext context;
129         private OutputCollector collector;
130
131         /**
132          * 和spout中的open方法意义一样
133          */
134         @Override
135         public void prepare(Map stormConf, TopologyContext context,
136                 OutputCollector collector) {
137             this.stormConf = stormConf;
138             this.context = context;
139             this.collector = collector;
140         }
141
142         int sum = 0;
143         /**
144          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
145          */
146         @Override
147         public void execute(Tuple input) {
148             try {
149                 //input.getInteger(0);//也可以根据角标获取tuple中的数据
150                 String value = input.getStringByField("num_1");
151                 System.out.println(value);
152                 this.collector.fail(input);//确认数据处理成功,spout中的ack方法会被调用
153                 //this.collector.ack(input);//确认数据处理成功,spout中的ack方法会被调用
154             } catch (Exception e) {
155                 //this.collector.fail(input);//确认数据处理失败,spout中的fail方法会被调用
156                 e.printStackTrace();
157             }
158         }
159
160         /**
161          * 声明输出字段
162          */
163         @Override
164         public void declareOutputFields(OutputFieldsDeclarer declarer) {
165             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
166             //如果nextT|uple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
167             declarer.declare(new Fields("num_1"));
168         }
169
170     }
171
172     /**
173      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
174      * @param args
175      */
176     public static void main(String[] args) {
177         //组装topology
178         TopologyBuilder topologyBuilder = new TopologyBuilder();
179         topologyBuilder.setSpout("spout1", new MySpout());
180         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
181         topologyBuilder.setBolt("bolt1", new MyBolt1()).shuffleGrouping("spout1");
182         topologyBuilder.setBolt("bolt2", new MyBolt2()).shuffleGrouping("bolt1");
183
184         //创建本地storm集群
185         LocalCluster localCluster = new LocalCluster();
186         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
187     }
188 }

上面的代码的大体意思是 Bolt1接收Spout1的输出,接收之后在数据后面加上"_1",然后发送给Bolt2,Bolt2接收到之后直接打印.

在spout2中的execute()方法不管成功还是失败 都调用   this.collector.fail(input);  方法....也就是Spout1发射的数据在Bolt1中处理都成功了,在Bolt2中的处理都失败了.

看Spout1中的哪个方法会被执行.....也就是Spout2中调用的ack或者是fail对tuple的处理状态结果是否有影响.

运行看结果:

可以看出都是成功的...这就说明tuple的处理状态和Bolt2中ack或者是fail是没有任何的关系的......只要Bolt1中处理tuple成功了,我们就认为是处理成功了...

如果Bolt1处理失败了就认为是处理失败了.. ...现在Bolt1中发射出去的tuple是无法追踪的.....

能不能在Bolt1发射的数据中也加上一个messageid...这个在Bolt中的   this.collector.emit(new Values(value+"_1"));  emit方法中是不支持传入一个messageid的.

但是这样有一种场景是有问题的.  单词计数的例子:

这个Spout后面有两个Bolt  一个SplitBolt 一个CountBolt    SplitBolt 切割成一个个的单词  然后再CountBolt中进行汇总....

按照上面在SplitBolt中切割成功了,就算处理成功了...但是有可能切割之后 在CountBolt中有一些Bolt没有收到. 这样最后其实是没有成功的...

而且SpiltBolt中处理的tuple和CountBolt中的tuple之间是有关联的. 后者是在前者之上切割出来的小tuple....

我们想达到两个Bolt都处理成功了才认为是处理成功的...如何做?

上面的代码中已经包括......这里再说明一下:

Spout1中 的   this.collector.emit(input,new Values(value+"_1"));   ---->    this.collector.emit(input,new Values(value+"_1"));//新的tuple是new Values(value+"_1")  老的tuple是input在Spout2中还是不管是否异常都调用..    this.collector.fail(input);

看运行结果:

运行都失败了........

这样就达到了上面的"完全处理"的要求....

完全处理:保证一个tuple以及这个tuple衍生的所有tuple都被成功处理.

在storm里面一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所衍生的所有的tuple都被成功处理。

如果把Bolt2的正常对应改为  this.collector.ack(input);  失败对应 this.collector.fail(input);就回复正常了.....

如果Spout2后面还有Spout3  同样把老的tuple在emit上带上.........

Storm的acker确认机制相关推荐

  1. Storm编程入门API系列之Storm的可靠性的ACK消息确认机制

    概念,见博客 Storm概念学习系列之storm的可靠性  什么业务场景需要storm可靠性的ACK确认机制? 答:想要保住数据不丢,或者保住数据总是被处理.即若没被处理的,得让我们知道. publi ...

  2. ack是什么,如何使用Ack机制,如何关闭Ack机制,基本实现,STORM的消息容错机制,Ack机制

    1.ack是什么 ack 机制是storm整个技术体系中非常闪亮的一个创新点. 通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作.比如在Me ...

  3. activemq 消息阻塞优化和消息确认机制优化

    一.消息阻塞优化 1.activemq消费者在从待消费队列中获取消息是会先进行预读取,默认是1000条(prefetch=1000).这样很容易造成消息积压. 2.可以通过设置prefetch的默认值 ...

  4. springboot + rabbitmq 用了消息确认机制,感觉掉坑里了

    最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人 ...

  5. RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)

    说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...

  6. RabbitMQ之消息确认机制(事务+Confirm)

    概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达brok ...

  7. TCP的ACK原理和延迟确认机制

    原文地址:https://blog.csdn.net/gamekit/article/details/53898802 一.ACK定义 TCP协议中,接收方成功接收到数据后,会回复一个ACK数据包,表 ...

  8. java确认rabbitmq_RabbitMQ 消息确认机制

    生产端 Confirm 消息确认机制 消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产者一个应答.生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker ,这 ...

  9. 最近发现系统rabbitmq丢消息比较严重,于是想了些方案来查找原因,给将消息发送方式添加确认机制。 我们在本地模拟了wms发送打标消息的场景. 1. 有事务 2. 先发点对点队列, 再发订

    最近发现系统rabbitmq丢消息比较严重,于是想了些方案来查找原因,给将消息发送方式添加确认机制. 我们在本地模拟了wms发送打标消息的场景. 1. 有事务 2. 先发点对点队列, 再发订阅队列 3 ...

最新文章

  1. jieba分词工具的使用方法
  2. vb mysql数据导入到mssql,[请教]怎样把*.txt文本的数据导入sql数据库中?
  3. keyshot分辨率多少合适_惠普打印机型号有哪些 惠普打印机多少钱【详解】
  4. 跳出数据计算拯救人工智能之分布式逻辑
  5. TCP/IP 网络数据封包和解包
  6. 动态规划经典算法--最大子段和
  7. linux下怎样看设备的中断号,Linux设备驱动的中断处理
  8. thinkphp5.0自定义验证器
  9. 【Janino】Janino框架初识与使用教程
  10. Practice:Demonstrating the Key TCP/IP Protocols
  11. ValueError: Shapes () and (1, 1) are incompatible
  12. Springboot实现remember-me记住我功能
  13. matlab 图片黑白图片,MATLAB读取黑白图像显示却是黑色,24位深转8位深黑白图像解决方法(示例代码)...
  14. debug基本命令及全称
  15. 1 什么是末端柔顺控制?
  16. 报错:The server time zone value ‘�й���׼ʱ��‘ is unrecognied
  17. linux l7,GitHub - windslinux/l7detect: Network application protocol detection software
  18. HTML CSS实现 轮播图 遮罩层
  19. 短视频源码仿抖音短视频APP源码短视频平台源码短视频源码
  20. 常州一院有全消化道的机器人的_高清裸眼3D,常州一院完成第四代“达芬奇”机器人食管癌根治手术...

热门文章

  1. ROS探索总结(一)(二)(三):ROS总体框架 ROS总体框架 ROS新手教程
  2. 在Github上搭建Jekyll博客和创建主题
  3. 编程之美-最短摘要的生成方法整理
  4. ES-PHP向ES批量添加文档报No alive nodes found in your cluster
  5. 《数据库原理与应用(第3版)》——1.4 数据库系统的组成
  6. 限制mysql服务为本地访问
  7. Struts快速入门
  8. IOS学习之多线程(9)--NSOperation简单介绍
  9. Oracle官方文档网址收录
  10. 使用各种方法加速大型矩阵运算的效率对比