0. 启动Name Server与 Broker

1. 引入依赖

添加 RocketMQ 客户端访问支持,具体版本和安装的 RocketMQ 版本一致即可。

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.1</version>
</dependency>

2. 消息生产者

public class Producer {public static void main(String[] args) throws Exception {//创建一个消息生产者,并设置一个消息生产者组DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");//指定 NameServer 地址producer.setNamesrvAddr("127.0.0.1:9876");//指定最大超时时间,用默认的会报错producer.setSendMsgTimeout(60000);//初始化 Producer,整个应用生命周期内只需要初始化一次producer.start();for (int i = 0; i < 100; i++) {//创建一条消息对象,指定其主题、标签和消息内容Message msg = new Message(/* 消息主题名 */"topicTest",/* 消息标签 */"TagA",/* 消息内容 */("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));//发送消息并返回结果SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等producer.shutdown();}
}

使用DefaultMQProducer类来创建了一个消息生产者,该类构造函数入参 producerGroup 是消息生产者组的名字,无论生产者还是消费者都必须给出 GroupName ,并保证该名字的唯一性。

接下来指定 NameServer 地址和调用 start 方法初始化,在整个应用生命周期内只需要调用一次 start 方法。

初始化完成后,调用 send 方法发送消息,示例中只是简单的构造了100条同样的消息发送,其实一个 Producer 对象可以发送多个主题多个标签的消息,消息对象的标签可以为空。send 方法是同步调用,只要不抛异常就标识成功。

最后应用退出时调用 shutdown 方法清理资源、关闭网络连接,从服务器上注销自己,通常建议应用在 JBOSS、Tomcat 等容器的退出钩子里调用 shutdown 方法。

3. 消息消费者

public class Consumer {public static void main(String[] args) throws Exception {//创建一个消息消费者,并设置一个消息消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");//指定 NameServer 地址consumer.setNamesrvAddr("127.0.0.1:9876");//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅指定 Topic 下的所有消息consumer.subscribe("topicTest", "*");//注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {//默认 list 里只有一条消息,可以通过设置参数来批量接收消息if (list != null) {for (MessageExt ext : list) {try {System.out.println(new Date() + new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 消费者对象在使用之前必须要调用 start 初始化consumer.start();System.out.println("消息消费者已启动");}
}

用 DefaultMQPushConsumer 类来创建一个消息消费者,该类构造函数入参 consumerGroup 是消息消费者组的名字,需要保证该名字的唯一性。

​ 接下来指定 NameServer 地址和设置消费者应用程序第一次启动时从队列头部开始消费还是队列尾部开始消费。

接着调用 subscribe 方法给消费者对象订阅指定主题下的消息,该方法第一个参数是主题名,第二个参数是标签名,示例表示订阅了主题名 topic_example_java下所有标签的消息。

最主要的是注册消息监听器才能消费消息,示例中用的是Consumer Push的方式,即设置监听器回调的方式消费消息,默认监听回调方法中 List<MessageExt>里只有一条消息,可以通过设置参数来批量接收消息。

​ 最后调用 start 方法初始化,在整个应用生命周期内只需要调用一次 start 方法。

4. 测试

先来运行生产者

再运行消费者

SpringBoot整合RocketMQ相关推荐

  1. Springboot整合RocketMQ实战

    本文来说下Springboot如何整合RocketMQ. 文章目录 概述 Springboot整合RocketMQ 引入pom依赖 yaml文件 简单实例 本文小结 概述 消息队列rocketmq是A ...

  2. springboot整合rocketMQ记录 实现发送普通消息,延时消息

    一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例) 首先RocketMQ是阿里巴巴自研出来的,也已开源.其性能和稳定性从双11就能看出来,借用阿里的一句官方介绍:历年双 11 购物狂 ...

  3. Springboot整合Rocketmq系列教程

    Springboot整合Rocketmq系列教程 本教程是基于Springboot2.6.3整合Rocketmq5.0,其中涉及了Rocketmq的安装,消息的发送及消费的代码实现. 本文不会对roc ...

