kafkaspot在ack机制下如何保证内存不溢
storm框架中的kafkaspout类实现的是BaseRichSpout,它里面已经重写了fail和ack方法,所以我们的bolt必须实现ack机制,就可以保证消息的重新发送;如果不实现ack机制,那么kafkaspout就无法得到消息的处理响应,就会在超时以后再次发送消息,导致消息的重复发送。
但是回想一下我们自己写一个spout类实现BaseRichSpout并让他具备消息重发,那么我们是会在我们的spout类里面定义一个map集合,并以msgId作为key。
public class MySpout extends BaseRichSpout {private static final long serialVersionUID = 5028304756439810609L;// key:messageId,Dataprivate HashMap<String, String> waitAck = new HashMap<String, String>();private SpoutOutputCollector collector;public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("sentence"));}public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}public void nextTuple() {String sentence = "the cow jumped over the moon";String messageId = UUID.randomUUID().toString().replaceAll("-", "");waitAck.put(messageId, sentence);//指定messageId,开启ackfail机制collector.emit(new Values(sentence), messageId);}@Overridepublic void ack(Object msgId) {System.out.println("消息处理成功:" + msgId);System.out.println("删除缓存中的数据...");waitAck.remove(msgId);}@Overridepublic void fail(Object msgId) {System.out.println("消息处理失败:" + msgId);System.out.println("重新发送失败的信息...");//重发如果不开启ackfail机制,那么spout的map对象中的该数据不会被删除的,而且下游collector.emit(new Values(waitAck.get(msgId)),msgId);}
}
那么kafkaspout会不会也是这样还保存这已发送未收到bolt响应的消息呢?如果这样,如果消息处理不断失败,不断重发,消息不断积累在kafkaspout节点上,kafkaspout端会不就会出现内存溢出?
其实并没有,回想kafka的原理,Kafka会为每一个consumergroup保留一些metadata信息–当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。也就是说,kafkaspot在消费kafka的数据是,通过offset读取到消息并发送给bolt后,kafkaspot只是保存者当前的offset值。
当失败或成功根据msgId查询offset值,然后再去kafka消费该数据来确保消息的重新发送。
那么虽然offset数据小,但是当offset的数据量上去了还是会内存溢出的?
其实并没有,kafkaspout发现缓存的数据超过限制了,会把某端的数据清理掉的。
kafkaspot中发送数据的代码
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
可以看到msgID里面包装了offset参数。
它不缓存已经发送出去的数据信息。
当他接收到来至bolt的响应后,会从接收到的msgId中得到offset。以下是从源码中折取的关键代码:
public void ack(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;PartitionManager m = _coordinator.getManager(id.partition);if (m != null) {m.ack(id.offset);}}m.ack(id.offset);public void ack(Long offset) {_pending.remove(offset);//处理成功移除offsetnumberAcked++;}public void fail(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;PartitionManager m = _coordinator.getManager(id.partition);if (m != null) {m.fail(id.offset);}}m.fail(id.offset);public void fail(Long offset) {failed.add(offset);//处理失败添加offsetnumberFailed++;}SortedSet<Long> _pending = new TreeSet<Long>();SortedSet<Long> failed = new TreeSet<Long>();
关于kafkaspot的源码解析大家可以看这边博客:http://www.cnblogs.com/cruze/p/4241181.html
源码解析中涉及了很多kafka的概念,所以仅仅理解kafka的概念想完全理解kafkaspot源码是很难的,如果不理解kafka概念,那么就只需要在理解storm的ack机制上明白kafkaspot做了上面的两件事就可以了。
转发:http://www.cnblogs.com/intsmaze/p/5947078.html
kafkaspot在ack机制下如何保证内存不溢相关推荐
- RocketMQ——ack机制保证消费成功
ACK简介 在实际使用RocketMQ的时候我们并不能保证每次发送的消息都刚好能被消费者一次性正常消费成功,可能会存在需要多次消费才能成功或者一直消费失败的情况,那作为发送者该做如何处理呢? Rock ...
- ack strom 保证只有一次_Storm容错机制(一):ACK机制
前言 好久没有写文章了,然后一连就写了三篇, 前两篇文章 Storm入门(一):编程模型 Storm入门(二):架构模型和集群部署 都是一些比较简单的入门教程,这一篇我们来聊一聊稍微高级点的话题, 关 ...
- ios keychain 不被清理_手机资讯:iPhone 如何在小内存下依然保证流畅的速度为什么不增加内存...
如今使用IT数码设备的小伙伴们是越来越多了,那么IT数码设备当中是有很多知识的,这些知识很多小伙伴一般都是不知道的,就好比最近就有很多小伙伴们想要知道iPhone 如何在小内存下依然保证流畅的速度为什 ...
- Apache Storm 实时流处理系统ACK机制以及源码分析
1.ACK机制简介 Storm的可靠性是指Storm会告知用户每一个消息单元是否在一个指定的时间(timeout)内被完全处理.完全处理的意思是该MessageId绑定的源Tuple以及由该源Tupl ...
- RabbitMQ的消息确认ACK机制
1.什么是消息确认ACK. 答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失.为了确保数据不会丢失,RabbitMQ支持消 ...
- Storm的ack机制在项目应用中的坑
正在学习storm的大兄弟们,我又来传道授业解惑了,是不是觉得自己会用ack了.好吧,那就让我开始啪啪打你们脸吧. 先说一下ACK机制: 为了保证数据能正确的被处理, 对于spout产生的每一个tup ...
- Rabbitmq- 消费者ack机制与发布者消息确认
Rabbitmq消息确认机制 https://blog.csdn.net/yorsola/article/details/108436276 官网:https://www.rabbitmq.com/c ...
- Redis 九种数据结构及其底层实现 持久化 缓存机制 过期键与内存淘汰 集群等相关知识
参考内容: B站尚硅谷Redis视频教程 <Redis 6 入门到精通 超详细 教程> B张黑马程序员Redis视频教程 <黑马程序员Redis入门到实战教程,全面透析redis底层 ...
- mysql jstorm_jstorm进阶-ack机制及KafkaSpout
安装部署使用 ack机制 ack机制原理 这里不讲什么是ack机制,可以参考官网的文档Ack 机制 我们只要知道它是使用异或xor的原理即可: A xor A = 0 A xor B xor B xo ...
最新文章
- 分享下自己写的一个微信小程序请求远程数据加载到页面的代码
- java 字符串大小比较
- excel函数FREQUENCY、COUNTIFS、COUNTIF
- oracle java连接配置
- golang install/build 生成的文件命名和路径
- hdu2222(看一些单词哪些在模式串中出现过)
- spark-jar冲突解决方案
- python自动答题免费_直播答题?Python助你自动搜题之新手篇!
- python哪个专业开这个课程-深圳python课程
- 总算知道怎样从ImageMagick生成的数据转换成HICON: MagickGetImageBlob LookupIconIdFromDirectoryEx...
- c语言中用指针倒序输出,菜鸟求助-如何用指针法将一串字符按单词的倒序输出?如:i love yo...
- iRecognizer号码扫描开发实录
- zktime 协议_中控考勤机对接信呼帮助(中控ZKTime5.0系统软件版)
- Three.js加载OBJ模型或FBX模型
- python输入姓名输出欢迎你某某同学_2018-03-24 python 练习
- 【springboot】SMS短信通实现手机验证码
- 密西西比河谷州立大学:Android应用程序开发(二)
- 《慢慢来,一切都来得及》语录
- logit回归怎么看显著性_请教用SPSS做两分类逻辑回归时自变量的显著性问题
- 四级英语口语模拟测试软件,2018年大学英语四级口语考试模拟
热门文章
- 课外阅读(XHTML和XML简介)
- Python time asctime()方法
- IEEE1459功率理论计算方法
- CSS之Multi-columns的跨列
- 8s nfs 挂载文件_把你的树莓派家庭实验室变成一个网络文件系统 | Linux 中国
- linux可以不用grub吗,既然不用Win了,那么GrubDOS也不用了。linux grub求指导
- php 5.4 aws,使用 Amazon EC2 管理 AWS SDK for PHP 实例 - 适用于 PHP 的 AWS 开发工具包
- First Steps with TensorFlow代码解析
- angular安装记录
- 【UVA 437】The Tower of Babylon(拓扑排序+DP,做法)