https://www.jianshu.com/p/a5f7fce67803

目录

  • RabbitMQ 概念
  • exchange交换机机制
    • 什么是交换机
    • binding?
    • Direct Exchange交换机
    • Topic Exchange交换机
    • Fanout Exchange交换机
    • Header Exchange交换机
  • RabbitMQ 的 Hello - Demo(springboot实现)
  • RabbitMQ 的 Hello Demo(spring xml实现)
  • RabbitMQ 在生产环境下运用和出现的问题
    • Spring RabbitMQ 注解
    • 消息的 JSON 传输
    • 消息持久化,断线重连,ACK。

RabbitMQ 概念

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。RabbitMQ使用的是AMQP协议,它是一种二进制协议。默认启动端口 5672。

在 RabbitMQ 中,如下图结构:

rabbitmq

  • 左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。
  • 中间即是 RabbitMQ,其中包括了 交换机 和 队列。
  • 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。

那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。

  • 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。
  • 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。
    • 这里有一个比较重要的概念:***路由键 *** 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
  • 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

exchange交换机机制

什么是交换机

rabbitmq的message model实际上消息不直接发送到queue中,中间有一个exchange是做消息分发,producer甚至不知道消息发送到那个队列中去。因此,当exchange收到message时,必须准确知道该如何分发。是append到一定规则的queue,还是append到多个queue中,还是被丢弃?这些规则都是通过exchagne的4种type去定义的。

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

exchange是一个消息的agent,每一个虚拟的host中都有定义。它的职责是把message路由到不同的queue中。

binding?

exchange和queue通过routing-key关联,这两者之间的关系是就是binding。如下图所示,X表示交换机,红色表示队列,交换机通过一个routing-key去binding一个queue,routing-key有什么作用呢?看Direct exchange类型交换机。

Directed Exchange

路由键exchange,该交换机收到消息后会把消息发送到指定routing-key的queue中。那消息交换机是怎么知道的呢?其实,producer deliver消息的时候会把routing-key add到 message header中。routing-key只是一个messgae的attribute。

A direct exchange delivers messages to queues based on a message routing key. The routing key is a message attribute added into the message header by the producer. The routing key can be seen as an "address" that the exchange use to decide how to route the message. A message goes to the queue(s) whose binding key exactly matches the routing key of the message.

Default Exchange
这种是特殊的Direct Exchange,是rabbitmq内部默认的一个交换机。该交换机的name是空字符串,所有queue都默认binding 到该交换机上。所有binding到该交换机上的queue,routing-key都和queue的name一样。

Topic Exchange

通配符交换机,exchange会把消息发送到一个或者多个满足通配符规则的routing-key的queue。其中*表号匹配一个word,#匹配多个word和路径,路径之间通过.隔开。如满足a.*.c的routing-key有a.hello.c;满足#.hello的routing-key有a.b.c.helo。

Fanout Exchange

扇形交换机,该交换机会把消息发送到所有binding到该交换机上的queue。这种是publisher/subcribe模式。用来做广播最好。
所有该exchagne上指定的routing-key都会被ignore掉。

The fanout copies and routes a received message to all queues that are bound to it regardless of routing keys or pattern matching as with direct and topic exchanges. Keys provided will simply be ignored.

Header Exchange

设置header attribute参数类型的交换机。

RabbitMQ 的 Hello Demo

安装就不说了,建议按照官方文档上做。先贴代码,稍后解释,代码如下:

配置 交换机,队列,交换机与队列的绑定,消息监视容器:

@Configuration
@Data
public class RabbitMQConfig {final static String queueName = "spring-boot";@BeanQueue queue() {return new Queue(queueName, false);}@BeanTopicExchange exchange() {return new TopicExchange("spring-boot-exchange");}@BeanBinding binding(Queue queue, TopicExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(queueName);}@BeanSimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(queueName);container.setMessageListener(listenerAdapter);return container;}@BeanReceiver receiver() {return new Receiver();}@BeanMessageListenerAdapter listenerAdapter(Receiver receiver) {return new MessageListenerAdapter(receiver, "receiveMessage");}
}

配置接收信息者(即消费者):

public class Receiver {private CountDownLatch latch = new CountDownLatch(1);public void receiveMessage(String message) {System.out.println("Received <" + message + ">");latch.countDown();}public CountDownLatch getLatch() {return latch;}
}

配置发送信息者(即生产者):

@RestController
public class Test {@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping(value = "/test/{abc}",method = RequestMethod.GET)public String test(@PathVariable(value = "abc") String abc){rabbitTemplate.convertAndSend("spring-boot", abc + " from RabbitMQ!");return  "abc";}
}

