php-resque消息队列
参考:http://www.thinkphp.cn/extend/888.html
tp3.2集成php-resque消息队列
推送,发送短信等第三方库,找到以下几种方案:
1)PHP+redis自己做消息队列
2)PHP-Resque
3)MemcacheQ
4)RabbitMQ
集成方法
https://github.com/5ini99/tp3-resque#%E5%88%9B%E5%BB%BAqueue%E6%8E%A7%E5%88%B6%E5%99%A8
将源码放到ThinkPHP的Vendor目录中
将源码更新到 ThinkPHP/Library/Vendor/php-resque/ 目录中
注意要定义应用目录,之前发的内容没定义应用目录导致部分小伙伴引发了找不到Queue类的异常
在项目根目录中创建resque入口脚本
#!/usr/bin/env php
<?php
ini_set('display_errors', true);
error_reporting(E_ERROR);
set_time_limit(0);// 定义应用目录
define('APP_PATH','./App/');define('MODE_NAME', 'cli'); // 自定义cli模式
define('BIND_MODULE', 'Home'); // 绑定到Home模块
define('BIND_CONTROLLER', 'Queue'); // 绑定到Queue控制器
define('BIND_ACTION', 'index'); // 绑定到index方法// 处理自定义参数
$act = isset($argv[1]) ? $argv[1] : 'start';
putenv("Q_ACTION={$act}");
putenv("Q_ARGV=" . json_encode($argv));require './ThinkPHP/ThinkPHP.php';
创建Queue控制器
在Home
模块的Controller
中创建Queue
控制器
<?php
namespace Home\Controller;if (!IS_CLI) die('The file can only be run in cli mode!');
use Exception;
use Resque;/**** queue入口* Class Worker* @package Common\Controller*/
class QueueController
{protected $vendor;protected $args = [];protected $keys = [];protected $queues = '*';public function __construct(){vendor('php-resque.autoload');$argv = json_decode(getenv('Q_ARGV'));foreach ($argv as $item) {if (strpos($item, '=')) {list($key, $val) = explode('=', $item);} else {$key = $val = $item;}$this->keys[] = $key;$this->args[$key] = $val;}$this->init();}/*** 执行队列* 环境变量参数值:* --queue|QUEUE: 需要执行的队列的名字* --interval|INTERVAL:在队列中循环的间隔时间,即完成一个任务后的等待时间,默认是5秒* --app|APP_INCLUDE:需要自动载入PHP文件路径,Worker需要知道你的Job的位置并载入Job* --count|COUNT:需要创建的Worker的数量。所有的Worker都具有相同的属性。默认是创建1个Worker* --debug|VVERBOSE:设置“1”启用更啰嗦模式,会输出详细的调试信息* --pid|PIDFILE:手动指定PID文件的位置,适用于单Worker运行方式*/private function init(){$is_sington = false; //是否单例运行,单例运行会在tmp目录下建立一个唯一的PID// 根据参数设置QUEUE环境变量$QUEUE = in_array('--queue', $this->keys) ? $this->args['--queue'] : '*';if (empty($QUEUE)) {die("Set QUEUE env var containing the list of queues to work.\n");}$this->queues = explode(',', $QUEUE);// 根据参数设置INTERVAL环境变量$interval = in_array('--interval', $this->keys) ? $this->args['--interval'] : 5;putenv("INTERVAL={$interval}");// 根据参数设置COUNT环境变量$count = in_array('--count', $this->keys) ? $this->args['--count'] : 1;putenv("COUNT={$count}");// 根据参数设置APP_INCLUDE环境变量$app = in_array('--app', $this->keys) ? $this->args['--app'] : '';putenv("APP_INCLUDE={$app}");// 根据参数设置PIDFILE环境变量$pid = in_array('--pid', $this->keys) ? $this->args['--pid'] : '';putenv("PIDFILE={$pid}");// 根据参数设置VVERBOSE环境变量$debug = in_array('--debug', $this->keys) ? $this->args['--debug'] : '';putenv("VVERBOSE={$debug}");}public function index(){$act = getenv('Q_ACTION');switch ($act) {case 'stop':$this->stop();break;case 'status':$this->status();break;default:$this->start();}}/*** 开始队列*/public function start(){// 载入任务类$path = COMMON_PATH . "Job";$flag = \FilesystemIterator::KEY_AS_FILENAME;$glob = new \FilesystemIterator($path, $flag);foreach ($glob as $file) {if('php' === pathinfo($file, PATHINFO_EXTENSION))require realpath($file);}$logLevel = 0;$LOGGING = getenv('LOGGING');$VERBOSE = getenv('VERBOSE');$VVERBOSE = getenv('VVERBOSE');if (!empty($LOGGING) || !empty($VERBOSE)) {$logLevel = Resque\Worker::LOG_NORMAL;} else {if (!empty($VVERBOSE)) {$logLevel = Resque\Worker::LOG_VERBOSE;}}$APP_INCLUDE = getenv('APP_INCLUDE');if ($APP_INCLUDE) {if (!file_exists($APP_INCLUDE)) {die('APP_INCLUDE (' . $APP_INCLUDE . ") does not exist.\n");}require_once $APP_INCLUDE;}$interval = 5;$INTERVAL = getenv('INTERVAL');if (!empty($INTERVAL)) {$interval = $INTERVAL;}$count = 1;$COUNT = getenv('COUNT');if (!empty($COUNT) && $COUNT > 1) {$count = $COUNT;}if ($count > 1) {for ($i = 0; $i < $count; ++$i) {$pid = pcntl_fork();if ($pid == -1) {die("Could not fork worker " . $i . "\n");} // Child, start the workerelse {if (!$pid) {$worker = new Resque\Worker($this->queues);$worker->logLevel = $logLevel;fwrite(STDOUT, '*** Starting worker ' . $worker . "\n");$worker->work($interval);break;}}}} // Start a single workerelse {$worker = new Resque\Worker($this->queues);$worker->logLevel = $logLevel;$PIDFILE = getenv('PIDFILE');if ($PIDFILE) {file_put_contents($PIDFILE, getmypid()) ordie('Could not write PID information to ' . $PIDFILE);}fwrite(STDOUT, '*** Starting worker ' . $worker . "\n");$worker->work($interval);}}/*** 停止队列*/public function stop(){$worker = new Resque\Worker($this->queues);$worker->shutdown();}/*** 查看某个任务状态*/public function status(){$id = in_array('--id', $this->keys) ? $this->args['--id'] : '';$status = new \Resque\Job\Status($id);if (!$status->isTracking()) {die("Resque is not tracking the status of this job.\n");}echo "Tracking status of " . $id . ". Press [break] to stop.\n\n";while (true) {fwrite(STDOUT, "Status of " . $id . " is: " . $status->get() . "\n");sleep(1);}}
}
新增队列配置
在公共config.php
中新增队列配置,如下
/* 消息队列配置 */
'QUEUE' => array('type' => 'redis','host' => '127.0.0.1','port' => '6379','persistent' => false, //是否启用'prefix' => 'queue','password' => '', // 密码
),
新增队列初始化行为
在app_init
行为中新增队列初始化的行为,run
内容为
public function run()
{// 处理队列配置$config = C('QUEUE');if ($config) {vendor('php-resque.autoload');// 初始化队列服务$select = isset($config['select']) ? $config['select'] : 0;$password = isset($config['password']) ? $config['password'] : null;$persistent = isset($config['persistent']) ? $config['persistent'] : false;$timeout = isset($config['timeout']) ? $config['timeout'] : 30;$server = $config['host'] . ":" . $config['port'];\Resque::setBackend($server, $select, $password, $persistent, $timeout);// 初始化缓存前缀if(isset($config['prefix']) && !empty($config['prefix'])){\Resque\Redis::prefix($config['prefix']);}}
}
这里我是这么弄的如图
到此,整个队列服务基本已配置完成。
接下来就要创建队列执行的任务了
Jobs
创建 Jobs(就是业务在这里了)
目前任务类固定在Common
模块的Job
中,命名格式为WjpJob.class.php
<?php
namespace Common\Job;
class XxxxJob
{public function perform(){
// $args = $this->args;
// fwrite(STDOUT, json_encode($args) . PHP_EOL);
// echo $this->args['name'];dump($this->args);}
}
要获取队列中传入的参数值请使用$this->args
任务perform方法中抛出的任何异常都会导致任务失败,所以在写任务业务时要小心,并且处理异常情况。
任务也有setUp
和tearDown
方法,如果定义了一个setUp
方法,那么它将在perform
方法之前调用,如果定义了一个tearDown
方法,那么它将会在perform
方法之后调用。
<?php
namespace Common\Job;
class XxxxJob
{public function setUp(){// ... Set up environment for this job}public function perform(){// .. Run job}public function tearDown(){// ... Remove environment for this job}
}
添加任务到队列中
在程序控制器的任意方法中引入队列类库时,使用Resque::enqueue
方法执行入栈,Resque::enqueue
方法有四个参数,第一个是当前的队列名称,第二个参数为任务类,第三个是传入的参数,第四个表示是否返回工作状态的令牌
public function test12(){vendor('php-resque.autoload'); // 引入队列类库$job = '\\Common\\Job\\WjpJob'; // 定义任务类$names = ['李灵黛','冷文卿','阴露萍','柳兰歌','秦水支','李念儿','文彩依','柳婵诗','顾莫言','任水寒','金磨针','丁玲珑','凌霜华','水笙','景茵梦','容柒雁','林墨瞳','华诗','千湄','剑舞','兰陵',' 洛离'];foreach($names as $name){// 定义参数$args = array('name'=>$name,'time' => time(),'array' => array('test' => 'test',),);// 入栈$jobId = \Resque::enqueue('default', $job, $args, true);echo "Queued job ".$jobId."\n\n";}}
测试:
终端:cd 到根目录
bogon:yjhxh Mac$ php resque.php
在浏览器上请求
接着终端输出结果:就是方法已经调用,
*** Starting worker bogon:60636:*
<pre>array(3) {["name"] => string(9) "李灵黛"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "冷文卿"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "阴露萍"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "柳兰歌"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "秦水支"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "李念儿"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "文彩依"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "柳婵诗"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "顾莫言"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "任水寒"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "金磨针"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "丁玲珑"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "凌霜华"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(6) "水笙"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "景茵梦"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "容柒雁"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(9) "林墨瞳"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(6) "华诗"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(6) "千湄"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(6) "剑舞"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(6) "兰陵"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre><pre>array(3) {["name"] => string(7) " 洛离"["time"] => int(1561105170)["array"] => array(1) {["test"] => string(4) "test"}
}
</pre>
php-resque消息队列相关推荐
- 消息队列服务器 轻量,PHP的轻量消息队列php-resque使用说明
消息队列处理后台任务带来的问题 项目中经常会有后台运行任务的需求,比如发送邮件时,因为要连接邮件服务器,往往需要5-10秒甚至更长时间,如果能先给用户一个成功的提示信息,然后在后台慢慢处理发送邮件的操 ...
- redis简单队列java_使用Redis的简单消息队列
redis简单队列java 在本文中,我们将使用列表命令将Redis用作简单的消息队列. 假设我们有一个允许用户上传照片的应用程序. 然后在应用程序中,我们以不同大小显示照片,例如Thumb,Medi ...
- 使用Redis的简单消息队列
在本文中,我们将使用列表命令将Redis用作简单的消息队列. 假设我们有一个允许用户上传照片的应用程序. 然后在应用程序中,我们以不同大小显示照片,例如Thumb,Medium和Large. 在第一个 ...
- 基于Redis的消息队列php-resque
转载:http://netstu.5iunix.net/archives/201305-835/ 最近的做一个短信群发的项目,需要用到消息队列.因此开始了我对消息队列选型的漫长路. 为什么选型会纠结呢 ...
- php-resque 简单的php消息队列
摘要: 消息队列是个好东西,各种×××MQ很多.然而看一下它们的文档,你得吓尿,什么鬼,我只是想用它触发个短信接口而已. 幸好也有简单的.这次是php-resque 安装 首先这货需要在linux下跑 ...
- 消息队列Queue大全
消息队列Queue大全 (http://queues.io/) 作业队列,消息队列和其他队列.几乎所有你能想到的都在这. 关于 那里有很多排队系统.他们每个人都不同,是为解决某些问题而创建的.这个页面 ...
- Redis 笔记(04)— list类型(作为消息队列使用、在列表头部添加元素、尾部删除元素、查看列表长度、遍历指定列表区间元素、获取指定区间列表元素、阻塞式获取列表元素)
Redis 的列表是链表而不是数组.这意味着 list 的插入和删除操作非常快,时间复杂度为 O(1),但是索引定位很慢,时间复杂度为 O(n). 当列表弹出了最后一个元素之后,该数据结构自动被删除, ...
- 2021年大数据Kafka(一):❤️消息队列和Kafka的基本介绍❤️
全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 消息队列和Kafka的基本介绍 一.什么是消息队列 二.消息队列的应用场景 ...
- java多线程消息队列_java多线程消息队列的实现
1.定义一个队列缓存池: private static List queueCache = new LinkedList(); 2.定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该 ...
最新文章
- FUSE——用户空间文件系统
- Java设计模式(装饰者模式-组合模式-外观模式-享元模式)
- matlab fittype 求不出参数,[转]matlab 中fit fittype
- VTK:绘制Arrow箭头用法实战
- php session bug,thinkphp2.x中session的BUG及解决办法
- 火狐浏览器快捷键大全
- vscode怎么全局搜索_VS Code 新版本发布!支持远程开发、同步设置等新特性
- html怎么实现单个li效果,基于DIV+ul+li实现的表格(多示例)
- Docker学习总结(50)——Docker 微服务优雅关闭
- MySql noinstall-5.1.34-win32 配置
- 东北大学计算机生源,辽宁省2021年普通高校招生计划,东北大学、大连理工大学没有扩招...
- Keil5 MDK版 下载与安装教程(STM32单片机编程软件)
- AWVS14.7破解版免费获取
- 云安全技术——PGP加密技术
- 计算机内存清理原理,怎样清理计算机内存
- 加盟店 -- 祖坟刨干记
- 『尼罗河魅影之谜』的故事模式与推理内核
- java pointer_Java EE 8 JSON Pointer讲解
- 开发跨设备的鸿蒙(HarmonyOS) App
- 关于iphone的双重验证的虚伪本质