官方样例:https://gitee.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md

1. 同步消息

producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送结果 ;
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

1.1 项目结构和依赖

  1. 项目结构
  2. 父级项目
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.2</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.meta.rocketmq</groupId><artifactId>rocketmq-master</artifactId><packaging>pom</packaging><version>1.0-SNAPSHOT</version><modules><module>rocketmq-producer</module><module>rocketmq-consumer</module></modules><properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><rocketmq.version>2.2.1</rocketmq.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>

1.2 生产者

1.2.1 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq-master</artifactId><groupId>com.meta.rocketmq</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rocketmq-producer</artifactId><name>rocketmq-producer</name><description>RocketMQ-Producer for Spring Boot</description></project>

1.2.2 配置文件

server.port=8090
server.servlet.context-path=/spring.application.name=rocketmq-producer
spring.profiles.active=local# rocketmq 的 nameserver 名称
rocketmq.name-server=192.168.0.24:9876
# 生产组名称
rocketmq.producer.group=group_producer_test
rocketmq.producer.topic.test=topic_test

1.2.3 生产者

package com.meta.rocketmq.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** RocketMQ 发送同步消息** @author gaoyang* @date 2021-12-07 15:05*/
@Slf4j
@Component
public class RocketmqSimpleProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;/*** 发送同步消息** @param topic* @param msg*/public void sendSyncMsg(String topic, String msg) {Message<String> message = MessageBuilder.withPayload(msg).build();this.rocketMQTemplate.syncSend(topic, message);}
}

1.2.4 编写测试方法

package com.meta.rocketmq;import com.meta.rocketmq.producer.RocketmqSimpleProducer;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;/*** @author gaoyang* @date 2021-12-07 15:15*/
@Slf4j
@SpringBootTest
class RocketmqProducerApplicationTests {@Value("${rocketmq.producer.topic.test}")private String topic;@Resourceprivate RocketmqSimpleProducer rocketmqSimpleProducer;@Testvoid sendSyncMsg() {this.rocketmqSimpleProducer.sendSyncMsg(topic, "这是一条同步消息");log.info("发送同步消息, end...");}
}

1.3 消费者

1.3.1 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq-master</artifactId><groupId>com.meta.rocketmq</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rocketmq-consumer</artifactId><name>rocketmq-consumer</name><description>RocketMQ-consumer for Spring Boot</description></project>

1.3.2 配置文件

server.port=8091
server.servlet.context-path=/spring.application.name=rocketmq-consumer
spring.profiles.active=local# rocketmq 的 nameserver 名称
rocketmq.name-server=192.168.0.24:9876
rocketmq.consumer.group=group_consumer_test
rocketmq.consumer.topic.test=topic_test

1.3.3 消费者

package com.meta.rocketmq.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 消费消息监听** @author gaoyang* @date 2021-12-08 10:04*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic.test}",consumerGroup = "${rocketmq.consumer.group}")
public class RocketmqSimpleListener implements RocketMQListener<String> {@Overridepublic void onMessage(String msg) {log.info("RocketmqSimpleListener: {}", msg);}
}

1.4 测试

分别启动消费者和生产者测试方法

2. 异步消息

producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API后立即返回,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 ;
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

2.1 生产者

