一、RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

  二、目录结构

  

  三、是使用springboot搭建rabbitmq我们需要基本的依赖包

  <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>

  四、这里我们主要介绍6中模式的配置和使用

  1)默认的模式(这种方式不是没有exchange,而是使用默认的exchange。默认为Direct)

  

           

  声明方式:

/*** 第一种:使用默认的交换机(direct模式)*/
@Configuration
public class QueueConfiguration {/*** 声明队列:队列有五个参数(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)* name:队列名称* durable:持久性* exclusive:排他性(独立性)* autoDelete:自动删除* arguments:其他相关参数* @return*/@Beanpublic Queue queue() {return new Queue("queue", false);}
}

  (1)简单:只有一个listener在监听queue,这样消息只能传到这个队列

  (2)进阶:如果存在多个listener监听这个queue,rabbitmq会优雅的平均分配给listener

  (3)arguments(参数配置)

    x-message-ttl(Time-To-Live):消息存活时间,单位毫秒

    x-expires:队列没有访问超时时,自动删除(包含没有消费的消息),单位毫秒。

    x-max-length:限制队列最大长度(新增后挤出最早的),单位个数。

    x-max-length-bytes :限制队列最大容量

    x-dead-letter-exchange:死信交换机,将删除/过期的数据,放入指定交换机。

    x-dead-letter-routing-key:死信路由,将删除/过期的数据,放入指定routingKey

    x-max-priority:队列优先级。

    x-queue-mode:对列模式,默认lazy(将数据放入磁盘,消费时放入内存)。

    x-queue-master-locator:镜像队列

  2)主题模式/通配符模式(topicExchange)  

  

  声明方式:

/*** 第二种:topic交换机模式(主题模式)*/
@Configuration
public class TopicExchangeConfiguration {@Beanpublic Queue queue1() {return new Queue("queue1", false);}@Beanpublic Queue queue2() {return new Queue("queue2", false);}/*** 声明交换机类型:存在4个参数(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)* 这里的参数基本和queue一样的理解* @return*/@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topic", false, false);}/*** 绑定队列到交换机上面* @return*/@Beanpublic Binding binding1() {return BindingBuilder.bind(queue1()).to(topicExchange()).with("*.topic");}/*** 这里存在两种匹配符* *:代表一个单位的字符(1.topic)* #:代表多个单位的字符(2.2.topic)* @return*/@Beanpublic Binding binding2() {return BindingBuilder.bind(queue2()).to(topicExchange()).with("#.topic");}
}

  通配符:

    *:代表一个单位的字符(1.topic)

    #:代表多个单位的字符(2.2.topic)

  3)直连模式(directExchange)

  

  声明方式:

/*** 第三种:Direct模式(直连模式,默认交换机也是这种类型)*/
@Configuration
public class DirectExchangeConfiguration {@Beanpublic Queue queue3() {return new Queue("queue3", false);}@Beanpublic Queue queue4() {return new Queue("queue4", false);}/*** 参数和topic的交换机类型一样* @return*/@Beanpublic DirectExchange directExchange() {return new DirectExchange("direct", false, false);}@Beanpublic Binding binding3() {return BindingBuilder.bind(queue3()).to(directExchange()).with("direct.3");}@Beanpublic Binding binding4() {return BindingBuilder.bind(queue4()).to(directExchange()).with("direct.4");}
}

  4)发布/订阅模式(fanout模式)

  

  声明方式:

/*** 第四种:fanout模式(发布/订阅模式)*/
@Configuration
public class FanoutExchangeConfiguration {@Beanpublic Queue queue5() {return new Queue("queue5", false);}@Beanpublic Queue queue6() {return new Queue("queue6", false);}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout", false, false);}/*** 这里的绑定不需要routingKey* @return*/@Beanpublic Binding binding5() {return BindingBuilder.bind(queue5()).to(fanoutExchange());}/*** 相比于topic,fanout只能全部发送,topic可以更具匹配规则进行* @return*/@Beanpublic Binding binding6() {return BindingBuilder.bind(queue6()).to(fanoutExchange());}
}

  说明:fanout模式是不需要绑定routingKey,这种方式也是广播形式的主要方式

  5)消息头模式(headers模式)

