文章目录

  • 一、异步与同步
    • 1.1 同步通讯与异步通讯
    • 1.2 同步调用的问题
    • 1.3 异步调用方案
  • 二、MQ消息队列
    • 2.1 单机部署MQ
    • 2.2 结构和概念
    • 2.3 常见的消息模型
  • 三、SpringAMQP
    • 3.1 用非自动装配的方式使用消息队列
    • 3.2 SpringAMQP介绍
    • 3.3 基础消息队列功能使用
    • 3.4 工作队列的配置
    • 3.5 发布与订阅模式
      • 3.5.1 SpringAMQP交换机类
      • 3.5.2 Fanout Exchange
      • 3.5.3 DirectExchange
      • 3.5.4 TopicExchange
    • 3.6 消息转换器
  • 参考文献

一、异步与同步

1.1 同步通讯与异步通讯

  • 同步通讯:时效性强。比如视频电话,实时传到对方,同时对方出回应。
  • 异步通讯:比如网络聊天,非实时反馈的,不会立即得到结果,可以之后再回复。

1.2 同步调用的问题

微服务基于Feign的调用就是同步的方式。

以购物场景为例

#mermaid-svg-b39EPuvR0OVO7mDj {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-b39EPuvR0OVO7mDj .error-icon{fill:#552222;}#mermaid-svg-b39EPuvR0OVO7mDj .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-b39EPuvR0OVO7mDj .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-b39EPuvR0OVO7mDj .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-b39EPuvR0OVO7mDj .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-b39EPuvR0OVO7mDj .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-b39EPuvR0OVO7mDj .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-b39EPuvR0OVO7mDj .marker{fill:#333333;stroke:#333333;}#mermaid-svg-b39EPuvR0OVO7mDj .marker.cross{stroke:#333333;}#mermaid-svg-b39EPuvR0OVO7mDj svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-b39EPuvR0OVO7mDj .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-b39EPuvR0OVO7mDj .cluster-label text{fill:#333;}#mermaid-svg-b39EPuvR0OVO7mDj .cluster-label span{color:#333;}#mermaid-svg-b39EPuvR0OVO7mDj .label text,#mermaid-svg-b39EPuvR0OVO7mDj span{fill:#333;color:#333;}#mermaid-svg-b39EPuvR0OVO7mDj .node rect,#mermaid-svg-b39EPuvR0OVO7mDj .node circle,#mermaid-svg-b39EPuvR0OVO7mDj .node ellipse,#mermaid-svg-b39EPuvR0OVO7mDj .node polygon,#mermaid-svg-b39EPuvR0OVO7mDj .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-b39EPuvR0OVO7mDj .node .label{text-align:center;}#mermaid-svg-b39EPuvR0OVO7mDj .node.clickable{cursor:pointer;}#mermaid-svg-b39EPuvR0OVO7mDj .arrowheadPath{fill:#333333;}#mermaid-svg-b39EPuvR0OVO7mDj .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-b39EPuvR0OVO7mDj .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-b39EPuvR0OVO7mDj .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-b39EPuvR0OVO7mDj .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-b39EPuvR0OVO7mDj .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-b39EPuvR0OVO7mDj .cluster text{fill:#333;}#mermaid-svg-b39EPuvR0OVO7mDj .cluster span{color:#333;}#mermaid-svg-b39EPuvR0OVO7mDj div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-b39EPuvR0OVO7mDj :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}

调用
调用
支付
支付服务
订单服务
仓储服务

但是如果要加业务就需要为支付服务加业务,改动其代码,耦合度高。

