SpringBoot和RabbitMQ集成
步骤
自动配置
123456789101112131415161718192021222324252627 |
@Bean public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config) throws Exception { RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean(); if (config.determineHost() != null) { //设置mq的host地址 factory.setHost(config.determineHost()); } factory.setPort(config.determinePort()); if (config.determineUsername() != null) { //设置mq的username factory.setUsername(config.determineUsername()); } if (config.determinePassword() != null) { //设置mq的密码 factory.setPassword(config.determinePassword()); } if (config.determineVirtualHost() != null) { //是指虚拟主机 factory.setVirtualHost(config.determineVirtualHost()); } if (config.getRequestedHeartbeat() != null) { //心跳 factory.setRequestedHeartbeat(config.getRequestedHeartbeat()); } .....} |
123456789101112131415161718192021222324252627282930313233343536373839 |
@ConfigurationProperties(prefix = "spring.rabbitmq")public class RabbitProperties { //地址 private String host = "localhost"; //端口 private int port = 5672; //账号 private String username; //密码 private String password; //SSL配置 private final Ssl ssl = new Ssl(); //虚拟主机 private String virtualHost; //地址 private String addresses; //请求心跳超时,以秒为单位; 零,没有。 private Integer requestedHeartbeat; //Publisher Confirms and Returns机制 private boolean publisherConfirms; private boolean publisherReturns; //连接超时时间 private Integer connectionTimeout; //缓存 private final Cache cache = new Cache(); //监听容器配置 private final Listener listener = new Listener(); private final Template template = new Template(); private List<Address> parsedAddresses; public String getHost() { return this.host; } |
RabbitProperties封装了RabbitMQ发送和接收消息。
RabbitTemplate给RabbitMQ发送和接收消息。
AmqpAdmin,RabbitMQ系统管理功能组件。
123456789101112131415161718192021222324252627282930313233343536 |
@Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean(RabbitTemplate.class) public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { //生成rabbitTemplate来操作rabbitmq RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); MessageConverter messageConverter = this.messageConverter.getIfUnique(); //如果messageConverter不为空设置我们自己的messageConverter if (messageConverter != null) { rabbitTemplate.setMessageConverter(messageConverter); } rabbitTemplate.setMandatory(determineMandatoryFlag()); RabbitProperties.Template templateProperties = this.properties.getTemplate(); RabbitProperties.Retry retryProperties = templateProperties.getRetry(); if (retryProperties.isEnabled()) { rabbitTemplate.setRetryTemplate(createRetryTemplate(retryProperties)); } if (templateProperties.getReceiveTimeout() != null) { rabbitTemplate.setReceiveTimeout(templateProperties.getReceiveTimeout()); } if (templateProperties.getReplyTimeout() != null) { rabbitTemplate.setReplyTimeout(templateProperties.getReplyTimeout()); } return rabbitTemplate; } @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true) @ConditionalOnMissingBean(AmqpAdmin.class) public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } } |
P2P发送
12345678910111213141516171819202122232425262728293031323334 |
package com.hph.amqp; import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner; import java.util.Arrays;import java.util.HashMap;import java.util.Map; @RunWith(SpringRunner.class)@SpringBootTestpublic class SpringBootAmqpApplicationTests { @Autowired RabbitTemplate rabbitTemplate; /** * 单播 P2P */ @Test public void p2p() { Map<String, Object> map = new HashMap<>(); map.put("msg","这是第1个消息"); map.put("data", Arrays.asList("Hello Rabitmq",123456, true)); //对象默认被序列化以后发送出去 rabbitTemplate.convertAndSend("exchange.direct", "phh.news",map); } } |
这是因为默认使用的是application/x-java-serialized-object的序列化
获取消息
123456 |
@Testpublic void receive() { Object o = rabbitTemplate.receiveAndConvert("hph.news"); System.out.println(o.getClass()); System.out.println(o);} |
转为Json
由于是RabbitTemplate操作Rabbit的在RabbitTemplate中RabbitTemplate为默认的序列化器
1 |
private volatile MessageConverter messageConverter = new SimpleMessageConverter(); |
MessageConverter又一下实现类我们使用的是Jackson2JsonMessageConverter的序列化器
在设置我们自己的MessageConverter
123 |
if (messageConverter != null) { rabbitTemplate.setMessageConverter(messageConverter); } |
1234567891011121314 |
package com.hph.amqp.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration; @Configurationpublic class MyAMQPConfig { @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }} |
再次发送消息
自定义发送
1234567 |
@Test public void sendMessage() { Map<String, Object> map = new HashMap<>(); map.put("msg", "这是第1个消息"); map.put("data", Arrays.asList("清风笑丶",123456,true)); rabbitTemplate.convertAndSend("exchange.direct", "hph.news", new Person("小明",18)); } |
1234567891011121314151617181920212223242526272829303132333435363738 |
package com.hph.amqp.bean; public class Person { private String name; private Integer age; public Person() { } public Person(String name, Integer age) { this.name = name; this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } @Override public String toString() { return "Person{" + "name='" + name + '\'' + ", age=" + age + '}'; }} |
反序列化
1234567 |
@Testpublic void receive() { Object o = rabbitTemplate.receiveAndConvert("hph.news"); System.out.println(o.getClass()); System.out.println(o);} |
广播发送
1234 |
@Testpublic void sendMessages() { rabbitTemplate.convertAndSend("exchange.fanout", "hph.news", new Person("清风笑丶",18));} |
监听消息队列
1234567891011121314 |
package com.hph.amqp; import org.springframework.amqp.rabbit.annotation.EnableRabbit;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication; @EnableRabbit //开启基于注解的RabbitMQ的模式@SpringBootApplicationpublic class SpringBootAmqpApplication { public static void main(String[] args) { SpringApplication.run(SpringBootAmqpApplication.class, args); }} |
123456789101112131415 |
package com.hph.amqp.service; import com.hph.amqp.bean.Person;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service; @Servicepublic class PersonService { @RabbitListener(queues = "hph.news") public void receive(Person person) { System.out.println("收到消息" + person+"上线"); }} |
启动SpringBoot然后运行sendMessage任务。
123456 |
@RabbitListener(queues = "hph") public void receive02(Message message){ System.out.println(message.getBody()); System.out.println(message.getMessageProperties()); }} |
消息头信息。
管理
在SpringBoot中消息队列的管理使用到了amqpAdmin
1234 |
@ConditionalOnMissingBean(AmqpAdmin.class) public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } |
在RabbitAutoConfiguration
1234567891011121314151617181920212223 |
public class DirectExchange extends AbstractExchange { public static final DirectExchange DEFAULT = new DirectExchange(""); //设置名字 public DirectExchange(String name) { super(name); } //名字 是否持久化 自动删除 public DirectExchange(String name, boolean durable, boolean autoDelete) { super(name, durable, autoDelete); } public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) { super(name, durable, autoDelete, arguments); } @Override public final String getType() { return ExchangeTypes.DIRECT; } } |
![
AHg0UA.png](https://s2.ax1x.com/2019/04/11/AHg0UA.png)
12345 |
@Test public void createExchange(){ amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange")); System.out.println("创建完成"); } |
运行该方法。
创建exchange
12345678910111213141516 |
public Queue(String name, boolean durable) { this(name, durable, false, false, null);} public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) { this(name, durable, exclusive, autoDelete, null);} public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) { Assert.notNull(name, "'name' cannot be null"); this.name = name; this.durable = durable; this.exclusive = exclusive; this.autoDelete = autoDelete; this.arguments = arguments;} |
创建Queue
12345 |
@Test public void createQueue() { amqpAdmin.declareQueue(new Queue("amqpadmin.queue", true)); System.out.println("创建队列成功"); } |
绑定exchange
12345678 |
public Binding(String destination, DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments) { this.destination = destination; this.destinationType = destinationType; this.exchange = exchange; this.routingKey = routingKey; this.arguments = arguments; } |
之前尚未绑定
1234 |
@Testpublic void bindExchange() { amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqpadmin.exchange", "amqp.bind", null));} |
绑定成功
SpringBoot和RabbitMQ集成相关推荐
- springboot 与rabbitmq集成+生产者投递确认+消费者手动确认+TTL+死信队列+延时队列
1.生产者的消息可靠性投递机制 1)springboot yml文件配置 spring:rabbitmq:host: 10.0.23.83username: lifwepassword: 123456 ...
- 【自撰】RabbitMQ集成SpringBoot框架
RabbitMQ集成SpringBoot框架 导入springboot依赖 <!-- rabbitmq依赖 --> <dependency><groupId>org ...
- SpringBoot整合RabbitMQ 实现五种消息模型
目录 SpringBoot中使用RabbitMQ 搭建初始环境 引入依赖 配置配置文件 测试类 注入 rabbitTemplate 消息队列RabbitMQ之五种消息模型 第一种直连模型使用 开发生产 ...
- RabbitMq(九) SpringBoot整合RabbitMQ消费者示例代码
概述 在上一篇我们介绍了SpringBoot整合RabbitMQ生产者代码,本章我们介绍SpringBoot整合RabbitMQ,实现消费者工程的代码实现.与生产者集成相比,集成消费者不需要进行添加配 ...
- SpringBoot整合RabbitMQ,实现单机抢票系统
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息. 消息 ...
- RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)
说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...
- 九、springboot整合rabbitMQ
springboot整合rabbitMQ 简介 rabbitMQ是部署最广泛的开源消息代理. rabbitMQ轻量级,易于在内部和云中部署. 它支持多种消息传递协议. RabbitMQ可以部署在分布式 ...
- SpringBoot使用RabbitMQ消息队列
RabbitMQ简介 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的 ...
- RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ
什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...
最新文章
- python二次开发攻略-ABAQUS Python二次开发攻略
- Can‘t connect to MySQL server on ‘localhost:3306‘ (10061)
- 女性人均存款56.3万,中国女性存款为何比男性高?
- 关于Oracle AUTONOMOUS TRANSACTION(自治事务)的介绍
- 成为更优秀的程序员:退后一步看问题
- flume-ng 入 oracle,flume-ng-sql-source
- 字节跳动的一面内容记录
- scala-jdbc-scalike操作jdbc数据库
- java 10什么意思_详解:Java 10的10个新特性
- MyEclipse10破解详解过程
- 控制台接收信息转发_微信多群转播能够起到什么作用?微信群聊录制课程语音转发多群怎么操作?...
- Oracle 创建新用户后无法登入,显示user lacks CREATE SESSION privilege; logon denied
- xcb_query_extension_reply_t的解释
- 从0开始建设SAAS系统的建议
- 有道云笔记同步IT笔试面试资源
- 如何系统地学习网络安全
- VR和AR可以怎样干掉智能手机
- 连接跟踪TCP序号检查
- python代码缩进和冒号_Python缩进和冒号详解
- Oracle 10G 64位下载
热门文章
- lisp读取天正轴号_第2天:Python 基础语法
- 计算机课本ppt,计算机基础知识培训教材(ppt44页) .pptx
- Spring.Net---4、IoC/DI注入方式
- HDU 1223 还是畅通工程(最小生成树prim模板)
- 泛型(java菜鸟的课堂笔记)
- js添加事件、移除事件、阻止冒泡、阻止浏览器默认行为等写法(兼容IE/FF/CHROME) 转载...
- SQL删除数据delete
- 分析函数——rollup,cube,rank,partition by
- php怎么求最小公倍数,PHP编程求最大公约数与最小公倍数的方法示例
- linux开热点软件,在Ubuntu系统的电脑上开启无线热点全攻略,