springboot rabbitlistener注解_一文带你SpringBoot+RabbitMQ方式收发消息
“
人生终将是场单人旅途,孤独之前是迷茫,孤独过后是成长。
”
楔子
这篇是消息队列RabbitMQ的第二弹。
上一篇的结尾我也预告了本篇的内容:利用RabbitTemplate和注解进行收发消息,还有一个我临时加上的内容:消息的序列化转换。
本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~
交给Spring帮我们管理连接可以让我们专注于业务逻辑,就像声明式事务一样易用,方便又高效。
祝有好收获,先赞后看,快乐无限。
- Github 地址:https://github.com/he-erduo/spring-boot-learning-demo
- Gitee 地址:https://gitee.com/he-erduo/spring-boot-learning-demo
Tip:上一篇的代码都放在prototype
包下,本篇的代码都放在auto
包下面。
1. ?环境配置
第一节我们先来搞一下环境的配置,上一篇中我们已经引入了自动配置的包,我们既然使用了自动配置的方式,那RabbitMQ
的连接信息我们直接放在配置文件中就行了,就像我们需要用到JDBC连接的时候去配置一下DataSource
一样。
如图所示,我们只需要指明一下连接的IP+端口号和用户名密码就行了,这里我用的是默认的用户名与密码,不写的话默认也都是guest,端口号也是默认5672。
主要我们需要看一下手动确认消息的配置,需要配置成manual
才是手动确认,日后还会有其他的配置项,眼下我们配置这一个就可以了。
接下来我们要配置一个Queue
,上一篇中我们往一个名叫erduo
的队列中发送消息,当时是我们手动定义的此队列,这里我们也需要手动配置,声明一个Bean
就可以了。
@Configurationpublic class RabbitmqConfig { @Bean public Queue erduo() { // 其三个参数:durable exclusive autoDelete // 一般只设置一下持久化即可 return new Queue("erduo",true); }
}
就这么简单声明一下就可以了,当然了 RabbitMQ 毕竟是一个独立的组件,如果你在 RabbitMQ 中通过其他方式已经创建过一个名叫erduo
的队列了,你这里也可以不声明,这里起到的一个效果就是如果你没有这个队列,会按照你声明的方式帮你创建这个队列。
配置完环境之后,我们就可以以SpringBoot的方式来编写生产者和消费者了。
2. ?生产者与RabbitTemplate
和上一篇的节奏一样,我们先来编写生产者,不过这次我要引入一个新的工具:RabbitTemplate
。
听它的这个名字就知道,又是一个拿来即用的工具类,Spring家族这点就很舒服,什么东西都给你封装一遍,让你用起来更方便更顺手。
RabbitTemplate 实现了标准AmqpTemplate接口,功能大致可以分为发送消息和接受消息。
我们这里是在生产者中来用,主要就是使用它的发送消息功能:send
和convertAndSend
方法。
// 发送消息到默认的Exchange,使用默认的routing keyvoid send(Message message) throws AmqpException;
// 使用指定的routing key发送消息到默认的exchangevoid send(String routingKey, Message message) throws AmqpException;
// 使用指定的routing key发送消息到指定的exchangevoid send(String exchange, String routingKey, Message message) throws AmqpException;
send
方法是发送byte数组的数据的模式,这里代表消息内容的对象是Message
对象,它的构造方法就是传入byte数组数据,所以我们需要把我们的数据转成byte数组然后构造成一个Message
对象再进行发送。
// Object类型,可以传入POJOvoid convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
convertAndSend
方法是可以传入POJO对象作为参数,底层是有一个MessageConverter
帮我们自动将数据转换成byte类型或String或序列化类型。
所以这里支持的传入对象也只有三种:byte类型,String类型和实现了Serializable
接口的POJO。
介绍完了,我们可以看一下代码:
@Slf4j@Component("rabbitProduce")public class RabbitProduce { @Autowired private RabbitTemplate rabbitTemplate;
public void send() { String message = "Hello 我是作者和耳朵,欢迎关注我。" + LocalDateTime.now().toString();
System.out.println("Message content : " + message);
// 指定消息类型 MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build();
rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props)); System.out.println("消息发送完毕。"); }
public void convertAndSend() { User user = new User();
System.out.println("Message content : " + user);
rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user); System.out.println("消息发送完毕。"); }
}
这里我特意写明了两个例子,一个用来测试send,另一个用来测试convertAndSend。
send
方法里我们看下来和之前的代码是几乎一样的,定义一个消息,然后直接send,但是这个构造消息的构造方法可能比我们想的要多一个参数,我们原来说的只要把数据转成二进制数组放进去即可,现在看来还要多放一个参数了。
MessageProperties
,是的我们需要多放一个 MessageProperties对象,从他的名字我们也可以看出它的功能就是附带一些参数,但是某些参数是少不了的,不带不行。
比如我的代码这里就是设置了一下消息的类型,消息的类型有很多种可以是二进制类型,文本类型,或者序列化类型,JSON类型,我这里设置的就是文本类型,指定类型是必须的,也可以为我们拿到消息之后要将消息转换成什么样的对象提供一个参考。
convertAndSend
方法就要简单太多,这里我放了一个User对象拿来测试用,直接指定队列然后放入这个对象即可。
Tips:User必须实现Serializable
接口,不然的话调用此方法的时候会抛出IllegalArgumentException
异常。
代码完成之后我们就可以调用了,这里我写一个测试类进行调用:
@SpringBootTestpublic class RabbitProduceTest { @Autowired private RabbitProduce rabbitProduce;
@Test public void sendSimpleMessage() { rabbitProduce.send(); rabbitProduce.convertAndSend(); }}
效果如下图~
同时在控制台使用命令rabbitmqctl.bat list_queues
查看队列-erduo
现在的情况:
如此一来,我们的生产者测试就算完成了,现在消息队列里两条消息了,而且消息类型肯定不一样,一个是我们设置的文本类型,一个是自动设置的序列化类型。
3. ?消费者与RabbitListener
既然队列里面已经有消息了,接下来我们就要看我们该如何通过新的方式拿到消息并消费与确认了。
消费者这里我们要用到@RabbitListener
来帮我们拿到指定队列消息,它的用法很简单也很复杂,我们可以先来说简单的方式,直接放到方法上,指定监听的队列就行了。
@Slf4j@Component("rabbitConsumer")public class RabbitConsumer {
@RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息已确认"); }
}
这段代码就代表onMessage
方法会处理erduo
(Producer.QUEUE_NAME是常量字符串"erduo")队列中的消息。
我们可以看到这个方法里面有两个参数,Message
和Channel
,如果用不到 Channel 可以不写此参数,但是 Message 消息一定是要的,它代表了消息本身。
我们可以想想,我们的程序从 RabbitMQ 之中拉回一条条消息之后,要以怎么样的方式展示给我们呢?
没错,就是封装为一个个 Message 对象,这里面放入了一条消息的所有信息,数据结构是什么样一会我一run你就能看到了。
同时这里我们使用 Channel 做一个消息确认的操作,这里的DeliveryTag代表的是这个消息在队列中的序号,这个信息存放在 MessageProperties 中。
4. ?SpringBoot 启动!
编写完生产者和消费者,同时已经运行过生产者往消息队列里面放了两条信息,接下来我们可以直接启动消息,查看消费情况:
在我红色框线标记的地方可以看到,因为我们有了消费者所以项目启动后先和RabbitMQ建立了一个连接进行监听队列。
随后就开始消费我们队列中的两条消息:
第一条信息是contentType=text/plain
类型,所以直接就在控制台上打印出了具体内容。
第二条信息是contentType=application/x-java-serialized-object
,在打印的时候只打印了一个内存地址+字节大小。
不管怎么说,数据我们是拿到了,也就是代表我们的消费是没有问题的,同时也都进行了消息确认操作,从数据上看,整个消息可以分为两部分:body
和MessageProperties
。
我们可以单独使用一个注解拿到这个body的内容 - @Payload
@RabbitListener(queues = Producer.QUEUE_NAME)public void onMessage(@Payload String body, Channel channel) throws Exception { System.out.println("Message content : " + body);}
也可以单独使用一个注解拿到MessageProperties
的headers属性,headers属性在截图里也可以看到,只不过是个空的 - @Headers。
@RabbitListener(queues = Producer.QUEUE_NAME)public void onMessage(@Payload String body, @Headers Map headers) throws Exception { System.out.println("Message content : " + body); System.out.println("Message headers : " + headers);}
这两个注解都算是扩展知识,我还是更喜欢直接拿到全部,全都要!!!
上面我们已经完成了消息的发送与消费,整个过程我们可以再次回想一下,一切都和我画的这张图上一样的轨迹:
只不过我们一直没有指定Exchage
一直使用的默认路由,希望大家好好记住这张图。
5. ?@RabbitListener与@RabbitHandler
下面再来补一些知识点,有关@RabbitListener
与@RabbitHandler
。
@RabbitListener 上面我们已经简单的进行了使用,稍微扩展一下它其实是可以监听多个队列的,就像这样:
@RabbitListener(queues = { "queue1", "queue2" })public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息已确认");}
还有一些其他的特性如绑定之类的,这里不再赘述因为太硬编码了一般用不上。
下面来说说这节要主要讲的一个特性:@RabbitListener
和@RabbitHandler
的搭配使用。
前面我们没有提到,@RabbitListener 注解其实是可以注解在类上的,这个注解在类上标志着这个类监听某个队列或某些队列。
这两个注解的搭配使用就要让 @RabbitListener 注解在类上,然后用 @RabbitHandler 注解在方法上,根据方法参数的不同自动识别并去消费,写个例子给大家看一看更直观一些。
@Slf4j@Component("rabbitConsumer")@RabbitListener(queues = Producer.QUEUE_NAME)public class RabbitConsumer {
@RabbitHandler public void onMessage(@Payload String message){ System.out.println("Message content : " + message); }
@RabbitHandler public void onMessage(@Payload User user) { System.out.println("Message content : " + user); }}
大家可以看看这个例子,我们先用 @RabbitListener 监听erduo
队列中的消息,然后使用 @RabbitHandler 注解了两个方法。
第一个方法的body类型是String类型,这就代表着这个方法只能处理文本类型的消息。
第二个方法的body类型是User类型,这就代表着这个方法只能处理序列化类型且为User类型的消息。
这两个方法正好对应着我们第二节中测试类会发送的两种消息,所以我们往RabbitMQ中发送两条测试消息,用来测试这段代码,看看效果:
都在控制台上如常打印了,如果 @RabbitHandler 注解的方法中没有一个的类型可以和你消息的类型对的上,比如消息都是byte数组类型,这里没有对应的方法去接收,系统就会在控制台不断的报错,如果你出现这个情况就证明你类型写的不正确。
假设你的erduo
队列中会出现三种类型的消息:byte,文本和序列化,那你就必须要有对应的处理这三种消息的方法,不然消息发过来的时候就会因为无法正确转换而报错。
而且使用了 @RabbitHandler 注解之后就不能再和之前一样使用Message
做接收类型。
@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息已确认");}
这样写的话会报类型转换异常的,所以二者选其一。
同时上文我的@RabbitHandler
没有进行消息确认,大家可以自己试一下进行消息确认。
6. ?消息的序列化转换
通过上文我们已经知道,能被自动转换的对象只有byte[]
、String
、java序列化对象
(实现了Serializable接口的对象),但是并不是所有的Java对象都会去实现Serializable接口,而且序列化的过程中使用的是JDK自带的序列化方法,效率低下。
所以我们更普遍的做法是:使用Jackson先将数据转换成JSON格式发送给RabbitMQ
,再接收消息的时候再用Jackson将数据反序列化出来。
这样做可以完美解决上面的痛点:消息对象既不必再去实现Serializable接口,也有比较高的效率(Jackson序列化效率业界应该是最好的了)。
默认的消息转换方案是消息转换顶层接口-MessageConverter
的一个子类:SimpleMessageConverter
,我们如果要换到另一个消息转换器只需要替换掉这个转换器就行了。
上图是MessageConverter
结构树的结构树,可以看到除了SimpleMessageConverter
之外还有一个Jackson2JsonMessageConverter
,我们只需要将它定义为Bean,就可以直接使用这个转换器了。
@Bean public MessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(jacksonObjectMapper); }
这样就可以了,这里的jacksonObjectMapper
可以不传入,但是默认的ObjectMapper
方案对JDK8的时间日期序列化会不太友好,具体可以参考我的上一篇文章:从LocalDateTime序列化探讨全局一致性序列化,总的来说就是定义了自己的ObjectMapper
。
同时为了接下来测试方便,我又定义了一个专门测试JSON序列化的队列:
@Beanpublic Queue erduoJson() { // 其三个参数:durable exclusive autoDelete // 一般只设置一下持久化即可 return new Queue("erduo_json",true);}
如此之后就可以进行测试了,先是生产者代码:
public void sendObject() { Client client = new Client();
System.out.println("Message content : " + client);
rabbitTemplate.convertAndSend(RabbitJsonConsumer.JSON_QUEUE,client); System.out.println("消息发送完毕。"); }
我又重新定义了一个Client
对象,它和之前测试使用的User对象成员变量都是一样的,不一样的是它没有实现Serializable接口。
同时为了保留之前的测试代码,我又新建了一个RabbitJsonConsumer
,用于测试JSON序列化的相关消费代码,里面定义了一个静态变量:JSON_QUEUE = "erduo_json"
;
所以这段代码是将Client
对象作为消息发送到"erduo_json"
队列中去,随后我们在测试类中run一下进行一次发送。
紧着是消费者代码:
@Slf4j@Component("rabbitJsonConsumer")@RabbitListener(queues = RabbitJsonConsumer.JSON_QUEUE)public class RabbitJsonConsumer { public static final String JSON_QUEUE = "erduo_json";
@RabbitHandler public void onMessage(Client client, @Headers Map headers, Channel channel) throws Exception { System.out.println("Message content : " + client); System.out.println("Message headers : " + headers); channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false); System.out.println("消息已确认"); }
}
有了上文的经验之后,这段代码理解起来也是很简单了吧,同时给出了上一节没写的如何在@RabbitHandler
模式下进行消息签收。
我们直接来看看效果:
在打印的Headers里面,往后翻可以看到contentType=application/json
,这个contentType
是表明了消息的类型,这里正是说明我们新的消息转换器生效了,将所有消息都转换成了JSON类型。
后记
这两篇讲完了RabbitMQ
的基本收发消息,包括手动配置和自动配置的两种方式,这些大家仔细研读之后应该会对 RabbitMQ 收发消息没什么疑问了~
不过我们一直以来发消息时都是使用默认的交换机,下篇将会讲述一下 RabbitMQ 的几种交换机类型,以及其使用方式。
讲完了交换机之后,这些 RabbitMQ 的常用概念基本就完善了。
人生终将是场单人旅途,孤独之前是迷茫,孤独过后是成长。
感谢各位的点赞和在看?,我是和耳朵,一个一直想做知识输出和大家共同成长的人,我们下期见。
springboot rabbitlistener注解_一文带你SpringBoot+RabbitMQ方式收发消息相关推荐
- 刚体验完RabbitMQ?一文带你SpringBoot+RabbitMQ方式收发消息
楔子 本篇是消息队列RabbitMQ的第二弹. 上一篇的结尾我也预告了本篇的内容:利用RabbitTemplate和注解进行收发消息,还有一个我临时加上的内容:消息的序列化转换. 本篇会和Spring ...
- 什么是对象的消息_SpringBoot+RabbitMQ方式收发消息,一文带你体验
推荐学习 二本渣渣被炒,18天脱产学飞SpringBoot,逆袭腾讯涨薪18K! 消息中间件合集:MQ(ActiveMQ/RabbitMQ/RocketMQ)+Kafka+笔记 肝了30天,整出这份[ ...
- 这篇带你熟悉 SpringBoot+RabbitMQ 方式收发消息
本文来源:juejin.im/post/6859152029823008781 本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建 ...
- springboot 接受数组对象_SpringBoot+RabbitMQ 方式收发消息
本文来源:juejin.im/post/6859152029823008781 本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建 ...
- swagger 修改dto注解_一文搞懂Swagger,让你明白用了Swagger的好处!!!
前后端分离缺陷 了解Swagger之前,需要先知道什么是前后端分离 现在的时代 SpringBoot + VUE 以前的时代 SSM + JSP模板引擎====>后端程序员 前后端分离时代 通过 ...
- lambda表达式java项目常用_一文带你彻底搞懂Lambda表达式
1. 为什么使用Lambda表达式 Lambda是一个匿名函数,我们可以把Lambda表达式理解为是一段可以传递的代码(将代码像数据一样进行传递).可以写出更简洁.更灵活的代码.作为一种更紧凑的代码风 ...
- 怎么看rabbitmq的浏览器信息_没用过消息队列?一文带你体验RabbitMQ收发消息
人生终将是场单人旅途,孤独之前是迷茫,孤独过后是成长. 楔子 先给大家说声抱歉,最近一周都没有发文,有一些比较要紧重要的事需要处理. 今天正好得空,本来说准备写SpringIOC相关的东西,但是发现想 ...
- pyecharts对于经纬度_一文带你掌握Pyecharts地理数据可视化的方法
本文主要介绍了Pyecharts地理数据可视化,分享给大家,具体如下: 一.Pyecharts简介和安装 1. 简介 Echarts 是一个由百度开源的数据可视化,凭借着良好的交互性,精巧的图表设计, ...
- java byte char io流_一文带你看懂JAVA IO流,史上最全面的IO教学
原标题:一文带你看懂JAVA IO流,史上最全面的IO教学 一.IO流是什么 惯例引用百科的回答 流是一种抽象概念,它代表了数据的无结构化传递.按照流的方式进行输入输出,数据被当成无结构的字节序或字符 ...
最新文章
- centos sudo不能运行_如何在 Linux 中配置 sudo 访问权限 | Linux 中国
- 过滤某一个时间段的日志----sed
- String 中的秘密
- axios 跨域代理
- RabbitMQ消息
- 卡诺模板_无关条件的卡诺地图
- 学术前沿 | Texar-PyTorch:在PyTorch里重现TensorFlow的最佳特性
- FPGA设计时避免使用循环语句
- JSK-127 进制转换【进制】
- 微信小程序 时间插件 (可以选择日期+星期)
- 深度学习中常见的打标签工具和数据集资源
- linux 终端窗口最大化,如何设置终端打开最大化
- 经验:怎么样免费在线PDF拆分
- java来源_java的来源
- PyCharm Community 2021.2 安装与汉化
- (12)筋斗云案例(导航栏醒目显示跟随)
- 《速度与激情9》中有哪些槽点?
- [通信 组成架构]AP是什么 WLAN及无线网络的结构
- oracle关键字plus,详细介绍ORACLE sqlplus命令 - jack198409的个人空间 - ITPU...
- Learn Beautiful Soup(3)——使用Beautiful Soup进行查找
热门文章
- 漫画:假装内卷,才是互联网人的骚操作
- @Cacheable 指定缓存位置
- 【升级包】jeecg_online 支持主子表列表展示风格模板升级包,简易升级
- 【jeecg移动开发能力】表单移动开发能力,提供多套表单模板(移动端、PC端),支持自定义
- 微信分享JS-SDK示例页面
- 通过反射--操作运行时类中的指定的属性/方法
- CS0656	缺少编译器要求的成员“Microsoft.CSharp..........
- Socket IO与NIO(三)
- Python3.7 Scrapy安装(Windows)
- WeUI 为微信 Web 服务量身设计-h5前端框架