SpringBoot和RabbitMQ的集成:

步骤

自动配置

123456789101112131415161718192021222324252627
@Bean  public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)          throws Exception {        RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();     if (config.determineHost() != null) {               //设置mq的host地址          factory.setHost(config.determineHost());      }     factory.setPort(config.determinePort());      if (config.determineUsername() != null) {               //设置mq的username            factory.setUsername(config.determineUsername());      }     if (config.determinePassword() != null) {               //设置mq的密码          factory.setPassword(config.determinePassword());      }     if (config.determineVirtualHost() != null) {               //是指虚拟主机            factory.setVirtualHost(config.determineVirtualHost());        }     if (config.getRequestedHeartbeat() != null) {               //心跳               factory.setRequestedHeartbeat(config.getRequestedHeartbeat());           }  .....}
123456789101112131415161718192021222324252627282930313233343536373839
@ConfigurationProperties(prefix = "spring.rabbitmq")public class RabbitProperties {    //地址   private String host = "localhost";    //端口   private int port = 5672;    //账号   private String username; //密码   private String password; //SSL配置   private final Ssl ssl = new Ssl();    //虚拟主机   private String virtualHost;    //地址   private String addresses;

 //请求心跳超时,以秒为单位; 零,没有。   private Integer requestedHeartbeat;

 //Publisher Confirms and Returns机制   private boolean publisherConfirms;

   private boolean publisherReturns; //连接超时时间   private Integer connectionTimeout;   //缓存   private final Cache cache = new Cache();

  //监听容器配置   private final Listener listener = new Listener();

   private final Template template = new Template();

   private List<Address> parsedAddresses;

   public String getHost() {      return this.host;   }

RabbitProperties封装了RabbitMQ发送和接收消息。

RabbitTemplate给RabbitMQ发送和接收消息。

AmqpAdmin,RabbitMQ系统管理功能组件。

123456789101112131415161718192021222324252627282930313233343536
@Bean        @ConditionalOnSingleCandidate(ConnectionFactory.class)       @ConditionalOnMissingBean(RabbitTemplate.class)      public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {            //生成rabbitTemplate来操作rabbitmq            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);           MessageConverter messageConverter = this.messageConverter.getIfUnique();            //如果messageConverter不为空设置我们自己的messageConverter         if (messageConverter != null) {              rabbitTemplate.setMessageConverter(messageConverter);         }         rabbitTemplate.setMandatory(determineMandatoryFlag());            RabbitProperties.Template templateProperties = this.properties.getTemplate();            RabbitProperties.Retry retryProperties = templateProperties.getRetry();          if (retryProperties.isEnabled()) {                rabbitTemplate.setRetryTemplate(createRetryTemplate(retryProperties));            }         if (templateProperties.getReceiveTimeout() != null) {                rabbitTemplate.setReceiveTimeout(templateProperties.getReceiveTimeout());         }         if (templateProperties.getReplyTimeout() != null) {              rabbitTemplate.setReplyTimeout(templateProperties.getReplyTimeout());         }         return rabbitTemplate;        }

       @Bean        @ConditionalOnSingleCandidate(ConnectionFactory.class)       @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic",               matchIfMissing = true)       @ConditionalOnMissingBean(AmqpAdmin.class)       public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {         return new RabbitAdmin(connectionFactory);        }

   }

P2P发送

12345678910111213141516171819202122232425262728293031323334
package com.hph.amqp;

import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;import java.util.HashMap;import java.util.Map;

@RunWith(SpringRunner.class)@SpringBootTestpublic class SpringBootAmqpApplicationTests {

    @Autowired    RabbitTemplate rabbitTemplate;

    /**     * 单播 P2P     */    @Test    public void p2p() {        Map<String, Object> map = new HashMap<>();        map.put("msg","这是第1个消息");        map.put("data", Arrays.asList("Hello Rabitmq",123456, true));       //对象默认被序列化以后发送出去        rabbitTemplate.convertAndSend("exchange.direct", "phh.news",map);    }

}

这是因为默认使用的是application/x-java-serialized-object的序列化

获取消息

123456
@Testpublic void receive() {    Object o = rabbitTemplate.receiveAndConvert("hph.news");    System.out.println(o.getClass());    System.out.println(o);}

转为Json

由于是RabbitTemplate操作Rabbit的在RabbitTemplate中RabbitTemplate为默认的序列化器

1
private volatile MessageConverter messageConverter = new SimpleMessageConverter();

MessageConverter又一下实现类我们使用的是Jackson2JsonMessageConverter的序列化器

在设置我们自己的MessageConverter

123
if (messageConverter != null) {               rabbitTemplate.setMessageConverter(messageConverter);         }
1234567891011121314
package com.hph.amqp.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;

@Configurationpublic class MyAMQPConfig {    @Bean    public MessageConverter messageConverter() {        return new Jackson2JsonMessageConverter();    }}

再次发送消息

自定义发送

1234567
@Test  public void sendMessage() {      Map<String, Object> map = new HashMap<>();      map.put("msg", "这是第1个消息");      map.put("data", Arrays.asList("清风笑丶",123456,true));      rabbitTemplate.convertAndSend("exchange.direct", "hph.news", new Person("小明",18));  }
1234567891011121314151617181920212223242526272829303132333435363738
package com.hph.amqp.bean;

public class Person {    private String name;    private Integer age;

    public Person() {    }

    public Person(String name, Integer age) {        this.name = name;        this.age = age;    }

    public String getName() {        return name;    }

    public void setName(String name) {        this.name = name;    }

    public Integer getAge() {        return age;    }

    public void setAge(Integer age) {        this.age = age;    }

    @Override    public String toString() {        return "Person{" +                "name='" + name + '\'' +                ", age=" + age +                '}';    }}

反序列化

1234567
@Testpublic void receive() {    Object o = rabbitTemplate.receiveAndConvert("hph.news");

    System.out.println(o.getClass());    System.out.println(o);}

广播发送

1234
@Testpublic void sendMessages() {    rabbitTemplate.convertAndSend("exchange.fanout", "hph.news", new Person("清风笑丶",18));}

监听消息队列

1234567891011121314
package com.hph.amqp;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableRabbit //开启基于注解的RabbitMQ的模式@SpringBootApplicationpublic class SpringBootAmqpApplication {

    public static void main(String[] args) {        SpringApplication.run(SpringBootAmqpApplication.class, args);    }}
123456789101112131415
package com.hph.amqp.service;

import com.hph.amqp.bean.Person;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;

@Servicepublic class PersonService {

    @RabbitListener(queues = "hph.news")    public void receive(Person person) {        System.out.println("收到消息" + person+"上线");

    }}

启动SpringBoot然后运行sendMessage任务。

123456
   @RabbitListener(queues = "hph")    public void receive02(Message message){        System.out.println(message.getBody());        System.out.println(message.getMessageProperties());    }}

消息头信息。

管理

在SpringBoot中消息队列的管理使用到了amqpAdmin

1234
@ConditionalOnMissingBean(AmqpAdmin.class)   public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {     return new RabbitAdmin(connectionFactory);    }

在RabbitAutoConfiguration

1234567891011121314151617181920212223
public class DirectExchange extends AbstractExchange {

   public static final DirectExchange DEFAULT = new DirectExchange("");

   //设置名字   public DirectExchange(String name) {      super(name);   } //名字  是否持久化 自动删除   public DirectExchange(String name, boolean durable, boolean autoDelete) {      super(name, durable, autoDelete);   }

   public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {      super(name, durable, autoDelete, arguments);   }

   @Override   public final String getType() {      return ExchangeTypes.DIRECT;   }

}

![

AHg0UA.png](https://s2.ax1x.com/2019/04/11/AHg0UA.png)

12345
@Test public void createExchange(){ amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));     System.out.println("创建完成"); }

运行该方法。

创建exchange

12345678910111213141516
public Queue(String name, boolean durable) {   this(name, durable, false, false, null);}

public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {    this(name, durable, exclusive, autoDelete, null);}

public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {  Assert.notNull(name, "'name' cannot be null");    this.name = name;    this.durable = durable;  this.exclusive = exclusive;  this.autoDelete = autoDelete;    this.arguments = arguments;}

创建Queue

12345
@Test  public void createQueue() {      amqpAdmin.declareQueue(new Queue("amqpadmin.queue", true));      System.out.println("创建队列成功");  }

绑定exchange

12345678
public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,            Map<String, Object> arguments) {        this.destination = destination;      this.destinationType = destinationType;      this.exchange = exchange;        this.routingKey = routingKey;        this.arguments = arguments;  }

