聊聊rocketmq的SequenceProducerImpl
为什么80%的码农都做不了架构师?>>>
序
本文主要研究一下rocketmq的SequenceProducerImpl
SequenceProducerImpl
io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer {private BlockingQueue<Message> msgCacheQueue;public SequenceProducerImpl(final KeyValue properties) {super(properties);this.msgCacheQueue = new LinkedBlockingQueue<>();}@Overridepublic KeyValue properties() {return properties;}@Overridepublic void send(final Message message) {checkMessageType(message);org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message);try {Validators.checkMessage(rmqMessage, this.rocketmqProducer);} catch (MQClientException e) {throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);}msgCacheQueue.add(message);}@Overridepublic void send(final Message message, final KeyValue properties) {send(message);}@Overridepublic synchronized void commit() {List<Message> messages = new ArrayList<>();msgCacheQueue.drainTo(messages);List<org.apache.rocketmq.common.message.Message> rmqMessages = new ArrayList<>();for (Message message : messages) {rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));}if (rmqMessages.size() == 0) {return;}try {SendResult sendResult = this.rocketmqProducer.send(rmqMessages);String[] msgIdArray = sendResult.getMsgId().split(",");for (int i = 0; i < messages.size(); i++) {Message message = messages.get(i);message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);}} catch (Exception e) {throw checkProducerException("", "", e);}}@Overridepublic synchronized void rollback() {msgCacheQueue.clear();}
}
- 采用的是LinkedBlockingQueue,send方法实际调用的是添加到队列
- 另外提供了commit以及rollback方法,都加了synchronized保证对LinkedBlockingQueue操作的线程安全
- commit的时候,将queue的数据drainTo到list,然后批量发送;rollback的时候清空整个LinkedBlockingQueue
小结
rocketmq的SequenceProducerImpl在send方法的时候不是真正方法,而是添加到队列,只有在commit的时候才批量发送,rollback的时候清空队列。这里的send方法语义不是太好,可以改为pending之类的名称。
doc
- SequenceProducerImpl
转载于:https://my.oschina.net/go4it/blog/1919697
聊聊rocketmq的SequenceProducerImpl相关推荐
- 聊聊rocketmq的ProducerImpl
序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java pu ...
- 聊聊rocketmq的RemotingException
序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/Remoti ...
- 聊聊rocketmq的BrokerHousekeepingService
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下rocketmq的BrokerHousekeepingService BrokerHousekeepingServ ...
- 与顶级互联网公司技术大佬面对面聊聊RocketMQ
作为由阿里巴巴捐赠的Apache顶级云原生消息中间件,RocketMQ 立足于在线交易链路,帮助企业实现异步解耦和削峰填谷以及 IoT 边缘数据以及 C 端用户行为数据采集传输和集成等众多功能.我们可 ...
- 聊聊rocketmq的ConsumerManageProcessor
序 本文主要研究一下rocketmq的ConsumerManageProcessor NettyRequestProcessor rocketmq-all-4.6.0-source-release/r ...
- 聊聊rocketmq的ConsumerIdsChangeListener
序 本文主要研究一下rocketmq的ConsumerIdsChangeListener ConsumerGroupEvent rocketmq-all-4.6.0-source-release/br ...
- 聊聊rocketmq的FileAppender
序 本文主要研究一下rocketmq的FileAppender WriterAppender org/apache/rocketmq/logging/inner/LoggingBuilder.java ...
- 真香,聊聊 RocketMQ 5.0 的 POP 消费模式!
大家好,我是君哥. 大家都知道,RocketMQ 消费模式有 PULL 模式和 PUSH 模式,不过本质上都是 PULL 模式,而在实际使用时,一般使用 PUSH 模式. 不过,RocketMQ 的 ...
- 深入源码聊聊RocketMQ刷盘机制
大家好,我是Leo. 今天聊一下RocketMQ的三种刷盘机制. 同步刷盘 异步刷盘(RocketMQ默认) 异步刷盘+缓冲区 出自微信公众号[欢少的成长之路] 本章概括 同步刷盘 整个同步刷盘策略由 ...
- 面试官:哥们,你们的系统架构中为什么要引入消息中间件?
点击上方"蓝字", 右上角选择"设为星标" 周一至五早11点半!精品文章准时送上! 本文来自石杉的架构笔记 这篇文章开始,我们把消息中间件这块高频的面试题给大家 ...
最新文章
- 提高开发效率之VS Code基础配置篇
- 数学建模学习笔记——模糊综合评价模型(评价类,发放问卷一般不用)
- Reflect.ownKeys
- JDK11的新特性:HTTP API和reactive streams
- 洛谷 p1434 滑雪【记忆化搜索】
- LintCode 51: Previous Permutation
- 论文阅读:A machine learning approach to medical image classification:Detecting age-related macular dege
- windows聚焦壁纸不更新_如何解决Win10聚焦锁屏壁纸不自动更新的问题
- html模拟终端,DomTerm:一款为Linux打造的终端模拟器
- 题解 [CQOI2017] 老 C 的方块
- 微商最低成本引流,学会这招日引精准粉1000+
- Windows 10电脑使用VMware虚拟机安装macOS苹果系统[一站式保姆级别教程]
- MATLAB交换图片红绿颜色通道,matlab的颜色映射colormap
- cad渐开线齿轮轮廓绘制_CAD渐开线齿轮怎么画?
- HBuilder 真机调试提示:手机上没有信任本计算机的授权,请在手机上信任该授权
- Jetson 相机编码
- 关于github的高级搜索技巧
- [转]转型后的BlackBerry“恋上”汽车市场,QNX拿什么与免费的安卓/Linux对抗?
- 点餐系统后台服务器部署,Java后台——点餐小程序在服务器上运行点餐系统供别人访问...
- ArcCatalog发布地图服务
热门文章
- 计算机学院篮球赛主题,计算机学院称雄中国科大2011年学生篮球赛
- ai伴侣2.4.7_人工智能:世界各地的活动(7月4日)
- 什么是AsHelper
- php系统变量有哪些,php预定义系统变量
- 感知机算法—推导收敛次数的上界
- 安装pytorch-metric-learning
- linux live usb下载,LinuxLive USB Creator
- c语言指针试题嵌入式,嵌入式面试C语言试题「」(2)
- h5页面自定义字体_H5页面中常见的字体有哪些
- 新版本安装包需求汇总