以上便可实现一个简单的 RabbitMQ Demo,具体代码在:点这里

那么,这里,分为三个部分分析:发消息,交换机队列,收消息。

  • 对于发送消息:我们一般可以使用 RabbitTemplate,这个是 Spring 封装给了我们,便于我们发送信息,我们调用 rabbitTemplate.convertAndSend("spring-boot", xxx); 即可发送信息。
  • 对于交换机队列:如上代码,我们需要配置交换机 TopicExchange,配置队列 Queue,并且配置他们之间的绑定 Binding
  • 对于接受消息:首先需要创建一个消息监听容器,然后把我们的接受者注册到该容器中,这样,队列中有信息,那么就会调用接收者的对应的方法。如上代码 container.setMessageListener(listenerAdapter); 其中,MessageListenerAdapter 可以看做是 我们接收者的一个包装类,new MessageListenerAdapter(receiver, "receiveMessage"); 指明了如果有消息来,那么调用接收者哪个方法进行处理。

RabbitMQ 的 Hello Demo(spring xml实现)

spring xml方式实现RabbitMQ简单,可读性较好,配置简单,配置和实现如下所示。

上文已经讲述了rabbitmq的配置,xml方式通过properites文件存放用户配置信息:

mq.host=127.0.0.1
mq.username=guest
mq.password=guest
mq.port=5672

配置application-mq.xml配置文件,声明连接、交换机、queue以及consumer监听。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" ><description>rabbitmq 连接服务配置</description><!-- 连接配置 --><context:property-placeholder location="classpath:mq.properties" /><rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"/><rabbit:admin connection-factory="connectionFactory"/><!-- spring template声明--><rabbit:template exchange="amqpExchange" id="amqpTemplate"  connection-factory="connectionFactory" /><!--申明queue--><rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" /><!--申明exchange交换机并绑定queue--><rabbit:direct-exchange name="amqpExchange" durable="true" auto-delete="false" id="amqpExchange"><rabbit:bindings><rabbit:binding queue="test_queue_key" key="test_queue_key"/></rabbit:bindings></rabbit:direct-exchange><!--consumer配置监听--><bean id="reveiver" class="com.demo.mq.receive.Reveiver" /><rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"><rabbit:listener queues="test_queue_key" ref="reveiver" method="receiveMessage"/></rabbit:listener-container>
</beans>

上述代码中,引入properties文件就不多说了。

<rabbit:connection-factory>标签声明创建connection的factory工厂。

<rabbit-template>声明spring template,和上文spring中使用template一样。template可声明exchange。

<rabbit:queue>声明一个queue并设置queue的配置项,直接看标签属性就可以明白queue的配置项。

<rabbit:direct-exchange>声明交换机并绑定queue。

<rabbit:listener-container>申明监听container并配置consumer和监听routing-key。

剩下就简单了,application-context.xml中把rabbitmq配置import进去。

<?xml version="1.0" encoding="UTF-8"?>
<beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:task="http://www.springframework.org/schema/task"xmlns:context="http://www.springframework.org/schema/context"xmlns:aop="http://www.springframework.org/schema/aop"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd"><context:component-scan base-package="com.demo.**" /><import resource="application-mq.xml" />
</beans>

Producer实现,发送消息还是使用template的convertAndSend() deliver消息。

@Service
public class Producer {@Autowiredprivate AmqpTemplate amqpTemplate;private final static Logger logger = LoggerFactory.getLogger(Producer.class);public void sendDataToQueue(String queueKey, Object object) {try {amqpTemplate.convertAndSend(queueKey, object);} catch (Exception e) {e.printStackTrace();logger.error("exeception={}",e);}}
}

配置consumer

package com.demo.mq.receive;import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;@Service
public class Reveiver {private CountDownLatch latch = new CountDownLatch(1);public void receiveMessage(String message) {System.out.println("reveice msg=" + message.toString());latch.countDown();}
}

测试deliver消息

Controller
@RequestMapping("/demo/")
public class TestController {private final static Logger logger = LoggerFactory.getLogger(TestController.class);@Resourceprivate Producer producer;@RequestMapping("/test/{msg}")public String send(@PathVariable("msg") String msg){logger.info("#TestController.send#abc={msg}", msg);System.out.println("msg="+msg);producer.sendDataToQueue("test_queue_key",msg);return "index";}
}

RabbitMQ 在生产环境下运用和出现的问题