#mermaid-svg-4zODhHP8i33y35nu {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-4zODhHP8i33y35nu .error-icon{fill:#552222;}#mermaid-svg-4zODhHP8i33y35nu .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-4zODhHP8i33y35nu .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-4zODhHP8i33y35nu .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-4zODhHP8i33y35nu .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-4zODhHP8i33y35nu .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-4zODhHP8i33y35nu .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-4zODhHP8i33y35nu .marker{fill:#333333;stroke:#333333;}#mermaid-svg-4zODhHP8i33y35nu .marker.cross{stroke:#333333;}#mermaid-svg-4zODhHP8i33y35nu svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-4zODhHP8i33y35nu .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-4zODhHP8i33y35nu .cluster-label text{fill:#333;}#mermaid-svg-4zODhHP8i33y35nu .cluster-label span{color:#333;}#mermaid-svg-4zODhHP8i33y35nu .label text,#mermaid-svg-4zODhHP8i33y35nu span{fill:#333;color:#333;}#mermaid-svg-4zODhHP8i33y35nu .node rect,#mermaid-svg-4zODhHP8i33y35nu .node circle,#mermaid-svg-4zODhHP8i33y35nu .node ellipse,#mermaid-svg-4zODhHP8i33y35nu .node polygon,#mermaid-svg-4zODhHP8i33y35nu .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-4zODhHP8i33y35nu .node .label{text-align:center;}#mermaid-svg-4zODhHP8i33y35nu .node.clickable{cursor:pointer;}#mermaid-svg-4zODhHP8i33y35nu .arrowheadPath{fill:#333333;}#mermaid-svg-4zODhHP8i33y35nu .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-4zODhHP8i33y35nu .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-4zODhHP8i33y35nu .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-4zODhHP8i33y35nu .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-4zODhHP8i33y35nu .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-4zODhHP8i33y35nu .cluster text{fill:#333;}#mermaid-svg-4zODhHP8i33y35nu .cluster span{color:#333;}#mermaid-svg-4zODhHP8i33y35nu div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-4zODhHP8i33y35nu :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}

调用
调用
调用
调用
支付
支付服务
订单服务
仓储服务
短信服务
...

同时,同步调用,要等待服务结束后,在进行下一个服务。支付总耗时,是支付服务依次调用服务耗时的时间和,耗时过长。

此外,如果仓储服务挂掉了,支付服务就会被卡在那里。当过多的支付服务都卡在那里,于是资源耗尽,支付服务也挂掉了。

问题:

  • 耦合度高
  • 性能下降
  • 资源浪费
  • 级联失败

1.3 异步调用方案

异步调用常见的就是事件驱动模式

#mermaid-svg-qDcn2rmvqmKcyCjb {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-qDcn2rmvqmKcyCjb .error-icon{fill:#552222;}#mermaid-svg-qDcn2rmvqmKcyCjb .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-qDcn2rmvqmKcyCjb .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-qDcn2rmvqmKcyCjb .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-qDcn2rmvqmKcyCjb .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-qDcn2rmvqmKcyCjb .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-qDcn2rmvqmKcyCjb .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-qDcn2rmvqmKcyCjb .marker{fill:#333333;stroke:#333333;}#mermaid-svg-qDcn2rmvqmKcyCjb .marker.cross{stroke:#333333;}#mermaid-svg-qDcn2rmvqmKcyCjb svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-qDcn2rmvqmKcyCjb .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-qDcn2rmvqmKcyCjb .cluster-label text{fill:#333;}#mermaid-svg-qDcn2rmvqmKcyCjb .cluster-label span{color:#333;}#mermaid-svg-qDcn2rmvqmKcyCjb .label text,#mermaid-svg-qDcn2rmvqmKcyCjb span{fill:#333;color:#333;}#mermaid-svg-qDcn2rmvqmKcyCjb .node rect,#mermaid-svg-qDcn2rmvqmKcyCjb .node circle,#mermaid-svg-qDcn2rmvqmKcyCjb .node ellipse,#mermaid-svg-qDcn2rmvqmKcyCjb .node polygon,#mermaid-svg-qDcn2rmvqmKcyCjb .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-qDcn2rmvqmKcyCjb .node .label{text-align:center;}#mermaid-svg-qDcn2rmvqmKcyCjb .node.clickable{cursor:pointer;}#mermaid-svg-qDcn2rmvqmKcyCjb .arrowheadPath{fill:#333333;}#mermaid-svg-qDcn2rmvqmKcyCjb .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-qDcn2rmvqmKcyCjb .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-qDcn2rmvqmKcyCjb .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-qDcn2rmvqmKcyCjb .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-qDcn2rmvqmKcyCjb .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-qDcn2rmvqmKcyCjb .cluster text{fill:#333;}#mermaid-svg-qDcn2rmvqmKcyCjb .cluster span{color:#333;}#mermaid-svg-qDcn2rmvqmKcyCjb div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-qDcn2rmvqmKcyCjb :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}

