RabbitMQ 整合 PD 商城实战项目流程总结

一、订单流量削峰(解耦)

简单模式,若多添加几个消费者则用工作模式。

导入商城项目

将 step5 课前资料里面 /elasticsearch/pd商城.zip 解压。

第二层目录下 pd-web 文件夹解压到 rabbitmq 工程目录下面。

修改 pom.xml 文件中的 SpringBoot 版本为 2.3.2.RELEASE(可拖拽至 idea 内更改)。

在 rabbitmq 工程中导入模块(若之前在 idea 内更改则可以直接右键 Add As Maven 导入)。
或者打开工程结构 Project Structures 手动添加也可。

使用 sqlyog 导入项目目录中的 pd.sql:

右键复制脚本文件的路径,在 sqlyog 里面右键连接,从 SQL 转储文件导入数据库。

如果导入失败,可以增大 mysql 缓存区
set global max_allowed_packet=100000000;
set global net_buffer_length=100000;
SET GLOBAL  interactive_timeout=28800000;
SET GLOBAL  wait_timeout=28800000

因为大环境变化,MySQL 5.5 以下版本的用起来会很费劲,请至少更新至5.7。MariaDB 要 10.4 以上。

确认工程 JDK 的配置(吐槽:坑爹的涛哥这里准备的项目是 GBK 编码的,属实难顶)。

为了本项目的运行,首先将数据库 pd_store 的 user、order、order_item 表清空:

delete from pd_user
delete from pd_order
delete from pd_order_item

找到主启动类 RunPdApp ,运行,然后更改启动配置中的工作目录为本项目,改完重新启动。

打开 localhost 进入模拟拼多商城,注册一个新用户(随意填写,注意格式即可)。

注册成功以后,点击地址管理,添加一个收货地址(只填必填项即可)。

返回起始页,选择一个商品,加入购物车并下订单(无需支付,订单会添加到数据库内)。

查看数据库是否正确存储数据。

生产者 - 发送订单

修改订单,把订单发送到 rabbitmq

返回 pd-web 项目,打开 service 下的 OrderServiceImpl 实现类,注释掉 64-89 行关于数据库操作的代码。这里是为了不直接操作数据库,而是发送到 rabbitmq 以后再继续操作。

pom.xml 添加 rabbitmq 依赖。

     <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

application.yml 添加 rabbitmq 连接。

spring:  rabbitmq:host: 192.168.64.140  # wht6.cnport: 5672username: adminpassword: admin# virtual-host: 自己的空间名

启动类(或者新建自动配置类)

  • 提供使用的队列参数:
    orderQueue,持久队列(订单不能丢),不独占(并行处理效率高),不自动删除。

  • 创建 spring 的 Queue 对象,用来封装队列的参数。RabbitAutoConfiguration 自动配置类,会自动发现 Queue 实例,使用这里的参数,连接服务器在服务器上创建队列。
    1)通过方法创建一个 Queue 队列,注意引包要用 amqp 核心包。
    import org.springframework.amqp.core.Queue
    2)直接返回一个 Queue 对象,参数为上述参数(队列名,true,false,false)。
    3)最后将方法交给 Spring 容器管理。

package com.pd;import org.mybatis.spring.annotation.MapperScan;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
@MapperScan("com.pd.mapper")
public class RunPdAPP{public static void main(String[] args) {SpringApplication.run(RunPdAPP.class, args);}@Beanpublic Queue orderQueue(){return new Queue("orderQueue",true,false,false);}
}

修改 OrderServiceImpl,向 Rabbitmq 发送订单。

  • saveOrder 方法上添加自动注入 AmqpTemplate 对象,该对象为 Spring 提供的用来发送消息的工具对象。

  • 在注释的方法里,要求将 pdOrder 发送到 orderQueue 队列: t.convertAndSend("orderQueue",pdOrder);

  • 这里的 convertAndSend 方法是转换并发送,先转成 byte[],再发送,订单对象会被自动序列化成 byte[]。

重启启动类,重新下订单进行测试:

  • 下订单以后,会发现数据库里没有新订单。
    此时查看 Rabbitmq 服务器,在 orderQueue 队列会发现有一条就绪的消息。
    点开 orderQueue 队列,查看下方的 Get messages 会有很长的一串代码(消息):

有消息即可。

订单 - 消费者

1.找到项目目录,复制一份 pd-web 起名为 pd-web-consumer 作为消费者端。

2.修改 pom.xml 文件:

  • 将 artifactId 改为 pd-web-consumer(可直接拖拽 pom 文件至 idea)。

3.修改 application.yml 文件:

  • 因为之前的生产者端已经占用了 80 端口,所以这里将 server.port 改为 81。