/*** 第五种:headers模式(消息头模式)*/
@Configuration
public class HeadersExchangeConfiguration {@Beanpublic Queue queue7() {return new Queue("queue7", false);}@Beanpublic Queue queue8() {return new Queue("queue8", false);}@Beanpublic HeadersExchange headersExchange() {return new HeadersExchange("headers", false, false);}/*** 确认header是否存在* @return*/@Beanpublic Binding binding5() {return BindingBuilder.bind(queue7()).to(headersExchange()).where("header").exists();}@Beanpublic Binding binding6() {return BindingBuilder.bind(queue8()).to(headersExchange()).where("header").exists();}
}

  说明:这种方式主要是限定headers,方便通过其他方式携带数据。

  6)rpc:

  

  声明方式(大同小异):

@Configuration
public class RpcConfiguration {@Beanpublic Queue rpc() {return new Queue("rpc", false);}@Beanpublic DirectExchange rpcExchange() {return new DirectExchange("rpcExchange", false, false);}@Beanpublic Binding rpcBinding() {return BindingBuilder.bind(rpc()).to(rpcExchange()).with("rpcRoutingKey");}
}

  lisntener:

@Component
@RabbitListener(queues = "rpc")
public class RpcListener {@RabbitHandlerpublic String rpcListener(String text, Channel channel, Message message) throws IOException {System.out.println("rpcServer:" + text);MessageProperties messageProperties = message.getMessageProperties();channel.basicAck(messageProperties.getDeliveryTag(), false);return "success";}
}

  注意这里是有返回数据的。

  客户端(publish)

  这里推送存在两种方式,同步和异步

  a、同步:主题这里默认超时是5秒,可以通过rabbitTemplate设置setReceiveTimeout超时时间。

     String message = (String) rabbitTemplate.convertSendAndReceive("rpcExchange", "rpcRoutingKey", time);System.out.println("rpcClient:" + message);

  b、异步:

     AsyncRabbitTemplate.RabbitConverterFuture<Object> future =asyncRabbitTemplate.convertSendAndReceive("rpcExchange", "rpcRoutingKey", time);System.out.println("rpcClient:" + future.get());

  注意:AsyncRabbitTemplate是需要手动去配置的。并且需要配置AbstractMessageListenerContainer

  如果没有配置AbstractMessageListenerContainer,则需要配置amq.rabbitmq.reply-to(amq.*需要权限才可以配置

  这里是spring对rabbitmq在源码部分对其进行的判断,如果不理解可以自己跟convertSendAndReceive函数

    @Beanpublic AsyncRabbitTemplate asyncRabbitTemplate(DirectMessageListenerContainer container) {AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate, container);return asyncRabbitTemplate;}@Beanpublic DirectMessageListenerContainer directMessageListenerContainer(ConnectionFactory connectionFactory) {DirectMessageListenerContainer container = new DirectMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("rpc");//这里我改成手动了,但是没有好的方式去获取channel,然后ack.所以我这里使用的自动。
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);//这里可以使用默认的执行器:SimpleAsyncTaskExecutor(但是,这里不是采用的线程池而是直接new Thread)container.setTaskExecutor(new ThreadPoolExecutor(5, 60, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3000)));return container;}

  五、消息发送者

  1)yaml配置

server:port: 9001
spring:rabbitmq:host: 192.168.5.100port: 5672username: guestpassword: guestpublisher-confirms: truepublisher-returns: truetemplate:#参数意义:true当没有合适的queue直接返回到ReturnCallback#         false没有合适的直接丢弃mandatory: true

  2)如果配置了publisher-confirms、publisher-returns为true.并且加入template.mandatory为true。可以配置如下