告知订单成功支付事件
订阅事件
订阅事件
订阅事件
支付
支付服务
Broker
订单服务
仓储服务
短信服务

当支付服务告知了Broker后,就可以继续自己的事情了,而不需要等待。

优势:

  • 代码解耦合:不需要改动支付服务,只需要让服务订阅或者取消订阅Broker即可。
  • 耗时减少了:只计算支付服务和通知Broker的时间。
  • 不存在级联失败的问题,仓储服务挂了不再影响支付服务。
  • 流量消峰:当流量过大时,请求排在Broker中,服务能做几个就做几个,做不了的就排着。

缺点:

  • Broker挂了也会出问题,依赖于Broker的可靠性,安全性,吞吐能力
  • 架构复杂了,业务没有明显的流程线,不好追踪管理

二、MQ消息队列

在上述的结构中,就是Broker。

常用的MQ有几种实现。

RabbitMQ ActiveMQ RocketMQ Kafaka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP、XMPP、SMTP、STOMP OpenWire、STOMP、REST、XMPP、AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般
  • 一般中小型公司,用的就是RabbitMQ。
  • 如果大型企业,做深度定制,可以用RocketMQ
  • Kafaka则是用于大量数据情况下的处理,但安全可靠性相对较差。
  • ActiveMQ是很早的消息队列,如今几乎没有维护。

2.1 单机部署MQ

通过docker部署最简单,

docker pull rabbitmq:3-management

也可以用命令安装,这里直接用容器了。

启动信息如下

docker run -e RABBITMQ_DEFAULT_USER=yjx23332 -e RABBITMQ_DEFAULT_PASS=123456 -v mq-plugins:/plugins --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

-e 为设置环境变量
两个端口,15672是管理平台端口,5672是发送消息的端口。

记得开放对应端口。如果是腾讯云或者阿里云,也要在购买的服务器管理页面打开放行端口。

firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
查询端口是否开放

  • firewall-cmd --query-port=15672/tcp 查看某个端口
  • firewall-cmd --zone=public --list-ports 查看所有

登陆成功后,即可进入以下界面。

我们可在这里为添加用户和角色
virtualhosts虚拟主机:对不用户进行隔离,避免相互影响。
此处可以添加

点击用户,可以配置其虚拟主机权限等。

此处设置交换机

2.2 结构和概念

使用消息队列中消息的对象。我们称之为消费者。

在一个virtualhost下:

#mermaid-svg-Nw7jXSXpa6baQJyM {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-Nw7jXSXpa6baQJyM .error-icon{fill:#552222;}#mermaid-svg-Nw7jXSXpa6baQJyM .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-Nw7jXSXpa6baQJyM .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-Nw7jXSXpa6baQJyM .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-Nw7jXSXpa6baQJyM .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-Nw7jXSXpa6baQJyM .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-Nw7jXSXpa6baQJyM .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-Nw7jXSXpa6baQJyM .marker{fill:#333333;stroke:#333333;}#mermaid-svg-Nw7jXSXpa6baQJyM .marker.cross{stroke:#333333;}#mermaid-svg-Nw7jXSXpa6baQJyM svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-Nw7jXSXpa6baQJyM .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-Nw7jXSXpa6baQJyM .cluster-label text{fill:#333;}#mermaid-svg-Nw7jXSXpa6baQJyM .cluster-label span{color:#333;}#mermaid-svg-Nw7jXSXpa6baQJyM .label text,#mermaid-svg-Nw7jXSXpa6baQJyM span{fill:#333;color:#333;}#mermaid-svg-Nw7jXSXpa6baQJyM .node rect,#mermaid-svg-Nw7jXSXpa6baQJyM .node circle,#mermaid-svg-Nw7jXSXpa6baQJyM .node ellipse,#mermaid-svg-Nw7jXSXpa6baQJyM .node polygon,#mermaid-svg-Nw7jXSXpa6baQJyM .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-Nw7jXSXpa6baQJyM .node .label{text-align:center;}#mermaid-svg-Nw7jXSXpa6baQJyM .node.clickable{cursor:pointer;}#mermaid-svg-Nw7jXSXpa6baQJyM .arrowheadPath{fill:#333333;}#mermaid-svg-Nw7jXSXpa6baQJyM .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-Nw7jXSXpa6baQJyM .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-Nw7jXSXpa6baQJyM .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-Nw7jXSXpa6baQJyM .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-Nw7jXSXpa6baQJyM .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-Nw7jXSXpa6baQJyM .cluster text{fill:#333;}#mermaid-svg-Nw7jXSXpa6baQJyM .cluster span{color:#333;}#mermaid-svg-Nw7jXSXpa6baQJyM div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-Nw7jXSXpa6baQJyM :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}

publisher
exchange1
exchange2
queue1
queue2
queue3
consumerr1
consumerr2

2.3 常见的消息模型

  • 基本消息队列BasicQueue:最简单的实现

  • 工作消息队列WorkQueue:在工作者之间分配任务

  • 发布订阅带有交换机,分为:

    • Fanout Exchange:广播,发布订阅(publish/subscribe):一次性向读个消费者发送消息。

    • Direct Exchange:路由(Routing):有选择的接收消息

    • Topic Exchange:主题 (Topics):根据主题接收消息。

  • 请求回复模型(RPC):收到请求然后答复。

  • 发布者确认模式(Publisher Confirms):会让发布者知道发送是否成功。

三、SpringAMQP

3.1 用非自动装配的方式使用消息队列

需要在项目中引入AMQP,记得加入父类spring-boot-starter-parent

     <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

引入Junit方便测试

     <dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency>

测试一下MQ
我们创建如下两个子项目。

为test写一个测试用例

package com.yjx23332.mq.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException{//1.建立连接ConnectionFactory factory = new ConnectionFactory();//1.2.设置连接参数factory.setHost("IP");//MQ地址设置factory.setPort(5672);//端口设置factory.setVirtualHost("/");//设置虚拟主机factory.setUsername("账号");factory.setPassword("密码");//1.2 建立连接Connection connection = factory.newConnection();//2.创建通道Channel channel = connection.createChannel();//3.创建消息队列String queueName = "simple.queue";channel.queueDeclare(queueName,false,false,false,null);//4.发送消息String message = "hello,rabbitmq!";channel.basicPublish("",queueName,null,message.getBytes());System.out.println("已发送消息:【"+message+"】");//5.关闭通道if(channel != null){channel.close();}if(connection != null){connection.close();}}
}

我们在发送消息前打上断点,用junit运行,就可以看到连接创建和通道创建.。
因为发完就不管了,因此必须打断点,才看得到连接和通道。


完成后可以看到队列中

接下来我们处理消息,基本一样,只需要修改几个部分。
注意我们没有关闭连接,因为在业务中,要一直处理。

