前言

前面学习了 RabbitMQ 基础,现在主要记录下学习 Spring Boot 整合 RabbitMQ ,调用它的 API ,以及中间使用的相关功能的记录。

相关的可以去[我的博客/RabbitMQ]

正文

我这里测试都是使用的是 topic 交换器,Spring Boot 2.0.0, jdk 1.8

配置

Spring Boot 版本 2.0.0
pom.xml 文件中引入 AMQP 的依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在系统配置文件中加入连接属性

spring:application:name: RabbitMQ-Demorabbitmq:host: k.wuwii.comport: 5672username: kronchanpassword: 123456#virtual-host: testpublisher-confirms: true # 开启确认消息是否到达交换器,需要设置 truepublisher-returns: true # 开启确认消息是否到达队列,需要设置 true

基本的使用

消费者

新增一个消费者类:

@Log
public class MessageReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try {byte[] body = message.getBody();log.info(">>>>>>> receive: " + new String(body));} finally {// 确认成功消费,否则消息会转发给其他的消费者,或者进行重试channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
}
配置类

新增 RabbitMQ 的配置类,主要是对消费者的队列,交换器,路由键的一些设置:

@Configuration
public class RabbitMQConfig {public final static String QUEUE_NAME = "springboot.demo.test1";public final static String ROUTING_KEY = "route-key";public final static String EXCHANGES_NAME = "demo-exchanges";@Beanpublic Queue queue() {// 是否持久化boolean durable = true;// 仅创建者可以使用的私有队列,断开后自动删除boolean exclusive = false;// 当所有消费客户端连接断开后,是否自动删除队列boolean autoDelete = false;return new Queue(QUEUE_NAME, durable, exclusive, autoDelete);}/*** 设置交换器,这里我使用的是 topic exchange*/@Beanpublic TopicExchange exchange() {// 是否持久化boolean durable = true;// 当所有消费客户端连接断开后,是否自动删除队列boolean autoDelete = false;return new TopicExchange(EXCHANGES_NAME, durable, autoDelete);}/*** 绑定路由*/@Beanpublic Binding binding(Queue queue, TopicExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);}@Beanpublic SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(QUEUE_NAME);container.setMessageListener(receiver());//container.setMaxConcurrentConsumers(1);//container.setConcurrentConsumers(1); 默认为1//container.setExposeListenerChannel(true);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置为手动,默认为 AUTO,如果设置了手动应答 basicack,就要设置manualreturn container;}@Beanpublic MessageReceiver receiver() {return new MessageReceiver();}}
生产者
@Component
public class MessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** logger*/private static final Logger log = LoggerFactory.getLogger(MessageSender.class);public void send() {// public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)// exchange:    交换机名称// routingKey:  路由关键字// object:      发送的消息内容// correlationData:消息IDCorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());// ConfirmListener是当消息无法发送到Exchange被触发,此时Ack为False,这时cause包含发送失败的原因,例如exchange不存在时// 需要在系统配置文件中设置 publisher-confirms: trueif (!rabbitTemplate.isConfirmListener()) {rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info(">>>>>>> 消息id:{} 发送成功", correlationData.getId());} else {log.info(">>>>>>> 消息id:{} 发送失败", correlationData.getId());}});}// ReturnCallback 是在交换器无法将路由键路由到任何一个队列中,会触发这个方法。// 需要在系统配置文件中设置 publisher-returns: truerabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息id:{} 发送失败", message.getMessageProperties().getCorrelationId());});rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGES_NAME, RabbitMQConfig.ROUTING_KEY, ">>>>> Hello World", correlationId);log.info("Already sent message.");}}
测试发送消息

先启动系统启动类,消费者开始订阅,启动测试类发送消息。

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqApplicationTests {@Autowiredprivate MessageSender sender;@Testpublic void testReceiver() {sender.send();}
}

可以在消费者接收到信息,并且发送端将打出日志 成功发送消息的记录,也可以测试下 Publisher Confirms and Returns机制 主要是测试 ConfirmCallbackReturnCallback 这两个方法。