4.添加依赖、添加连接、队列参数:前面做过了所以省略。

5.新建消费者类(OrderConsumer),从 orderQueue 接收消息:

  • 在 pd 包内创建 OrderConsumer 类。

  • 添加 @Component 注解自动创建实例,添加 @RabbitListener 注解指定接收消息的队列。

  • 在接收订单时,需要调用 OrderService 对象来接收,因此添加自动注入 OrderService 对象。

  • 创建 receive 方法,参数为 PdOrder 对象,并添加 @RabbitHandler 注解。

    @RabbitHandler 注解是用来配合 @RabbitListener 注解的。

    因此指定处理消息的方法的类中,只能有一个 @RabbitHandler 注解。

6.调用原来的业务代码,存储订单

  • 通过 OrderService 对象调用 saveOrder 方法接收订单,最后输出提示 “订单已存储” 。
  • Spring 会自动创建实例,自动启动消费者,自动开始从队列获取消息,自动执行处理消息的方法。
package com.pd;import com.pd.pojo.PdOrder;
import com.pd.service.OrderService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "orderQueue")
public class OrderConsumer {@Autowiredprivate OrderService orderService;@RabbitHandlerpublic void receive(PdOrder pdOrder) throws Exception {orderService.saveOrder(pdOrder);System.out.println("-----------------------------------订单已存储!wow!");}
}

7.修改 OrderServiceImpl,还原成原来的数据库操作的代码即可。

  • 将注入的 AmqpTemplate 对象删掉,saveOrder 前面自动序列化的代码也删掉,数据库注释部分还原。

  • 因为之前的是随机生成的 orderId,现在因为前面已经获取了 PdOrder 的对象。

    所以直接添加一行代码:获取 pdOrder 的 orderId。
    String orderId = pdOrder.getOrderId();

8.消息传递测试:

  • 由于之前发送的订单对象需要序列化,所以这里需要重新下一个订单来测试传递。
  • 重新下完订单以后,启动 81 端口的 RunPdApp 服务,消息会自动传递到消费者这里。
  • 控制台显示 “订单已存储” 即可。

9.合理分发相关:

  • ack – spring 封装的 rabbitmq,已经是手动 ack 模式了,spring 会自动发送回执。

  • qos=1 – spring 默认设置的 qos 是250,yml 中 可以添加 prefetch = 1 把 qos 设置成 1。

spring:  rabbitmq:  listener:simple:prefetch: 1

10.持久化相关:

队列持久化参数设定为 true;生产者发送的消息默认是持久消息(即生产者利用 AmqpTemplate 调用 convertAndSend 的方法)。

二、SpringBoot 整合 RabbitMQ

创建项目

  • 在 rabbitmq 目录下创建 SpringBoot 工程模块:rabbitmq-spring,只添加 rabbitmq 依赖即可。

  • 修改 pom.xml 文件:Spring Boot 版本改为 2.3.2.RELEASE

  • 修改 application.yml 文件:添加 rabbitmq 连接

    spring:rabbitmq:host: 192.168.64.140  # wht6.cnport: 5672username: adminpassword: admin# virtual-host:你的空间名
    

项目结构

  • 创建结构和之前类似,虽然项目为我们自动创建了一个启动类(RabbitmqSpringApplication),但是我们想同时启动多个模式下的生产者和消费者时会起冲突,因此为了方便测试运行我们每个包内单独创建一个启动类(Main)来运行该包下的测试程序。
  • 分别创建 m1/m2/m3/m4/m5 包,目录下创建 Main 启动类、Producer 生产者类和 Consumer 消费者类。

简单模式测试(M1)

M1 项目为默认模式的测试,即只有一个消费者使用一个队列接收一个生产者的模式。

Producer 类

首先我们完成生产者类,生产者类负责向某个队列发送消息,通过对象转换并发送自己想发送的信息。

  • 添加 @Component 注解自动创建实例。

  • 自动注入 AmqpTemplate 对象。

  • 自定义一个发送方法(send),用 AmqpTemplate 对象调用 convertAndSend 方法将信息发送。

    方法参数为:( 队列名,具体信息内容 )这里该方法会自动转换字符串内容。

  • 这里同意规定测试队列名为 ”helloworld“,发送信息随意。

package cn.tedu.rabbitmqspring.m1;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class Producer {@Autowiredprivate AmqpTemplate t;public void send(){t.convertAndSend("helloworld","Hello-THE-FKING-World");}
}

