rocketMq实现顺序消费的原理

produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息

注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue

单个节点(Producer端1个、Consumer端1个)

1、Producer.java 

package order;  import java.util.List;  import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;  /** * Producer,发送顺序消息 */
public class Producer {  public static void main(String[] args) {  try {  DefaultMQProducer producer = new DefaultMQProducer("order_Producer");  producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  producer.start();  // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",  // "TagE" };  for (int i = 1; i <= 5; i++) {  Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());  SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  Integer id = (Integer) arg;  int index = id % mqs.size();  return mqs.get(index);  }  }, 0);  System.out.println(sendResult);  }  producer.shutdown();  } catch (MQClientException e) {  e.printStackTrace();  } catch (RemotingException e) {  e.printStackTrace();  } catch (MQBrokerException e) {  e.printStackTrace();  } catch (InterruptedException e) {  e.printStackTrace();  }  }
}

2、Consumer.java   (有问题)

package order;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;  /** * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交) */
public class Consumer1 {  public static void main(String[] args) throws MQClientException {  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  consumer.subscribe("TopicOrderTest", "*");          consumer.registerMessageListener(new MessageListenerOrderly() {  AtomicLong consumeTimes = new AtomicLong(0);  public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  // 设置自动提交  context.setAutoCommit(true);  for (MessageExt msg : msgs) {  System.out.println(msg + ",内容:" + new String(msg.getBody()));  }  try {  TimeUnit.SECONDS.sleep(5L);  } catch (InterruptedException e) {  e.printStackTrace();  }  ;  return ConsumeOrderlyStatus.SUCCESS;  }  });  consumer.start();  System.out.println("Consumer1 Started.");  }  }

这个地方有一个大坑,注册监听类的时候,不能使用匿名内部类。不然的话,只会消费一次,然后消费者就 挂了……挂了……挂了…… 

监听类要单独写!!!

正确消费者写法:

自定义监听类:

MyMessageListener

public class MyMessageListener implements  MessageListenerOrderly {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {// 设置自动提交context.setAutoCommit(true);for (MessageExt msg : msgs) {System.out.println(msg + ",内容:" + new String(msg.getBody()));}try {TimeUnit.SECONDS.sleep(5L);} catch (InterruptedException e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}
}

Consumer.java

public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");consumer.setNamesrvAddr("101.200.33.225:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicOrderTest", "*");        MyMessageListener myMessageListener = new MyMessageListener();consumer.registerMessageListener(myMessageListener);consumer.start();System.out.println("Consumer1 Started.");}
}

参考:https://www.cnblogs.com/antain/p/rocketmq.html

http://www.cnblogs.com/520playboy/p/6750023.html

http://dbaplus.cn/news-21-1123-1.html

转载于:https://www.cnblogs.com/Jtianlin/p/8436024.html

RocketMQ 顺序消费只消费一次 坑相关推荐

  1. rocketmq 顺序消费_10 分钟看懂消息队列 RocketMQ

    RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目.作为经历过多次阿里巴巴双十一这种&qu ...

  2. rocketmq 顺序消费_RocketMQ核心概念扫盲

    在正式进入RocketMQ的学习之前,我觉得有必要梳理一下RocketMQ核心概念,为大家学习RocketMQ打下牢固的基础. 1.RocketMQ部署架构 在RocketMQ主要的组件如下: Nam ...

  3. 源码分析RocketMQ顺序消息消费实现原理

    本节目录 1.消息队列负载 2.消息拉取 3.消息顺序消息消费 3.1核心属性与构造函数 3.2 start方法 3.3 submitConsumeRequest 3.4 ConsumeMessage ...

  4. RocketMQ——顺序消费(代码)

    关于rocketmq顺序消费的理解和图示可以查看该博文:RocketMQ--顺序消费和重复消费 本博客主要是以代码示例来了解顺序消费的相关内容,建议在此之前先了解下顺序消费的原理. 注:RocketM ...

  5. 一次 RocketMQ 顺序消费延迟的问题定位

    一次 RocketMQ 顺序消费延迟的问题定位 问题背景与现象 昨晚收到了应用报警,发现线上某个业务消费消息延迟了 54s 多(从消息发送到MQ 到被消费的间隔): 2021-06-30T23:12: ...

  6. 一文理清RocketMQ顺序消费、重复消费、消息丢失问题

    前言 在使用消息队列时不可避免的会遇到顺序消费.重复消费.消息丢失三个问题.在一次面试字节的时候,面试官问到如何保证顺序消费,当时回答不太准确,特意此文回顾如何解决顺序消费.重复消费.消息丢失三个问题 ...

  7. java rocketmq消费_rocketmq消费负载均衡--push消费详解

    前言 本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析 ...

  8. 记录Rocketmq定时消息不消费问题的排查过程

    记录Rocketmq定时消息不消费问题的排查过程 写在前面 问题根源初步确认 问题根源再次确认. 了解rocketmq定时消息原理以及处理过程. 源码调试 结论求证 后记 写在前面 此本记录了一个项目 ...

  9. RocketMQ发送消息和消费消息

    RocketMQ发送消息和消费消息 一.使用前配置 二.启动命令 三.pom.xml文件配置 四.编码 4.1 先定义一个消息保存的载体: 4.2 定义消息的发送者: 4.3 定义消息的消费者: 五 ...

最新文章

  1. 微信小游戏创业,究竟是红海还是死海?
  2. Keras之DNN:基于Keras(sigmoid+binary_crossentropy+predict_proba)利用DNN实现分类预测概率——DIY二分类数据集预测新数据点
  3. TCP/IP模型如何分层?路由器、网卡分别属于哪一层?
  4. 【opencv 学习】使用tesseract-ocr机芯数字识别
  5. 推荐一款 IDEA 神器 ,人工智能帮你写代码,再也不用加班了!!
  6. win10下安装Cygwin配置gcc编译环境
  7. VsCode云端版本
  8. Windows安装numpy详细教程
  9. K近邻算法(k-Nearest Neighbour, KNN)
  10. kde下gwenview启动慢,甚至几十秒才能启动
  11. 2FSK频谱matlab,数字调制系统在Matlab下的分析
  12. 什么是应用宝统一链接服务器,applink
  13. 日本知名动画公司东映动画加入 The Sandbox 元宇宙
  14. php 在线选座,基于jquery实现在线选座订座之影院篇
  15. 通俗易懂物联网(1):什么是物联网?
  16. 利用PHP表单做一个简历模块
  17. 用ffmpeg来切割mp3
  18. 物流人必备网络货运实战指南
  19. 西南石油大学计算机考研资料汇总
  20. LL(1)预测分析程序C++实现

热门文章

  1. SharePoint2013升级SP1后,运行配置向导报错:未注册sharepoint服务
  2. 利用Tomcat的用户名和密码构建“永久”后门
  3. 国外虚拟机下linux及mysql常用命令
  4. 复旦大学提出《Meta-FDMixup》解决跨域小样本学习中的域偏移问题
  5. CLIP再创辉煌!西南交大MSRA提出CLIP4Clip,进行端到端的视频文本检索!
  6. CVPR2021开源项目,带你傲游宇宙!
  7. 第四届UG2研讨会和竞赛:弥合计算成像与视觉识别之间的鸿沟
  8. 测一测!中科视拓免费开放口罩人脸检测与识别技术
  9. 百度大脑公开课:快速定制、部署高精度深度学习模型!
  10. 旷视研究院张弛:行人重识别及其应用