RocketMQ的拉(Pull)模式详解
文章目录
- 一、RocketMQ的Pull模式
- 1.1 Pull模式的使用特点
- 1.2 Java中PullConsumer的几种实现
- 1.2.1 DefaultMQPullConsumer
- 1.2.1.1 DefaultMQPullConsumer的重要API
- 获取消息队列
- 同步拉去消息
- 异步拉取消息
- 同步阻塞拉取消息
- 获取队列的消费Offset
- 更新消费组Offset
- 1.2.1.2 DefaultMQPullConsumer的使用样例
- 1.2.2 MQPullConsumerScheduleService
- 1.2.2.1 MQPullConsumerScheduleService的实现原理
- 1.2.2.2 MQPullConsumerScheduleService的重要API
- 注册拉取任务回调函数
- 回调函数接口
- 拉取任务上下文
- 1.2.2.3 MQPullConsumerScheduleService的使用样例
- 二、使用Python对RocketMQ进行拉模式消费(PullConsumer)
- 2.1 环境准备
- 2.1.1 安装依赖
- 2.1.2 设置Pycharm使用远程环境
- 2.2 Python中的PullConsumer
- 2.2.1 最简单的样例
- 2.2.2 MQPullConsumerScheduleService的Python初步实现
一、RocketMQ的Pull模式
1.1 Pull模式的使用特点
- 自己维护OffsetStore
用户需要自己保存消费组的offset,比如存入Redis,或调用MQ接口将其保存在Broker端。 - 自主选择Message Queue和offset进行消息拉取
用户拉去消息时,需要用户自己来决定拉去哪个队列从哪个offset开始,拉去多少消息。
1.2 Java中PullConsumer的几种实现
1.2.1 DefaultMQPullConsumer
1.2.1.1 DefaultMQPullConsumer的重要API
获取消息队列
/*** 根据Topic获取该Topic的所有消息队列,用于遍历消息队列,从每个消息队列中获取消息** @param topic Topic名称* @return 该Topic所有的消息队列*/
@Override
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(withNamespace(topic));
}
同步拉去消息
/*** 同步拉取消息** @param mq 消息队列* @param subExpression 消息tag过滤表达式* @param offset 消费组offset(从哪里开始拉去)* @param maxNums 一次最大拉去消息数量* @param timeout 超时时间* @return 存储了拉取状态以及消息*/
@Override
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, timeout);
}
异步拉取消息
/*** 异步拉取消息* @param mq 消息队列* @param subExpression 消息tag过滤表达式* @param offset 消费组offset(从哪里开始拉去)* @param maxNums 一次最大拉去消息数量* @param timeout 超时时间* @param pullCallback 异步回调函数* @param timeout * @throws MQClientException* @throws RemotingException* @throws InterruptedException*/
@Override
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,long timeout)throws MQClientException, RemotingException, InterruptedException {this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback, timeout);
}
同步阻塞拉取消息
/*** 拉取消息,若没有找到消息,则阻塞一段时间** @param mq 消息队列* @param subExpression tag过滤* @param offset 消费组offset* @param maxNums 一次最大拉取数量* @return*/
@Override
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums);
}
pullBlockIfNotFound 和 pull区别是: 前者在没有找到消息的时候会阻塞一段时间以便等待后续消息进入,后者则会直接返回 NOT_FOUND 。
获取队列的消费Offset
/*** 获取队列的消费Offset* @param mq 队列* @param fromStore 是否从存储获取,true: 从当前内存中获取,false:从远程broker获取* @return 消费offset*/
@Override
public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException {return this.defaultMQPullConsumerImpl.fetchConsumeOffset(queueWithNamespace(mq), fromStore);
}
更新消费组Offset
/*** 更新消费组的Offset,注意:只会在本地内存中更新,并不会同步到远程Broker,至少现在版本不会(4.4)* @param mq 消息队列* @param offset 消费进度*/
@Override
public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset);
}
1.2.1.2 DefaultMQPullConsumer的使用样例
在源码样例中存在该样例:
org.apache.rocketmq.example.simple.PullConsumer
样例代码如下:
import java.util.HashMap;
import java.util.Map;
import java.util.Set;import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;public class PullConsumer {// 记录每个队列的消费进度private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();public static void main(String[] args) throws MQClientException {// 1. 创建DefaultMQPullConsumer实例DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");// 2. 设置NameServerconsumer.setNamesrvAddr("127.0.0.1:9876");consumer.start();// 3. 获取Topic的所有队列Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");// 4. 遍历所有队列for (MessageQueue mq : mqs) {System.out.printf("Consume from the queue: %s%n", mq);SINGLE_MQ:while (true) {try {// 5. 拉取消息,arg1=消息队列,arg2=tag消息过滤,arg3=消息队列,arg4=一次最大拉去消息数量PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);System.out.printf("%s%n", pullResult);// 6. 将消息放入hash表中,存储该队列的消费进度putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND: // 找到消息,输出System.out.println(pullResult.getMsgFoundList().get(0));break;case NO_MATCHED_MSG: // 没有匹配tag的消息System.out.println("无匹配消息");break;case NO_NEW_MSG: // 该队列没有新消息,消费offset=最大offsetSystem.out.println("没有新消息");break SINGLE_MQ; // 跳出该队列遍历case OFFSET_ILLEGAL: // offset不合法System.out.println("Offset不合法");break;default:break;}} catch (Exception e) {e.printStackTrace();}}}// 7. 关闭Consumerconsumer.shutdown();}/*** 从Hash表中获取当前队列的消费offset* @param mq 消息队列* @return long类型 offset*/private static long getMessageQueueOffset(MessageQueue mq) {Long offset = OFFSE_TABLE.get(mq);if (offset != null)return offset;return 0;}/*** 将消费进度更新到Hash表* @param mq 消息队列* @param offset offset*/private static void putMessageQueueOffset(MessageQueue mq, long offset) {OFFSE_TABLE.put(mq, offset);}
}
增加了详细注释,比较简单,就不解释了
1.2.2 MQPullConsumerScheduleService
1.2.2.1 MQPullConsumerScheduleService的实现原理
这图不是严格按照源码来的,中间省略了很多步骤,只画了大致原理。
- 用户使用registerPullTaskCallback对Topic进行注册
- MQPullConsumerScheduleService 会将Topic的每个队列以及相应的 doPullTask() 实现放入名为 taskTable 的Hash表中。
- 线程池 scheduledThreadPoolExecutor 会不断的调用每个队列的 doPullTask() 函数。
- 用户在 doPullTask() 完成自己的拉取消息逻辑,和DefaultMQPullConsumer是一样的。
- 用户设置下次调用间隔时间
- scheduledThreadPoolExecutor 等待该间隔时间后,再次调用 doPullTask() 方法。
1.2.2.2 MQPullConsumerScheduleService的重要API
注册拉取任务回调函数
/*** 注册拉取任务回调函数* @param topic topic名称* @param callback 回调函数*/
public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {this.callbackTable.put(topic, callback);this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
}
回调函数接口
public interface PullTaskCallback {/*** * @param mq 消息队列* @param context 任务上下文*/void doPullTask(final MessageQueue mq, final PullTaskContext context);
}
拉取任务上下文
public class PullTaskContext {private int pullNextDelayTimeMillis = 200;// 使用该接口进行消息拉取,默认实现是DefaultMQPullConsumerprivate MQPullConsumer pullConsumer;public int getPullNextDelayTimeMillis() {return pullNextDelayTimeMillis;}/*** 设置下次调用doPullTask()的间隔时间,默认毫秒*/public void setPullNextDelayTimeMillis(int pullNextDelayTimeMillis) {this.pullNextDelayTimeMillis = pullNextDelayTimeMillis;}public MQPullConsumer getPullConsumer() {return pullConsumer;}public void setPullConsumer(MQPullConsumer pullConsumer) {this.pullConsumer = pullConsumer;}
}
1.2.2.3 MQPullConsumerScheduleService的使用样例
该样例来源于源码:
org.apache.rocketmq.example.simple.PullScheduleService
源码样例:
import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;public class PullScheduleService {public static void main(String[] args) throws MQClientException {// 1. 实例化对象final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");// 2. 设置NameServerscheduleService.getDefaultMQPullConsumer().setNamesrvAddr("127.0.0.1:9876");// 3. 设置消费组为集群模式scheduleService.setMessageModel(MessageModel.CLUSTERING);// 4. 注册拉取回调函数scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {@Overridepublic void doPullTask(MessageQueue mq, PullTaskContext context) {// 5.从上下文中获取MQPullConsumer对象,此处其实就是DefaultMQPullConsumer。MQPullConsumer consumer = context.getPullConsumer();try {// 6.获取该消费组的该队列的消费进度long offset = consumer.fetchConsumeOffset(mq, false);if (offset < 0)offset = 0;// 7.拉取消息,pull()方法在DefaultMQPullConsumer有具体介绍PullResult pullResult = consumer.pull(mq, "*", offset, 32);System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);switch (pullResult.getPullStatus()) {case FOUND:break;case NO_MATCHED_MSG:break;case NO_NEW_MSG:case OFFSET_ILLEGAL:break;default:break;}// 8.更新消费组该队列消费进度consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());// 9.设置下次拉取消息时间间隔,单位毫秒context.setPullNextDelayTimeMillis(100);} catch (Exception e) {e.printStackTrace();}}});scheduleService.start();}
}
二、使用Python对RocketMQ进行拉模式消费(PullConsumer)
2.1 环境准备
python中使用 rocket-python 库
https://github.com/messense/rocketmq-python
该库依赖于 rocketmq-client-cpp 库
https://github.com/apache/rocketmq-client-cpp
2.1.1 安装依赖
该库不直接支持windows,windows的玩家需要先安装 rocketmq-client-cpp ,比较麻烦,可以参考2.1.2,使用远程Python环境。
pip install rocketmq -i https://pypi.doubanio.com/simple
2.1.2 设置Pycharm使用远程环境
配置过后,就可以在windows的pycharm中直接使用linux的python环境
网上教程很多
https://www.cnblogs.com/zhishifenzi/p/9565404.html
2.2 Python中的PullConsumer
2.2.1 最简单的样例
from rocketmq.client import PullConsumerconsumer = PullConsumer('CID_XXX') # 实例化对象
consumer.set_namesrv_addr('192.168.188.1:9876') # 设置IP:port
consumer.start()for msg in consumer.pull('TestTopic'): # 拉取一批消息print(msg.id, msg.body) # 打印消息体consumer.shutdown() # 关闭
Python中只对PullConsumer进行了简单的封装,并没有提供向Java那样 MQPullConsumerScheduleService 的实现
2.2.2 MQPullConsumerScheduleService的Python初步实现
import ctypes
import time
from _ctypes import POINTER
from ctypes import c_intfrom rocketmq.client import PullConsumer, _to_bytes, RecvMessage
import threadingfrom rocketmq.exceptions import ffi_check
from rocketmq.ffi import _CMessageQueue, dll, _CPullStatusclass PullTaskContext(object):pull_next_delay_time_millis = 200 # 拉取消息之后,延迟{pull_next_delay_time_millis}毫秒后,再次拉取pull_consumer = Noneclass MQPullConsumerScheduleService(object):pull_consumer = Nonedo_pull_task = Nonetopic = Nonepull_thread_nums = 1 # 每个队列对应的线程数,默认每个队列一个线程def __init__(self, group_id: str):self.pull_consumer = PullConsumer(group_id)def register_pull_task_callback(self, topic: str, do_pull_task):"""为Topic注册回调函数:param topic::param do_pull_task: 回调函数"""self.do_pull_task = do_pull_taskself.topic = topicdef set_namesrv_addr(self, addr):self.pull_consumer.set_namesrv_addr(addr)def set_namesrv_domain(self, domain):self.pull_consumer.set_namesrv_domain(domain)def start(self):self.pull_consumer.start() # 启动PullConsumermessage_queue = POINTER(_CMessageQueue)() # 获取所有队列信息queue_size = c_int() # 获取队列长度ffi_check(dll.FetchSubscriptionMessageQueues(self.pull_consumer._handle,_to_bytes(self.topic),ctypes.pointer(message_queue),ctypes.pointer(queue_size))) # 获取队列信息for i in range(int(queue_size.value)):# 遍历所有队列for item in range(self.pull_thread_nums):# 为每个队列开启{pull_thread_nums}个线程self.PullTaskImpl(self, message_queue[i]).start()class PullTaskImpl(threading.Thread):"""内部类线程类"""message_queue = None # 队列is_cancelled = False # 是否取消def __init__(self, outer, message_queue):super().__init__()self.outer = outer # 外部类MQPullConsumerScheduleServiceself.message_queue = message_queue # 消息队列def run(self):while True:if self.outer.do_pull_task:context = PullTaskContext() # 新建上下文信息context.pull_consumer = self.outer.pull_consumertry:# 调用回调函数self.outer.do_pull_task(self.message_queue, context)except Exception as e:context.pull_next_delay_time_millis = 1000# import traceback# traceback.print_exc()print("doPullTask Exception: %s" % str(e))# 等待{context.pull_next_delay_time_millis}毫秒后,再次调用time.sleep(context.pull_next_delay_time_millis / 1000)else:print("Pull Task Callback not exist , %s" % self.outer.topic)if self.is_cancelled:breakdef synchronized(func):"""函数锁,防止并发问题"""func.__lock__ = threading.Lock()def synced_func(*args, **kws):with func.__lock__:return func(*args, **kws)return synced_func@synchronized
def pull_message(consumer: PullConsumer, message_queue, expression, offset, max_num):"""拉取消息:param consumer: PullConsumer对象:param message_queue: 消息队列:param expression: tag过滤表达式:param offset: 消费组的该队列offset:param max_num: 一次拉取最大消息数:return:"""pull_res = dll.Pull(consumer._handle,ctypes.pointer(message_queue),_to_bytes(expression),offset,max_num,)return pull_resif __name__ == '__main__':# 1. 实例化对象pull_consumer = MQPullConsumerScheduleService("ConsumerGroupTest")# 2. 设置nameserver地址pull_consumer.set_namesrv_addr("192.168.188.1:9876")# 3. 设置每个队列的线程数pull_consumer.pull_thread_nums = 2# 4. 实现回调函数def do_pull_task(message_queue: _CMessageQueue, context: PullTaskContext):consumer = context.pull_consumer # 从上下文中获取PullConsumer()实例# 获取该consumer中该队列的offset,可以把offset保存在其他地方(比如redis),从其他地方读取# consumer实例中的offset是存储在内存中的,重启之后就会归零tmp_offset = ctypes.c_longlong(consumer.get_message_queue_offset(message_queue))has_new_msg = Truewhile has_new_msg:pull_res = pull_message(consumer, message_queue, "*", tmp_offset, 1)if not pull_res:continueif pull_res.pullStatus != _CPullStatus.BROKER_TIMEOUT:tmp_offset = pull_res.nextBeginOffsetconsumer.set_message_queue_offset(message_queue, tmp_offset)if pull_res.pullStatus == _CPullStatus.FOUND:for i in range(int(pull_res.size)):print(message_queue.queueId, message_queue.brokerName, RecvMessage(pull_res.msgFoundList[i]))elif pull_res.pullStatus == _CPullStatus.NO_MATCHED_MSG:passelif pull_res.pullStatus == _CPullStatus.NO_NEW_MSG:has_new_msg = Falseelif pull_res.pullStatus == _CPullStatus.OFFSET_ILLEGAL:passelse:pass# dll.ReleasePullResult(pull_res) # NOTE: No need to check ffi return code heretime.sleep(5) # todo 处理业务逻辑# 5.注册Topic对应的回调函数pull_consumer.register_pull_task_callback("TopicTest2", do_pull_task)# 6.启动pull_consumer.start()
RocketMQ的拉(Pull)模式详解相关推荐
- (十八)享元模式详解(都市异能版) - 转
作者:zuoxiaolong8810(左潇龙),转载请注明出处. 魔都. 自从越狱风波过去以后,小左的生活便又回到了之前的节奏,依旧是每日徘徊在魔都某天桥,继续着自己的算命之旅. 说起这次越狱风波,着 ...
- ST MCU_GPIO的八种工作模式详解
GPIO的八种工作模式详解 浮空输入_IN_FLOATING 带上拉输入_IPU 带下拉输入_IPD 模拟输入_AIN 开漏输出_OUT_OD 推挽输出_OUT_PP 开漏复用输出_AF_OD 推挽复 ...
- Android M新特性Doze and App Standby模式详解
转载请标明出处:http://blog.csdn.net/xx326664162/article/details/52312122 文章出自:薛瑄的博客 你也可以查看我的其他同类文章,也会让你有一定的 ...
- getinstance方法详解_二、设计模式总览及工厂模式详解
二.架构师内功心法之设计模式 2.架构师内功心法之设计模式 2.1.课程目标 1.通过对本章内容的学习,了解设计模式的由来. 2.介绍设计模式能帮我们解决哪些问题. 3.剖析工厂模式的历史由来及应用场 ...
- Spotify敏捷模式详解三部曲第二篇:研发过程
本文转自:Scrum 中文网 引言 在本系列文章的第一篇,我们介绍了Spotify的敏捷研发团队,以及它独特的组织架构.Spotify的研发团队采用的是一种非常独特的组织架构,如下图所示: 整个研发组 ...
- Spotify敏捷模式详解三部曲第一篇:研发团队
本文转自:Scrum中文网 引言 2018年4月,来自北欧瑞典的音乐流媒体公司.百亿美元独角兽Spotify创造了历史,它成为了当代上市公司当中,第一家通过"直接上市"的方式在美国 ...
- Docker(十四):Docker:网络模式详解
Docker作为目前最火的轻量级容器技术,牛逼的功能,如Docker的镜像管理,不足的地方网络方面. Docker自身的4种网络工作方式,和一些自定义网络模式 安装Docker时,它会自动创建三个网络 ...
- linux apache两种工作模式详解
apache两种工作模式详解 刚接触这两个配置时很迷糊,全部开启或全部注释没有几多变化.今天搜索到这么一篇讲得还不错的文章,看了几篇,还是不能完全记住,做一个收藏. 空闲子进程:是指没有正在处理请求的 ...
- cglib动态代理jar包_代理模式详解:静态代理+JDK/CGLIB 动态代理实战
1. 代理模式 代理模式是一种比较好的理解的设计模式.简单来说就是 我们使用代理对象来代替对真实对象(real object)的访问,这样就可以在不修改原目标对象的前提下,提供额外的功能操作,扩展目标 ...
- STM32开发 -- 低功耗模式详解(2)
如需转载请注明出处:https://juyou.blog.csdn.net/article/details/98631012 上一篇文章 STM32开发 – 低功耗模式详解(1) 简单的总结了一下低功 ...
最新文章
- SLAM:SLAM相机简介、SLAM五步流程简介(VO+BEO+LCD+M)之详细攻略
- windows安装MongoDB进度条卡住,window安装mongo系统错误 2,系统错误5的解决办法(转载)...
- C++ 经典面试题
- 专有云传统HSF升级Pandora Boot开发
- Bash脚本教程之循环
- Windows进程系列(2) -- Svchost进程
- 1万亿次、10亿人、10亿张,科技给生活带来多少改变?
- C#对多态性的两种支持
- RocketMQ 分布式事务
- python 元组与数组的区别_python – NumPy – 从元组到数组的高效转换?
- mingw w64 v8.0.0_R 4.0发布,配套编译工具Rtools 40发布
- ORACLE 10g创建单实例 ASM
- MySQL-第N篇一些经验
- html5都有神马岗位,HTML5的优势和岗位要求你都知道吗
- 抖音怎么去除水印方法及小工具
- 【快应用】十大手机厂商共推快应用标准
- React+Antd兼容ie浏览器,360安全浏览器兼容模式
- 从小米5看雷军式“互联网思维”的黄昏
- 数学词汇的英译,写文章,读文献必备
- Xmake v2.7.1 发布,更好的 C++ Modules 支持
热门文章
- 中国科学院843遥感概论-目录
- MongoDB与亚马逊云科技扩大全球合作
- 领扣问题1. 两数之和领python解决方案
- java计算机毕业设计汽车客运站票务管理系统源码+系统+数据库+lw文档+mybatis+运行部署
- easypoi导出Excel根据内容如何自动换行和自动调整行高
- 四川大学计算机考研考点,四川大学计算机技术考研
- python集合怎么表示_Python 集合(Set)、字典(Dictionary)
- 魏晋南北朝书法家钟繇、王羲之、王献之
- 微信 SHA1 签名_个性签名设计制作,多种样式免费一键设计
- 通过dns轮询实现负载均衡