package com.yjx23332.mq.helloworld;import com.rabbitmq.client.*;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerTest {@Testpublic static void main(String[] args) throws IOException, TimeoutException{//1.建立连接ConnectionFactory factory = new ConnectionFactory();//1.2.设置连接参数factory.setHost("101.43.65.53");//MQ地址设置factory.setPort(5672);//端口设置factory.setVirtualHost("/");//设置虚拟主机factory.setUsername("yjx23332");factory.setPassword("123456");//1.2 建立连接Connection connection = factory.newConnection();//2.创建通道Channel channel = connection.createChannel();//3.创建消息队列//为什么这里也要创建?避免消费者先执行,还没有队列。同时,相同的队列创建重复执行没有影响。String queueName = "simple.queue";channel.queueDeclare(queueName,false,false,false,null);//4.处理消息String message = "hello,rabbitmq!";//DefaultConsumer 是回调函数,一旦有消息,异步处理channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws  IOException{System.out.println("接收到消息:【"+ new String (body)+"】");}});System.out.println("####################等待接收消息##################");
//
//        //5.关闭通道
//        if(channel != null){//            channel.close();
//        }
//        if(connection != null){//            connection.close();
//        }}
}

结果如下
可以看到,因为回调的原因,后面的输出先执行

队列中消息处理完毕

3.2 SpringAMQP介绍

AMQP:Advanced Message Queuing Protocol:高级消息队列协议。于应用程序之间传递业务消息的开放标准。

Spring AMQP:基于AMQP协议的一套API规范,提供模板来发送和接收消息。其中Spring-amqp是基础抽象,Spring-rabbit是底层的默认实现。可参考Spring AMQP官网。

3.3 基础消息队列功能使用

导入依赖

     <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

为了方便测试,我们引入SpringBoot单元测试

 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency>

然后准备一个yml文件,配置和之前用代码写得相似。

spring:rabbitmq:host: port: 5672virtual-host: /username: password:

我们直接走单元测试,这里就不创建一个队列了,直接放消息。

package com.yjx23332.mq.helloworld;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.io.IOException;
import java.util.concurrent.TimeoutException;@SpringBootTest
@RunWith(SpringRunner.class)
public class PublisherTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() throws IOException, TimeoutException{rabbitTemplate.convertAndSend("simple.queue","hello,spring amqp");}
}

接下来为消费者建一个监听器(记得配置yml文件)

package com.yjx23332.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");}
}

消息一旦消费,就会被移除,Rabbit MQ不存在回溯功能。

3.4 工作队列的配置

一个队列绑定多个消费者。

我们准备发送50条消息

package com.yjx23332.mq.helloworld;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest
@RunWith(SpringRunner.class)
public class PublisherTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() throws InterruptedException{for(int i = 0;i < 50;i++){rabbitTemplate.convertAndSend("simple.queue","hello,spring amqp___" + i);Thread.sleep(20);}}
}

修改消费者

package com.yjx23332.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】" + LocalTime.now());Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage2(String msg) throws InterruptedException{System.err.println("spring 消费者接收到消息:【"+msg+"】"+ LocalTime.now());Thread.sleep(200);}
}

从结果会发现处理总时长超过了1秒达到了5秒,查看输出会发现消息被平均分配给了两个。一个处理偶数,一个处理奇数。但由于处理速度不同,因此处理总时长超过了1秒。

这里是因为消费预取导致的,在执行前会提前把消息从队列拿出,然后各自处理。

但我们希望的是,做的快的多做,做的慢的少做。

因此我们可以修改yml文件:

spring:rabbitmq:host: port: 5672virtual-host: /username: password: listener:simple:prefetch:  1 # 每次只能获取几条消息,执行完了再取下一条,默认是无限

重启后再次执行就会发现正常了。

3.5 发布与订阅模式

我们需要将同一消息发送给多个消费者。需要加入交换机来实现。注意,交换机只负责消费路由,但不存储消息,丢失一概不负责。

3.5.1 SpringAMQP交换机类

