RocketMQ 实战-SpringBoot整合RocketMQ同步消息、异步消息、单向消息
官方样例:https://gitee.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
1. 同步消息
producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送结果 ;
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
1.1 项目结构和依赖
- 项目结构
- 父级项目
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.2</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.meta.rocketmq</groupId><artifactId>rocketmq-master</artifactId><packaging>pom</packaging><version>1.0-SNAPSHOT</version><modules><module>rocketmq-producer</module><module>rocketmq-consumer</module></modules><properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><rocketmq.version>2.2.1</rocketmq.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>
1.2 生产者
1.2.1 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq-master</artifactId><groupId>com.meta.rocketmq</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rocketmq-producer</artifactId><name>rocketmq-producer</name><description>RocketMQ-Producer for Spring Boot</description></project>
1.2.2 配置文件
server.port=8090
server.servlet.context-path=/spring.application.name=rocketmq-producer
spring.profiles.active=local# rocketmq 的 nameserver 名称
rocketmq.name-server=192.168.0.24:9876
# 生产组名称
rocketmq.producer.group=group_producer_test
rocketmq.producer.topic.test=topic_test
1.2.3 生产者
package com.meta.rocketmq.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** RocketMQ 发送同步消息** @author gaoyang* @date 2021-12-07 15:05*/
@Slf4j
@Component
public class RocketmqSimpleProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;/*** 发送同步消息** @param topic* @param msg*/public void sendSyncMsg(String topic, String msg) {Message<String> message = MessageBuilder.withPayload(msg).build();this.rocketMQTemplate.syncSend(topic, message);}
}
1.2.4 编写测试方法
package com.meta.rocketmq;import com.meta.rocketmq.producer.RocketmqSimpleProducer;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;/*** @author gaoyang* @date 2021-12-07 15:15*/
@Slf4j
@SpringBootTest
class RocketmqProducerApplicationTests {@Value("${rocketmq.producer.topic.test}")private String topic;@Resourceprivate RocketmqSimpleProducer rocketmqSimpleProducer;@Testvoid sendSyncMsg() {this.rocketmqSimpleProducer.sendSyncMsg(topic, "这是一条同步消息");log.info("发送同步消息, end...");}
}
1.3 消费者
1.3.1 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq-master</artifactId><groupId>com.meta.rocketmq</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rocketmq-consumer</artifactId><name>rocketmq-consumer</name><description>RocketMQ-consumer for Spring Boot</description></project>
1.3.2 配置文件
server.port=8091
server.servlet.context-path=/spring.application.name=rocketmq-consumer
spring.profiles.active=local# rocketmq 的 nameserver 名称
rocketmq.name-server=192.168.0.24:9876
rocketmq.consumer.group=group_consumer_test
rocketmq.consumer.topic.test=topic_test
1.3.3 消费者
package com.meta.rocketmq.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 消费消息监听** @author gaoyang* @date 2021-12-08 10:04*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic.test}",consumerGroup = "${rocketmq.consumer.group}")
public class RocketmqSimpleListener implements RocketMQListener<String> {@Overridepublic void onMessage(String msg) {log.info("RocketmqSimpleListener: {}", msg);}
}
1.4 测试
分别启动消费者和生产者测试方法
2. 异步消息
producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API后立即返回,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 ;
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
2.1 生产者
public void sendAsyncMsg(String topic, String msg){Message<String> message = MessageBuilder.withPayload(msg).build();this.rocketMQTemplate.asyncSend(topic, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 成功回调log.info("发送成功: {}", sendResult.getSendStatus().toString());}@Overridepublic void onException(Throwable throwable) {// 失败回调log.error("发送失败: {}", throwable.getMessage());}});}
2.2 测试方法
@Test
void sendAsyncMsg() throws InterruptedException {this.rocketmqSimpleProducer.sendAsyncMsg(topic, "这是一条异步消息");log.info("发送异步消息, end...");// 异步消息,为跟踪回调线程这里加入延迟Thread.sleep(3000);
}
2.3 测试
3. 单向消息
producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果 。
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
3.1 生产者
/*** 发送单向消息** @param topic* @param msg*/
public void sendOnewayMsg(String topic, String msg) {Message<String> message = MessageBuilder.withPayload(msg).build();this.rocketMQTemplate.sendOneWay(topic, message);
}
3.2 测试方法
@Test
void sendOnewayMsg() {this.rocketmqSimpleProducer.sendOnewayMsg(topic, "这是一条单向消息");log.info("发送单向消息, end...");
}
3.3 测试
RocketMQ 实战-SpringBoot整合RocketMQ同步消息、异步消息、单向消息相关推荐
- RocketMQ 实战-SpringBoot整合RocketMQ
1. 消息生产者 1.1 maven 依赖 <?xml version="1.0" encoding="UTF-8"?> <project x ...
- Springboot整合RocketMQ实战
本文来说下Springboot如何整合RocketMQ. 文章目录 概述 Springboot整合RocketMQ 引入pom依赖 yaml文件 简单实例 本文小结 概述 消息队列rocketmq是A ...
- springboot整合rocketMQ记录 实现发送普通消息,延时消息
一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例) 首先RocketMQ是阿里巴巴自研出来的,也已开源.其性能和稳定性从双11就能看出来,借用阿里的一句官方介绍:历年双 11 购物狂 ...
- SpringBoot整合RocketMQ,三种测试附带源码【rocketmq-spring-boot-starter】
我们整合boot项目的时候都是引入 xxx-start 依赖,但是现在大多数的整合RocketMQ都还不是这样. 我花了一天时间使用rocketmq-spring-boot-starter整合,使得操 ...
- Springboot整合Rocketmq系列教程
Springboot整合Rocketmq系列教程 本教程是基于Springboot2.6.3整合Rocketmq5.0,其中涉及了Rocketmq的安装,消息的发送及消费的代码实现. 本文不会对roc ...
- springboot整合rocketmq,支持多连接生产者和消费者配置。不同topic适配不同业务处理类
1.代码仓库 rocketmq版本4.5.2 直接上代码,下面再逐步讲解,仓库地址 本地启动后,访问swagger地址测试,http://127.0.0.1:8099/mq/swagger-ui/in ...
- SpringBoot整合RocketMQ
0. 启动Name Server与 Broker 1. 引入依赖 添加 RocketMQ 客户端访问支持,具体版本和安装的 RocketMQ 版本一致即可. <dependency>< ...
- SpringBoot整合RocketMQ之环境搭建以及Producer发送消息
https://github.com/apache/rocketmq-spring/releases/tag/2.0.0https://github.com/apache/rocketmq-sprin ...
- SpringBoot整合RocketMQ报错:“PullMessageService“ NoClassDefFoundError xxx/protocol/FastCodesHeader解决
问题阐述 学习RocketMQ到整合SpringBoot时,遇到问题,以下问题排除: 配置完整性(很多博文都会跳到group组名未定义问题) 服务器/虚拟机,防火墙或外网设置问题(请先去玩玩客户端发送 ...
最新文章
- go语言json的使用技巧
- 【陷阱】交换排序中交换动作的陷阱
- 刷题一个4ms的程序,代码如何优化到3ms再到2ms?
- 将frm,myi,myd文件打包成sql文件
- VTK:PolyData之Curvatures
- DOCKER - 容器抓包
- tensorflow 学习资料汇总
- 人脸识别技术原理与工程实践
- sklearn 神经网络_机器学习100天-Day2404 循环神经网络RNN(预测时间序列)
- python安装时没有选path怎么办,环境变量是个啥? 没有勾选Add to Path怎么办?
- Android TabHost和xml定义Menu应用
- 3月9日 英语笔记-英标
- 韩顺平的java入门到精通中serversql笔记(包括emp表和dept表,linux的mysql版)
- Jquery EasyUi日期输入框(点击今天不自动填充)
- 非深圳户口办理《深圳计划生育证明》需要以下几个证件
- 工商银行销售基金一览表
- 数据挖掘基础学习笔记
- 零代码:如何使用吾来机器人实现表格问答?
- ‘XXXX’ was compiled with optimization - stepping may behave oddly; variables may not be available
- 关于香港高防IP需要关注的几个问题