Consumer 类

  • 添加 @Component 注解自动创建实例。

  • 通常我们在消费者端写接收方法时会在类上添加 @RabbitListener 注解,并在接收方法上添加 @RabbitHandler 注解来配合。

    这里我们选择直接在方法上添加 @RabbitListener 注解来表示接收。

    每个 @RabbitListener 注解,都会注册启动一个消费者。

    直接加在方法上面表示该队列只有这一个方法用来接收。

    这样做的好处是接收代码可以集中,维护起来更加方便。

  • 自定义一个接收方法(receive),上面通过 @RabbitListener 注解限定接收的队列。

    方法参数为 String 对象(发送过来的数据),并输出 “收到: ” + 内容。

package cn.tedu.rabbitmqspring.m1;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@RabbitListener(queues = "helloworld")public void receive(String str){System.out.println("收到: "+str);}
}

Main 启动类

  • 完成 Spring 启动类的基本配置。

  • 创建一个 Queue 实例,封装队列参数:

    定义一个方法(helloWorldQueue),返回 Queue 队列,该队列设定为非持久队列,并交给容器管理。

  • 添加测试代码,调用生产者发送消息:

    为了调用生产者,所以进行 Producer 的注入。

    定义测试方法(test),利用 Producer 对象调用 send 方法发送信息。

    这里可以添加 @PostConstruct 注解,该注解用于初始化资源,可以事先将这些文件加载到内存里,需要时直接从内存调用。

  • spring 的执行流程:
    包扫描创建所有实例 --> 完成所有依赖的注入 --> @PostConstruct --> 后续流程

  • 这里执行代码时,自动配置类有可能还没有创建队列,消费者可能也还没有启动第一次,执行可能丢失信息。

package cn.tedu.rabbitmqspring.m1;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.annotation.PostConstruct;@SpringBootApplication
public class Main {public static void main(String[] args) {SpringApplication.run(Main.class,args);}@Beanpublic Queue helloWorldQueue(){return new Queue("helloworld",false);}@Autowiredprivate Producer p;@PostConstructpublic void test(){p.send();}
}

启动测试:测试有收到消息即可。

工作模式测试(M2)

M2 项目测试为轮询模式(工作模式),即有多台消费者用一个队列接收生产者的模式。

基本项目结构和 M1 类似,我们可以直接复制 M1 项目的三个类到 M2 里。

Producer 类

  • 本项目的生产者采用工作模式,向 task_queue 队列发送消息。

  • 主要更改自定义发送方法的方法体,工作模式需要不断地向消费者发送信息,所以采用 while-true

    死循环来模拟频繁的发送。增加输入消息提示并获取信息。

  • 最后还是调用 convertAndSend 方法发送,参数为( 队列名,消息内容,消息预处理器)

    此处的消息预处理器,可以重新设置消息的属性,本次测试不采用所以可以不写。

package cn.tedu.rabbitmqspring.m2;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Scanner;@Component
public class Producer {@Autowiredprivate AmqpTemplate t;public void send(){while (true) {System.out.println("输入消息: ");String str = new Scanner(System.in).nextLine();t.convertAndSend("task_queue",str);}}
}

Consumer 类

  • 工作模式下,通常是多个消费者共用一个队列来接收消费者的消息,我们模拟两个消费者来测试。
  • 在 M1 的基础上,将队列名改为 task_queue,并复制出另一个发送方法模拟两个同时启动。
  • 相应的方法名和输出提示也要更改。
package cn.tedu.rabbitmqspring.m2;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@RabbitListener(queues = "task_queue")public void receive1(String str){System.out.println("Consumer 1 收到: "+str);}@RabbitListener(queues = "task_queue")public void receive2(String str){System.out.println("Consumer 2 收到: "+str);}
}

Main 启动类

  • 在工作模式中,队列为共用的,所以要创建一个持久型队列,更改队列方法名和队列参数。

  • 由于前面发送方法为死循环,send 方法不会停止跳出, 所以PostConstruct也不会继续。

    这样就会导致后续流程不会进行,虽然可以一直输出消息,但是消费者端却不会收到消息。

  • 所以我们要把 send 方法放到一个新线程里面去执行,不会占用主线程。方法可以用 lambda 表达式省略写。

package cn.tedu.rabbitmqspring.m2;import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;import javax.annotation.PostConstruct;@SpringBootApplication
public class Main {public static void main(String[] args) {SpringApplication.run(Main.class,args);}@Beanpublic Queue taskQueue(){return new Queue("task_queue");}@Autowiredprivate Producer p;@PostConstructpublic void test(){new Thread(()->p.send()).start();}
}

启动测试:有消息输出即可。

发布、订阅模式测试(M3)

M3 项目测试为发布订阅模式(广播模式),即生产者向 fanout 交换机发送消息。

