有序消息

消息有序指的是可以按照消息的发送顺序来消费。
RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。

顺序消息生产者

public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("localhost:9876");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTest2", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息时,需要实现MessageQueueSelector , 用来选择合适的queueSendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;// int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}

上面实现的顺序消息时,通过orderId来进行顺序消息,同一个订单ID的消息,发送到同一个Queue里面

顺序消息消费者

public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");// 设置NameServer地址consumer.setNamesrvAddr("localhost:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest1", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}

需要注意,registerMessageListener 注册的消息监听器 , 需要使用MessageListenerOrderly , ConsumeOrderlyContext , 不可以使用

MessageListenerConcurrently , ConsumeConcurrentlyContext , 否则消费的顺序无法保证。

源码分析

/*** @param msg 消息* @param selector 消息队列选择器* @param arg 分片值 (类似分库分表里面的分片键)*/
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg, selector, arg);
}

实际发送

public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);}private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);// 1. 获取topic信息,TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {// 2. 获取当前topic的内部队列信息List<MessageQueue> messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());// 复制一个消息Message userMessage = MessageAccessor.cloneMessage(msg);// topic信息String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);//3. 获取消息队列mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue throwed exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {// 获取到队列了,执行发送消息, 跟普通消息的发送一样的return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);}

步骤说明:

  1. 获取当前topic的信息,内部包含消息队列
  2. 获取topic内部的队列信息
  3. 获取消息队列,这个其实就是顺序消息实现的核心点selector.select(messageQueueList, userMessage, arg) ,通过自定义的消息队列选择器,返回相应的队列。 内部完全自定义
  4. 获取到了消息队列之后,执行发送消息,跟普通消息一样,这里就没有什么重试的说法了。

总结: 顺序消息的核心就是将你希望按照顺序的消息,通过某种特定的条件,计算发送到对应的队列里面去。

顺序消息的缺点:

  1. 送顺序消息无法利用集群的Failover特性,因为不能更换broker,MessageQueue进行重试
  2. 存在队列热点问题,当一个场景下消息非常多的情况,会导致个别队列非常繁忙
  3. 消费失败时无法跳过, 会导致消费停止
  4. 消息的并行度依赖于对列数量,不过可以增加队列数量,动态调整

思考: 通过上面那种顺序消息的模式,在broker发生宕机 , 队列数量发生变化时,会造成消费乱序

比如在多master集群的情况下 ,

topic: TP_TEST  总共8个队列MASTER-1 : 1,2,3,4
MASTER-2 : 5,6,7,8

一个topic分别在多个master上面有队列, 如果其中一个master宕机了,那么队列数会变成4个,那么顺序消息通过 orderId % queueSize 的这种方式,会造成原来往一个队列里面发送的,会发送到另外一个队列里面去,造成消费乱序。

所以如果是要严格的顺序消息,则不要使用rocketMq, 在极端情况下会造成消费乱序。

http://weixin.qq.com/r/eC-YwJDE7s2RrdSj93pq (二维码自动识别)

producer send源码_RocketMq系列之Producer顺序消息发送源码分析(四)相关推荐

  1. RocketMQ:Producer启动流程与消息发送源码分析

    文章目录 Producer 1.方法和属性 2.启动流程 3.消息发送 3.1验证消息 3.2查找路由 3.3选择队列 3.4发送消息 3.5发送批量消息 Producer 在RocketMQ中,消息 ...

  2. 跟我学RocketMQ之批量消息发送源码解析

    上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送.本文中,我们就一起来集中分析一下批量消息的发送是怎样的 ...

  3. 《SpringBoot2.0 实战》系列-整合kafka实现消息发送、消费

    之前写过一篇关于ActiveMq的博文,有兴趣的小伙伴可以点击查看.但是ActiveMq总体性能表现一般,如果对消息队列性能要求较高,业务量较大,那我们需要重新考量.本文就介绍一款性能高的消息队列- ...

  4. [Zookeeper-3.6.2源码解析系列]-14-Zookeeper使用到的Reactor网络模型原理分析

    目录 14-启服务端网络监听连接NIOServerCnxnFactory 14.1 简介 14.2 主从Reactor网络IO模型main-sub reactor 14.3 NIOServerCnxn ...

  5. 【SpringBoot实战系列】RabbitMQ实现消息发送并实现邮箱发送异常监控报警实战

    大家好,我是工藤学编程

  6. TiKV 源码解析系列 - Raft 的优化

    这篇文章转载TiDB大牛 唐刘 的博客:https://mp.weixin.qq.com/s?__biz=MzI3NDIxNTQyOQ==&mid=2247484544&idx=1&a ...

  7. 安卓系统源码编译系列(一)——下载安卓系统源码教程

    最近需要编译安卓系统,咨询了一个编译过安卓系统的朋友,说是下载源码就得下载两天,于是做好了长期抗战的准备,开始了下载安卓源码的旅程.在刚开始下载时,可以参照的内容只有官方教程,于是跟着官方教程一步一步 ...

  8. Java并发包源码学习系列:同步组件CountDownLatch源码解析

    文章目录 CountDownLatch概述 使用案例与基本思路 类图与基本结构 void await() boolean await(long timeout, TimeUnit unit) void ...

  9. nonebot发送CQ码

    nonebot2发送CQ码 使用MessageSegment发送CQ码 更一般的方法 使用MessageSegment发送CQ码 经常在群里看到有人问nonebot怎么发图片,发语音等等,其实都可以通 ...

最新文章

  1. JQuery设置checkbox的值,取checkbox的值,设置radio的值,取radio的值,设置下拉选select的值,取select的值...
  2. 一行代码快速搞定Flowable断点下载(中)
  3. 实例教程:1小时学会Python
  4. 提问:访问服务器时提示system.componentmodel.win32exception: 拒绝访问
  5. Java特性-动态代理
  6. UVA-572-搜索基础题
  7. MATLAB中好用的快捷键
  8. stringWithUTF8String return null (返回null)的解决办法
  9. hibernate延迟加载,LazyInitializationException session失效问题。多数据源配置
  10. heartbeat如何监控程序_一文看懂MyCAT 命令行监控命令,监控调优必备
  11. nginx根据域名做http,https分发
  12. lgg8各个版本_LG正式推出G8SThinQ 搭载骁龙855
  13. php短信报警直到响应,Cacti实现短信报警
  14. 清华校友、香港科技大学准博士ICCV顶会论文被爆公然抄袭!去年CVPR也是抄的...
  15. 殷书数据结构5.8——堆
  16. 微信小游戏 - Canvas/WebGL Demo 移植
  17. Ural 2037. Richness of binary words (打表+构造)
  18. pythondocker——外部无法访问,报错:该网页无法正常运作
  19. 【等保小知识】安全等保是什么意思?是ccrc吗?
  20. Unity开发基础——使用字符串学习笔记

热门文章

  1. 罗马数字转阿拉伯数字
  2. 操作系统——CPU、计算机的构成
  3. postsharp初体验
  4. 排错之网络映射缓存凭证记录导致备份计划任务失败
  5. 使用p3p跨域设置Cookie
  6. Web API应用架构设计分析(2)
  7. WebService事务处理
  8. android 文字路径,Android自定义控件:路径及文字
  9. android alertdialog 背景透明,Android Alertdialog弹出框设置半透明背景
  10. java+jtextfield+取值_[求助]JTextfield 取值问题!