#mermaid-svg-4aynGx4YVRGxapqp {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-4aynGx4YVRGxapqp .error-icon{fill:#552222;}#mermaid-svg-4aynGx4YVRGxapqp .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-4aynGx4YVRGxapqp .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-4aynGx4YVRGxapqp .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-4aynGx4YVRGxapqp .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-4aynGx4YVRGxapqp .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-4aynGx4YVRGxapqp .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-4aynGx4YVRGxapqp .marker{fill:#333333;stroke:#333333;}#mermaid-svg-4aynGx4YVRGxapqp .marker.cross{stroke:#333333;}#mermaid-svg-4aynGx4YVRGxapqp svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-4aynGx4YVRGxapqp .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-4aynGx4YVRGxapqp .cluster-label text{fill:#333;}#mermaid-svg-4aynGx4YVRGxapqp .cluster-label span{color:#333;}#mermaid-svg-4aynGx4YVRGxapqp .label text,#mermaid-svg-4aynGx4YVRGxapqp span{fill:#333;color:#333;}#mermaid-svg-4aynGx4YVRGxapqp .node rect,#mermaid-svg-4aynGx4YVRGxapqp .node circle,#mermaid-svg-4aynGx4YVRGxapqp .node ellipse,#mermaid-svg-4aynGx4YVRGxapqp .node polygon,#mermaid-svg-4aynGx4YVRGxapqp .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-4aynGx4YVRGxapqp .node .label{text-align:center;}#mermaid-svg-4aynGx4YVRGxapqp .node.clickable{cursor:pointer;}#mermaid-svg-4aynGx4YVRGxapqp .arrowheadPath{fill:#333333;}#mermaid-svg-4aynGx4YVRGxapqp .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-4aynGx4YVRGxapqp .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-4aynGx4YVRGxapqp .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-4aynGx4YVRGxapqp .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-4aynGx4YVRGxapqp .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-4aynGx4YVRGxapqp .cluster text{fill:#333;}#mermaid-svg-4aynGx4YVRGxapqp .cluster span{color:#333;}#mermaid-svg-4aynGx4YVRGxapqp div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-4aynGx4YVRGxapqp :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}

Declarable
AbstractDeclarable
Exchange
AbstractExchange
HeadersExchange
DirectExchange
FanoutExchange
TopicExchange

3.5.2 Fanout Exchange

我们在consumer服务中声明Exchange、Queue、Binding.

