RibbatMQ

文章目录

  • RibbatMQ
    • 同步通讯和异步通讯:
      • 同步通讯:
      • 异步通讯:
      • 技术对比:
    • RibbatMQ入门:
      • 安装RibbatMQ:
      • RibbatMQ的基本结构:
      • RabbitMQ消息模型:
        • 简单队列模式的模型图:
    • SpringAMQP:
      • AMQP:
      • SpringAMQP提供了三个功能:
      • Basic Queue 简单队列模型:
        • 消息发送:
        • 消息接收:
      • WorkQueue工作队列模型:
        • 能者多劳:
        • 消息发送:
        • 消息接收:
    • 发布和订阅:
      • Fanout:广播
        • 声明队列和交换机:
        • 消息发送:
        • 消息接收:
      • Direct:路由
        • 声明队列和交换机:
        • 消息发送:
        • 消息接收:
      • Topic:话题
        • 消息发送:
        • 消息接收:
    • 消息转换器:
      • 配置JSON转换器:

同步通讯和异步通讯:

同步通讯:

  • 同步通讯需要实时回复
  • 优点:
    • 时效性强,可以立即得到结果
  • 缺点:
    • 耦合性高
    • 性能和吞吐能力下降
    • 有额外的资源消耗
    • 有级联失败问题

异步通讯:

  • 事件发布者(publisher)发布事件,事件订阅者(Consumer)接收消息,无需及时回复。
  • 为了解决事假发布者和事件订阅者之间的耦合,两方不是直接通信,而是引入一个中间人(Broker),发布者将消息发布到broker,不需要等待回复,订阅者从broker订阅事件,不需要知道是谁发的
  • 优点:
    • 吞吐量提升:无需等待订阅者处理完成,响应更快速
    • 故障隔离:服务没有直接调用,不存在级联失败问题
    • 调用间没有阻塞,不会造成无效的资源占用
    • 耦合度极低,每个服务都可以灵活插拔,可替换
    • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
  • 缺点:
    • 架构复杂了,业务没有明显的流程线,不好管理
    • 需要依赖于Broker的可靠、安全、性能

技术对比:

  • MQ,中文就是消息队列(MessageQueue),字面意思就是存放消息的队列,也就是事件驱动架构中的broker

  • 比较常见的MQ实现:

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 几种常见MQ的对比:

    RabbitMQ ActiveMQ RocketMQ Kafka
    公司/社区 Rabbit Apache 阿里 Apache
    开发语言 Erlang Java Java Scala&Java
    协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
    可用性 一般
    单机吞吐量 一般 非常高
    消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
    消息可靠性 一般 一般
  • 追求可用性:Kafka、 RocketMQ 、RabbitMQ

    追求可靠性:RabbitMQ、RocketMQ

    追求吞吐能力:RocketMQ、Kafka

    追求消息低延迟:RabbitMQ、Kafka

RibbatMQ入门:

安装RibbatMQ:

  • 单机部署

    • 下载镜像:

      docker pull rabbitmq:3.8-management
      
    • 安装MQ:

      docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management
      

RibbatMQ的基本结构:

  • RibbatMQ中的一些角色:

    • publisher:生产者
    • consumer:消费者
    • exchange个:交换机,负责消息路由
    • queue:队列,存储消息
    • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

RabbitMQ消息模型:

  • RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型

简单队列模式的模型图:

  • 官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

    • publisher:消息发布者,将消息发送到队列queue
    • queue:消息队列,负责接受并缓存消息
    • consumer:订阅队列,处理队列中的消息
  • 实现publisher

    • 建立连接

             //建立连接ConnectionFactory factory = new ConnectionFactory();//设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");Connection connection = factory.newConnection();
      
    • 创建Channel

              //创建通道ChannelChannel channel = connection.createChannel();
      
    • 声明队列

              //创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);
      
    • 发送消息

              //发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");
      
    • 关闭连接和channel

              //关闭通道和连接channel.close();connection.close();
      
  • 实现consumer

    • 建立连接

              // 建立连接ConnectionFactory factory = new ConnectionFactory();//设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");Connection connection = factory.newConnection();
      
    • 创建Channel

              //创建通道ChannelChannel channel = connection.createChannel();
      
    • 声明队列

              //创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);
      
    • 订阅消息

              //订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {//处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");
      
  • 总结:

    • 基本消息队列的消息发送流程:

      • 建立connection

      • 创建channel

      • 利用channel声明队列

      • 利用channel向队列发送消息

    • 基本消息队列的消息接收流程:

      • 建立connection

      • 创建channel

      • 利用channel声明队列

      • 定义consumer的消费行为handleDelivery()

      • 利用channel将消费者与队列绑定