之前尚未绑定

1234
@Testpublic void bindExchange() {    amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqpadmin.exchange", "amqp.bind", null));}

绑定成功

SpringBoot和RabbitMQ集成相关推荐

  1. springboot 与rabbitmq集成+生产者投递确认+消费者手动确认+TTL+死信队列+延时队列

    1.生产者的消息可靠性投递机制 1)springboot yml文件配置 spring:rabbitmq:host: 10.0.23.83username: lifwepassword: 123456 ...

  2. 【自撰】RabbitMQ集成SpringBoot框架

    RabbitMQ集成SpringBoot框架 导入springboot依赖 <!-- rabbitmq依赖 --> <dependency><groupId>org ...

  3. SpringBoot整合RabbitMQ 实现五种消息模型

    目录 SpringBoot中使用RabbitMQ 搭建初始环境 引入依赖 配置配置文件 测试类 注入 rabbitTemplate 消息队列RabbitMQ之五种消息模型 第一种直连模型使用 开发生产 ...

  4. RabbitMq(九) SpringBoot整合RabbitMQ消费者示例代码

    概述 在上一篇我们介绍了SpringBoot整合RabbitMQ生产者代码,本章我们介绍SpringBoot整合RabbitMQ,实现消费者工程的代码实现.与生产者集成相比,集成消费者不需要进行添加配 ...

  5. SpringBoot整合RabbitMQ,实现单机抢票系统

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息. 消息 ...

  6. RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)

    说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...

  7. 九、springboot整合rabbitMQ

    springboot整合rabbitMQ 简介 rabbitMQ是部署最广泛的开源消息代理. rabbitMQ轻量级,易于在内部和云中部署. 它支持多种消息传递协议. RabbitMQ可以部署在分布式 ...

  8. SpringBoot使用RabbitMQ消息队列

    RabbitMQ简介 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的 ...

  9. RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ

    什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...