public void sendAsyncMsg(String topic, String msg){Message<String> message = MessageBuilder.withPayload(msg).build();this.rocketMQTemplate.asyncSend(topic, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 成功回调log.info("发送成功: {}", sendResult.getSendStatus().toString());}@Overridepublic void onException(Throwable throwable) {// 失败回调log.error("发送失败: {}", throwable.getMessage());}});}

2.2 测试方法

@Test
void sendAsyncMsg() throws InterruptedException {this.rocketmqSimpleProducer.sendAsyncMsg(topic, "这是一条异步消息");log.info("发送异步消息, end...");// 异步消息,为跟踪回调线程这里加入延迟Thread.sleep(3000);
}

2.3 测试


3. 单向消息

producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果 。
这种方式主要用在不特别关心发送结果的场景,例如日志发送。

3.1 生产者

/*** 发送单向消息** @param topic* @param msg*/
public void sendOnewayMsg(String topic, String msg) {Message<String> message = MessageBuilder.withPayload(msg).build();this.rocketMQTemplate.sendOneWay(topic, message);
}

3.2 测试方法

@Test
void sendOnewayMsg() {this.rocketmqSimpleProducer.sendOnewayMsg(topic, "这是一条单向消息");log.info("发送单向消息, end...");
}

3.3 测试


RocketMQ 实战-SpringBoot整合RocketMQ同步消息、异步消息、单向消息相关推荐

  1. RocketMQ 实战-SpringBoot整合RocketMQ

    1. 消息生产者 1.1 maven 依赖 <?xml version="1.0" encoding="UTF-8"?> <project x ...

  2. Springboot整合RocketMQ实战

    本文来说下Springboot如何整合RocketMQ. 文章目录 概述 Springboot整合RocketMQ 引入pom依赖 yaml文件 简单实例 本文小结 概述 消息队列rocketmq是A ...

  3. springboot整合rocketMQ记录 实现发送普通消息,延时消息

    一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例) 首先RocketMQ是阿里巴巴自研出来的,也已开源.其性能和稳定性从双11就能看出来,借用阿里的一句官方介绍:历年双 11 购物狂 ...

  4. SpringBoot整合RocketMQ,三种测试附带源码【rocketmq-spring-boot-starter】

    我们整合boot项目的时候都是引入 xxx-start 依赖,但是现在大多数的整合RocketMQ都还不是这样. 我花了一天时间使用rocketmq-spring-boot-starter整合,使得操 ...

  5. Springboot整合Rocketmq系列教程

    Springboot整合Rocketmq系列教程 本教程是基于Springboot2.6.3整合Rocketmq5.0,其中涉及了Rocketmq的安装,消息的发送及消费的代码实现. 本文不会对roc ...

  6. springboot整合rocketmq,支持多连接生产者和消费者配置。不同topic适配不同业务处理类

    1.代码仓库 rocketmq版本4.5.2 直接上代码,下面再逐步讲解,仓库地址 本地启动后,访问swagger地址测试,http://127.0.0.1:8099/mq/swagger-ui/in ...

  7. SpringBoot整合RocketMQ

    0. 启动Name Server与 Broker 1. 引入依赖 添加 RocketMQ 客户端访问支持,具体版本和安装的 RocketMQ 版本一致即可. <dependency>< ...

  8. SpringBoot整合RocketMQ之环境搭建以及Producer发送消息

    https://github.com/apache/rocketmq-spring/releases/tag/2.0.0https://github.com/apache/rocketmq-sprin ...

  9. SpringBoot整合RocketMQ报错:“PullMessageService“ NoClassDefFoundError xxx/protocol/FastCodesHeader解决

    问题阐述 学习RocketMQ到整合SpringBoot时,遇到问题,以下问题排除: 配置完整性(很多博文都会跳到group组名未定义问题) 服务器/虚拟机,防火墙或外网设置问题(请先去玩玩客户端发送 ...

最新文章

  1. go语言json的使用技巧
  2. 【陷阱】交换排序中交换动作的陷阱
  3. 刷题一个4ms的程序,代码如何优化到3ms再到2ms?
  4. 将frm,myi,myd文件打包成sql文件
  5. VTK:PolyData之Curvatures
  6. DOCKER - 容器抓包
  7. tensorflow 学习资料汇总
  8. 人脸识别技术原理与工程实践
  9. sklearn 神经网络_机器学习100天-Day2404 循环神经网络RNN(预测时间序列)
  10. python安装时没有选path怎么办,环境变量是个啥? 没有勾选Add to Path怎么办?
  11. Android TabHost和xml定义Menu应用
  12. 3月9日 英语笔记-英标
  13. 韩顺平的java入门到精通中serversql笔记(包括emp表和dept表,linux的mysql版)
  14. Jquery EasyUi日期输入框(点击今天不自动填充)
  15. 非深圳户口办理《深圳计划生育证明》需要以下几个证件
  16. 工商银行销售基金一览表
  17. 数据挖掘基础学习笔记
  18. 零代码:如何使用吾来机器人实现表格问答?
  19. ‘XXXX’ was compiled with optimization - stepping may behave oddly; variables may not be available
  20. 关于香港高防IP需要关注的几个问题

热门文章

  1. 动态切换tableView中的cell的种类
  2. HDU2006 求奇数的乘积【入门+序列处理】
  3. Vijos P1786 质因数分解【质因数分解】
  4. POJ1664 放苹果【递推+记忆化递归】
  5. UVA10878 Decode the tape【编码】
  6. 电压、电流 —— 常用设备的电压电流
  7. Python 标准库 —— 邮件(email)与邮件服务器(smtplib)
  8. 家谱等人物关系图的绘制
  9. 工具类与工具函数 —— NextPrime
  10. 重复抽样与非重复抽样