场景举例

用户下单,付款,发货,完成,每一个步骤发送完成,发送mq消息到下游系统处理(消费客户端),同一个订单要保证顺序消费。

mq实现思路

选择mq顺序消息,实现思路,

生产者:发送消息的时候保证同一个订单的业务消息,发送到同一个队列;订单号对队列总和取模,找到指定队列

消费者:每一个消费线程可以保证只消费一个队列的消息,怎么保证的...可进一步探究

代码

生产者

/*** 顺序消息生产者*/
@Slf4j
public class OrderedProducer {private static List<Order> buildMsg() {List<Order> orders = new ArrayList<>();Order order1 = new Order();order1.setOrderNo("1");order1.setNote("下单");order1.setCustomer("麦迪");Order order7 = new Order();order7.setOrderNo("3");order7.setNote("下单");order7.setCustomer("库里");Order order2 = new Order();order2.setOrderNo("1");order2.setNote("付款");order2.setCustomer("麦迪");Order order3 = new Order();order3.setOrderNo("1");order3.setNote("通知");order3.setCustomer("麦迪");Order order4 = new Order();order4.setOrderNo("2");order4.setNote("下单");order4.setCustomer("科比");Order order5 = new Order();order5.setOrderNo("2");order5.setNote("付款");order5.setCustomer("科比");Order order8 = new Order();order8.setOrderNo("3");order8.setNote("付款");order8.setCustomer("库里");Order order6 = new Order();order6.setOrderNo("2");order6.setNote("通知");order6.setCustomer("科比");Order order9 = new Order();order9.setOrderNo("3");order9.setNote("通知");order9.setCustomer("库里");orders.add(order1);orders.add(order7);orders.add(order2);orders.add(order3);orders.add(order4);orders.add(order5);orders.add(order8);orders.add(order6);orders.add(order9);return orders;}public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("example_group_name");producer.setNamesrvAddr("47.103.143.191:8201;47.103.143.191:8202");producer.start();List<Order> orders = buildMsg();orders.forEach(x -> {Message msg = new Message("TopicOrder", "TagOrder4", x.getOrderNo(), JSON.toJSONBytes(x));try {SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> queues, Message message, Object arg) {Integer orderNo = Integer.valueOf((String) arg);int index = orderNo % queues.size();//          log.info("order: {}, queueTotal: {}, index: {}, queue: {}", JSON.toJSONString(x), queues.size(),//                    index, queues.get(index));log.info("客户: {}, 发送到的队列: {}", x.getCustomer(), index);return queues.get(index);}}, x.getOrderNo());log.info("sendResult: {}", JSON.toJSONString(sendResult));} catch (MQClientException e) {e.printStackTrace();} catch (RemotingException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}});producer.shutdown();}}

消费者

@Slf4j
public class OrderedConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setNamesrvAddr("47.103.143.191:8201;47.103.143.191:8202");consumer.subscribe("TopicOrder", "*");consumer.registerMessageListener(new MessageListenerOrderly() {@SneakyThrows@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(false);log.info("线程:{}, 队列:{}, 消费消息:{}", Thread.currentThread().getName(), msgs.get(0).getQueueId(),new String(msgs.get(0).getBody(), "UTF-8"));return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

结果验证

生产者发送日志,可以看出,同一个人的业务发送到了同一个队列,总共9条消息

消费者

当生产者第一次发送9条消息时,看下图红字框中,没问题,3个线程,分别消费3个队列中的消息,但是当我再次发送消息时,消费者消费就乱了,每一个消息都由一个线程消费,不断的新建线程,4-13...再发送消息,继续创建新的线程,直到总量20,再从线程1开始消费

如果从新启动消费者,就是正常的3个线程123消费3个队列,如果不从新启动消费者,生产者不断发送,消费端就无法按顺序正常消费,而是不断创建新线程消费

疑问

1.消费者线程为什么是20个

2.消费者为什么创建新线程只消费一个,不是一个线程,消费一个队列

3.消费者多线程在哪里实现,真实的消费端多线程消费场景就是这样吗,不用管,自动会有多个线程消费,还是怎么实现多个消费线程消费

有知晓的同志请解答,谢谢

思考

队列与broker

每个broker默认有4个队列,双主双从,就是4*4 16个队列,所以稍微需要考虑一个消息均衡的发送到指定队列

rocketmq顺序消费问题相关推荐

  1. RocketMQ——顺序消费(代码)

    关于rocketmq顺序消费的理解和图示可以查看该博文:RocketMQ--顺序消费和重复消费 本博客主要是以代码示例来了解顺序消费的相关内容,建议在此之前先了解下顺序消费的原理. 注:RocketM ...

  2. 一次 RocketMQ 顺序消费延迟的问题定位

    一次 RocketMQ 顺序消费延迟的问题定位 问题背景与现象 昨晚收到了应用报警,发现线上某个业务消费消息延迟了 54s 多(从消息发送到MQ 到被消费的间隔): 2021-06-30T23:12: ...

  3. rocketmq 顺序消费_10 分钟看懂消息队列 RocketMQ

    RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目.作为经历过多次阿里巴巴双十一这种&qu ...

  4. 一文理清RocketMQ顺序消费、重复消费、消息丢失问题

    前言 在使用消息队列时不可避免的会遇到顺序消费.重复消费.消息丢失三个问题.在一次面试字节的时候,面试官问到如何保证顺序消费,当时回答不太准确,特意此文回顾如何解决顺序消费.重复消费.消息丢失三个问题 ...

  5. RocketMQ 顺序消费只消费一次 坑

    rocketMq实现顺序消费的原理 produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一 ...

  6. rocketMq 顺序消费

    什么是顺序消费? 消息有序指的是一类消息消费时,能按照发送的顺序来消费.例如:一个订单产生了 3 条消息,分别是订单创建.订单付款.订单完成.消费时,要按照这个顺序消费才有意义.与此同时多笔订单之间又 ...

  7. rocketmq 顺序消费_RocketMQ核心概念扫盲

    在正式进入RocketMQ的学习之前,我觉得有必要梳理一下RocketMQ核心概念,为大家学习RocketMQ打下牢固的基础. 1.RocketMQ部署架构 在RocketMQ主要的组件如下: Nam ...

  8. rocketmq 顺序消费_必须先理解的RocketMQ入门手册,才能再次深入解读

    推荐阅读一下下 2020年后想跳槽?MQ.ZK.Nginx.Kafk等分布式技术你都掌握了? 阿里架构师推荐学习的<RabbitMQ实战指南>,渣渣的你都看过吗? RocketMQ入门手册 ...

  9. 顺序消费可没你想的这么简单,队列数量的变更往往无法保证同一个账号的消息发送到同一个分区,怎么解决?

    掌握一到两门java主流中间件,是敲开BAT等大厂必备的技能,送给大家一个Java中间件学习路线,助力大家实现职场的蜕变. Java进阶之梯,成长路线与学习资料,助力突破中间件领域 在金融行业中,如果 ...

  10. rocketmq 消费方式_RocketMQ事务消费和顺序消费详解

    一.RocketMq有3中消息类型 1.普通消费 2. 顺序消费 3.事务消费 顺序消费场景 在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一.创建订单 ,第二:订单付款,第三:订单完成. ...

最新文章

  1. SQL中获取刚插入记录时对应的自增列的值
  2. ipvsadm+keepalived安装错误解决
  3. 计算机科学与技术 单片机,单片机-兰州交通大学计算机科学与技术实验教学中心...
  4. 升级Hbase,解决bug问题
  5. 2021已去,2022未来
  6. druid不能close mysql连接_druid长时间无操作无法保持连接!!
  7. 软件生存周期文档系列 之 6.用户操作手册
  8. 用这开源小书学 Docker,香!
  9. 【实习笔试面试题】2013网易互联网实习笔试算法题-找出最大连续自然数个数
  10. 为WebBrowser的WEB页的Document注册事件的问题
  11. 胡明浩 160809313 (我就会三个)
  12. 自考计算机软件基础真题,2019年4月自考计算机软件基础考试真题试卷
  13. 应用添加分享至微信、QQ和微博
  14. 如何绘制四线3格拼音
  15. Magic Data上榜互联网周刊2022数字经济100强
  16. golang连接FTP服务器并下载
  17. MiCT: Mixed 3D/2D Convolutional Tube for Human Action Recognition论文笔记
  18. 使用.NET Reflector
  19. java virt res_top命令里内存参数 VIRT, RES 和 SHR 分别是什么意思
  20. 如何合理安排测试团队人员分工的问题?

热门文章

  1. btrfs文件系统学习总结
  2. IO OutputStreamWriter和InputStreamReader
  3. 元素命名空间中的“MvcBuildViews”无效
  4. STM32笔记记录2
  5. linux内核双向链表学习
  6. linux日志级别的正确使用(printk)
  7. 线程的创建以及线程的本质
  8. Linux音频驱动-声音采集过程
  9. Camera ISP流程概述
  10. Android下基于UVC的UsbCam的开发