只有与之绑定队列的消费者才能收到信息。

基本项目结构和 M2 类似,我们可以直接复制 M2 项目的三个类到 M3 里。

Producer 类

  • 因为本项目是订阅模式测试,所以并不是向队列发送消息,而是向 fanout 交换机发送消息。
  • 只需要将发送方法(send)里面调用 convertAndSend 方法更改即可。
  • 方法参数为( 交换机名,路由键,发送消息),订阅模式不设置路由键(空串即可)。
  • 交换机名设定为 “logs”(其他的也可以,但注意后面的名字需要与其一致)。
package cn.tedu.rabbitmqspring.m3;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Scanner;@Component
public class Producer {@Autowiredprivate AmqpTemplate t;public void send(){while (true) {System.out.println("输入消息: ");String str = new Scanner(System.in).nextLine();t.convertAndSend("logs","",str);}}
}

Consumer 类

  • 订阅模式下,Consumer 需要创建随机队列,然后和交换机绑定。

  • 具体更改的地方只有 @RabbitListener 注解的参数部分:

    M1-M2 测试中都是与队列进行绑定参数,这里我们要使用绑定参数(bindings)。

    @RabbitListener(bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "logs", declare = "false")
    ))
    

    其中:

    由于群发没有关键词,所以 @QueueBinding 处可以直接绑定。

    value = @Queue 当后面没有配置参数时就是默认:随机命名,非持久,独占,自动删除的队列。

    交换机参数里面的 declare = “false” 代表:不重新定义,不新创建交换机。

    ( 吐槽:这注解里面套注解,是真的不做人 … )

  • 最后将配置复制到第二个方法上即可。

package cn.tedu.rabbitmqspring.m3;import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "logs",declare = "false")))public void receive1(String str){System.out.println("Consumer 1 收到: "+str);}@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "logs",declare = "false")))public void receive2(String str){System.out.println("Consumer 2 收到: "+str);}
}

Main 启动类

  • 创建队列方法改为创建交换机,并封装交换机参数:

    定义一个创建交换机方法(logsExchange),返回 Fanout 交换机(FanoutExchange)。

    交换机参数为( 交换机名,持久化参数,自动删除参数)。

    我们这里设置交换机为非持久,非自动删除的交换机,交换机名应于上面一致。

package cn.tedu.rabbitmqspring.m3;import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;import javax.annotation.PostConstruct;@SpringBootApplication
public class Main {public static void main(String[] args) {SpringApplication.run(Main.class,args);}@Beanpublic FanoutExchange logsExchange(){return new FanoutExchange("logs",false,false);}@Autowiredprivate Producer p;@PostConstructpublic void test(){new Thread(()->p.send()).start();}
}

启动测试:测试两个消费者均收到消息即为成功。


Day01结束

路由模式测试(M4)

路由模式,将只订阅所有消息中的一部分。
基本项目结构和 M3 类似,我们可以直接复制 M3 项目的三个类到 M4 里。

Producer 类

  • 主题模式下,用的是 direct 类型的交换机,且需要绑定路由键。
  • 在方法内添加输入关键词提示并接收关键词。
  • 调整调用 convertAndSend 方法参数:交换机名,路由键,传递的信息
  • 这里交换机设定名为 direct_logs。
package cn.tedu.rabbitmqspring.m4;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Scanner;@Component
public class Producer {@Autowiredprivate AmqpTemplate t;public void send(){while (true) {System.out.println("输入消息: ");String str = new Scanner(System.in).nextLine();System.out.println("输入关键词: ");String key = new Scanner(System.in).nextLine();t.convertAndSend("direct_logs",key,str);}}
}

Consumer 类

  • 修改绑定交换机参数:交换机名(direct_logs)。
  • 添加绑定路由键:格式: key = {“路由键”}。
  • 这里为了区分,
    receive1 方法设定 error 为路由键,receive2 方法设定 error,info,warning 为路由键。
package cn.tedu.rabbitmqspring.m4;import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "direct_logs",declare = "false"),key = {"error"}))public void receive1(String str){System.out.println("Consumer 1 收到: "+str);}@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "direct_logs",declare = "false"),key = {"error","info","warning"}))public void receive2(String str){System.out.println("Consumer 2 收到: "+str);}
}

Main 启动类

  • 修改交换机类型:DirectExchange
  • 修改交换机名:direct_logs
package cn.tedu.rabbitmqspring.m4;import org.springframework.amqp.core.DirectExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;import javax.annotation.PostConstruct;@SpringBootApplication
public class Main {public static void main(String[] args) {SpringApplication.run(Main.class,args);}@Beanpublic DirectExchange logsExchange(){return new DirectExchange("direct_logs",false,false);}@Autowiredprivate Producer p;@PostConstructpublic void test(){new Thread(()->p.send()).start();}
}

