博客地址:朝·闻·道​www.wuwenliang.net

本文我将继续讲解如何使用DefaultMQPushConsumer对RocketMQ中的消息进行消费,同时在文章的第二部分将继续带领读者朋友对DefaultMQPushConsumer进行薄封装,让我们在Spring中更容易对消息进行消费。DefaultMQPushConsumer不区分普通消息和事务消息,即我们能够利用DefaultMQPushConsumer实现对普通消息和事务消息的消费。

通过DefaultMQProducer消费消息

首先,声明一个DefaultMQPushConsumer客户端,并通过构造器初始化,构造参数为消费者组。官方建议消费者组以“CID_”开头。

DefaultMQPushConsumer consumer =

new DefaultMQPushConsumer("CID_SNOWALKER");

设置NameServer地址

defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");

设置Consumer第一次启动从队列头部开始消费

defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

设置消费模式为集群方式,CLUSTERING模式下每条消息只会被一个Consumer消费一次,如果设置为BROADCASTING则为广播模式,每个消费者都会将消息消费至少一次。一般我们使用的均为CLUSTERING模式。

defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);

注册消息监听器,这里需要实现MessageListenerConcurrently接口,并实现consumeMessage(List msgs, ConsumeConcurrentlyContext context) 方法,我这里的demo是lambda形式,实际上是一样的。如果你不喜欢lambda形式,可以继续使用匿名内部类或者自行定义一个类实现该接口。