在生产环境中,由于 Spring 对 RabbitMQ 提供了一些方便的注解,所以首先可以使用这些注解。例如:

  • @EnableRabbit:@EnableRabbit 和 @Configuration 注解在一个类中结合使用,如果该类能够返回一个 RabbitListenerContainerFactory 类型的 bean,那么就相当于能够把该终端(消费端)和 RabbitMQ 进行连接。Ps:(生成端不是通过 RabbitListenerContainerFactory 来和 RabbitMQ 连接,而是通过 RabbitTemplate )
  • @RabbitListener:当对应的队列中有消息的时候,该注解修饰下的方法会被执行。
  • @RabbitHandler:接收者可以监听多个队列,不同的队列消息的类型可能不同,该注解可以使得不同的消息让不同方法来响应。

具体这些注解的使用,可以参考这里的代码:点这里

首先,生产环境下的 RabbitMQ 可能不会在生产者或者消费者本机上,所以需要重新定义 ConnectionFactory,即:

@Bean
ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);connectionFactory.setUsername(userName);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(vhost);return connectionFactory;
}

这里,可以重新设置需要连接的 RabbitMQ 的 ip,端口,虚拟主机,用户名,密码。

然后,可以先从生产端考虑,生产端需要连接 RabbitMQ,那么可以通过 RabbitTemplate 进行连接。 Ps:(RabbitTemplate 用于生产端发送消息到交换机中),如下代码:

@Bean(name="myTemplate")
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMessageConverter(integrationEventMessageConverter());template.setExchange(exchangeName);return template;
}

在该代码中,new RabbitTemplate(connectionFactory); 设置了生产端连接到RabbitMQ,template.setMessageConverter(integrationEventMessageConverter()); 设置了 生产端发送给交换机的消息是以什么格式的,在 integrationEventMessageConverter() 代码中:

public MessageConverter integrationEventMessageConverter() {Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();return messageConverter;
}

如上 Jackson2JsonMessageConverter 指明了 JSON。上述代码的最后 template.setExchange(exchangeName); 指明了 要把生产者要把消息发送到哪个交换机上。

有了上述,那么,我们即可使用 rabbitTemplate.convertAndSend("spring-boot", xxx); 发送消息,xxx 表示任意类型,因为上述的设置会帮我们把这些类型转化成 JSON 传输。

接着,生产端发送我们说过了,那么现在可以看看消费端:

对于消费端,我们可以只创建 SimpleRabbitListenerContainerFactory,它能够帮我们生成 RabbitListenerContainer,然后我们再使用 @RabbitListener 指定接收者收到信息时处理的方法。

@Bean(name="myListenContainer")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setMessageConverter(integrationEventMessageConverter());factory.setConnectionFactory(connectionFactory());return factory;
}

这其中 factory.setMessageConverter(integrationEventMessageConverter()); 指定了我们接受消息的时候,以 JSON 传输的消息可以转换成对应的类型传入到方法中。例如:

@Slf4j
@Component
@RabbitListener(containerFactory = "helloRabbitListenerContainer",queues = "spring-boot")
public class Receiver {@RabbitHandlerpublic void receiveTeacher(Teacher teacher) {log.info("##### = {}",teacher);}
}

可能出现的问题:

消息持久化

在生产环境中,我们需要考虑万一生产者挂了,消费者挂了,或者 rabbitmq 挂了怎么样。一般来说,如果生产者挂了或者消费者挂了,其实是没有影响,因为消息就在队列里面。那么万一 rabbitmq 挂了,之前在队列里面的消息怎么办,其实可以做消息持久化,RabbitMQ 会把信息保存在磁盘上。

做法是可以先从 Connection 对象中拿到一个 Channel 信道对象,然后再可以通过该对象设置 消息持久化。

生产者或者消费者断线重连

这里 Spring 有自动重连机制。

ACK 确认机制

每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么 非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完 成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。

如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了(注意是这种情况下)。
为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。

在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。
如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。

个人对 RabbitMQ ACK 的一些疑问,求助:点这里

总结

  1. RabbitMQ 作用:异步,解耦,缓冲,消息分发。
  2. RabbitMQ 主要分为3个部分,生产者,交换机和队列,消费者。
  3. 需要注意消息持久化,目的为了防止 RabbitMQ 宕机;考虑 ACK 机制,目的为了如果消费者对消息的处理失败了,那么后续要如何处理。

写在最后

  1. 写出来,说出来才知道对不对,知道不对才能改正,改正了才能成长。
  2. 在技术方面,希望大家眼里都容不得沙子。如果有不对的地方或者需要改进的地方希望可以指出,万分感谢