最新文章

  1. python二次开发攻略-ABAQUS Python二次开发攻略
  2. Can‘t connect to MySQL server on ‘localhost:3306‘ (10061)
  3. 女性人均存款56.3万,中国女性存款为何比男性高?
  4. 关于Oracle AUTONOMOUS TRANSACTION(自治事务)的介绍
  5. 成为更优秀的程序员:退后一步看问题
  6. flume-ng 入 oracle,flume-ng-sql-source
  7. 字节跳动的一面内容记录
  8. scala-jdbc-scalike操作jdbc数据库
  9. java 10什么意思_详解:Java 10的10个新特性
  10. MyEclipse10破解详解过程
  11. 控制台接收信息转发_微信多群转播能够起到什么作用?微信群聊录制课程语音转发多群怎么操作?...
  12. Oracle 创建新用户后无法登入,显示user lacks CREATE SESSION privilege; logon denied
  13. xcb_query_extension_reply_t的解释
  14. 从0开始建设SAAS系统的建议
  15. 有道云笔记同步IT笔试面试资源
  16. 如何系统地学习网络安全
  17. VR和AR可以怎样干掉智能手机
  18. 连接跟踪TCP序号检查
  19. python代码缩进和冒号_Python缩进和冒号详解
  20. Oracle 10G 64位下载

热门文章

  1. lisp读取天正轴号_第2天:Python 基础语法
  2. 计算机课本ppt,计算机基础知识培训教材(ppt44页) .pptx
  3. Spring.Net---4、IoC/DI注入方式
  4. HDU 1223 还是畅通工程(最小生成树prim模板)
  5. 泛型(java菜鸟的课堂笔记)
  6. js添加事件、移除事件、阻止冒泡、阻止浏览器默认行为等写法(兼容IE/FF/CHROME) 转载...
  7. SQL删除数据delete
  8. 分析函数——rollup,cube,rank,partition by
  9. php怎么求最小公倍数,PHP编程求最大公约数与最小公倍数的方法示例
  10. linux开热点软件,在Ubuntu系统的电脑上开启无线热点全攻略,