为什么80%的码农都做不了架构师?>>>   

本文主要研究一下rocketmq的SequenceProducerImpl

SequenceProducerImpl

io/openmessaging/rocketmq/producer/SequenceProducerImpl.java

public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer {private BlockingQueue<Message> msgCacheQueue;public SequenceProducerImpl(final KeyValue properties) {super(properties);this.msgCacheQueue = new LinkedBlockingQueue<>();}@Overridepublic KeyValue properties() {return properties;}@Overridepublic void send(final Message message) {checkMessageType(message);org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message);try {Validators.checkMessage(rmqMessage, this.rocketmqProducer);} catch (MQClientException e) {throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);}msgCacheQueue.add(message);}@Overridepublic void send(final Message message, final KeyValue properties) {send(message);}@Overridepublic synchronized void commit() {List<Message> messages = new ArrayList<>();msgCacheQueue.drainTo(messages);List<org.apache.rocketmq.common.message.Message> rmqMessages = new ArrayList<>();for (Message message : messages) {rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));}if (rmqMessages.size() == 0) {return;}try {SendResult sendResult = this.rocketmqProducer.send(rmqMessages);String[] msgIdArray = sendResult.getMsgId().split(",");for (int i = 0; i < messages.size(); i++) {Message message = messages.get(i);message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);}} catch (Exception e) {throw checkProducerException("", "", e);}}@Overridepublic synchronized void rollback() {msgCacheQueue.clear();}
}
  • 采用的是LinkedBlockingQueue,send方法实际调用的是添加到队列
  • 另外提供了commit以及rollback方法,都加了synchronized保证对LinkedBlockingQueue操作的线程安全
  • commit的时候,将queue的数据drainTo到list,然后批量发送;rollback的时候清空整个LinkedBlockingQueue

小结

rocketmq的SequenceProducerImpl在send方法的时候不是真正方法,而是添加到队列,只有在commit的时候才批量发送,rollback的时候清空队列。这里的send方法语义不是太好,可以改为pending之类的名称。

doc

  • SequenceProducerImpl

转载于:https://my.oschina.net/go4it/blog/1919697

聊聊rocketmq的SequenceProducerImpl相关推荐

  1. 聊聊rocketmq的ProducerImpl

    序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java pu ...

  2. 聊聊rocketmq的RemotingException

    序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/Remoti ...

  3. 聊聊rocketmq的BrokerHousekeepingService

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下rocketmq的BrokerHousekeepingService BrokerHousekeepingServ ...

  4. 与顶级互联网公司技术大佬面对面聊聊RocketMQ

    作为由阿里巴巴捐赠的Apache顶级云原生消息中间件,RocketMQ 立足于在线交易链路,帮助企业实现异步解耦和削峰填谷以及 IoT 边缘数据以及 C 端用户行为数据采集传输和集成等众多功能.我们可 ...

  5. 聊聊rocketmq的ConsumerManageProcessor

    序 本文主要研究一下rocketmq的ConsumerManageProcessor NettyRequestProcessor rocketmq-all-4.6.0-source-release/r ...

  6. 聊聊rocketmq的ConsumerIdsChangeListener

    序 本文主要研究一下rocketmq的ConsumerIdsChangeListener ConsumerGroupEvent rocketmq-all-4.6.0-source-release/br ...

  7. 聊聊rocketmq的FileAppender

    序 本文主要研究一下rocketmq的FileAppender WriterAppender org/apache/rocketmq/logging/inner/LoggingBuilder.java ...

  8. 真香,聊聊 RocketMQ 5.0 的 POP 消费模式!

    大家好,我是君哥. 大家都知道,RocketMQ 消费模式有 PULL 模式和 PUSH 模式,不过本质上都是 PULL 模式,而在实际使用时,一般使用 PUSH 模式. 不过,RocketMQ 的 ...

  9. 深入源码聊聊RocketMQ刷盘机制

    大家好,我是Leo. 今天聊一下RocketMQ的三种刷盘机制. 同步刷盘 异步刷盘(RocketMQ默认) 异步刷盘+缓冲区 出自微信公众号[欢少的成长之路] 本章概括 同步刷盘 整个同步刷盘策略由 ...

  10. 面试官:哥们,你们的系统架构中为什么要引入消息中间件?

    点击上方"蓝字", 右上角选择"设为星标" 周一至五早11点半!精品文章准时送上! 本文来自石杉的架构笔记 这篇文章开始,我们把消息中间件这块高频的面试题给大家 ...

最新文章

  1. 提高开发效率之VS Code基础配置篇
  2. 数学建模学习笔记——模糊综合评价模型(评价类,发放问卷一般不用)
  3. Reflect.ownKeys
  4. JDK11的新特性:HTTP API和reactive streams
  5. 洛谷 p1434 滑雪【记忆化搜索】
  6. LintCode 51: Previous Permutation
  7. 论文阅读:A machine learning approach to medical image classification:Detecting age-related macular dege
  8. windows聚焦壁纸不更新_如何解决Win10聚焦锁屏壁纸不自动更新的问题
  9. html模拟终端,DomTerm:一款为Linux打造的终端模拟器
  10. 题解 [CQOI2017] 老 C 的方块
  11. 微商最低成本引流,学会这招日引精准粉1000+
  12. Windows 10电脑使用VMware虚拟机安装macOS苹果系统[一站式保姆级别教程]
  13. MATLAB交换图片红绿颜色通道,matlab的颜色映射colormap
  14. cad渐开线齿轮轮廓绘制_CAD渐开线齿轮怎么画?
  15. HBuilder 真机调试提示:手机上没有信任本计算机的授权,请在手机上信任该授权
  16. Jetson 相机编码
  17. 关于github的高级搜索技巧
  18. [转]转型后的BlackBerry“恋上”汽车市场,QNX拿什么与免费的安卓/Linux对抗?
  19. 点餐系统后台服务器部署,Java后台——点餐小程序在服务器上运行点餐系统供别人访问...
  20. ArcCatalog发布地图服务

热门文章

  1. 计算机学院篮球赛主题,计算机学院称雄中国科大2011年学生篮球赛
  2. ai伴侣2.4.7_人工智能:世界各地的活动(7月4日)
  3. 什么是AsHelper
  4. php系统变量有哪些,php预定义系统变量
  5. 感知机算法—推导收敛次数的上界
  6. 安装pytorch-metric-learning
  7. linux live usb下载,LinuxLive USB Creator
  8. c语言指针试题嵌入式,嵌入式面试C语言试题「」(2)
  9. h5页面自定义字体_H5页面中常见的字体有哪些
  10. 新版本安装包需求汇总