RocketMQ 顺序消费只消费一次 坑
rocketMq实现顺序消费的原理
produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息
注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue
单个节点(Producer端1个、Consumer端1个)
1、Producer.java
package order; import java.util.List; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * Producer,发送顺序消息 */ public class Producer { public static void main(String[] args) { try { DefaultMQProducer producer = new DefaultMQProducer("order_Producer"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); producer.start(); // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", // "TagE" }; for (int i = 1; i <= 5; i++) { Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 0); System.out.println(sendResult); } producer.shutdown(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
2、Consumer.java (有问题)
package order; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交) */ public class Consumer1 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 设置自动提交 context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println(msg + ",内容:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } ; return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer1 Started."); } }
这个地方有一个大坑,注册监听类的时候,不能使用匿名内部类。不然的话,只会消费一次,然后消费者就 挂了……挂了……挂了……
监听类要单独写!!!
正确消费者写法:
自定义监听类:
MyMessageListener
public class MyMessageListener implements MessageListenerOrderly {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {// 设置自动提交context.setAutoCommit(true);for (MessageExt msg : msgs) {System.out.println(msg + ",内容:" + new String(msg.getBody()));}try {TimeUnit.SECONDS.sleep(5L);} catch (InterruptedException e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;} }
Consumer.java
public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");consumer.setNamesrvAddr("101.200.33.225:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicOrderTest", "*"); MyMessageListener myMessageListener = new MyMessageListener();consumer.registerMessageListener(myMessageListener);consumer.start();System.out.println("Consumer1 Started.");} }
参考:https://www.cnblogs.com/antain/p/rocketmq.html
http://www.cnblogs.com/520playboy/p/6750023.html
http://dbaplus.cn/news-21-1123-1.html
转载于:https://www.cnblogs.com/Jtianlin/p/8436024.html
RocketMQ 顺序消费只消费一次 坑相关推荐
- rocketmq 顺序消费_10 分钟看懂消息队列 RocketMQ
RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目.作为经历过多次阿里巴巴双十一这种&qu ...
- rocketmq 顺序消费_RocketMQ核心概念扫盲
在正式进入RocketMQ的学习之前,我觉得有必要梳理一下RocketMQ核心概念,为大家学习RocketMQ打下牢固的基础. 1.RocketMQ部署架构 在RocketMQ主要的组件如下: Nam ...
- 源码分析RocketMQ顺序消息消费实现原理
本节目录 1.消息队列负载 2.消息拉取 3.消息顺序消息消费 3.1核心属性与构造函数 3.2 start方法 3.3 submitConsumeRequest 3.4 ConsumeMessage ...
- RocketMQ——顺序消费(代码)
关于rocketmq顺序消费的理解和图示可以查看该博文:RocketMQ--顺序消费和重复消费 本博客主要是以代码示例来了解顺序消费的相关内容,建议在此之前先了解下顺序消费的原理. 注:RocketM ...
- 一次 RocketMQ 顺序消费延迟的问题定位
一次 RocketMQ 顺序消费延迟的问题定位 问题背景与现象 昨晚收到了应用报警,发现线上某个业务消费消息延迟了 54s 多(从消息发送到MQ 到被消费的间隔): 2021-06-30T23:12: ...
- 一文理清RocketMQ顺序消费、重复消费、消息丢失问题
前言 在使用消息队列时不可避免的会遇到顺序消费.重复消费.消息丢失三个问题.在一次面试字节的时候,面试官问到如何保证顺序消费,当时回答不太准确,特意此文回顾如何解决顺序消费.重复消费.消息丢失三个问题 ...
- java rocketmq消费_rocketmq消费负载均衡--push消费详解
前言 本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析 ...
- 记录Rocketmq定时消息不消费问题的排查过程
记录Rocketmq定时消息不消费问题的排查过程 写在前面 问题根源初步确认 问题根源再次确认. 了解rocketmq定时消息原理以及处理过程. 源码调试 结论求证 后记 写在前面 此本记录了一个项目 ...
- RocketMQ发送消息和消费消息
RocketMQ发送消息和消费消息 一.使用前配置 二.启动命令 三.pom.xml文件配置 四.编码 4.1 先定义一个消息保存的载体: 4.2 定义消息的发送者: 4.3 定义消息的消费者: 五 ...
最新文章
- 微信小游戏创业,究竟是红海还是死海?
- Keras之DNN:基于Keras(sigmoid+binary_crossentropy+predict_proba)利用DNN实现分类预测概率——DIY二分类数据集预测新数据点
- TCP/IP模型如何分层?路由器、网卡分别属于哪一层?
- 【opencv 学习】使用tesseract-ocr机芯数字识别
- 推荐一款 IDEA 神器 ,人工智能帮你写代码,再也不用加班了!!
- win10下安装Cygwin配置gcc编译环境
- VsCode云端版本
- Windows安装numpy详细教程
- K近邻算法(k-Nearest Neighbour, KNN)
- kde下gwenview启动慢,甚至几十秒才能启动
- 2FSK频谱matlab,数字调制系统在Matlab下的分析
- 什么是应用宝统一链接服务器,applink
- 日本知名动画公司东映动画加入 The Sandbox 元宇宙
- php 在线选座,基于jquery实现在线选座订座之影院篇
- 通俗易懂物联网(1):什么是物联网?
- 利用PHP表单做一个简历模块
- 用ffmpeg来切割mp3
- 物流人必备网络货运实战指南
- 西南石油大学计算机考研资料汇总
- LL(1)预测分析程序C++实现
热门文章
- SharePoint2013升级SP1后,运行配置向导报错:未注册sharepoint服务
- 利用Tomcat的用户名和密码构建“永久”后门
- 国外虚拟机下linux及mysql常用命令
- 复旦大学提出《Meta-FDMixup》解决跨域小样本学习中的域偏移问题
- CLIP再创辉煌!西南交大MSRA提出CLIP4Clip,进行端到端的视频文本检索!
- CVPR2021开源项目,带你傲游宇宙!
- 第四届UG2研讨会和竞赛:弥合计算成像与视觉识别之间的鸿沟
- 测一测!中科视拓免费开放口罩人脸检测与识别技术
- 百度大脑公开课:快速定制、部署高精度深度学习模型!
- 旷视研究院张弛:行人重识别及其应用