package com.yjx23332.mq.confg;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {//声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("yjx23332.fanout");}//声明一个队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//绑定队列和交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//声明第二个队列@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}//绑定第二个队列和交换机@Beanpublic Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

运行后,会看到:

修改监听器

package com.yjx23332.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MQlistener {@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg){System.out.println("spring 消费者接收q1到消息:【"+msg+"】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg){System.err.println("spring 消费者接收到q2消息:【"+msg+"】");}
}

我们再修改publisher的测试代码

package com.yjx23332.mq;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest
@RunWith(SpringRunner.class)
public class test {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testFanoutExchange(){String exchangeName = "yjx23332.fanout";String message = "hello world!";rabbitTemplate.convertAndSend(exchangeName,"",message);}
}

启动

3.5.3 DirectExchange

将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
  • 一个队列可以指定多个BindingKey,且队列之间的BindingKey可以重复

由于基于Config创建队列交换机的方式很麻烦,我们用新的方式声明交换机、队列。

删除上一节我们在config中的声明代码。

然后在listener中进行

package com.yjx23332.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MQlistener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(value = "yjx23332.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"} //bindingkey))public void listenDirectQueue1(String msg){System.out.println("spring 消费者接收q1到消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(value = "yjx23332.direct" , type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue2(String msg) {System.err.println("spring 消费者接收到q2消息:【"+msg+"】");}}

运行后,我们可以看到

接下来,我们修改Test代码

package com.yjx23332.mq;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest
@RunWith(SpringRunner.class)
public class test {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testDirectExchange(){String exchangeName = "yjx23332.direct";rabbitTemplate.convertAndSend(exchangeName,"red","hello red");rabbitTemplate.convertAndSend(exchangeName,"blue","hello blue");rabbitTemplate.convertAndSend(exchangeName,"yellow","hello yellow");}
}

3.5.4 TopicExchange

与DirectExchange类似,但是它的routingKey必须是多个单词表,并用’.'分割。
当队列与交换机绑定时,可以使用通配符。避免当bindkey过多导致的麻烦。

#:代表0个或多个单词
*:代指一个单词

比如

China.news
Japan.news
就可以用 #.news
同理
China.weather
China.news
就可以用 China.#

我们沿用上一节的代码,做一点修改即可

package com.yjx23332.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MQlistener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue"),exchange = @Exchange(value = "yjx23332.topic",type = ExchangeTypes.TOPIC),key = {"China.#"} //bindingkey))public void listenTopicQueue1(String msg){System.out.println("spring 消费者接收q1到消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(value = "yjx23332.topic",type = ExchangeTypes.TOPIC),key = {"#.news"}))public void listenTopicQueue2(String msg) {System.err.println("spring 消费者接收到q2消息:【"+msg+"】");}}

重启后,可看到




修改Test代码

package com.yjx23332.mq;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest
@RunWith(SpringRunner.class)
public class test {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testDirectExchange(){String exchangeName = "yjx23332.topic";rabbitTemplate.convertAndSend(exchangeName,"China.news","江苏地表最高温度将达到72摄氏度");rabbitTemplate.convertAndSend(exchangeName,"China.weather","未来温度仍将升高");rabbitTemplate.convertAndSend(exchangeName,"Japan.news","安培中枪");}
}

3.6 消息转换器

在发送中,我们接收消息的类型是Object。SpringAMQP会帮我们序列化后变为字节发送。
用默认JDK的序列化ObjectOutputStream是没有问题的,但是中间过程是乱码,我们这里改用JSON方式的序列化,这样在消息队列中查看也是正常的。

默认JDK的消息信息:

接下来我们配置消息转换。

我们先在消费者声明一个queue,并设置处理方式

package com.yjx23332.mq.listener;import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MQlistener {@RabbitListener(queuesToDeclare = @Queue("object.queue"))public void listenObjectQueue(String msg){System.out.println("spring 消费者接收到Object消息:【"+msg+"】");}
}

我们为发送类引入依赖并编写配置

     <dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-yaml</artifactId></dependency>


覆盖默认的消息转换。

package com.yjx23332.mq.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MessageConverterConfig {@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
}

随后修改Test

package com.yjx23332.mq;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.HashMap;
import java.util.Map;@SpringBootTest
@RunWith(SpringRunner.class)
public class test {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testObjectQueue(){String queueName = "object.queue";Map<String,Object> msg = new HashMap<>();msg.put("name","yjx23332");msg.put("age",21);rabbitTemplate.convertAndSend(queueName,msg);}
}

结果如下:

这时消息不再是乱码

我们在为消费者配置转换,并修改监听器。当然,如果我们在两边都不配置消息转换器,这里结果是一样的。

package com.yjx23332.mq.listener;import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class MQlistener {@RabbitListener(queuesToDeclare = @Queue("object.queue"))public void listenObjectQueue(Map<String,Object> msg){System.out.println("spring 消费者接收到Object消息:【 name = "+msg.get("name")+",age = "+msg.get("age")+"】");}}

结果如下

参考文献

[1]Spring AMQP官网
[2]黑马程序员Java微服务
[3]RabbitMQ官方文档

MessageQueue消息队列——基础(笔记)相关推荐

  1. 【Android 异步操作】Handler 机制 ( MessageQueue 消息队列的阻塞机制 | Java 层机制 | native 层阻塞机制 | native 层解除阻塞机制 )

    文章目录 一.MessageQueue 的 Java 层机制 二.MessageQueue 的 native 层阻塞机制 三.MessageQueue 的 native 层解除阻塞机制 三.Messa ...

  2. 【Android 异步操作】Handler 机制 ( Android 提供的 Handler 源码解析 | Handler 构造与消息分发 | MessageQueue 消息队列相关方法 )

    文章目录 一.Handler 构造函数 二.Handler 消息分发 三.MessageQueue 消息队列相关函数 一.Handler 构造函数 一般使用 Handler 时 , 调用 Handle ...

  3. RocketMQ4.X消息队列详细笔记

    人不能没有批评和自我批评 那样一个人就不能进步. 目录 JMS和消息中间件介绍 JMS消息服务和使用场景 消息中间件常见概念和编程模型 主流消息队列和技术选型讲解 基础介绍和阿里云服务器快速部署 Ro ...

  4. 分布式消息队列基础知识

    本文主要整理消息队列的一些基本概念,为后面的Rocketmq消息队列组件深入学习打下基础. 一.什么是消息队列? 维基百科介绍:消息队列(Message Queue)是一种进程间通信或同一进程的不同线 ...

  5. Kafka消息队列学习笔记1——Kafka入门1

    目录 1.消息队列 1.1.传统消息队列的应用场景 2.1.1.异步处理 1.1.2.系统解耦 1.1.3.流量削峰 1.1.4.日志处理 1.2.生产者-消费者模型 1.3.消息队列的两种模式 1. ...

  6. POSIX 消息队列基础知识复习,以及相关例程

    1.1        Posix消息队列 1.1.1       消息队列的创建和删除 1.1.1.1     mq_open( ) #include<mqueue.h> mqd_tmq_ ...

  7. kafka消息队列学习笔记

    消息队列: (1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除) 点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端.这个模型的特点是 ...

  8. MQ(MessageQueue)消息队列

    同步调用 同步调用的优点: 时效性较强,可以立即得到结果 同步调用的问题: 耦合度高: 每次加入新的需求,都要修改原来的代码 性能和吞吐能力下降: 调用者需要等待服务提供者响应,如果调用链过长则响应时 ...

  9. rabbitMQ 实战 高效部署分布式消息队列 读书笔记

    第三章 运行和管理RabbitMQ 1. 创建一个新用户 ./rabbitmqctl add_usertest_ai_20171228 testai Creating user "test_ ...

  10. Windows消息机制学习笔记(一)—— 消息队列

    Windows消息机制学习笔记(一)-- 消息队列 基本概念 实验一:使用代码画出最简单窗口 第一步:编译并运行以下代码 第二步:查看运行结果 第三步:使用其它窗口对其进行覆盖,观察效果 总结 消息队 ...

最新文章

  1. 【基础算法】常见的ML、DL编程题
  2. [导入]C++程序随笔
  3. 旧访客设计模式的新生活
  4. 【CSP考前复习】关于考试时的注意事项
  5. css 文字超出隐藏显示省略号
  6. Xcode7中 添加 .dylib
  7. AndroidOpenCV摄像头预览全屏问题
  8. 合法的括号序列匹配数
  9. 机票预订系统活动图_机票预订系统UML讲解
  10. 【STM32F407的DSP教程】第2章 Matlab R2018a的安装
  11. 从0到1设计通用数据大屏搭建平台
  12. 两代荣耀Magic历史性同框,荣耀Magic 2如何践行科技理想主义?
  13. 计算机表格制作中这么打字,excel表格先打字还是先制表?
  14. TableView的使用
  15. Direction-aware Spatial Context Features for Shadow Detection and Removal
  16. 20-观察者模式Quarkus实现
  17. VB程序破解常用函数
  18. 目前智能手机 微型计算机,微型计算机基础知识1(新).ppt.ppt
  19. Android获取手机屏幕宽度
  20. 华为全新MateBook E评测

热门文章

  1. 密码破解与HASH计算
  2. arduino两轮平衡车(二)-- 原理讲解
  3. 过年倒计时 java swing 附源码
  4. 【老生谈算法】matlab实现FFT算法源码——FFT算法
  5. windows开启ftp服务及FTP命令使用
  6. 计算机电源管理设置,怎么修改电脑中设置的显卡电源管理模式
  7. 计算机网络:网络安全(电子邮件安全)
  8. 联想启天m410进bios_联想启天M410台式机怎么装win7系统
  9. 将jpg格式转成PDF格式的转换器
  10. IMDB TOP250电影介绍(下)