运行测试:测试该绑定键对应的消费者接收到消息即可。

补充:

  • 若没有绑定的路由键,消息会被直接丢弃(消失)。

主题模式测试(M5)

主题模式,本质上和路由模式一样

基本项目结构和 M4 类似,我们可以直接复制 M4 项目的三个类到 M5 里。

Producer 类

  • 改交换机名:topic_logs

Consumer 类

  • 改交换机名:topic_logs
  • 改路由键:receive1 方法 改为"※.orange.※",receive2 方法改为 “※.※.rabbit”,“lazy.#”
    (其中, ※ 其实就是" * ",因为 Markdown 编辑器的语法问题打不出来。。。)
    package cn.tedu.rabbitmqspring.m5;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class Consumer {@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "topic_logs",declare = "false"),key = {"*.orange.*"}))public void receive1(String str){System.out.println("Consumer 1 收到: "+str);}@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "topic_logs",declare = "false"),key = {"*.*.rabbit","lazy.#"}))public void receive2(String str){System.out.println("Consumer 2 收到: "+str);}}

Main 启动类

  • 改交换机类型:TopicExchange
  • 改交换机名:topic_logs

运行测试:测试测试该绑定键对应的消费者接收到消息即可。

RPC 模式 - 源码测试(M6)

RPC 异步调用模式(Remote Procedure Call)远程过程调用。本质上是两套 Pro-Con 系统。

本地可看做客户端(Client),远程可看做服务器(Server)。

返回队列可以有很多,但调用队列只有一个。

业务执行顺序:

  • 第一步:本地生产者:发送调用信息。多携带了两个参数(携带返回队列信息,关联 id)。

  • 第二步:远程消费者:接收调用信息

  • 第三步:远程生产者:发送计算结果。

  • 第四步:本地消费者:接收远程模块计算结果。

Blocking Queue 测试类:

原生 rabbitmq-api 项目里创建 m6.TestBlockingQueue 类:

  • 创建静态成员 bq(BlockingQueue)数组,存放 String 字符串,大小为 10。
  • 创建 main 方法。

测试分别用两个线程来完成:

  • 从 bq 取数据,没有数据会阻塞等待。

    创建线程,调用 take 方法,获取数据。输出 s 并提示 "线程1正在获取数据: "

  • 向 bq 放入数据。

    输出提示 "线程2 – 输入数据放入集合: " ,获取输入内容,调用 add 方法放入 bq。

启动测试:正常有结果即为成功。

Client 类:

原生 rabbitmq-api 项目里创建 m6.Client 类:

  • main 方法,输出提示 "求第几个斐波那契数: ",获取输入结果(int)。

  • 定义 f(int)方法求结果,返回值为 long。

  • 输出结果。

  • 整个过程设定为 while-true 死循环方便多次测试。

f(int)方法内要完成以下步骤:

  • 准备阻塞队列集合 abq,泛型为 Long,集合大小为 10。

  • 创建连接

  • 创建调用队列:rpc-queue,(非持久、非独占、非自动删除、空)

  • 创建随机队列(replyTo),用来获取计算结果

  • 利用 UUID 产生一个关联 id (correlationId)

  • 发送调用信息,携带两个参数: 返回队列名,关联id:

    携带参数需要创建 Prop 对象,调用 AMQP 接口中内部类 BasicProperties 的子类 builder来创建。

    将 返回队列名(replyTo),关联 ID(cid)封装到对象中,最后用 build 方法结尾。

    发送信息参数为:交换机(默认为空),调用队列名,Prop配置,发送信息(int 对象先转 String)。

  • 执行其他运算,输出提示信息即可。

  • 需要结果时,从返回队列接收结果:

    和之前的类似,需要定义两个回调对象 DeliverCallback 和 CancelCallback(可用 Lambda 简写)。

    定义回调对象,调用 basicConsume 发送消息。

  • 重写 DeliverCallback 方法(消费者线程处理计算结果):

    逻辑:首先判断 message 中的关联 id,是不是刚才发送的关联 id。

    若为 true,把结果放入阻塞队列中 (ArrayBlockingQueue)。

  • 主线程中,从 BlockingQueue 获取数据,调用 take 返回数据即可。

  • 所有异常都抛出。

代码示例