@Component
public class RabbitmqPublisherConfiguration {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic RabbitTemplate rabbitTemplate() {//1、设置publisher-confirms为true//2、发布确认,只是在exchange范围//3、如果没有exchange,则false.如果过为true,则说明发送到exchange成功rabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {if (ack) {System.out.println("send success");} else {System.out.println("send fail");}});//1、设置publisher-returns为true//2、如果没有发布成功,则将消息返回。当然这只是在接受消息层,不是exchange。rabbitTemplate.setReturnCallback((message, id, reason, exchange, routingKey) -> {StringBuffer buffer = new StringBuffer();buffer.append("----------------------------------------\n");buffer.append("接受消息: {0},失败!\n");buffer.append("消息ID: {1}\n");buffer.append("原因: {2}\n");buffer.append("exchange: {3}\n");buffer.append("routingKey: {4}\n");buffer.append("----------------------------------------");MessageFormat messageFormat = new MessageFormat(buffer.toString());String text = messageFormat.format(new Object[]{new String(message.getBody()), id, reason, exchange, routingKey});System.out.println(text);});return rabbitTemplate;}
}

  a、ConfirmCallback:只是针对exchange,如果消息可以通过exchange,则发送成功。反之则失败

  b、ReturnCallback:这个只是针对于routingKey,是否通过。如果这个routingKey不存在,则将消息返回。反之则发送。

  3)消息发送

@Component
@EnableScheduling
public class RabbitmqPublisher {@Autowiredprivate RabbitTemplate rabbitTemplate;@Scheduled(cron = "0/15 * * * * ?")public void execute() {DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");String time = formatter.format(LocalDateTime.ofInstant(Instant.now(), ZoneId.systemDefault()));//默认rabbitTemplate.convertAndSend("queue", time);//主题模式rabbitTemplate.convertAndSend("topic", "1.topic", time);rabbitTemplate.convertAndSend("topic", "2.2.topic", time);//直连模式rabbitTemplate.convertAndSend("direct", "direct.3", time);rabbitTemplate.convertAndSend("direct", "direct.4", time);//广播模式rabbitTemplate.convertAndSend("fanout", "", time);//headers模式MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("header", "header");messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);Message message = MessageBuilder.withBody(time.getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("headers", "", message);}
}

  六、消息监听者

  1)yaml配置

server:port: 9002
spring:rabbitmq:host: 192.168.5.100port: 5672username: guestpassword: guestlistener:direct:acknowledge-mode: manualsimple:acknowledge-mode: manual

  说明:如果配置acknowledge-mode: manual(手动模式),则需要手动确认消息。如果没有则不需要手动确认,否则会报错。

  需要在每个listener下面加上

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

  listener的对手动对消息的处理方式有3种:Ack、Nack、Reject

  Ack:确认收到消息

  Nack:不确认收到消息

  Reject:拒接消息

  2)listener

@Component
public class RabbitmqListener {//1.默认队列@RabbitListener(queues = "queue")public void queueDouble1(String text, Channel channel, Message message) throws IOException {System.out.println("queueDouble1:" + text);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = "queue")public void queueDouble2(String text, Channel channel, Message message) throws IOException {System.out.println("queueDouble2:" + text);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}//2.主题队列@RabbitListener(queues = "queue1")public void queue1(String text, Channel channel, Message message) throws IOException {System.out.println("queue1:" + text);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = "queue2")public void queue2(String text, Channel channel, Message message) throws IOException {System.out.println("queue2:" + text);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}//3.直连队列@RabbitListener(queues = "queue3")public void queue3(String text, Channel channel, Message message) throws IOException {System.out.println("queue3:" + text);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = "queue4")public void queue4(String text, Channel channel, Message message) throws IOException {System.out.println("queue4:" + text);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}//4.广播队列@RabbitListener(queues = "queue5")public void queue5(String text, Channel channel, Message message) throws IOException {System.out.println("queue5:" + text);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = "queue6")public void queue6(String text, Channel channel, Message message) throws IOException {System.out.println("queue6:" + text);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}//5.消息头队列@RabbitListener(queues = "queue7")public void queue7(String text, Channel channel, Message message) throws IOException {System.out.println("queue7:" + text);System.out.println("header7:" + message.getMessageProperties().getHeaders().get("header"));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = "queue8")public void queue8(String text, Channel channel, Message message) throws IOException {System.out.println("queue8:" + text);System.out.println("header8:" + message.getMessageProperties().getHeaders().get("header"));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

  2)也可以写成,另外一种方式

