Storm的acker确认机制
Storm的acker消息确认机制...
ack/fail消息确认机制(确保一个tuple被完全处理)
在spout中发射tuple的时候需要同时发送messageid,这样才相当于开启了消息确认机制
如果你的topology里面的tuple比较多的话, 那么把acker的数量设置多一点,效率会高一点。
通过config.setNumAckers(num)来设置一个topology里面的acker的数量,默认值是1。
注意: acker用了特殊的算法,使得对于追踪每个spout tuple的状态所需要的内存量是恒定的(20 bytes)
注意:如果一个tuple在指定的timeout(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS默认值为30秒)时间内没有被成功处理,那么这个tuple会被认为处理失败了。
下面代码中Bolt的execute中模拟消息的正常和失败.
1 import java.util.Map; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.spout.SpoutOutputCollector; 6 import backtype.storm.task.OutputCollector; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.OutputFieldsDeclarer; 9 import backtype.storm.topology.TopologyBuilder; 10 import backtype.storm.topology.base.BaseRichBolt; 11 import backtype.storm.topology.base.BaseRichSpout; 12 import backtype.storm.tuple.Fields; 13 import backtype.storm.tuple.Tuple; 14 import backtype.storm.tuple.Values; 15 import backtype.storm.utils.Utils; 16 17 /** 18 * 数字累加求和 19 * 先添加storm依赖 20 * 21 * @author Administrator 22 * 23 */ 24 public class LocalTopologySumAcker { 25 26 27 /** 28 * spout需要继承baserichspout,实现未实现的方法 29 * @author Administrator 30 * 31 */ 32 public static class MySpout extends BaseRichSpout{ 33 private Map conf; 34 private TopologyContext context; 35 private SpoutOutputCollector collector; 36 37 /** 38 * 初始化方法,只会执行一次 39 * 在这里面可以写一个初始化的代码 40 * Map conf:其实里面保存的是topology的一些配置信息 41 * TopologyContext context:topology的上下文,类似于servletcontext 42 * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple) 43 */ 44 @Override 45 public void open(Map conf, TopologyContext context, 46 SpoutOutputCollector collector) { 47 this.conf = conf; 48 this.context = context; 49 this.collector = collector; 50 } 51 52 int num = 1; 53 /** 54 * 这个方法是spout中最重要的方法, 55 * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内 56 * 每调用一次,会向外发射一条数据 57 */ 58 @Override 59 public void nextTuple() { 60 System.out.println("spout发射:"+num); 61 //把数据封装到values中,称为一个tuple,发射出去 62 //messageid:和tuple需要是一一对应的,可以把messageid认为是数据的主键id,而tuple中的内容就是这个数据. 63 //messageid和tuple中的消息是一一对应的. 它们之间的关系是需要我们程序员来维护的. 64 //this.collector.emit(new Values(num++)); 65 this.collector.emit(new Values(num++),num-1);//传递messageid(num-1)参数就表示开启了消息确认机制. 66 Utils.sleep(1000); 67 } 68 69 @Override 70 public void ack(Object msgId) { 71 System.out.println("处理成功"); 72 } 73 74 @Override 75 public void fail(Object msgId) { 76 System.out.println("处理失败....."+msgId); 77 //TODO--可以选择把失败的数据重发,或者单独存储后期进行分析 78 //重发的方法...this.collector.emit(tuple);//这个tuple可以根据参数msgId来获得... 79 } 80 81 /** 82 * 声明输出字段 83 */ 84 @Override 85 public void declareOutputFields(OutputFieldsDeclarer declarer) { 86 //给values中的数据起个名字,方便后面的bolt从这个values中取数据 87 //fields中定义的参数和values中传递的数值是一一对应的 88 declarer.declare(new Fields("num")); 89 } 90 91 } 92 93 94 /** 95 * 自定义bolt需要实现baserichbolt 96 * @author Administrator 97 * 98 */ 99 public static class MyBolt extends BaseRichBolt{ 100 private Map stormConf; 101 private TopologyContext context; 102 private OutputCollector collector; 103 104 /** 105 * 和spout中的open方法意义一样 106 */ 107 @Override 108 public void prepare(Map stormConf, TopologyContext context, 109 OutputCollector collector) { 110 this.stormConf = stormConf; 111 this.context = context; 112 this.collector = collector; 113 } 114 115 int sum = 0; 116 /** 117 * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理 118 */ 119 @Override 120 public void execute(Tuple input) { 121 try{ 122 //input.getInteger(0);//也可以根据角标获取tuple中的数据 123 Integer value = input.getIntegerByField("num"); 124 if(value == 3){ 125 throw new Exception("value=3异常....."); 126 } 127 sum+=value; 128 System.out.println("和:"+sum); 129 this.collector.ack(input);//这个表示确认消息处理成功,spout中的ack方法会被调用 130 }catch(Exception e) { 131 this.collector.fail(input);//这个表示确认消息处理失败,spout中的fail方法会被调用 132 e.printStackTrace(); 133 } 134 } 135 136 /** 137 * 声明输出字段 138 */ 139 @Override 140 public void declareOutputFields(OutputFieldsDeclarer declarer) { 141 //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。 142 //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明 143 } 144 145 } 146 /** 147 * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的 148 * @param args 149 */ 150 public static void main(String[] args) { 151 //组装topology 152 TopologyBuilder topologyBuilder = new TopologyBuilder(); 153 topologyBuilder.setSpout("spout1", new MySpout()); 154 //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple 155 topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1"); 156 157 //创建本地storm集群 158 LocalCluster localCluster = new LocalCluster(); 159 Config config = new Config(); 160 localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology()); 161 } 162 163 }
运行结果:
从结果可以看到Bolt1执行execute成功了通过ack 调用Spout1中的ack方法.... 失败了就通过fail 调用Spout1中的fail 方法 来达到对消息处理成功与否的追踪.
//=============================================================================================
上面的例子是一个Spout 和一个Bolt.....如果对应有1个Spout和2个Bolt 会是什么情况.....
改造上面的代码.....
1 /** 2 * 数字累加求和 3 * 先添加storm依赖 4 * 5 * @author Administrator 6 * 7 */ 8 public class LocalTopologySumAcker2 { 9 10 11 /** 12 * spout需要继承baserichspout,实现未实现的方法 13 * @author Administrator 14 * 15 */ 16 public static class MySpout extends BaseRichSpout{ 17 private Map conf; 18 private TopologyContext context; 19 private SpoutOutputCollector collector; 20 21 /** 22 * 初始化方法,只会执行一次 23 * 在这里面可以写一个初始化的代码 24 * Map conf:其实里面保存的是topology的一些配置信息 25 * TopologyContext context:topology的上下文,类似于servletcontext 26 * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple) 27 */ 28 @Override 29 public void open(Map conf, TopologyContext context, 30 SpoutOutputCollector collector) { 31 this.conf = conf; 32 this.context = context; 33 this.collector = collector; 34 } 35 36 int num = 1; 37 /** 38 * 这个方法是spout中最重要的方法, 39 * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内 40 * 每调用一次,会向外发射一条数据 41 */ 42 @Override 43 public void nextTuple() { 44 System.out.println("spout发射:"+num); 45 //把数据封装到values中,称为一个tuple,发射出去 46 //messageid:和tuple需要是一一对应的,可以把messageid认为是数据的主键id,而tuple中的内容就是这个数据 47 //messageid和tuple之间的关系是需要我们程序员维护的 48 this.collector.emit(new Values(num++),num-1);//传递messageid参数就表示开启了消息确认机制 49 Utils.sleep(1000); 50 } 51 52 /** 53 * 声明输出字段 54 */ 55 @Override 56 public void declareOutputFields(OutputFieldsDeclarer declarer) { 57 //给values中的数据起个名字,方便后面的bolt从这个values中取数据 58 //fields中定义的参数和values中传递的数值是一一对应的 59 declarer.declare(new Fields("num")); 60 } 61 62 @Override 63 public void ack(Object msgId) { 64 System.out.println("处理成功"); 65 } 66 67 @Override 68 public void fail(Object msgId) { 69 System.out.println("处理失败。。"+msgId); 70 //TODO--可以选择吧失败的数据重发,或者单独存储后期分析 71 } 72 } 73 74 75 /** 76 * 自定义bolt需要实现baserichbolt 77 * @author Administrator 78 * 79 */ 80 public static class MyBolt1 extends BaseRichBolt{ 81 private Map stormConf; 82 private TopologyContext context; 83 private OutputCollector collector; 84 85 /** 86 * 和spout中的open方法意义一样 87 */ 88 @Override 89 public void prepare(Map stormConf, TopologyContext context, 90 OutputCollector collector) { 91 this.stormConf = stormConf; 92 this.context = context; 93 this.collector = collector; 94 } 95 96 int sum = 0; 97 /** 98 * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理 99 */ 100 @Override 101 public void execute(Tuple input) { 102 try { 103 //input.getInteger(0);//也可以根据角标获取tuple中的数据 104 Integer value = input.getIntegerByField("num"); 105 this.collector.emit(new Values(value+"_1")); 106 //this.collector.emit(input,new Values(value+"_1"));//新的tuple是new Values(value+"_1") 老的tuple是input 107 this.collector.ack(input);//确认数据处理成功,spout中的ack方法会被调用 108 } catch (Exception e) { 109 this.collector.fail(input);//确认数据处理失败,spout中的fail方法会被调用 110 e.printStackTrace(); 111 } 112 } 113 114 /** 115 * 声明输出字段 116 */ 117 @Override 118 public void declareOutputFields(OutputFieldsDeclarer declarer) { 119 //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。 120 //如果nextT|uple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明 121 declarer.declare(new Fields("num_1")); 122 } 123 124 } 125 126 public static class MyBolt2 extends BaseRichBolt{ 127 private Map stormConf; 128 private TopologyContext context; 129 private OutputCollector collector; 130 131 /** 132 * 和spout中的open方法意义一样 133 */ 134 @Override 135 public void prepare(Map stormConf, TopologyContext context, 136 OutputCollector collector) { 137 this.stormConf = stormConf; 138 this.context = context; 139 this.collector = collector; 140 } 141 142 int sum = 0; 143 /** 144 * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理 145 */ 146 @Override 147 public void execute(Tuple input) { 148 try { 149 //input.getInteger(0);//也可以根据角标获取tuple中的数据 150 String value = input.getStringByField("num_1"); 151 System.out.println(value); 152 this.collector.fail(input);//确认数据处理成功,spout中的ack方法会被调用 153 //this.collector.ack(input);//确认数据处理成功,spout中的ack方法会被调用 154 } catch (Exception e) { 155 //this.collector.fail(input);//确认数据处理失败,spout中的fail方法会被调用 156 e.printStackTrace(); 157 } 158 } 159 160 /** 161 * 声明输出字段 162 */ 163 @Override 164 public void declareOutputFields(OutputFieldsDeclarer declarer) { 165 //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。 166 //如果nextT|uple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明 167 declarer.declare(new Fields("num_1")); 168 } 169 170 } 171 172 /** 173 * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的 174 * @param args 175 */ 176 public static void main(String[] args) { 177 //组装topology 178 TopologyBuilder topologyBuilder = new TopologyBuilder(); 179 topologyBuilder.setSpout("spout1", new MySpout()); 180 //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple 181 topologyBuilder.setBolt("bolt1", new MyBolt1()).shuffleGrouping("spout1"); 182 topologyBuilder.setBolt("bolt2", new MyBolt2()).shuffleGrouping("bolt1"); 183 184 //创建本地storm集群 185 LocalCluster localCluster = new LocalCluster(); 186 localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology()); 187 } 188 }
上面的代码的大体意思是 Bolt1接收Spout1的输出,接收之后在数据后面加上"_1",然后发送给Bolt2,Bolt2接收到之后直接打印.
在spout2中的execute()方法不管成功还是失败 都调用 this.collector.fail(input); 方法....也就是Spout1发射的数据在Bolt1中处理都成功了,在Bolt2中的处理都失败了.
看Spout1中的哪个方法会被执行.....也就是Spout2中调用的ack或者是fail对tuple的处理状态结果是否有影响.
运行看结果:
可以看出都是成功的...这就说明tuple的处理状态和Bolt2中ack或者是fail是没有任何的关系的......只要Bolt1中处理tuple成功了,我们就认为是处理成功了...
如果Bolt1处理失败了就认为是处理失败了.. ...现在Bolt1中发射出去的tuple是无法追踪的.....
能不能在Bolt1发射的数据中也加上一个messageid...这个在Bolt中的 this.collector.emit(new Values(value+"_1")); emit方法中是不支持传入一个messageid的.
但是这样有一种场景是有问题的. 单词计数的例子:
这个Spout后面有两个Bolt 一个SplitBolt 一个CountBolt SplitBolt 切割成一个个的单词 然后再CountBolt中进行汇总....
按照上面在SplitBolt中切割成功了,就算处理成功了...但是有可能切割之后 在CountBolt中有一些Bolt没有收到. 这样最后其实是没有成功的...
而且SpiltBolt中处理的tuple和CountBolt中的tuple之间是有关联的. 后者是在前者之上切割出来的小tuple....
我们想达到两个Bolt都处理成功了才认为是处理成功的...如何做?
上面的代码中已经包括......这里再说明一下:
Spout1中 的 this.collector.emit(input,new Values(value+"_1")); ----> this.collector.emit(input,new Values(value+"_1"));//新的tuple是new Values(value+"_1") 老的tuple是input在Spout2中还是不管是否异常都调用.. this.collector.fail(input); 看运行结果:
运行都失败了........
这样就达到了上面的"完全处理"的要求....
完全处理:保证一个tuple以及这个tuple衍生的所有tuple都被成功处理.
在storm里面一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所衍生的所有的tuple都被成功处理。
如果把Bolt2的正常对应改为 this.collector.ack(input); 失败对应 this.collector.fail(input);就回复正常了.....
如果Spout2后面还有Spout3 同样把老的tuple在emit上带上.........
Storm的acker确认机制相关推荐
- Storm编程入门API系列之Storm的可靠性的ACK消息确认机制
概念,见博客 Storm概念学习系列之storm的可靠性 什么业务场景需要storm可靠性的ACK确认机制? 答:想要保住数据不丢,或者保住数据总是被处理.即若没被处理的,得让我们知道. publi ...
- ack是什么,如何使用Ack机制,如何关闭Ack机制,基本实现,STORM的消息容错机制,Ack机制
1.ack是什么 ack 机制是storm整个技术体系中非常闪亮的一个创新点. 通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作.比如在Me ...
- activemq 消息阻塞优化和消息确认机制优化
一.消息阻塞优化 1.activemq消费者在从待消费队列中获取消息是会先进行预读取,默认是1000条(prefetch=1000).这样很容易造成消息积压. 2.可以通过设置prefetch的默认值 ...
- springboot + rabbitmq 用了消息确认机制,感觉掉坑里了
最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人 ...
- RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)
说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...
- RabbitMQ之消息确认机制(事务+Confirm)
概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达brok ...
- TCP的ACK原理和延迟确认机制
原文地址:https://blog.csdn.net/gamekit/article/details/53898802 一.ACK定义 TCP协议中,接收方成功接收到数据后,会回复一个ACK数据包,表 ...
- java确认rabbitmq_RabbitMQ 消息确认机制
生产端 Confirm 消息确认机制 消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产者一个应答.生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker ,这 ...
- 最近发现系统rabbitmq丢消息比较严重,于是想了些方案来查找原因,给将消息发送方式添加确认机制。 我们在本地模拟了wms发送打标消息的场景. 1. 有事务 2. 先发点对点队列, 再发订
最近发现系统rabbitmq丢消息比较严重,于是想了些方案来查找原因,给将消息发送方式添加确认机制. 我们在本地模拟了wms发送打标消息的场景. 1. 有事务 2. 先发点对点队列, 再发订阅队列 3 ...
最新文章
- jieba分词工具的使用方法
- vb mysql数据导入到mssql,[请教]怎样把*.txt文本的数据导入sql数据库中?
- keyshot分辨率多少合适_惠普打印机型号有哪些 惠普打印机多少钱【详解】
- 跳出数据计算拯救人工智能之分布式逻辑
- TCP/IP 网络数据封包和解包
- 动态规划经典算法--最大子段和
- linux下怎样看设备的中断号,Linux设备驱动的中断处理
- thinkphp5.0自定义验证器
- 【Janino】Janino框架初识与使用教程
- Practice:Demonstrating the Key TCP/IP Protocols
- ValueError: Shapes () and (1, 1) are incompatible
- Springboot实现remember-me记住我功能
- matlab 图片黑白图片,MATLAB读取黑白图像显示却是黑色,24位深转8位深黑白图像解决方法(示例代码)...
- debug基本命令及全称
- 1 什么是末端柔顺控制?
- 报错:The server time zone value ‘�й���ʱ��‘ is unrecognied
- linux l7,GitHub - windslinux/l7detect: Network application protocol detection software
- HTML CSS实现 轮播图 遮罩层
- 短视频源码仿抖音短视频APP源码短视频平台源码短视频源码
- 常州一院有全消化道的机器人的_高清裸眼3D,常州一院完成第四代“达芬奇”机器人食管癌根治手术...