php 使用rabbitmq-delayed-message-exchange插件实现延迟功能

1.安装

3.6.x下载地址

3.7.x下载地址

下载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/local/rabbitmq/plugins中( windows安装目录\rabbitmq_server-version\plugins )。

2.启用插件

使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchang

复制代码

输出如下:

The following plugins have been enabled:

rabbitmq_delayed_message_exchange

复制代码

通过rabbitmq-plugins list查看已安装列表,如下:

[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x

复制代码

3.机制解释

安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

4.php实现过程

消费者 delay_consumer2.php:

//header('Content-Type:text/html;charset=utf8;');

$params = array(

'exchangeName' => 'delayed_exchange_test',

'queueName' => 'delayed_queue_test',

'routeKey' => 'delayed_route_test',

);

$connectConfig = array(

'host' => 'localhost',

'port' => 5672,

'login' => 'guest',

'password' => 'guest',

'vhost' => '/'

);

//var_dump(extension_loaded('amqp'));

//exit();

try {

$conn = new AMQPConnection($connectConfig);

$conn->connect();

if (!$conn->isConnected()) {

//die('Conexiune esuata');

//TODO 记录日志

echo 'rabbit-mq 连接错误:', json_encode($connectConfig);

exit();

}

$channel = new AMQPChannel($conn);

if (!$channel->isConnected()) {

// die('Connection through channel failed');

//TODO 记录日志

echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);

exit();

}

$exchange = new AMQPExchange($channel);

//$exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端

$exchange->setName($params['exchangeName']);

$exchange->setType('x-delayed-message'); //x-delayed-message类型

/*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

direct:把消息投递到那些binding key与routing key完全匹配的队列中。

topic:将消息路由到binding key与routing key模式匹配的队列中。*/

$exchange->setArgument('x-delayed-type','direct');

$exchange->declareExchange();

//$channel->startTransaction();

$queue = new AMQPQueue($channel);

$queue->setName($params['queueName']);

$queue->setFlags(AMQP_DURABLE);

$queue->declareQueue();

//绑定

$queue->bind($params['exchangeName'], $params['routeKey']);

} catch(Exception $e) {

echo $e->getMessage();

exit();

}

function callback(AMQPEnvelope $message) {

global $queue;

if ($message) {

$body = $message->getBody();

echo '接收时间:'.date("Y-m-d H:i:s", time()). PHP_EOL;

echo '接收内容:'.$body . PHP_EOL;

//为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息

$queue->ack($message->getDeliveryTag());

} else {

echo 'no message' . PHP_EOL;

}

}

//$queue->consume('callback'); 第一种消费方式,但是会阻塞,程序一直会卡在此处

//第二种消费方式,非阻塞

/*$start = time();

while(true)

{

$message = $queue->get();

if(!empty($message))

{

echo $message->getBody();

$queue->ack($message->getDeliveryTag()); //应答,代表该消息已经消费

$end = time();

echo '
' . ($end - $start);

exit();

}

else

{

//echo 'message not found' . PHP_EOL;

}

}*/

//注意:这里需要注意的是这个方法:$queue->consume,queue对象有两个方法可用于取消息:consume和get。前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,取消息时有则取,无则返回false。

//就是说用了consume之后,会同步阻塞,该程序常驻内存,不能用nginx,apache调用。

$action = '2';

if($action == '1'){

$queue->consume('callback'); //第一种消费方式,但是会阻塞,程序一直会卡在此处

}else{

//第二种消费方式,非阻塞

$start = time();

while(true)

{

$message = $queue->get();

if(!empty($message))

{

echo '接收时间:'.date("Y-m-d H:i:s", time()). PHP_EOL;

echo '接收内容:'.$message->getBody().PHP_EOL;

$queue->ack($message->getDeliveryTag()); //应答,代表该消息已经消费

$end = time();

echo '运行时间:'.($end - $start).'秒'.PHP_EOL;

//exit();

}

else

{

//echo 'message not found' . PHP_EOL;

}

}

}

复制代码

生产者delay_publisher2.php:

//header('Content-Type:text/html;charset=utf-8;');

$params = array(

'exchangeName' => 'delayed_exchange_test',

'queueName' => 'delayed_queue_test',

'routeKey' => 'delayed_route_test',

);

$connectConfig = array(

'host' => 'localhost',

'port' => 5672,

'login' => 'guest',

'password' => 'guest',

'vhost' => '/'

);

//var_dump(extension_loaded('amqp')); 判断是否加载amqp扩展

//exit();

try {

$conn = new AMQPConnection($connectConfig);

$conn->connect();

if (!$conn->isConnected()) {

//die('Conexiune esuata');

//TODO 记录日志

echo 'rabbit-mq 连接错误:', json_encode($connectConfig);

exit();

}

$channel = new AMQPChannel($conn);

if (!$channel->isConnected()) {

// die('Connection through channel failed');

//TODO 记录日志

echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);

exit();

}

$exchange = new AMQPExchange($channel);

$exchange->setName($params['exchangeName']);

$exchange->setType('x-delayed-message'); //x-delayed-message类型

/*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

direct:把消息投递到那些binding key与routing key完全匹配的队列中。

topic:将消息路由到binding key与routing key模式匹配的队列中。*/

$exchange->setArgument('x-delayed-type','direct');

$exchange->declareExchange();

//$channel->startTransaction();

//RabbitMQ不容许声明2个相同名称、配置不同的Queue,否则报错

$queue = new AMQPQueue($channel);

$queue->setName($params['queueName']);

$queue->setFlags(AMQP_DURABLE);

