php写入rabbit速度,RabbitMQ 入门教程(PHP) 实现延迟功能
php 使用rabbitmq-delayed-message-exchange插件实现延迟功能
1.安装
3.6.x下载地址
3.7.x下载地址
下载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/local/rabbitmq/plugins中( windows安装目录\rabbitmq_server-version\plugins )。
2.启用插件
使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchang
复制代码
输出如下:
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
复制代码
通过rabbitmq-plugins list查看已安装列表,如下:
[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x
复制代码
3.机制解释
安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。
4.php实现过程
消费者 delay_consumer2.php:
//header('Content-Type:text/html;charset=utf8;');
$params = array(
'exchangeName' => 'delayed_exchange_test',
'queueName' => 'delayed_queue_test',
'routeKey' => 'delayed_route_test',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp'));
//exit();
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die('Conexiune esuata');
//TODO 记录日志
echo 'rabbit-mq 连接错误:', json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die('Connection through channel failed');
//TODO 记录日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
//$exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message类型
/*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。
fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。
direct:把消息投递到那些binding key与routing key完全匹配的队列中。
topic:将消息路由到binding key与routing key模式匹配的队列中。*/
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange();
//$channel->startTransaction();
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName']);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
//绑定
$queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {
echo $e->getMessage();
exit();
}
function callback(AMQPEnvelope $message) {
global $queue;
if ($message) {
$body = $message->getBody();
echo '接收时间:'.date("Y-m-d H:i:s", time()). PHP_EOL;
echo '接收内容:'.$body . PHP_EOL;
//为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息
$queue->ack($message->getDeliveryTag());
} else {
echo 'no message' . PHP_EOL;
}
}
//$queue->consume('callback'); 第一种消费方式,但是会阻塞,程序一直会卡在此处
//第二种消费方式,非阻塞
/*$start = time();
while(true)
{
$message = $queue->get();
if(!empty($message))
{
echo $message->getBody();
$queue->ack($message->getDeliveryTag()); //应答,代表该消息已经消费
$end = time();
echo '
' . ($end - $start);
exit();
}
else
{
//echo 'message not found' . PHP_EOL;
}
}*/
//注意:这里需要注意的是这个方法:$queue->consume,queue对象有两个方法可用于取消息:consume和get。前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,取消息时有则取,无则返回false。
//就是说用了consume之后,会同步阻塞,该程序常驻内存,不能用nginx,apache调用。
$action = '2';
if($action == '1'){
$queue->consume('callback'); //第一种消费方式,但是会阻塞,程序一直会卡在此处
}else{
//第二种消费方式,非阻塞
$start = time();
while(true)
{
$message = $queue->get();
if(!empty($message))
{
echo '接收时间:'.date("Y-m-d H:i:s", time()). PHP_EOL;
echo '接收内容:'.$message->getBody().PHP_EOL;
$queue->ack($message->getDeliveryTag()); //应答,代表该消息已经消费
$end = time();
echo '运行时间:'.($end - $start).'秒'.PHP_EOL;
//exit();
}
else
{
//echo 'message not found' . PHP_EOL;
}
}
}
复制代码
生产者delay_publisher2.php:
//header('Content-Type:text/html;charset=utf-8;');
$params = array(
'exchangeName' => 'delayed_exchange_test',
'queueName' => 'delayed_queue_test',
'routeKey' => 'delayed_route_test',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp')); 判断是否加载amqp扩展
//exit();
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die('Conexiune esuata');
//TODO 记录日志
echo 'rabbit-mq 连接错误:', json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die('Connection through channel failed');
//TODO 记录日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message类型
/*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。
fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。
direct:把消息投递到那些binding key与routing key完全匹配的队列中。
topic:将消息路由到binding key与routing key模式匹配的队列中。*/
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange();
//$channel->startTransaction();
//RabbitMQ不容许声明2个相同名称、配置不同的Queue,否则报错
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName']);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
//绑定队列和交换机
$queue->bind($params['exchangeName'], $params['routeKey']);
//$channel->commitTransaction();
} catch(Exception $e) {
}
for($i=5;$i>0;$i--){
//生成消息
echo '发送时间:'.date("Y-m-d H:i:s", time()).PHP_EOL;
echo 'i='.$i.',延迟'.$i.'秒'.PHP_EOL;
$message = json_encode(['order_id'=>time(),'i'=>$i]);
$exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]);
sleep(2);
}
$conn->disconnect();
复制代码
对于代码来讲,首先对于消费者核心代码
$exchange->setType('x-delayed-message'); //x-delayed-message类型
$exchange->setArgument('x-delayed-type','direct');
复制代码
生产者核心代码
$exchange = new AMQPExchange($channel);
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message类型
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange();
复制代码
**使用方法:**先运行delay_consumer1.php,再运行delay_publisher1.php
运行效果:
php写入rabbit速度,RabbitMQ 入门教程(PHP) 实现延迟功能相关推荐
- RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等)
RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等) 1. RabbitMQ简介及AMQP协议 开源的消息代理和队列 ...
- 干货!消息队列RabbitMQ入门教程
写在前面:全文12000多字,从为什么需要用消息队列,到rabbitMQ安装使用,如何使用JavaAPI生产消费消息,以及使用消息队列带来的一些常见问题.绝对很适合新手入门学习. 为什么需要消息队列 ...
- RabbitMQ 入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
发布/订阅 在上篇第二部分教程中,我们搭建了一个工作队列.每个任务之分发给一个工作者(worker).在本篇教程中,我们要做的之前完全不一样--分发一个消息给多个消费者(consumers).这种模式 ...
- RabbitMQ入门教程
摘要: 使用RabbitMQ的消息队列,可以有效提高系统的峰值处理能力. RabbitMQ简介 RabbitMQ是消息代理(Message Broker),它支持多种异步消息处理方式,最常见的有: W ...
- RabbitMQ入门教程 1
一.MQ的基本概念 1.MQ的概述 2.MQ的优势 应用解耦 异步提速 削峰填谷 3.MQ的劣势 系统可用性降低 系统复杂度提高 一致性问题 4.MQ的使用条件 5.常见的MQ的产品 二.什么是Rab ...
- RabbitMQ入门教程——发布/订阅
什么是发布订阅 发布订阅是一种设计模式定义了一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象.这个主题对象在自身状态变化时,会通知所有的订阅者对象,使他们能够自动更新自己的状态. 为了描述这种 ...
- RabbitMQ入门教程——.NET客户端使用
众所周知RabbitMQ使用的是AMQP协议.我们知道AMQP是一种网络协议,能够支持符合要求的客户端应用和消息中间件代理之间进行通信. 其中消息代理扮演的角色就是从生产者那儿接受消息,并根据既定的路 ...
- RabbitMQ入门教程(十一):消息属性Properties
分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 发送消息可以为消息指定一些参数 Delivery mode: 是否持久化,1 - Non-persistent,2 ...
- RabbitMQ入门教程(四):工作队列(Work Queues)
分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 工作队列 使用工作队列实现任务分发的功能,一个队列的优点就是很容易处理并行化的工作能力,但是如果我们积累了大量的工作,我们 ...
最新文章
- “cannot resolve symbol R” in Android Studio
- CASE_01 基于FPGA的交通灯控制器
- 英文版Ubuntu 16.04系统如何解决gedit中文显示乱码的问题
- 我的内容管理系统(CMS)寻找历程 -- Mambo出鞘,谁与争锋?
- HttpModuel
- gflags.lib(gflags.obj) : error LNK2001: 无法解析的外部符号 __imp_PathMatchSpecA
- 使用 matlab 数字图像处理(五)—— 双线性插值(Bilinear Interpolation)
- 【前端基础】DOM对象
- Cadence安装教程(亲测记录)
- STM8S103硬件I2C的操作注意事项
- 如何选择关键词以及关键词分析优化
- HTML5 canvas 之 clip
- html5 css背景图片满,css background-size与背景图片填满div
- 《数据结构与算法之二叉搜索树(Java实现)》
- 银行案例启示:莫把客户投诉当小事
- js-只能输入数字(正则)
- 云计算服务在零售行业的革命性作用
- 【可见光室内定位】(二)基于光电器件PD的可见光室内定位技术
- 2019年通信工程师传输与接入(有线)考试有哪些题型?
- 最全的android图片加密
热门文章
- python语言在大数据分析处理领域应用广泛_在大数据分析/挖掘领域,哪些编程语言应用最多...
- python清除数据库表命令_Python PostgreSQL-删除表
- js数组查找最接近_如何从javascript中的对象数组中获取最接近的先前id
- python改变turtle画笔方向的函数_哪个选项不能改变turtle画笔的运行方向?
- Android 内存泄漏分析指北
- 方格路径问题!【转】
- 分享一些自己的学习过程和学习方法
- 【WCF】服务并发中的“可重入模式”
- 5月21 回话控制SESSION COOKIE
- Htmlt_Div+Css简介