defaultMQPushConsumer.registerMessageListener(

(msgs, context) -> {

for (MessageExt msg : msgs) {

String realMessage = new String(msg.getBody());

LOGGER.info("当前消费线程名={}, 消息id={}, 收到消息为={}",

Thread.currentThread().getName(),

msg.getMsgId(),

realMessage);

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

)

这里注意,当消费逻辑执行成功,则返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,后续将不再对该消息进行消费。如果消费逻辑失败,则需要设置为ConsumeConcurrentlyStatus.RECONSUME_LATER, RocketMQ会对消息进行重新推送,默认推送16次,目的是尽量保证消息消费成功,如果达到最大重试次数,还是失败则进入死信队列,等待人工干预。

调用start()方法,启动对队列的监听,开始进行消息的消费。

defaultMQPushConsumer.start();

我们尝试运行一下,这里我已经有了对应的消费者,可以看下运行的日志:

2019-01-23 09:55:25.022 INFO 18784 --- [ublicExecutor_8] c.s.shield.job.publisher.DemoPublisher :

消息id=AC1E5356496018B4AAC2736D06CF0002, 发送结果=SEND_OK

2019-01-23 09:55:27.519 INFO 18784 --- [MessageThread_8] c.s.shield.job.consumer.DemoConsumer :

当前消费线程名=ConsumeMessageThread_8, 消息id=AC1E5356496018B4AAC2736D06CF0002, 收到消息为={"msgName":"rocketmq-simple-msg-test","topicName":"SNOWALKER_TEST","tagName":"SNOWALKER_TEST-TAG","clusterName":"localhost.localdomain","taskName":"测试消息简单发送------第0次","threadSize":"10","threadName":"simple-msg-test-0"}

可以看到broker推送消息至消费端,并且被成功消费。

Spring框架整合DefaultMQPushConsumer

我们仍然基于Spring Boot v1.5.3.RELEASE, Spring v4.3.8.RELEASE 对DefaultMQPushConsumer进行整合,相关代码已经上传至github

这里对核心代码进行讲解。

首先定义RocketMQPushConsumerAgent.java并将其声明为spring的bean,作用域为prototype,即多例形式。

@Scope("prototype")

@Component

public class RocketMQPushConsumerAgent {

声明消息监听器及消息消费者

private MessageListenerConcurrently messageListener;

private DefaultMQPushConsumer defaultMQPushConsumer;

init()方法为核心的初始化逻辑,在该方法中,初始化了DefaultMQPushConsumer,并设置NameServer地址、消费模式以及将外部实现的监听器设置给内部的messageListener引用。

接着对消息主题进行订阅,对该主题下所有的消息进行监听,这里有待优化,后续将把消息的过滤表达式也暴露给调用者。

所有的配置参数均通过RocketMQConsumerConfig进行设置,保证接口的整洁性,RocketMQConsumerConfig将在附录中进行简单讲解。

public RocketMQPushConsumerAgent init(RocketMQConsumerConfig consumerConfig, MessageListenerConcurrently messageListener) throws MQClientException {

defaultMQPushConsumer = new DefaultMQPushConsumer(consumerConfig.getConsumerGroup());

defaultMQPushConsumer.setNamesrvAddr(consumerConfig.getNameSrvAddr());

defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 消费模式

if (consumerConfig.getMessageModel() != null) {

defaultMQPushConsumer.setMessageModel(consumerConfig.getMessageModel());

}

// 注册监听器

this.messageListener = messageListener;

defaultMQPushConsumer.registerMessageListener(this.messageListener);

defaultMQPushConsumer.subscribe(consumerConfig.getTopic(), "*");

LOGGER.debug("com.shield.job.message.rocketmq.RocketMQConsumerAgent消费者客户端组装完成");

return this;

}

独立的启动方法

public void start() throws MQClientException {

this.defaultMQPushConsumer.start();

}

独立的关闭方法

public void destroy() {

defaultMQPushConsumer.shutdown();

LOGGER.debug("com.shield.job.message.rocketmq.RocketMQConsumerAgent消费者客户端[已关闭]");

}

为方便外部对消费者进行进一步的自定义设置,提供外部获取defaultMQPushConsumer的接口。

public DefaultMQPushConsumer getConsumer() {

return defaultMQPushConsumer;

}

RocketMQPushConsumerAgent使用案例

仍然依据开头的示例进行改造。

@Component

public class DemoConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(DemoConsumer.class);

使用@Resource(name = “rocketMQPushConsumerAgent”)或者直接@Autowired将自定义的消息消费者注入。

@Resource(name = "rocketMQPushConsumerAgent")

RocketMQPushConsumerAgent rocketMQConsumerAgent;

调用方需要实现一个返回值为void的方法,并标记为@PostConstruct,在该方法中进行rocketMQConsumerAgent的初始化。当spring在加载过程中,DemoConsumer初始化之前会调用该init()方法初始化rocketMQConsumerAgent。通过start()链式调用,启动消息消费者,内部是调用的defaultMQPushConsumer.start()方法。

@PostConstruct

void init() {

try {

rocketMQConsumerAgent.init(

new RocketMQConsumerConfig(

"snowalker-consumer-group",

"172.30.83.100:9876",

"SNOWALKER_TEST",

MessageModel.CLUSTERING),

(msgs, context) -> {

for (MessageExt msg : msgs) {

String realMessage = new String(msg.getBody());

LOGGER.info("当前消费线程名={}, 消息id={}, 收到消息为={}",

Thread.currentThread().getName(),

msg.getMsgId(),

realMessage);

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

).start();

LOGGER.info("DemoConsumer 初始化RocketMQ简单消息消费者完成");

} catch (MQClientException e) {

e.printStackTrace();

LOGGER.info("DemoConsumer 初始化RocketMQ简单消息消费者失败");

}

}

}

在init()方法中同时将消息监听器的实现逻辑注入,消费者会加载该接口的实现。

附录:RocketMQConsumerConfig配置类

public class RocketMQConsumerConfig {

/**消费者组*/

private String consumerGroup;

/**nameServer地址*/

private String nameSrvAddr;

/**消息消费主题*/

private String topic;

private MessageModel messageModel;

public RocketMQConsumerConfig(String consumerGroup, String nameSrvAddr, String topic) {

Preconditions.checkNotNull(consumerGroup);

Preconditions.checkNotNull(nameSrvAddr);

Preconditions.checkNotNull(topic);

this.consumerGroup = consumerGroup;

this.nameSrvAddr = nameSrvAddr;

this.topic = topic;

}

public RocketMQConsumerConfig(String consumerGroup, String nameSrvAddr, String topic, MessageModel messageModel) {

Preconditions.checkNotNull(consumerGroup);

Preconditions.checkNotNull(nameSrvAddr);

Preconditions.checkNotNull(topic);

Preconditions.checkNotNull(messageModel);

this.consumerGroup = consumerGroup;

this.nameSrvAddr = nameSrvAddr;

this.topic = topic;

this.messageModel = messageModel;

}

public String getConsumerGroup() {

return consumerGroup;

}

public String getNameSrvAddr() {

return nameSrvAddr;

}

public String getTopic() {

return topic;

}

public MessageModel getMessageModel() {

return messageModel;

}

}

该配置类封装了消费者客户端初始化的必填参数,目的是收拢初始化参数,从而使初始化接口更加简洁,符合开闭原则。

rocketmq 消息 自定义_跟我学RocketMQ[1-4]之消息消费及支持spring相关推荐

  1. 语言叮叮消息接口_五分钟学后端技术:如何学习Java工程师必知必会的消息队列...

    原创声明 本文作者:黄小斜 转载请务必在文章开头注明出处和作者. 什么是消息队列 "RabbitMQ?""Kafka?""RocketMQ?" ...

  2. 控制台没有消息循环_【干货】思科设备报错消息汇总大全~

    故障诊断与排除是一种结构化的方法.许多工程技术人员认为故障诊断与排除计划不如研究和应用技术本身重要. 事实上,正确的计划在故障诊断与排除过程中往往起决定性的作用.在故障排除过程中,一个偶然的行为可能使 ...

  3. java推送微信消息换行_微信公众平台开发教程之文本消息如何换行(第十二课)

    上两节课介绍了微信公众平台开发工具类的开发,这节课呢就用到了,没有读前两节课程的读者建议看前两节课程,当然你可以可以去下载实例代码直接运行 . 使用"\n"换行 回复用户的文本消息 ...

  4. rocketmq 消息指定_进大厂必备的RocketMQ你会吗?

    点击关注"故里学Java" 右上角"设为星标"好文章不错过 关于消息队列,相信大家都不陌生,现在的中大型项目中或多或少都有使用到消息队列,对于消息队列大家可能都 ...

  5. rocketmq 消息 自定义_RocketMQ的消息发送及消费

    RocketMQ消息支持的模式: 消息支持的模式分为三种:NormalProducer(普通同步),消息异步发送,OneWay. 消息同步发送: 普通消息的发送和接收在前面已经演示过了,在前面的案例中 ...

  6. rocketmq 消息 自定义_RocketMQ消息轨迹-设计篇

    RocketMQ 消息轨迹主要包含两篇文章:设计篇与源码分析篇,本节将详细介绍RocketMQ消息轨迹-设计相关. RocketMQ消息轨迹,主要跟踪消息发送.消息消费的轨迹,即详细记录消息各个处理环 ...

  7. 跟我学RocketMQ之批量消息发送源码解析

    上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送.本文中,我们就一起来集中分析一下批量消息的发送是怎样的 ...

  8. rocketmq怎么保证消息一致性_从入门到入土(三)RocketMQ 怎么保证的消息不丢失?...

    精彩推荐 一百期Java面试题汇总SpringBoot内容聚合IntelliJ IDEA内容聚合Mybatis内容聚合 接上一篇:RocketMQ入门到入土(二)事务消息&顺序消息 面试官常常 ...

  9. rocketmq 消息指定_详解RocketMQ不同类型的消费者

    原标题:详解RocketMQ不同类型的消费者 云栖君导读:本文节选自云栖社区系列丛书<RocketMQ原理与实战解析>,作者:阿里巴巴数据专家杨开元.本节将重点讲解RocketMQ不同类型 ...

最新文章

  1. Aptana 安装jQuery库 智能提示
  2. 有关 Session 操作的几个误区
  3. 获取本机MSSQL保存凭证
  4. 网站正在建设中提示页面设计欣赏
  5. python 使用 os的 popen(‘命令’) 如果命令行输出中 有中文乱码, 提示 'gbk' 无法解析的错误 解决办法
  6. python垂直输出_Python实现图像的垂直投影示例
  7. [转载] Python 中 pass 语句的作用是什么?
  8. 2012-1-31学习日记
  9. ibm刀箱服务器型号查询,IBM 服务器模块 刀箱模块
  10. SEO经验分享之百度知道篇
  11. 下载超星或读秀图书时,怎么搞定完整书签?
  12. 九个实用的Word转PDF的方法,为你解决格式转换的问题
  13. 揭开MySQL数据库的神秘面纱!
  14. 计算机保护重要文件的方法,重要文件应该如何保护?
  15. css样式的补充:鼠标悬停字体变大和改变颜色
  16. 打开记事本文件出现黑色方块的解决办法
  17. 当女生说没衣服穿时。。。。
  18. 华为疯狂扩招3000名数据分析师,招聘要求让人窒息!
  19. 机器学习之选择小样本交叉验证训练模型并使用精确率、召回率、F1分数和AUC值、画出ROC曲线评估
  20. java中的线程池有哪些,分别有什么作用?

热门文章

  1. 论文浅尝 | 基于局内去噪和迁移学习的关系抽取
  2. 2018 年,NLP 研究与应用进展到什么水平了?
  3. 快速理解bootstrap,bagging,boosting-三个概念
  4. 完美世界2020编程题-救雅典娜 英雄AB PK
  5. python引用自己的文件的一切问题
  6. 战斗机嵌入式训练系统中的智能虚拟陪练
  7. JAVA面试题:HashMap和Hashtable的区别
  8. MyBatis中SQL语句相关内容
  9. Java基础:数组的声明,循环,赋值,拷贝。
  10. MySQL日志详细说明