  • ConfirmCallback ,确认消息是否到达交换器,例如我们发送一个消息到一个你没有创建过的 交换器上面去,看看情况,
  • ReturnCallback,确认消息是否到达队列,我们可以这样测试,定义一个路由键,不会被任何队列订阅到,最后查看结果就可以了。

使用注解的方式

引入依赖和连接参数

跟文章第一步的配置一样的。

消费者
@Component
@Log
public class MessageReceiver {/*** 无返回消息的** @param message*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = Constant.QUEUE_NAME, durable = "true", exclusive = "false", autoDelete = "false"),exchange = @Exchange(value = Constant.EXCHANGES_NAME, ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC, autoDelete = "false"),key = Constant.ROUTING_KEY))public void receive(byte[] message) {log.info(">>>>>>>>>>> receive:" + new String(message));}/*** 设置有返回消息的* 需要注意的是,* 1. 在消息的在生产者(发送消息端)一定要使用 SendAndReceive(……) 这种带有 receive 的方法,否则会抛异常,不捕获会死循环。* 2. 该方法调用时会锁定当前线程,并且有可能会造成MQ的性能下降或者服务端/客户端出现死循环现象,请谨慎使用。** @param message* @return*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = Constant.QUEUE_NAME, durable = "true", exclusive = "false", autoDelete = "false"),exchange = @Exchange(value = Constant.EXCHANGES_NAME, ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC, autoDelete = "false"),key = Constant.ROUTING_REPLY_KEY))public String receiveAndReply(byte[] message) {log.info(">>>>>>>>>>> receive:" + new String(message));return ">>>>>>>> I got the message";}}

主要是使用到 @RabbitListener,虽然看起来参数很多,仔细的你会发现这个和写配置类里面的基本属性是一摸一样的,没有任何区别。

需要注意的是我在这里多做了个有返回值的消息,这个使用异常的话,会不断重试消息,从而阻塞了线程。而且使用它的时候只能使用带有 receive 的方法给它发送消息。

生产者

生产者没什么变化。

@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {/*** logger*/private static final Logger log = LoggerFactory.getLogger(MessageSender.class);private RabbitTemplate rabbitTemplate;/*** 注入 RabbitTemplate*/@Autowiredpublic MessageSender(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;this.rabbitTemplate.setConfirmCallback(this);this.rabbitTemplate.setReturnCallback(this);}/*** 测试无返回消息的*/public void send() {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(Constant.EXCHANGES_NAME, Constant.ROUTING_KEY, ">>>>>> Hello World".getBytes(), correlationData);log.info(">>>>>>>>>> Already sent message");}/*** 测试有返回消息的,需要注意一些问题*/public void sendAndReceive() {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());Object o = rabbitTemplate.convertSendAndReceive(Constant.EXCHANGES_NAME, Constant.ROUTING_REPLY_KEY, ">>>>>>>> Hello World Second".getBytes(), correlationData);log.info(">>>>>>>>>>> {}", Objects.toString(o));}/*** Confirmation callback.** @param correlationData correlation data for the callback.* @param ack             true for ack, false for nack* @param cause           An optional cause, for nack, when available, otherwise null.*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info(">>>>>>> 消息id:{} 发送成功", correlationData.getId());} else {log.info(">>>>>>> 消息id:{} 发送失败", correlationData.getId());}}/*** Returned message callback.** @param message    the returned message.* @param replyCode  the reply code.* @param replyText  the reply text.* @param exchange   the exchange.* @param routingKey the routing key.*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息id:{} 发送失败", message.getMessageProperties().getCorrelationId());}
}
测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootAnnotationApplicationTests {@Autowiredprivate MessageSender sender;@Testpublic void send() {sender.send();}@Testpublic void sendAndReceive() {sender.sendAndReceive();}
}

学习Spring Boot:(二十六)使用 RabbitMQ 消息队列相关推荐

  1. Spring Boot(十四)RabbitMQ延迟队列

    一.前言 延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单:2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度:3.过1分钟给新注册会员的用户,发送注册邮件等. 实现延迟队列的 ...

  2. 窗口消息——Windows核心编程学习手札之二十六

    窗口消息 --Windows核心编程学习手札之二十六 Windows允许一个进程至多建立10000个不同类型的用户对象(user object):图符.光标.窗口类.菜单.加速键表等,当一个线程调用一 ...

  3. OpenCV学习笔记(二十六)——小试SVM算法ml OpenCV学习笔记(二十七)——基于级联分类器的目标检测objdect OpenCV学习笔记(二十八)——光流法对运动目标跟踪Video Ope

    OpenCV学习笔记(二十六)--小试SVM算法ml 总感觉自己停留在码农的初级阶段,要想更上一层,就得静下心来,好好研究一下算法的东西.OpenCV作为一个计算机视觉的开源库,肯定不会只停留在数字图 ...

  4. 【Java学习笔记之二十六】深入理解Java匿名内部类

    在[Java学习笔记之二十五]初步认知Java内部类中对匿名内部类做了一个简单的介绍,但是内部类还存在很多其他细节问题,所以就衍生出这篇博客.在这篇博客中你可以了解到匿名内部类的使用.匿名内部类要注意 ...

  5. Spring Boot教程(十六):Spring Boot集成shiro

    Apache Shiro™是一个功能强大且易于使用的Java安全框架,可执行身份验证,授权,加密和会话管理.借助Shiro易于理解的API,您可以快速轻松地保护任何应用程序 - 从最小的移动应用程序到 ...

  6. (转)Spring Boot(二十):使用 spring-boot-admin 对 Spring Boot 服务进行监控

    http://www.ityouknow.com/springboot/2018/02/11/spring-boot-admin.html 上一篇文章<Spring Boot(十九):使用 Sp ...

  7. 学习Spring Boot:(六) 集成Swagger2

    前言 Swagger是用来描述和文档化RESTful API的一个项目.Swagger Spec是一套规范,定义了该如何去描述一个RESTful API.类似的项目还有RAML.API Bluepri ...

  8. kafka创建topic_Kafka系列第三篇!10 分钟学会如何在 Spring Boot 程序中使用 Kafka 作为消息队列?...

    Guide哥答应大家的 Kafka系列的第3篇原创文章,写的非常详细,没有接触过 Kafka 的朋友应该都可以看懂,觉得不错的话一定要点亮你们的在看!在看就是对Guide 哥最大的鼓励! 为了保证内容 ...

  9. 【C语言进阶深度学习记录】二十六 C语言中的字符串与字符数组的详细分析

    之前有一篇文章是学习了字符和字符串的,可以与之结合学习:[C语言进阶深度学习记录]十二 C语言中的:字符和字符串 文章目录 1 字符串的概念 1.1 字符串与字符数组 1.2 字符数组与字符串代码分析 ...

  10. 深度学习自学(二十六):人脸数据集

    人脸检测,关键点检测,人脸识别,人脸表情,人脸年龄,人脸姿态等方向的数据集. 01 人脸检测 所谓人脸检测任务,就是要定位出图像中人脸的大概位置. 1.1 Caltech 10000 数据集地址:ht ...

最新文章

  1. “智源论坛Live”报名 | 清华大学游凯超:领域适配前沿研究--场景、方法与模型选择...
  2. 八十五、Python | Leetcode数据结构之图和动态规划算法系列
  3. 小样本学习(Few-shot Learning)综述
  4. 怎么在终端启用python_在终端启动Python时报错的解决方案
  5. java编程实现素数环_结对编程(JAVA实现)
  6. java将naso注册成计算机服务_2018计算机二级考试Java语言每日一练 8月2日
  7. 如何动态设置控件的宽高度?
  8. 多线程的那点儿事(之自旋锁)
  9. 切图时图片的选择:JPG、PNG、GIF的区别
  10. 数学建模可以用python_数学建模可以用Python吗
  11. pandownload使用
  12. 小灵通为什么会退市?
  13. java 聊天室 私聊_Java聊天室——实现多人聊天、私聊、群聊
  14. Rust FFI 编程 - FFI 概述
  15. 自同步扰乱编码器的原理与MATLAB仿真
  16. iphone怎样关闭副屏_机情烩:联通eSIM主副卡业务上线 副卡套餐最低仅10元
  17. HTML+CSS 简易搜索框
  18. 电脑连不上网故障排查思路
  19. PS Suite Studio 初探
  20. 网站推广(百度百科)

热门文章

  1. [转载] 菜鸟举例理解字节流和字符流区别
  2. [转载] python中for语句用法_详解Python中for循环的使用_python
  3. c++ stl队列初始化_创建一个向量,并将其像C ++ STL中的数组一样初始化
  4. php mysql 权重_PHP对MySql的常用操作
  5. go在方法中修改结构体的值_[Go]结构体及其方法
  6. .net core image怎么保存_轻量级Vue图片上传插件——Vue-core-image-Upload
  7. docker镜像创建与优化
  8. Zabbix监控——proxy 分布式监控配置
  9. ruby 覆盖率测试_Ruby方法覆盖
  10. c语言两个浮点数相加_C语言中两个浮点数或双精度数的模数