  4. SpringBoot整合RocketMQ,三种测试附带源码【rocketmq-spring-boot-starter】

    我们整合boot项目的时候都是引入 xxx-start 依赖,但是现在大多数的整合RocketMQ都还不是这样. 我花了一天时间使用rocketmq-spring-boot-starter整合,使得操 ...

  5. RocketMQ 实战-SpringBoot整合RocketMQ

    1. 消息生产者 1.1 maven 依赖 <?xml version="1.0" encoding="UTF-8"?> <project x ...

  6. SpringBoot整合RocketMQ报错:“PullMessageService“ NoClassDefFoundError xxx/protocol/FastCodesHeader解决

    问题阐述 学习RocketMQ到整合SpringBoot时,遇到问题,以下问题排除: 配置完整性(很多博文都会跳到group组名未定义问题) 服务器/虚拟机,防火墙或外网设置问题(请先去玩玩客户端发送 ...

  7. springboot整合rocketmq,支持多连接生产者和消费者配置。不同topic适配不同业务处理类

    1.代码仓库 rocketmq版本4.5.2 直接上代码,下面再逐步讲解,仓库地址 本地启动后,访问swagger地址测试,http://127.0.0.1:8099/mq/swagger-ui/in ...

  8. SpringBoot整合RocketMQ之环境搭建以及Producer发送消息

    https://github.com/apache/rocketmq-spring/releases/tag/2.0.0https://github.com/apache/rocketmq-sprin ...

  9. RocketMQ 实战-SpringBoot整合RocketMQ同步消息、异步消息、单向消息

    官方样例:https://gitee.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md 1. 同步消息 producer向 bro ...

  10. Windows环境下安装RocketMQ,及SpringBoot整合RocketMQ

    一.预备环境 Windows.JDK1.8.Maven.Git 二. RocketMQ部署 下载 1.1 地址:http://rocketmq.apache.org/release_notes/rel ...

最新文章

  1. 在SpringMVC中使用@RequestBody注解处理json时,报出HTTP Status 415的解决方案
  2. JSONP的学习(收集整理)
  3. 开发日记-20190703 关键词 读书笔记《Linux 系统管理技术手册(第二版)》DAY 9
  4. Adobe Flash Player 10.0.32.18
  5. 巧用参数组件和过滤组件,教你快速定位目标数据
  6. JAVA_HOME系统环境变量
  7. STM32学习——ADC采集
  8. d3js path generator vs layouts
  9. android自动扫码转账,亿乐社区微信扫码转账加款机器人使用教程以及常见问题...
  10. Ubuntu16.04安装Hadoop2.7.3教程
  11. 希尔伯特(Hilbert)空间和巴拿赫(Banach)空间
  12. 使用树莓派gpio连接ps2手柄模块(附程序)
  13. Windows下批处理一键修改系统时间并运行程序
  14. emule服务器无响应是什么原因,为什么,一直无响应,求大神帮忙
  15. 【Latex】如何用 latex 分双栏(分两列)
  16. Unity 一分钟学会适配IOS刘海屏
  17. 百家讲坛 雍正十三年(下部)
  18. jmeter+csv+ant接口自动化测试--设计jmeter脚本(一)
  19. Mac应用无法打开或文件损坏的处理方法
  20. 解决“无法访问此网站,连接已重置。”

热门文章

  1. 学历是敲门砖也是枷锁
  2. java中一个系统如何注册账户名和密码
  3. 减少用户投诉,就选中国移动二次号查询
  4. MySQL实战45讲学习笔记
  5. 扶不扶真人版现烟台 警察被老人问“你咋撞我”
  6. 语音识别研究综述——阅读笔记4(总结与展望)
  7. SQLServer 数学函数
  8. java 侵入性_侵入性和非侵入式指什么?
  9. 游戏运营 ---没用
  10. 软件工程——软件测试(黑盒测试、白盒测试、测试分析报告)