一对多模式,用图表示如下

一个生产者向消息队列中发送消息,多个消费者同时从消息队列中读取消息,在这个模式下,我们优先考虑的,是解决各个消费者如何读取消息的机制。

下面我们以ThinkPHP的代码来展示一下处理过程。

创建一个控制器MQSimple2,其对应的类文件MQSimple2.php中的代码如下

/**

* Created by PhpStorm.

* User: zhaoqinsong

* Date: 2018/12/10

* Time: 17:02 PM

*/

namespace app\msq\controller;

use think\App;

use think\Controller;

use PhpAmqpLib\Connection\AMQPStreamConnection;

use PhpAmqpLib\Message\AMQPMessage;

class MQSimple2 extends Controller

{

var $msq_connect;

var $msq_channel;

public function __construct(App $app)

{

parent::__construct($app);

$this->msq_connect = new AMQPStreamConnection("localhost", 5672, "phpmsq", "123456");

$this->msq_channel = $this->msq_connect->channel();

$this->msq_channel->queue_declare("simple", false, true, false, false);    //  生成一个持久化消息队列

}

public function index()

{

echo "rabbitmq of test";

}

public function mq_send() {

//  向消息队列simple发送100个消息

for($i=0; $i<20; $i++)

{

$msg = new AMQPMessage("Hello World No.$i");

$this->msq_channel->basic_publish($msg, "", "simple");

}

}

public function mq_recv1() {

//  处理消息队列simple中的消息(时间较长)

$callback = function($msg) {

$body = $msg->body;

echo "recv:$body\n";

sleep(5);

};

$this->msq_channel->basic_consume("simple", "", false, true, false, false, $callback);

while ($this->msq_channel->callbacks) {

$this->msq_channel->wait();

}

}

public function mq_recv2() {

//  处理消息队列simple中的消息(时间较短)

$callback = function($msg) {

$body = $msg->body;

echo "recv:$body\n";

sleep(2);

};

$this->msq_channel->basic_consume("simple", "", false, true, false, false, $callback);

while ($this->msq_channel->callbacks) {

$this->msq_channel->wait();

}

}

public function __destruct()

{

$this->msq_channel->close();

$this->msq_connect->close();

}

}

指定路由,设定/simple2/send指向此类的mq_send方法,/simple2/recv1指向mq_recv1方法,/simple2/recv2(处理时间5秒))指向mq_recv2方法(处理时间2秒)。

在命令行下,先创建两个服务器连接,分别执行消费者1和消费者2命令(需要先切换到ThinkPHP框架的根目录下)

# php public/index.php /simple2/recv1

# php public/index.php /simple2/recv2

然后再创建一个服务器连接,执行生产者命令,产生20个待处理消息(也需要切换么ThinkPHP框架的根目录下)

# php public/index.php /simple2/send

这时候,通过查看消费者1和消费者2连接,可以看到两者在不断地处理消息

消费者1

recv:Hello World No.0

recv:Hello World No.2

recv:Hello World No.4

recv:Hello World No.6

recv:Hello World No.8

recv:Hello World No.10

recv:Hello World No.12

recv:Hello World No.14

recv:Hello World No.16

recv:Hello World No.18

消费者2recv:Hello World No.1

recv:Hello World No.3

recv:Hello World No.5

recv:Hello World No.7

recv:Hello World No.9

recv:Hello World No.11

recv:Hello World No.13

recv:Hello World No.15

recv:Hello World No.17

recv:Hello World No.19

我们可以看到,虽然消费者1的处理时间远远大于消费者2,但它还是收到了10条消息并进行处理,也就是说,消息队列将收到的消息,平均分配给了两个消费者,而且是异步进行分配的,它没有考虑各个消费者端实际的处理性能,只是按个数进行简单地平均分配(将收到的消息分配给下一个)。

实际应用中,我们必须考虑各个消费者端的处理速度,因此,在这基础上,我们进行一下优化,采取basicQos以及手动应答消息完成,来实现能者多劳,按性能分配消息的机制。

修改mq_recv1和mq_recv2,修改后的相应函数代码如下

public function mq_recv1() {

//  处理消息队列simple中的消息(时间较长)

$callback = function($msg) {

$body = $msg->body;

echo "recv:$body\n";

sleep(5);

$channel = $msg->delivery_info['channel'];

$channel->basic_ack($msg->delivery_info['delivery_tag']);

};

$this->msq_channel->basic_qos(null, 1, null);

$this->msq_channel->basic_consume("simple", "", false, false, false, false, $callback);

while ($this->msq_channel->callbacks) {

$this->msq_channel->wait();

}

}