作者:Mooner_guo链接:https://www.jianshu.com/p/a5f7fce67803來源:简书著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

RabbitMq(一)走进RabbitMq相关推荐

  1. 一篇小文带你走进RabbitMQ的世界

    云栖号资讯:[点击查看更多行业资讯] 在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! 说到消息中间件,大部分人的第一印象可能是Kafka.毕竟Kafka自问世以来,就顶着高并发,大流量 ...

  2. RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ

    什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...

  3. RabbitMQ(1) - win+rabbitMQ

    rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统.它遵循Mozilla Public License开源协议,采用 Erlang 实现的工业级的消息队列(MQ)服务器,Rab ...

  4. 【消息队列之rabbitmq】学习RabbitMQ必备品之一

    目录 一.基础知识 二.Rabbitmq消息发送模式 1.简单队列 2.工作队列 3.发布/订阅 4.路由模式 5.主题模式 三.RabbitMQ交换机类型 1.Direct exchange 2.F ...

  5. Github开源:Sheng.RabbitMQ.CommandExecuter (RabbitMQ 的命令模式实现)

    [Github]:https://github.com/iccb1013/Sheng.RabbitMQ.CommandExecuter 引用请注明原文出处: http://sheng.city/pos ...

  6. rabbitmq基础2——rabbitmq二进制安装和docker安装、基础命令

    文章目录 一.RabbitMQ安装 1.1 二进制安装 1.2 rabbitmqctl工具 1.3 docker安装 二.rabbitmq基础命令 2.1 多租户与权限类 2.1.1 创建虚拟主机 2 ...

  7. rabbitmq 限制速度_=(:) RabbitMQ详解

    本篇包含了RabbitMQ概念的一些东西,下篇会整理出SpringBoot结合RabbitMQ的使用案例. 目录 一.MQ概述 1.什么是消息 2.什么是消息队列 3.MQ的特点 二.MQ适用场景 1 ...

  8. Ubuntu安装rabbitMQ及单机版rabbitMQ集群配置

    安装过程 1.sudo vim /etc/apt/sources.list 打开sources.list添加下面的内容在结尾处 deb http://www.rabbitmq.com/debian/ ...

  9. RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列

    搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...

  10. rabbitmq python_Python操作RabbitMQ服务器实现消息队列的路由功能

    Python使用Pika库(安装:sudo pip install pika)可以操作RabbitMQ消息队列服务器(安装:sudo apt-get install rabbitmq-server), ...

最新文章

  1. ThreadLocal为什么要使用弱引用和内存泄露问题
  2. 虚拟在左,真实在右:德国学者用AI合成一亿像素逼真3D图像,可任意旋转
  3. easyui 消息框按钮文字修改
  4. Web 设计中的 5 个最具争议性的话题
  5. Vuex 入门前的小菜 - “Vue 中的简单状态共享机制实现”
  6. 添加javascript代码:_JavaScript(1)
  7. 英语阅读理解关于计算机,一篇摘选的关于计算机的英语阅读材料,对大家的英语也许会有提高!...
  8. 腾讯云与阿里云竞争激烈:销售团队积极争取每一笔交易
  9. 如何在Timeline中创建自定义轨道?
  10. 【记录贴】cs231n课程作业一遇到问题总结
  11. jQuery 选择器 (基础恶补之二)
  12. 组态软件mcgs入库mysql_MCGS组态软件实现数据报表
  13. python运行快捷键是什么_Python快捷键
  14. 2018最新Jrebel激活服务,Jrebel激活,Jrebel激活码,Jrebel破解
  15. 蘑菇租房爆雷,房东围堵总部,CEO凌晨发公告
  16. 每日一题【62】导数-公切线问题
  17. 最详细的知识图谱的技术与应用
  18. Google Dremel 原理 - 如何能 3 秒分析 1PB
  19. 测试无法测试的几乎苹果api实时搜索示例
  20. 【原创】【SPI】SPI通信协议介绍

热门文章

  1. MacCleaner Pro 2.4更新(支持最新M1处理器mac)
  2. CrossOver for Mac 怎么用?
  3. C语言项目2:图书管理系统
  4. 关于“客户感知价值提升”的思考(三)---电商渠道客户感知管理方法探讨
  5. C# this关键字(给底层类库扩展成员方法)
  6. Ambari Server 架构
  7. 打造自己的VC++ 6.0免安装简版
  8. Illustrator 教程,如何在 Illustrator 中使用绘图模式?
  9. Pr教程,Premiere Pro 中常用视频格式
  10. 苹果Mac 3D 模型展开工具:Unfolder