package m6;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeoutException;public class Client {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {while (true) {System.out.println("你要求第几个斐波那契数: ");int n = new Scanner(System.in).nextInt();long r = fibonacci(n);System.out.println("第"+n+"个斐波那契数为: "+r);}}private static long fibonacci(int n) throws IOException, TimeoutException, InterruptedException {ArrayBlockingQueue<Long> abq = new ArrayBlockingQueue<>(10);//创建连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.64.140");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");Channel c = factory.newConnection().createChannel();//创建调用队列:rpc-queuec.queueDeclare("rpc-queue",false,false,false,null);//创建随机队列(replyTo),用来获取计算结果String replyTo = c.queueDeclare().getQueue();//利用 UUID 产生一个关联 id (correlationId)String cId = UUID.randomUUID().toString();//发送调用信息,携带两个参数: 返回队列名,关联id:AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder().replyTo(replyTo).correlationId(cId).build();c.basicPublish("","rpc-queue",prop,(n+"").getBytes());//执行其他运算,输出提示信息即可System.out.println("正在执行其他计算...");//需要结果时,从返回队列接收结果:DeliverCallback deliverCallback = (string,delivery)->{if (cId.equals(delivery.getProperties().getCorrelationId())){String s = new String(delivery.getBody());abq.add(Long.valueOf(s));}};CancelCallback cancelCallback = (message)->{};c.basicConsume(replyTo,true,deliverCallback,cancelCallback);//主线程中,从 BlockingQueue 获取数据,调用 take 返回数据即可。return abq.take();}
}

启动测试:服务端(Server)尚未完成,启动连通 rabbitmq 即可。、

Server 类:

定义求斐波那契数的方法 fibonacci(int):

  • 定义返回值为 long 的公开静态方法,首先判断 num 数值是否为 1 或 2,若为是直接返回 1。
  • 除此之外的情况下,定义前两个数分别为 a,b,值为 1。
  • 利用 for 循环,依次叠加计算第 n 个斐波那契数的值,最后返回。

定义主方法 main:

  • 创建连接,与 Client 一致,复制即可。

  • 创建调用队列,共用一个队列所以复制即可。

  • 从 rpc-queue 接收调用信息,把结果发回到返回队列,携带关联id参数:

    先创建两个回调对象,然后接收消息(basicConsume)。

  • 重写 DeliverCallback ,把结果发回到返回队列。

    先从 message 中获取调用信息的 n,replyTo 和 cid。

    输出提示 “求第”+n+“个斐波那契数…”,调用 fibonacci 方法求出 r( long 类型)。

    创建 AMQP 包下的 BasicProperties 对象,将关联Id 封装进去。

    发送方法(basicPublish)将结果发回:

    参数:( 交换机(默认空),队列名,配置对象,具体信息内容)

package m6;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Server {public static void main(String[] args) throws IOException, TimeoutException {// 创建连接ConnectionFactory f = new ConnectionFactory();f.setHost("192.168.64.140"); // wht6.cnf.setPort(5672);f.setUsername("admin");f.setPassword("admin");Channel c = f.newConnection().createChannel();// 创建调用队列: rpc-queuec.queueDeclare("rpc-queue",false,false,false,null);// 从 rpc-queue 接收调用信息DeliverCallback deliverCallback = (consumerTag, message) -> {// 求出斐波那契数// 把结果发回到返回队列,携带关联id参数// 从 message 取出: n,返回队列名,关联idInteger n = Integer.valueOf(new String(message.getBody()));String replyTo = message.getProperties().getReplyTo();String cid = message.getProperties().getCorrelationId();System.out.println("求第"+n+"个斐波那契数...");long r = fibonacci(n);AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder().correlationId(cid).build();c.basicPublish("",replyTo,prop,(r+"").getBytes());};CancelCallback cancelCallback = consumerTag -> {};c.basicConsume("rpc-queue",true,deliverCallback,cancelCallback);}public static long fibonacci(int n){if (n==1 || n==2){return 1;}/*a[0] = 1a[1] = 1*/long a = 1;long b = 1;for (int i = 3; i <= n; i++) {b = a + b;a = b - a;}return b; // 从第20个斐波那契数开始,就会溢出,超出长整型整数运算可以用 BigInteger}
}

启动测试:正确显示结果即可( 注意:第 104 个斐波那契数开始,数值会溢出。)

RPC 模式 - Spring测试(M6)

因为Spring为我们封装了很多代码,所以相比上面的测试,这种方式要简单的多。

准备工作:

  • 先分别创建 Client 类、Server 类、Main 启动类。

  • 在 Main 启动类完成 Spring 启动配置。

  • 然后添加方法(rndQueue)用于创建发送队列:

    该方法返回 Queue 对象(amqp.core包的)

    参数:UUID 随机生成队列名、非持久、独占、自动删除。

    将方法交给 Spring 容器管理。

Client 类:

创建发送方法:

  • 添加 @Component 注解将 Client 类交给容器管理。

  • Spring 内置了封装对象,直接注入 AmqpTemplate 使用即可。

  • 定义发送消息方法(send),参数为 int :

    调用 convertAndSend 方法发送消息,队列名为 “rpc-queue”。

  • 此处需要在后面添加消息的预处理器:

    首先获取 message 里面的配置信息,然后将 replyTo 和 CId 封装进去,这里 cid 用 UUID生成。

    最后再把 message 返回。

    封装 replyTo 时,我们可以用 SPEL 来直接访问 spring 容器中的对象。直接引入 replyTo。

  • 定义接收消息方法(receive):

    因为 spring 封装了以后会自动接收来自 Server 的信息,所以方法体直接提示输出即可。

    提示输出:cid+" ------ 计算结果为: "+r ,方法参数为,传回的 r(long),cid(String)。

    其中:cid 需添加注解 @Header(name = AmqpHeaders.CORRELATION_ID)

    方法上要添加 @RabbitListener(queues = “#{rndQueue.name}”)

package cn.tedu.rabbitmqspring.m6;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.UUID;@Component
public class Client {@Autowiredprivate AmqpTemplate t;@Value("#{rndQueue.name}")private String replyTo;public void send(int n) {  //后面添加消息的预处理器t.convertAndSend("rpc-queue", n,(message)-> {MessageProperties p = message.getMessageProperties();p.setReplyTo(replyTo);p.setCorrelationId(UUID.randomUUID().toString());return message;});}@RabbitListener(queues = "#{rndQueue.name}")//注解当中是没法写成员变量的public void receive(long r, @Header(name = AmqpHeaders.CORRELATION_ID) String cid){System.out.println(cid+" ------ 计算结果为: "+r);}
}

Server 类:

在 Spring 环境下,如果处理消息的方法有返回值,

那么 spring 会把返回值,通过返回队列发回到客户端,并携带关联id。

  • 添加 @Component 注解将 Client 类交给容器管理。
  • 定义 receive 方法,返回值为 long ,将计算后的结果返回即可。
  • 方法上添加注解 @RabbitListener 限定发送队列为 “rpc-queue”
  • 在下方添加静态方法 fibonacci ,直接复制之前的测试方法即可。
package cn.tedu.rabbitmqspring.m6;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Server {/*如果处理消息的方法不是 void,有返回值,那么 spring 会把返回值,通过返回队列发回到客户端,并携带关联id*/@RabbitListener(queues = "rpc-queue")public long receive(int n){long r = fibonacci(n);return r;}public static long fibonacci(int n){if (n==1 || n==2){return 1;}/*a[0] = 1a[1] = 1*/long a = 1;long b = 1;for (int i = 3; i <= n; i++) {b = a + b;a = b - a;}return b;}
}

Main 启动类:

  • 由于 Server 类已经由 Spring 自动返回了方法结果,所以只注入 Client 对象即可。

  • 定义 test 方法调用 send,并添加 @PostConstruct注解:

    为了防止预加载导致主线程卡死,我们选择创建一个新线程来完成调用。

    新建线程后可以用 lambda 表达式简写内部方法。和前面测试类似,方法外部套用 while-true 死循环。

    输出提示:"求第几个斐波那契数: ",获取输入值后,通过 client 对象发送给 server。

package cn.tedu.rabbitmqspring.m6;import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;import javax.annotation.PostConstruct;
import java.util.Scanner;
import java.util.UUID;@SpringBootApplication
public class Main {public static void main(String[] args) {SpringApplication.run(Main.class,args);}@Bean //amqp.core包的queuepublic Queue rndQueue(){return new Queue(UUID.randomUUID().toString(),false,true,true);}@Autowiredprivate Client client;@PostConstruct//lambda快捷键调整public void test(){new Thread(() -> {while (true) {System.out.println("求第几个斐波那契数: ");int n = new Scanner(System.in).nextInt();client.send(n);}}).start();}
}

启动测试:测试成功即可。

三、其他

Config Bus + RabbitMQ 消息总线配置及 Spring Cloud 应用相关请查看我的另一篇博客。
Spring Cloud 微服务项目操作实战流程(完整版)

有错误请及时评论私信指正,各位一起加油!

RabbitMQ 相关整合实战项目(完结)相关推荐

  1. 测速源码_物联网之智能平衡车开发实战项目(附源码)

    自从上次分享了"适合练手的10个前端实战项目(附源码)"之后,很多小伙伴就私信问有没有物联网相关的实战项目教程,那么今天就给大家分享一个物联网工作初期经常接触的项目:智能平衡车开发 ...

  2. 基于python的房地产数据分析_基于Python的数据分析实战项目

    本文中项目资料来源于网易云课堂,代码为纯手工码字滴,请放心食用,不定期更新,欢迎对Python.数据分析以及编程感兴趣的同学留言沟通. 详细介绍了数十个数据分析相关的实战项目,大量使用pandas.n ...

  3. 我把Github上最牛b的Java教程和实战项目整合成了一个PDF文档

    写在前面 大家都知道 Github 是一个程序员福地,这里有各种厉害的开源框架.软件或者教程.这些东西对于我们学习和进步有着莫大的进步,所以我有了这个将 Github 上非常棒的 Java 开源项目整 ...

  4. (RabbitMQ 二)Springboot项目中使用RabbitMQ的相关依赖

    (RabbitMQ 二)Springboot项目中使用RabbitMQ的相关依赖 RabbitMQ系列文章如下: (RabbitMQ 一[转载])windows10环境下的RabbitMQ安装步骤 h ...

  5. 重磅回归-SSM整合进阶项目实战之个人博客系统

    历经一个多月的重新设计,需求分析以及前后端开发,终于有了一定的输出:我自己实现的spring4+springmvc+mybatis3整合的进阶项目实战-个人博客系统 已然完成了,系统采用mvc三层模式 ...

  6. docker 安装nacos_19.SpringCloud实战项目-SpringCloud整合Alibaba-Nacos配置中心

    SpringCloud实战项目全套学习教程连载中 PassJava 学习教程 简介 PassJava-Learning项目是PassJava(佳必过)项目的学习教程.对架构.业务.技术要点进行讲解. ...

  7. Vue整合SpringBoot项目实战之Vue+Element-Ui搭建前端项目

    Vue整合SpringBoot项目实战之Vue+Element-Ui搭建前端项目 源码(欢迎star): 前端项目代码 后端项目代码 系列文章: Vue整合SpringBoot项目实战之后端业务处理 ...

  8. RabbitMQ精讲7:与SpringBoot、Spring Cloud Stream整合实战

    目录 与SpringBoot整合实战 1. 生产端 2. 消费端 消费端核心配置: @RabbitListener注解使用

  9. Mall电商实战项目专属学习路线,主流技术一网打尽!

    之前经常有朋友问我,mall项目该如何学习,按照什么顺序学习?我一般都会把<mall学习教程>的目录发给他,希望他按照教程顺序学习.我一直认为基于项目实战的学习,可以更好的掌握技术.mal ...

最新文章

  1. 当我们在谈大前端的时候,我们谈的是什么
  2. 在centos7上编译安装nginx
  3. [BZOJ 2038][2009国家集训队]小Z的袜子(hose)(莫队)
  4. python编写函数、计算三个数的最大公约数_python 函数求两个数的最大公约数和最小公倍数...
  5. 00-基于Vue的博客项目展示
  6. 孙宏斌谈贾跃亭哽咽:“我一定把乐视做成一个好公司”
  7. 台式计算机防盗锁怎么安装,怎样拆装防盗门锁?防盗门锁怎么进行正确安装?...
  8. ip sensor芯片级解决方案
  9. uC/OS-II源码分析(总体思路一)(
  10. python_程序的构成---python工作笔记015
  11. 端口占用问题解决办法(以1099端口为例)
  12. 北理工的石锅饭依然美味
  13. linux系统上不去网,linux 上不去网
  14. lnmp 一键安装详解
  15. 机器人电焊电流电压怎么调_机器人二保焊自动焊机是怎么调节的?
  16. html5qq空间代码作业,免费QQ空间背景代码大全(高手整理)
  17. 一个方便的大文件分割web工具
  18. 大胖子走迷宫(bfs)
  19. 服务器inetpub是什么文件夹,inetpub是什么文件夹?Win10怎么删除c盘下的inetpub文件夹?...
  20. 监督学习之分类学习:支持向量机

热门文章

  1. Minecraft 开服:从入门到精通
  2. Javascrpit特效之打字机效果
  3. 湖南质监局:南山奶粉可正常生产销售-南山奶粉-许可证
  4. 求一元二次方程求根公式与韦达定理.
  5. Modbus转Profibus网关连接安科瑞ARD3T电机保护器接到300PLC配置案例
  6. 【Cocos开发者大会】触控CEO陈昊芝:收入最高的80%游戏均由Cocos开发
  7. Linux下如何接ADSL一类的宽带猫带动局域网上internet
  8. 单目相机(Mono camera)在MATLAB中的表示与实例
  9. PADS VX2.8 焊盘方式出线的选择方法
  10. 串口通讯调试悲催经历,经验分享