public function mq_recv2() {

//  处理消息队列simple中的消息(时间较短)

$callback = function($msg) {

$body = $msg->body;

echo "recv:$body\n";

sleep(2);

$channel = $msg->delivery_info['channel'];

$channel->basic_ack($msg->delivery_info['delivery_tag']);

};

$this->msq_channel->basic_qos(null, 1, null);

$this->msq_channel->basic_consume("simple", "", false, false, false, false, $callback);

while ($this->msq_channel->callbacks) {

$this->msq_channel->wait();

}

}

修改后的代码,限制每次只接受1条消息,在消息处理完成后,发送消息处理完毕的应答给消息队列,消息列表收到应答后再次发送信息给该连接,因此消息被有效地分给了各个消费者端,效率也大大增加。

php消费rabbitmq消息QoS,RabbitMQ消息队列-一对多模式相关推荐

  1. 消息队列8:RabbitMq的QOS实验

    环境: win10 rabbitmq-3.8.8 .net core 3.1 RabbitMQ.Client 6.2.1 vs2019 安装RabbitMq环境参照: window下安装rabbitm ...

  2. php消费rabbitmq消息QoS,简介Rabbitmq的几种消费模式

    前言 在日常开发中,消息队列能帮我们解决系统的异步问题,流量的控制和服务解耦,不同的消息队列有不同的消费模型 思考 redis也可以实现消息队列(list和stream),也称为轻量级消息队列,lis ...

  3. java rabbitmq topic_java rabbitmq 发送消息是topic模式, 消费者 怎么消费多个不同名字的队列?...

    这里有几个不同的队列 名字没有什么规则 就是xxx.xxx exchange也是和队列的名字一样的 package com.monitor.receiver.queue; import java.ut ...

  4. RabbitMQ 从入门到精通 消息应答 持久化 交换机 队列 发布确认 集群 等

    RabbitMQ消息队列 RabbitMQ 的概念 RabbitMQ 是一个消息中间件:它接受并转发消息.你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快 ...

  5. RabbitMQ 中 7 种消息队列

    点击关注公众号,Java干货及时送达 七种模式介绍与应用场景 简单模式(Hello World) 做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B ...

  6. RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)

    说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...

  7. rabbitmq消费固定个数消息_SpringBoot+RabbitMQ (保证消息100%投递成功并被消费)

    作者:wangzaiplus https://www.jianshu.com/p/dca01aad6bc8 一.先扔一张图 说明:本文涵盖了关于RabbitMQ很多方面的知识点, 如: 消息发送确认机 ...

  8. rabbitmq java实例_RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)

    转载http://blog.csdn.net/u013142781 一.消息队列使用场景或者其好处 消息队列一般是在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式 ...

  9. RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列

    搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...

最新文章

  1. 函数 tostring_Kotlin实战之Fuel的高阶函数
  2. 大数据风控之贷前调查必知的十大客户信息
  3. MFC提示 未在此计算机上注册ActiveX控件“{648A5600-2C6E-101B-82B6-000000000014}“完美解决
  4. git linux 登陆_Git安装及基础命令
  5. Business Intelligence——SSIS项目从创建到部署的简单总结(二)
  6. JAVA JDBC连接mysql编程
  7. pic pwm 占空比可调 源码_PIC16F914输出可调占空比PWM波形程序
  8. 计算机应用杂志投稿,计算机类杂志 (可网上投稿)
  9. XSS-Game Level 4
  10. oledb驱动Oracle,Oracle学习笔记:手工注册oracle的oledb驱动 | 学步园
  11. Commons-VFS 使用SFTP
  12. qnap威联通作文件服务器,QNAP 威联通 453BT3 网络存储服务器 使用手记,Nas中的小钢炮...
  13. WORD之文字处理之插入复合条饼图
  14. 华为云产品介绍—大数据
  15. 【暂时性死区(TDZ)】
  16. 如何修改Maven仓库地址为阿里云仓库
  17. 5G手机芯片如何选择?
  18. web前端开发(一)
  19. Flink结合Iceberg的一种实现方式笔记
  20. Apache-POI 设置excel单元格样式字体等

热门文章

  1. 永磁同步电机矢量控制(一)—— 数学模型
  2. Java输入输出(IO)和流的基本概念以及几种方法
  3. 将临时表的数据更新到正式表
  4. Java开发工程师面试-基础
  5. 利用霍尔效应传感器和Arduino研究了一个简单的钟摆
  6. python英雄联盟登录程序,注册用户程序
  7. 电子病历国内好大企IPO
  8. 最小二乘法求解直线方程系数
  9. QD70P4步进电机控制
  10. 基因共表达网络分析java,好用的基因共表达网络分析工具