SpringAMQP:

  • 仅仅一个简单队列模式,写着就如此复杂,所以我们可以使用SpringAMOP来进行

  • SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

    SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

AMQP:

  • Advanced Message Queuing Protocol ,是用于在应用程序之间传递业务消息的开放标准,该协议与语言无关,更符合微服务中独立性的要求

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

Basic Queue 简单队列模型:

  • 引入依赖:
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

消息发送:

  • 在配置文件中添加配置:
spring:rabbitmq:host: 192.168.150.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: root # 用户名password: 123456 # 密码
  • 利用RabbitTemplate发送消息
@Autowired
private RabbitTemplate rabbitTemplate;@Test
public void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);
}

消息接收:

  • 在配置文件中配置:
spring:rabbitmq:host: 192.168.150.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: root # 用户名password: 123456 # 密码
  • 新建一个类SpringRabbitListener
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

WorkQueue工作队列模型:

  • 简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息
  • 同一条消息只会被一个消费者处理,阅后即焚。

  • 当发送消息的速度远远大于消费速度,长此以往会有大量的消息无法解决,此时就可以使用工作队列

能者多劳:

  • 如果没有配置如下代码,两个消费者就会预取,均分两个消息,如果消费者1能力不行,一秒只能处理5条,消费者2一秒可以处理50条,就会出现消费者2早早处理完,在哪儿等着消息来,消费者1还有大把的消息没有处理,浪费资源,时间长了可能会出现栈溢出问题。
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

消息发送:

  • 没有新的API,和简单队列模型一样

消息接收:

  • 没有新的API,只是多创建了两个消费者来绑定同一个队列。

发布和订阅:

  • 可以看到,在订阅模型中,多了一个exchange,过程也有略微变化。

    • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
      • Fanout:广播,将消息交给所有绑定到交换机的队列
      • Direct:定向,把消息交给符合指定routing key 的队列
      • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
    • Consumer:消费者,与以前一样,订阅队列,没有变化
    • Queue:消息队列也与以前一样,接收消息、缓存消息。
  • 注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

  • 交换机的作用是什么?

    • 接收publisher发送的消息
    • 将消息按照规则路由到与之绑定的队列
    • 不能缓存消息,路由失败,消息丢失
    • FanoutExchange的会将消息路由到每个绑定的队列
  • 声明队列、交换机、绑定关系的Bean是什么?

    • Queue
    • FanoutExchange
    • Binding

Fanout:广播

  • 在广播模式下,消息发送流程是这样的:

    • 可以有多个队列
    • 每个队列都要绑定到Exchange(交换机)
    • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
    • 交换机把消息发送给绑定过的所有队列
    • 订阅队列的消费者都能拿到消息
  • 在广播模式下,交换机会将消息转发给所有绑定的队列

声明队列和交换机:

  • 在消费者创建一个config类,用来声明交换机和队列

  • @Configuration
    public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
    }
    

消息发送:

  •     // 队列名称String exchangeName = "itcast.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
    

消息接收:

  • @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
    }
    

Direct:路由

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

声明队列和交换机:

  • 用配置类来声明略显麻烦,spring还提供了注解方式来声明,在消费者中的SpringRabbitListener中添加消费者时,同事用注解来声明队列和交换机
//消费者1
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
//消费者2
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

消息发送:

  • 与简单队列模型发送消息差不多,代码如下:
    // 交换机名称String exchangeName = "itcast.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
//在发送时,需要指定一个key

消息接收:

  • 在声明队列和交换机的时候就写好了

Topic:话题

  • 话题的key可以由多个单词组成,只有一个key,用.分割

  • 通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

消息发送:

  •     // 交换机名称String exchangeName = "itcast.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    

消息接收:

  • //消费者1
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"
    ))
    public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }
    //消费者2
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"
    ))
    public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }
    

消息转换器:

  • Spring会把发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

  • 不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

    • 数据体积过大
    • 有安全漏洞
    • 可读性差

配置JSON转换器:

  • 在publisher和consumer两个服务中都引入依赖:

  • <dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
    </dependency>
    
  • 在启动类中添加一个Bean即可:

  • @Bean
    public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
    }
    