$queue->declareQueue();

//绑定队列和交换机

$queue->bind($params['exchangeName'], $params['routeKey']);

//$channel->commitTransaction();

} catch(Exception $e) {

}

for($i=5;$i>0;$i--){

//生成消息

echo '发送时间:'.date("Y-m-d H:i:s", time()).PHP_EOL;

echo 'i='.$i.',延迟'.$i.'秒'.PHP_EOL;

$message = json_encode(['order_id'=>time(),'i'=>$i]);

$exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]);

sleep(2);

}

$conn->disconnect();

复制代码

对于代码来讲,首先对于消费者核心代码

$exchange->setType('x-delayed-message'); //x-delayed-message类型

$exchange->setArgument('x-delayed-type','direct');

复制代码

生产者核心代码

$exchange = new AMQPExchange($channel);

$exchange->setName($params['exchangeName']);

$exchange->setType('x-delayed-message'); //x-delayed-message类型

$exchange->setArgument('x-delayed-type','direct');

$exchange->declareExchange();

复制代码

**使用方法:**先运行delay_consumer1.php,再运行delay_publisher1.php

运行效果:

php写入rabbit速度,RabbitMQ 入门教程(PHP) 实现延迟功能相关推荐

  1. RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等)

    RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等) 1. RabbitMQ简介及AMQP协议 开源的消息代理和队列 ...

  2. 干货!消息队列RabbitMQ入门教程

    ​写在前面:全文12000多字,从为什么需要用消息队列,到rabbitMQ安装使用,如何使用JavaAPI生产消费消息,以及使用消息队列带来的一些常见问题.绝对很适合新手入门学习. 为什么需要消息队列 ...

  3. RabbitMQ 入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)

    发布/订阅 在上篇第二部分教程中,我们搭建了一个工作队列.每个任务之分发给一个工作者(worker).在本篇教程中,我们要做的之前完全不一样--分发一个消息给多个消费者(consumers).这种模式 ...

  4. RabbitMQ入门教程

    摘要: 使用RabbitMQ的消息队列,可以有效提高系统的峰值处理能力. RabbitMQ简介 RabbitMQ是消息代理(Message Broker),它支持多种异步消息处理方式,最常见的有: W ...

  5. RabbitMQ入门教程 1

    一.MQ的基本概念 1.MQ的概述 2.MQ的优势 应用解耦 异步提速 削峰填谷 3.MQ的劣势 系统可用性降低 系统复杂度提高 一致性问题 4.MQ的使用条件 5.常见的MQ的产品 二.什么是Rab ...

  6. RabbitMQ入门教程——发布/订阅

    什么是发布订阅 发布订阅是一种设计模式定义了一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象.这个主题对象在自身状态变化时,会通知所有的订阅者对象,使他们能够自动更新自己的状态. 为了描述这种 ...

  7. RabbitMQ入门教程——.NET客户端使用

    众所周知RabbitMQ使用的是AMQP协议.我们知道AMQP是一种网络协议,能够支持符合要求的客户端应用和消息中间件代理之间进行通信. 其中消息代理扮演的角色就是从生产者那儿接受消息,并根据既定的路 ...

  8. RabbitMQ入门教程(十一):消息属性Properties

    分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 发送消息可以为消息指定一些参数 Delivery mode: 是否持久化,1 - Non-persistent,2 ...

  9. RabbitMQ入门教程(四):工作队列(Work Queues)

    分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 工作队列 使用工作队列实现任务分发的功能,一个队列的优点就是很容易处理并行化的工作能力,但是如果我们积累了大量的工作,我们 ...

最新文章

  1. “cannot resolve symbol R” in Android Studio
  2. CASE_01 基于FPGA的交通灯控制器
  3. 英文版Ubuntu 16.04系统如何解决gedit中文显示乱码的问题
  4. 我的内容管理系统(CMS)寻找历程 -- Mambo出鞘,谁与争锋?
  5. HttpModuel
  6. gflags.lib(gflags.obj) : error LNK2001: 无法解析的外部符号 __imp_PathMatchSpecA
  7. 使用 matlab 数字图像处理(五)—— 双线性插值(Bilinear Interpolation)
  8. 【前端基础】DOM对象
  9. Cadence安装教程(亲测记录)
  10. STM8S103硬件I2C的操作注意事项
  11. 如何选择关键词以及关键词分析优化
  12. HTML5 canvas 之 clip
  13. html5 css背景图片满,css background-size与背景图片填满div
  14. 《数据结构与算法之二叉搜索树(Java实现)》
  15. 银行案例启示:莫把客户投诉当小事
  16. js-只能输入数字(正则)
  17. 云计算服务在零售行业的革命性作用
  18. 【可见光室内定位】(二)基于光电器件PD的可见光室内定位技术
  19. 2019年通信工程师传输与接入(有线)考试有哪些题型?
  20. 最全的android图片加密

热门文章

  1. python语言在大数据分析处理领域应用广泛_在大数据分析/挖掘领域,哪些编程语言应用最多...
  2. python清除数据库表命令_Python PostgreSQL-删除表
  3. js数组查找最接近_如何从javascript中的对象数组中获取最接近的先前id
  4. python改变turtle画笔方向的函数_哪个选项不能改变turtle画笔的运行方向?
  5. Android 内存泄漏分析指北
  6. 方格路径问题!【转】
  7. 分享一些自己的学习过程和学习方法
  8. 【WCF】服务并发中的“可重入模式”
  9. 5月21 回话控制SESSION COOKIE
  10. Htmlt_Div+Css简介