php消费rabbitmq消息QoS,RabbitMQ消息队列-一对多模式
一对多模式,用图表示如下
一个生产者向消息队列中发送消息,多个消费者同时从消息队列中读取消息,在这个模式下,我们优先考虑的,是解决各个消费者如何读取消息的机制。
下面我们以ThinkPHP的代码来展示一下处理过程。
创建一个控制器MQSimple2,其对应的类文件MQSimple2.php中的代码如下
/**
* Created by PhpStorm.
* User: zhaoqinsong
* Date: 2018/12/10
* Time: 17:02 PM
*/
namespace app\msq\controller;
use think\App;
use think\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class MQSimple2 extends Controller
{
var $msq_connect;
var $msq_channel;
public function __construct(App $app)
{
parent::__construct($app);
$this->msq_connect = new AMQPStreamConnection("localhost", 5672, "phpmsq", "123456");
$this->msq_channel = $this->msq_connect->channel();
$this->msq_channel->queue_declare("simple", false, true, false, false); // 生成一个持久化消息队列
}
public function index()
{
echo "rabbitmq of test";
}
public function mq_send() {
// 向消息队列simple发送100个消息
for($i=0; $i<20; $i++)
{
$msg = new AMQPMessage("Hello World No.$i");
$this->msq_channel->basic_publish($msg, "", "simple");
}
}
public function mq_recv1() {
// 处理消息队列simple中的消息(时间较长)
$callback = function($msg) {
$body = $msg->body;
echo "recv:$body\n";
sleep(5);
};
$this->msq_channel->basic_consume("simple", "", false, true, false, false, $callback);
while ($this->msq_channel->callbacks) {
$this->msq_channel->wait();
}
}
public function mq_recv2() {
// 处理消息队列simple中的消息(时间较短)
$callback = function($msg) {
$body = $msg->body;
echo "recv:$body\n";
sleep(2);
};
$this->msq_channel->basic_consume("simple", "", false, true, false, false, $callback);
while ($this->msq_channel->callbacks) {
$this->msq_channel->wait();
}
}
public function __destruct()
{
$this->msq_channel->close();
$this->msq_connect->close();
}
}
指定路由,设定/simple2/send指向此类的mq_send方法,/simple2/recv1指向mq_recv1方法,/simple2/recv2(处理时间5秒))指向mq_recv2方法(处理时间2秒)。
在命令行下,先创建两个服务器连接,分别执行消费者1和消费者2命令(需要先切换到ThinkPHP框架的根目录下)
# php public/index.php /simple2/recv1
# php public/index.php /simple2/recv2
然后再创建一个服务器连接,执行生产者命令,产生20个待处理消息(也需要切换么ThinkPHP框架的根目录下)
# php public/index.php /simple2/send
这时候,通过查看消费者1和消费者2连接,可以看到两者在不断地处理消息
消费者1
recv:Hello World No.0
recv:Hello World No.2
recv:Hello World No.4
recv:Hello World No.6
recv:Hello World No.8
recv:Hello World No.10
recv:Hello World No.12
recv:Hello World No.14
recv:Hello World No.16
recv:Hello World No.18
消费者2recv:Hello World No.1
recv:Hello World No.3
recv:Hello World No.5
recv:Hello World No.7
recv:Hello World No.9
recv:Hello World No.11
recv:Hello World No.13
recv:Hello World No.15
recv:Hello World No.17
recv:Hello World No.19
我们可以看到,虽然消费者1的处理时间远远大于消费者2,但它还是收到了10条消息并进行处理,也就是说,消息队列将收到的消息,平均分配给了两个消费者,而且是异步进行分配的,它没有考虑各个消费者端实际的处理性能,只是按个数进行简单地平均分配(将收到的消息分配给下一个)。
实际应用中,我们必须考虑各个消费者端的处理速度,因此,在这基础上,我们进行一下优化,采取basicQos以及手动应答消息完成,来实现能者多劳,按性能分配消息的机制。
修改mq_recv1和mq_recv2,修改后的相应函数代码如下
public function mq_recv1() {
// 处理消息队列simple中的消息(时间较长)
$callback = function($msg) {
$body = $msg->body;
echo "recv:$body\n";
sleep(5);
$channel = $msg->delivery_info['channel'];
$channel->basic_ack($msg->delivery_info['delivery_tag']);
};
$this->msq_channel->basic_qos(null, 1, null);
$this->msq_channel->basic_consume("simple", "", false, false, false, false, $callback);
while ($this->msq_channel->callbacks) {
$this->msq_channel->wait();
}
}
public function mq_recv2() {
// 处理消息队列simple中的消息(时间较短)
$callback = function($msg) {
$body = $msg->body;
echo "recv:$body\n";
sleep(2);
$channel = $msg->delivery_info['channel'];
$channel->basic_ack($msg->delivery_info['delivery_tag']);
};
$this->msq_channel->basic_qos(null, 1, null);
$this->msq_channel->basic_consume("simple", "", false, false, false, false, $callback);
while ($this->msq_channel->callbacks) {
$this->msq_channel->wait();
}
}
修改后的代码,限制每次只接受1条消息,在消息处理完成后,发送消息处理完毕的应答给消息队列,消息列表收到应答后再次发送信息给该连接,因此消息被有效地分给了各个消费者端,效率也大大增加。
php消费rabbitmq消息QoS,RabbitMQ消息队列-一对多模式相关推荐
- 消息队列8:RabbitMq的QOS实验
环境: win10 rabbitmq-3.8.8 .net core 3.1 RabbitMQ.Client 6.2.1 vs2019 安装RabbitMq环境参照: window下安装rabbitm ...
- php消费rabbitmq消息QoS,简介Rabbitmq的几种消费模式
前言 在日常开发中,消息队列能帮我们解决系统的异步问题,流量的控制和服务解耦,不同的消息队列有不同的消费模型 思考 redis也可以实现消息队列(list和stream),也称为轻量级消息队列,lis ...
- java rabbitmq topic_java rabbitmq 发送消息是topic模式, 消费者 怎么消费多个不同名字的队列?...
这里有几个不同的队列 名字没有什么规则 就是xxx.xxx exchange也是和队列的名字一样的 package com.monitor.receiver.queue; import java.ut ...
- RabbitMQ 从入门到精通 消息应答 持久化 交换机 队列 发布确认 集群 等
RabbitMQ消息队列 RabbitMQ 的概念 RabbitMQ 是一个消息中间件:它接受并转发消息.你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快 ...
- RabbitMQ 中 7 种消息队列
点击关注公众号,Java干货及时送达 七种模式介绍与应用场景 简单模式(Hello World) 做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B ...
- RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)
说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...
- rabbitmq消费固定个数消息_SpringBoot+RabbitMQ (保证消息100%投递成功并被消费)
作者:wangzaiplus https://www.jianshu.com/p/dca01aad6bc8 一.先扔一张图 说明:本文涵盖了关于RabbitMQ很多方面的知识点, 如: 消息发送确认机 ...
- rabbitmq java实例_RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)
转载http://blog.csdn.net/u013142781 一.消息队列使用场景或者其好处 消息队列一般是在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式 ...
- RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列
搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...
最新文章
- 函数 tostring_Kotlin实战之Fuel的高阶函数
- 大数据风控之贷前调查必知的十大客户信息
- MFC提示 未在此计算机上注册ActiveX控件“{648A5600-2C6E-101B-82B6-000000000014}“完美解决
- git linux 登陆_Git安装及基础命令
- Business Intelligence——SSIS项目从创建到部署的简单总结(二)
- JAVA JDBC连接mysql编程
- pic pwm 占空比可调 源码_PIC16F914输出可调占空比PWM波形程序
- 计算机应用杂志投稿,计算机类杂志 (可网上投稿)
- XSS-Game Level 4
- oledb驱动Oracle,Oracle学习笔记:手工注册oracle的oledb驱动 | 学步园
- Commons-VFS 使用SFTP
- qnap威联通作文件服务器,QNAP 威联通 453BT3 网络存储服务器 使用手记,Nas中的小钢炮...
- WORD之文字处理之插入复合条饼图
- 华为云产品介绍—大数据
- 【暂时性死区(TDZ)】
- 如何修改Maven仓库地址为阿里云仓库
- 5G手机芯片如何选择?
- web前端开发(一)
- Flink结合Iceberg的一种实现方式笔记
- Apache-POI 设置excel单元格样式字体等