RibbatMQ(详解)相关推荐

  1. 从命令行到IDE,版本管理工具Git详解(远程仓库创建+命令行讲解+IDEA集成使用)

    首先,Git已经并不只是GitHub,而是所有基于Git的平台,只要在你的电脑上面下载了Git,你就可以通过Git去管理"基于Git的平台"上的代码,常用的平台有GitHub.Gi ...

  2. JVM年轻代,老年代,永久代详解​​​​​​​

    秉承不重复造轮子的原则,查看印象笔记分享连接↓↓↓↓ 传送门:JVM年轻代,老年代,永久代详解 速读摘要 最近被问到了这个问题,解释的不是很清晰,有一些概念略微模糊,在此进行整理和记录,分享给大家.在 ...

  3. docker常用命令详解

    docker常用命令详解 本文只记录docker命令在大部分情境下的使用,如果想了解每一个选项的细节,请参考官方文档,这里只作为自己以后的备忘记录下来. 根据自己的理解,总的来说分为以下几种: Doc ...

  4. 通俗易懂word2vec详解词嵌入-深度学习

    https://blog.csdn.net/just_so_so_fnc/article/details/103304995 skip-gram 原理没看完 https://blog.csdn.net ...

  5. 深度学习优化函数详解(5)-- Nesterov accelerated gradient (NAG) 优化算法

    深度学习优化函数详解系列目录 深度学习优化函数详解(0)– 线性回归问题 深度学习优化函数详解(1)– Gradient Descent 梯度下降法 深度学习优化函数详解(2)– SGD 随机梯度下降 ...

  6. CUDA之nvidia-smi命令详解---gpu

    nvidia-smi是用来查看GPU使用情况的.我常用这个命令判断哪几块GPU空闲,但是最近的GPU使用状态让我很困惑,于是把nvidia-smi命令显示的GPU使用表中各个内容的具体含义解释一下. ...

  7. Bert代码详解(一)重点详细

    这是bert的pytorch版本(与tensorflow一样的,这个更简单些,这个看懂了,tf也能看懂),地址:https://github.com/huggingface/pytorch-pretr ...

  8. CRF(条件随机场)与Viterbi(维特比)算法原理详解

    摘自:https://mp.weixin.qq.com/s/GXbFxlExDtjtQe-OPwfokA https://www.cnblogs.com/zhibei/p/9391014.html C ...

  9. pytorch nn.LSTM()参数详解

    输入数据格式: input(seq_len, batch, input_size) h0(num_layers * num_directions, batch, hidden_size) c0(num ...

最新文章

  1. 为什么我的DevOps落地过程跟别人不一样?
  2. 面试官:说说什么是Java内存模型?
  3. 《是男人就下100层》真的有隐藏剧情!B站up主数月破解,原作者点赞致谢
  4. Python基础——PyCharm版本——第三章、数据类型和变量(超详细)
  5. FineReport——获取控件值和单元格值
  6. 信息学奥赛一本通(1020:打印ASCII码)
  7. Spark 2.1.0集成CarbonData 1.1.0
  8. mysql for rhel7_MySQL5.7.18 for Linux7.2(二进制安装)
  9. android viewdraghelper 点击移动,ViewDragHelper使用时遇到的问题
  10. 【iCore4 双核心板_ARM】例程十四:FATFS实验——文件操作
  11. 解析AMD品牌的中国元素
  12. 增程式串联混合动力实际项目模型,本模型基于Cruise软件和Simulink软件共同搭建完成,并实际应用,本资料包包含所有源文件
  13. macOS Monterey 12.0 Beta版 With Clover 5136 and OC 0.7.0 and PE 三EFI分区原版黑苹果镜像
  14. 《码出高效:Java 开发手册》“码” 出高效的同时编写出高质量的代“码”。PDF文档资料免费开放下载!
  15. ZIGBEE通过协议栈点对点通信流程
  16. 电影文件的合并与分割
  17. 软件工程中国大学慕课mooc北京大学 答案
  18. 3D世界 ORGE SceneManager GetStart
  19. 听说 TCC 不支持 OpenFeign?这个坑松哥必须给大家填了
  20. C++ 中的隐含 *this

热门文章

  1. sleep()和wait()的区别
  2. EmbeddedServletContainerException: Unable to start embedded Tomcat 内嵌Tomcat启动失败
  3. ie打开后不是主页(转载)
  4. Python联动Excel入门教程(3--数据处理)
  5. Linux笔记2_Linux图形界面简介
  6. 看楼房 | C++ | 栈
  7. Python | 用相关系数进行Kmeans聚类,利用利润率、打折率、销售额、毛利润得到商品价格弹性标签,建立价格折扣力度模型
  8. dumpbin的使用方法_DUMPBIN命令使用详解
  9. dumpbin的使用方法_[Windows]_[中级]_[使用命令行工具dumpbin分析文件]
  10. 【Debian11】win10+VMware16安装linux虚拟机踩过的坑