文章目录

  • 一、RocketMQ的Pull模式
    • 1.1 Pull模式的使用特点
    • 1.2 Java中PullConsumer的几种实现
      • 1.2.1 DefaultMQPullConsumer
        • 1.2.1.1 DefaultMQPullConsumer的重要API
          • 获取消息队列
          • 同步拉去消息
          • 异步拉取消息
          • 同步阻塞拉取消息
          • 获取队列的消费Offset
          • 更新消费组Offset
        • 1.2.1.2 DefaultMQPullConsumer的使用样例
      • 1.2.2 MQPullConsumerScheduleService
        • 1.2.2.1 MQPullConsumerScheduleService的实现原理
        • 1.2.2.2 MQPullConsumerScheduleService的重要API
          • 注册拉取任务回调函数
          • 回调函数接口
          • 拉取任务上下文
        • 1.2.2.3 MQPullConsumerScheduleService的使用样例
  • 二、使用Python对RocketMQ进行拉模式消费(PullConsumer)
    • 2.1 环境准备
      • 2.1.1 安装依赖
      • 2.1.2 设置Pycharm使用远程环境
    • 2.2 Python中的PullConsumer
      • 2.2.1 最简单的样例
      • 2.2.2 MQPullConsumerScheduleService的Python初步实现

一、RocketMQ的Pull模式

1.1 Pull模式的使用特点

  • 自己维护OffsetStore
    用户需要自己保存消费组的offset,比如存入Redis,或调用MQ接口将其保存在Broker端。
  • 自主选择Message Queue和offset进行消息拉取
    用户拉去消息时,需要用户自己来决定拉去哪个队列哪个offset开始,拉去多少消息

1.2 Java中PullConsumer的几种实现

1.2.1 DefaultMQPullConsumer

1.2.1.1 DefaultMQPullConsumer的重要API

获取消息队列
/*** 根据Topic获取该Topic的所有消息队列,用于遍历消息队列,从每个消息队列中获取消息** @param topic Topic名称* @return 该Topic所有的消息队列*/
@Override
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(withNamespace(topic));
}
同步拉去消息
/*** 同步拉取消息** @param mq            消息队列* @param subExpression 消息tag过滤表达式* @param offset        消费组offset(从哪里开始拉去)* @param maxNums       一次最大拉去消息数量* @param timeout       超时时间* @return 存储了拉取状态以及消息*/
@Override
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, timeout);
}
异步拉取消息
/*** 异步拉取消息* @param mq            消息队列* @param subExpression 消息tag过滤表达式* @param offset        消费组offset(从哪里开始拉去)* @param maxNums       一次最大拉去消息数量* @param timeout       超时时间* @param pullCallback  异步回调函数* @param timeout       * @throws MQClientException* @throws RemotingException* @throws InterruptedException*/
@Override
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,long timeout)throws MQClientException, RemotingException, InterruptedException {this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback, timeout);
}
同步阻塞拉取消息
/*** 拉取消息,若没有找到消息,则阻塞一段时间** @param mq            消息队列* @param subExpression tag过滤* @param offset        消费组offset* @param maxNums       一次最大拉取数量* @return*/
@Override
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums);
}

pullBlockIfNotFoundpull区别是: 前者在没有找到消息的时候会阻塞一段时间以便等待后续消息进入,后者则会直接返回 NOT_FOUND

获取队列的消费Offset
/*** 获取队列的消费Offset* @param mq 队列* @param fromStore 是否从存储获取,true: 从当前内存中获取,false:从远程broker获取* @return 消费offset*/
@Override
public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException {return this.defaultMQPullConsumerImpl.fetchConsumeOffset(queueWithNamespace(mq), fromStore);
}
更新消费组Offset
/*** 更新消费组的Offset,注意:只会在本地内存中更新,并不会同步到远程Broker,至少现在版本不会(4.4)* @param mq 消息队列* @param offset 消费进度*/
@Override
public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset);
}

1.2.1.2 DefaultMQPullConsumer的使用样例

在源码样例中存在该样例:

org.apache.rocketmq.example.simple.PullConsumer

样例代码如下:

import java.util.HashMap;
import java.util.Map;
import java.util.Set;import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;public class PullConsumer {// 记录每个队列的消费进度private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();public static void main(String[] args) throws MQClientException {// 1. 创建DefaultMQPullConsumer实例DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");// 2. 设置NameServerconsumer.setNamesrvAddr("127.0.0.1:9876");consumer.start();// 3. 获取Topic的所有队列Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");// 4. 遍历所有队列for (MessageQueue mq : mqs) {System.out.printf("Consume from the queue: %s%n", mq);SINGLE_MQ:while (true) {try {// 5. 拉取消息,arg1=消息队列,arg2=tag消息过滤,arg3=消息队列,arg4=一次最大拉去消息数量PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);System.out.printf("%s%n", pullResult);// 6. 将消息放入hash表中,存储该队列的消费进度putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:  // 找到消息,输出System.out.println(pullResult.getMsgFoundList().get(0));break;case NO_MATCHED_MSG:  // 没有匹配tag的消息System.out.println("无匹配消息");break;case NO_NEW_MSG:  // 该队列没有新消息,消费offset=最大offsetSystem.out.println("没有新消息");break SINGLE_MQ;  // 跳出该队列遍历case OFFSET_ILLEGAL:  // offset不合法System.out.println("Offset不合法");break;default:break;}} catch (Exception e) {e.printStackTrace();}}}// 7. 关闭Consumerconsumer.shutdown();}/*** 从Hash表中获取当前队列的消费offset* @param mq 消息队列* @return long类型 offset*/private static long getMessageQueueOffset(MessageQueue mq) {Long offset = OFFSE_TABLE.get(mq);if (offset != null)return offset;return 0;}/*** 将消费进度更新到Hash表* @param mq 消息队列* @param offset offset*/private static void putMessageQueueOffset(MessageQueue mq, long offset) {OFFSE_TABLE.put(mq, offset);}
}

增加了详细注释,比较简单,就不解释了

1.2.2 MQPullConsumerScheduleService

1.2.2.1 MQPullConsumerScheduleService的实现原理

这图不是严格按照源码来的,中间省略了很多步骤,只画了大致原理。

  1. 用户使用registerPullTaskCallbackTopic进行注册
  2. MQPullConsumerScheduleService 会将Topic的每个队列以及相应的 doPullTask() 实现放入名为 taskTable 的Hash表中。
  3. 线程池 scheduledThreadPoolExecutor 会不断的调用每个队列的 doPullTask() 函数。
  4. 用户在 doPullTask() 完成自己的拉取消息逻辑,和DefaultMQPullConsumer是一样的。
  5. 用户设置下次调用间隔时间
  6. scheduledThreadPoolExecutor 等待该间隔时间后,再次调用 doPullTask() 方法。

1.2.2.2 MQPullConsumerScheduleService的重要API

注册拉取任务回调函数
/*** 注册拉取任务回调函数* @param topic topic名称* @param callback 回调函数*/
public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {this.callbackTable.put(topic, callback);this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
}
回调函数接口
public interface PullTaskCallback {/*** * @param mq 消息队列* @param context 任务上下文*/void doPullTask(final MessageQueue mq, final PullTaskContext context);
}
拉取任务上下文
public class PullTaskContext {private int pullNextDelayTimeMillis = 200;// 使用该接口进行消息拉取,默认实现是DefaultMQPullConsumerprivate MQPullConsumer pullConsumer;public int getPullNextDelayTimeMillis() {return pullNextDelayTimeMillis;}/*** 设置下次调用doPullTask()的间隔时间,默认毫秒*/public void setPullNextDelayTimeMillis(int pullNextDelayTimeMillis) {this.pullNextDelayTimeMillis = pullNextDelayTimeMillis;}public MQPullConsumer getPullConsumer() {return pullConsumer;}public void setPullConsumer(MQPullConsumer pullConsumer) {this.pullConsumer = pullConsumer;}
}

1.2.2.3 MQPullConsumerScheduleService的使用样例

该样例来源于源码:

org.apache.rocketmq.example.simple.PullScheduleService

源码样例:

import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;public class PullScheduleService {public static void main(String[] args) throws MQClientException {// 1. 实例化对象final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");// 2. 设置NameServerscheduleService.getDefaultMQPullConsumer().setNamesrvAddr("127.0.0.1:9876");// 3. 设置消费组为集群模式scheduleService.setMessageModel(MessageModel.CLUSTERING);// 4. 注册拉取回调函数scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {@Overridepublic void doPullTask(MessageQueue mq, PullTaskContext context) {// 5.从上下文中获取MQPullConsumer对象,此处其实就是DefaultMQPullConsumer。MQPullConsumer consumer = context.getPullConsumer();try {// 6.获取该消费组的该队列的消费进度long offset = consumer.fetchConsumeOffset(mq, false);if (offset < 0)offset = 0;// 7.拉取消息,pull()方法在DefaultMQPullConsumer有具体介绍PullResult pullResult = consumer.pull(mq, "*", offset, 32);System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);switch (pullResult.getPullStatus()) {case FOUND:break;case NO_MATCHED_MSG:break;case NO_NEW_MSG:case OFFSET_ILLEGAL:break;default:break;}// 8.更新消费组该队列消费进度consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());// 9.设置下次拉取消息时间间隔,单位毫秒context.setPullNextDelayTimeMillis(100);} catch (Exception e) {e.printStackTrace();}}});scheduleService.start();}
}

二、使用Python对RocketMQ进行拉模式消费(PullConsumer)

2.1 环境准备

python中使用 rocket-python

https://github.com/messense/rocketmq-python

该库依赖于 rocketmq-client-cpp

https://github.com/apache/rocketmq-client-cpp

2.1.1 安装依赖

该库不直接支持windows,windows的玩家需要先安装 rocketmq-client-cpp ,比较麻烦,可以参考2.1.2,使用远程Python环境。

pip install rocketmq -i https://pypi.doubanio.com/simple

2.1.2 设置Pycharm使用远程环境

配置过后,就可以在windows的pycharm中直接使用linux的python环境

网上教程很多

https://www.cnblogs.com/zhishifenzi/p/9565404.html

2.2 Python中的PullConsumer

2.2.1 最简单的样例

from rocketmq.client import PullConsumerconsumer = PullConsumer('CID_XXX')  # 实例化对象
consumer.set_namesrv_addr('192.168.188.1:9876')  # 设置IP:port
consumer.start()for msg in consumer.pull('TestTopic'):  # 拉取一批消息print(msg.id, msg.body)  # 打印消息体consumer.shutdown()  # 关闭

Python中只对PullConsumer进行了简单的封装,并没有提供向Java那样 MQPullConsumerScheduleService 的实现

2.2.2 MQPullConsumerScheduleService的Python初步实现

import ctypes
import time
from _ctypes import POINTER
from ctypes import c_intfrom rocketmq.client import PullConsumer, _to_bytes, RecvMessage
import threadingfrom rocketmq.exceptions import ffi_check
from rocketmq.ffi import _CMessageQueue, dll, _CPullStatusclass PullTaskContext(object):pull_next_delay_time_millis = 200  # 拉取消息之后,延迟{pull_next_delay_time_millis}毫秒后,再次拉取pull_consumer = Noneclass MQPullConsumerScheduleService(object):pull_consumer = Nonedo_pull_task = Nonetopic = Nonepull_thread_nums = 1  # 每个队列对应的线程数,默认每个队列一个线程def __init__(self, group_id: str):self.pull_consumer = PullConsumer(group_id)def register_pull_task_callback(self, topic: str, do_pull_task):"""为Topic注册回调函数:param topic::param do_pull_task: 回调函数"""self.do_pull_task = do_pull_taskself.topic = topicdef set_namesrv_addr(self, addr):self.pull_consumer.set_namesrv_addr(addr)def set_namesrv_domain(self, domain):self.pull_consumer.set_namesrv_domain(domain)def start(self):self.pull_consumer.start()  # 启动PullConsumermessage_queue = POINTER(_CMessageQueue)()  # 获取所有队列信息queue_size = c_int()  # 获取队列长度ffi_check(dll.FetchSubscriptionMessageQueues(self.pull_consumer._handle,_to_bytes(self.topic),ctypes.pointer(message_queue),ctypes.pointer(queue_size)))  # 获取队列信息for i in range(int(queue_size.value)):# 遍历所有队列for item in range(self.pull_thread_nums):# 为每个队列开启{pull_thread_nums}个线程self.PullTaskImpl(self, message_queue[i]).start()class PullTaskImpl(threading.Thread):"""内部类线程类"""message_queue = None  # 队列is_cancelled = False  # 是否取消def __init__(self, outer, message_queue):super().__init__()self.outer = outer  # 外部类MQPullConsumerScheduleServiceself.message_queue = message_queue # 消息队列def run(self):while True:if self.outer.do_pull_task:context = PullTaskContext()  # 新建上下文信息context.pull_consumer = self.outer.pull_consumertry:# 调用回调函数self.outer.do_pull_task(self.message_queue, context)except Exception as e:context.pull_next_delay_time_millis = 1000# import traceback# traceback.print_exc()print("doPullTask Exception: %s" % str(e))# 等待{context.pull_next_delay_time_millis}毫秒后,再次调用time.sleep(context.pull_next_delay_time_millis / 1000)else:print("Pull Task Callback not exist , %s" % self.outer.topic)if self.is_cancelled:breakdef synchronized(func):"""函数锁,防止并发问题"""func.__lock__ = threading.Lock()def synced_func(*args, **kws):with func.__lock__:return func(*args, **kws)return synced_func@synchronized
def pull_message(consumer: PullConsumer, message_queue, expression, offset, max_num):"""拉取消息:param consumer: PullConsumer对象:param message_queue: 消息队列:param expression: tag过滤表达式:param offset: 消费组的该队列offset:param max_num: 一次拉取最大消息数:return:"""pull_res = dll.Pull(consumer._handle,ctypes.pointer(message_queue),_to_bytes(expression),offset,max_num,)return pull_resif __name__ == '__main__':# 1. 实例化对象pull_consumer = MQPullConsumerScheduleService("ConsumerGroupTest")# 2. 设置nameserver地址pull_consumer.set_namesrv_addr("192.168.188.1:9876")# 3. 设置每个队列的线程数pull_consumer.pull_thread_nums = 2# 4. 实现回调函数def do_pull_task(message_queue: _CMessageQueue, context: PullTaskContext):consumer = context.pull_consumer  # 从上下文中获取PullConsumer()实例# 获取该consumer中该队列的offset,可以把offset保存在其他地方(比如redis),从其他地方读取# consumer实例中的offset是存储在内存中的,重启之后就会归零tmp_offset = ctypes.c_longlong(consumer.get_message_queue_offset(message_queue))has_new_msg = Truewhile has_new_msg:pull_res = pull_message(consumer, message_queue, "*", tmp_offset, 1)if not pull_res:continueif pull_res.pullStatus != _CPullStatus.BROKER_TIMEOUT:tmp_offset = pull_res.nextBeginOffsetconsumer.set_message_queue_offset(message_queue, tmp_offset)if pull_res.pullStatus == _CPullStatus.FOUND:for i in range(int(pull_res.size)):print(message_queue.queueId, message_queue.brokerName, RecvMessage(pull_res.msgFoundList[i]))elif pull_res.pullStatus == _CPullStatus.NO_MATCHED_MSG:passelif pull_res.pullStatus == _CPullStatus.NO_NEW_MSG:has_new_msg = Falseelif pull_res.pullStatus == _CPullStatus.OFFSET_ILLEGAL:passelse:pass# dll.ReleasePullResult(pull_res)  # NOTE: No need to check ffi return code heretime.sleep(5)  # todo 处理业务逻辑# 5.注册Topic对应的回调函数pull_consumer.register_pull_task_callback("TopicTest2", do_pull_task)# 6.启动pull_consumer.start()

RocketMQ的拉(Pull)模式详解相关推荐

  1. (十八)享元模式详解(都市异能版) - 转

    作者:zuoxiaolong8810(左潇龙),转载请注明出处. 魔都. 自从越狱风波过去以后,小左的生活便又回到了之前的节奏,依旧是每日徘徊在魔都某天桥,继续着自己的算命之旅. 说起这次越狱风波,着 ...

  2. ST MCU_GPIO的八种工作模式详解

    GPIO的八种工作模式详解 浮空输入_IN_FLOATING 带上拉输入_IPU 带下拉输入_IPD 模拟输入_AIN 开漏输出_OUT_OD 推挽输出_OUT_PP 开漏复用输出_AF_OD 推挽复 ...

  3. Android M新特性Doze and App Standby模式详解

    转载请标明出处:http://blog.csdn.net/xx326664162/article/details/52312122 文章出自:薛瑄的博客 你也可以查看我的其他同类文章,也会让你有一定的 ...

  4. getinstance方法详解_二、设计模式总览及工厂模式详解

    二.架构师内功心法之设计模式 2.架构师内功心法之设计模式 2.1.课程目标 1.通过对本章内容的学习,了解设计模式的由来. 2.介绍设计模式能帮我们解决哪些问题. 3.剖析工厂模式的历史由来及应用场 ...

  5. Spotify敏捷模式详解三部曲第二篇:研发过程

    本文转自:Scrum 中文网 引言 在本系列文章的第一篇,我们介绍了Spotify的敏捷研发团队,以及它独特的组织架构.Spotify的研发团队采用的是一种非常独特的组织架构,如下图所示: 整个研发组 ...

  6. Spotify敏捷模式详解三部曲第一篇:研发团队

    本文转自:Scrum中文网 引言 2018年4月,来自北欧瑞典的音乐流媒体公司.百亿美元独角兽Spotify创造了历史,它成为了当代上市公司当中,第一家通过"直接上市"的方式在美国 ...

  7. Docker(十四):Docker:网络模式详解

    Docker作为目前最火的轻量级容器技术,牛逼的功能,如Docker的镜像管理,不足的地方网络方面. Docker自身的4种网络工作方式,和一些自定义网络模式 安装Docker时,它会自动创建三个网络 ...

  8. linux apache两种工作模式详解

    apache两种工作模式详解 刚接触这两个配置时很迷糊,全部开启或全部注释没有几多变化.今天搜索到这么一篇讲得还不错的文章,看了几篇,还是不能完全记住,做一个收藏. 空闲子进程:是指没有正在处理请求的 ...

  9. cglib动态代理jar包_代理模式详解:静态代理+JDK/CGLIB 动态代理实战

    1. 代理模式 代理模式是一种比较好的理解的设计模式.简单来说就是 我们使用代理对象来代替对真实对象(real object)的访问,这样就可以在不修改原目标对象的前提下,提供额外的功能操作,扩展目标 ...

  10. STM32开发 -- 低功耗模式详解(2)

    如需转载请注明出处:https://juyou.blog.csdn.net/article/details/98631012 上一篇文章 STM32开发 – 低功耗模式详解(1) 简单的总结了一下低功 ...

最新文章

  1. SLAM:SLAM相机简介、SLAM五步流程简介(VO+BEO+LCD+M)之详细攻略
  2. windows安装MongoDB进度条卡住,window安装mongo系统错误 2,系统错误5的解决办法(转载)...
  3. C++ 经典面试题
  4. 专有云传统HSF升级Pandora Boot开发
  5. Bash脚本教程之循环
  6. Windows进程系列(2) -- Svchost进程
  7. 1万亿次、10亿人、10亿张,科技给生活带来多少改变?
  8. C#对多态性的两种支持
  9. RocketMQ 分布式事务
  10. python 元组与数组的区别_python – NumPy – 从元组到数组的高效转换?
  11. mingw w64 v8.0.0_R 4.0发布,配套编译工具Rtools 40发布
  12. ORACLE 10g创建单实例 ASM
  13. MySQL-第N篇一些经验
  14. html5都有神马岗位,HTML5的优势和岗位要求你都知道吗
  15. 抖音怎么去除水印方法及小工具
  16. 【快应用】十大手机厂商共推快应用标准
  17. React+Antd兼容ie浏览器,360安全浏览器兼容模式
  18. 从小米5看雷军式“互联网思维”的黄昏
  19. 数学词汇的英译,写文章,读文献必备
  20. Xmake v2.7.1 发布,更好的 C++ Modules 支持

热门文章

  1. 中国科学院843遥感概论-目录
  2. MongoDB与亚马逊云科技扩大全球合作
  3. 领扣问题1. 两数之和领python解决方案
  4. java计算机毕业设计汽车客运站票务管理系统源码+系统+数据库+lw文档+mybatis+运行部署
  5. easypoi导出Excel根据内容如何自动换行和自动调整行高
  6. 四川大学计算机考研考点,四川大学计算机技术考研
  7. python集合怎么表示_Python 集合(Set)、字典(Dictionary)
  8. 魏晋南北朝书法家钟繇、王羲之、王献之
  9. 微信 SHA1 签名_个性签名设计制作,多种样式免费一键设计
  10. 通过dns轮询实现负载均衡