@Component
@RabbitListener(queues = "queue")
public class RabbitmqHandlerListener {@RabbitHandlerpublic void messageHandler(String text, Channel channel, Message message) throws IOException {System.out.println("queueDouble3:" + text);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

  七、测试

  

  

  1)默认:

  

  均匀的分配到每一个节点

  2)主题(topic):

  

  只要符合规则就接受

  3)直连(direct)

  

  和模式方式一样,一对一。多个均匀分布

  4)广播(fanout)

  

  5)消息头(headers)

  

  八、当然例子也可以参考官网:https://www.rabbitmq.com/getstarted.html

   九、源码:https://github.com/lilin409546297/springboot-rabbitmq

转载于:https://www.cnblogs.com/ll409546297/p/10607420.html

springboot之rabbitmq相关推荐

  1. RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)

    说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...

  2. 九、springboot整合rabbitMQ

    springboot整合rabbitMQ 简介 rabbitMQ是部署最广泛的开源消息代理. rabbitMQ轻量级,易于在内部和云中部署. 它支持多种消息传递协议. RabbitMQ可以部署在分布式 ...

  3. SpringBoot使用RabbitMQ消息队列

    RabbitMQ简介 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的 ...

  4. springboot 集成rabbitmq 实例

    springboot 集成rabbitmq 实例 个人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理 ...

  5. RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ

    什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...

  6. SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka

    1.概念:SpringBoot 整合消息服务2.具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的:· AMQ ...

  7. RabbitMq(九) SpringBoot整合RabbitMQ消费者示例代码

    概述 在上一篇我们介绍了SpringBoot整合RabbitMQ生产者代码,本章我们介绍SpringBoot整合RabbitMQ,实现消费者工程的代码实现.与生产者集成相比,集成消费者不需要进行添加配 ...

  8. RabbitMq(八) SpringBoot整合RabbitMQ 生产者代码实现

    在本章中我们将创建RabbitMQ的生产者工程,并实现生产者端代码实现. springboot整合RabbitMQ生产者工程步骤如下: 创建maven工程 引入springboot及RabbitMQ依 ...

  9. SpringBoot和RabbitMQ集成

    SpringBoot和RabbitMQ的集成: 步骤 自动配置 123456789101112131415161718192021222324252627 @Bean public CachingCo ...

最新文章

  1. c语言知道算法写不出代码,这个代码怎么写算法啊,求教,我真的不会写算法怎么办#incl...
  2. IO流之过滤流介绍:
  3. 笔记-项目范围管理-确认范围与控制范围的区别
  4. 前端开发常用的Chrome插件推荐
  5. 5.关于QT中的网络编程,QTcpSocket,QUdpSocket
  6. SAP UI5 ComponentBase createMetaData signature - why is MD hard coded
  7. CMake使用介绍(2)
  8. luci编程 openwrt_openwrt开源系统LUCI配置界面
  9. CentOS系统根目录下各个目录存放的内容
  10. 微积分28-复合函数与隐函数的微分法
  11. labVIEW学习笔记(三)簇,局部、全局变量
  12. ubuntu下查看CPU/GPU/内存使用率
  13. tcp/ip 协议的传输过程
  14. 售前工程师面试准备工作及经验分享
  15. 【NVMe-MI 1.2a - 1】NVM Express Management Interface介绍
  16. VS中实时获取SVN的版本号并写入到AssemblyInfo.cs中(C#)
  17. 通过QQ邮件发送文档到kindle,kindle收不到的问题
  18. 超融合一体机概述及优势
  19. 000python路--pycharm使用
  20. 敏捷开发之一—笼统的介绍

热门文章

  1. [链接]--Microsoft Dynamics CRM 2011 Web Resource简介
  2. 用户DSN、 系统DSN 、文件DSN
  3. nmealib解析-----(1)
  4. EMC业务连续性和容灾服务
  5. err-disabled
  6. vue1和vue2获取dom元素的方法 及 nextTick() 、$nextTick()
  7. 微服务,我们如何与你相处
  8. 【cocos2d-x从c++到js】13:回调函数2——JSCallbackWrapper
  9. VS2010测试功能之旅:编码的“.NET研究”UI测试(2)-操作动作的录制原理(上)...
  10. winserver 服务开机启动