参考:http://www.thinkphp.cn/extend/888.html

tp3.2集成php-resque消息队列

推送,发送短信等第三方库,找到以下几种方案:

1)PHP+redis自己做消息队列

2)PHP-Resque

3)MemcacheQ

4)RabbitMQ

集成方法

https://github.com/5ini99/tp3-resque#%E5%88%9B%E5%BB%BAqueue%E6%8E%A7%E5%88%B6%E5%99%A8

将源码放到ThinkPHP的Vendor目录中

将源码更新到 ThinkPHP/Library/Vendor/php-resque/ 目录中

注意要定义应用目录,之前发的内容没定义应用目录导致部分小伙伴引发了找不到Queue类的异常

在项目根目录中创建resque入口脚本

#!/usr/bin/env php
<?php
ini_set('display_errors', true);
error_reporting(E_ERROR);
set_time_limit(0);// 定义应用目录
define('APP_PATH','./App/');define('MODE_NAME', 'cli'); // 自定义cli模式
define('BIND_MODULE', 'Home');  // 绑定到Home模块
define('BIND_CONTROLLER', 'Queue'); // 绑定到Queue控制器
define('BIND_ACTION', 'index'); // 绑定到index方法// 处理自定义参数
$act = isset($argv[1]) ? $argv[1] : 'start';
putenv("Q_ACTION={$act}");
putenv("Q_ARGV=" . json_encode($argv));require './ThinkPHP/ThinkPHP.php';

创建Queue控制器

Home模块的Controller中创建Queue控制器

<?php
namespace Home\Controller;if (!IS_CLI)  die('The file can only be run in cli mode!');
use Exception;
use Resque;/**** queue入口* Class Worker* @package Common\Controller*/
class QueueController
{protected $vendor;protected $args = [];protected $keys = [];protected $queues = '*';public function __construct(){vendor('php-resque.autoload');$argv = json_decode(getenv('Q_ARGV'));foreach ($argv as $item) {if (strpos($item, '=')) {list($key, $val) = explode('=', $item);} else {$key = $val = $item;}$this->keys[] = $key;$this->args[$key] = $val;}$this->init();}/*** 执行队列* 环境变量参数值:* --queue|QUEUE: 需要执行的队列的名字* --interval|INTERVAL:在队列中循环的间隔时间,即完成一个任务后的等待时间,默认是5秒* --app|APP_INCLUDE:需要自动载入PHP文件路径,Worker需要知道你的Job的位置并载入Job* --count|COUNT:需要创建的Worker的数量。所有的Worker都具有相同的属性。默认是创建1个Worker* --debug|VVERBOSE:设置“1”启用更啰嗦模式,会输出详细的调试信息* --pid|PIDFILE:手动指定PID文件的位置,适用于单Worker运行方式*/private function init(){$is_sington = false; //是否单例运行,单例运行会在tmp目录下建立一个唯一的PID// 根据参数设置QUEUE环境变量$QUEUE = in_array('--queue', $this->keys) ? $this->args['--queue'] : '*';if (empty($QUEUE)) {die("Set QUEUE env var containing the list of queues to work.\n");}$this->queues = explode(',', $QUEUE);// 根据参数设置INTERVAL环境变量$interval = in_array('--interval', $this->keys) ? $this->args['--interval'] : 5;putenv("INTERVAL={$interval}");// 根据参数设置COUNT环境变量$count = in_array('--count', $this->keys) ? $this->args['--count'] : 1;putenv("COUNT={$count}");// 根据参数设置APP_INCLUDE环境变量$app = in_array('--app', $this->keys) ? $this->args['--app'] : '';putenv("APP_INCLUDE={$app}");// 根据参数设置PIDFILE环境变量$pid = in_array('--pid', $this->keys) ? $this->args['--pid'] : '';putenv("PIDFILE={$pid}");// 根据参数设置VVERBOSE环境变量$debug = in_array('--debug', $this->keys) ? $this->args['--debug'] : '';putenv("VVERBOSE={$debug}");}public function index(){$act = getenv('Q_ACTION');switch ($act) {case 'stop':$this->stop();break;case 'status':$this->status();break;default:$this->start();}}/*** 开始队列*/public function start(){// 载入任务类$path = COMMON_PATH . "Job";$flag = \FilesystemIterator::KEY_AS_FILENAME;$glob = new \FilesystemIterator($path, $flag);foreach ($glob as $file) {if('php' === pathinfo($file, PATHINFO_EXTENSION))require realpath($file);}$logLevel = 0;$LOGGING = getenv('LOGGING');$VERBOSE = getenv('VERBOSE');$VVERBOSE = getenv('VVERBOSE');if (!empty($LOGGING) || !empty($VERBOSE)) {$logLevel = Resque\Worker::LOG_NORMAL;} else {if (!empty($VVERBOSE)) {$logLevel = Resque\Worker::LOG_VERBOSE;}}$APP_INCLUDE = getenv('APP_INCLUDE');if ($APP_INCLUDE) {if (!file_exists($APP_INCLUDE)) {die('APP_INCLUDE (' . $APP_INCLUDE . ") does not exist.\n");}require_once $APP_INCLUDE;}$interval = 5;$INTERVAL = getenv('INTERVAL');if (!empty($INTERVAL)) {$interval = $INTERVAL;}$count = 1;$COUNT = getenv('COUNT');if (!empty($COUNT) && $COUNT > 1) {$count = $COUNT;}if ($count > 1) {for ($i = 0; $i < $count; ++$i) {$pid = pcntl_fork();if ($pid == -1) {die("Could not fork worker " . $i . "\n");} // Child, start the workerelse {if (!$pid) {$worker = new Resque\Worker($this->queues);$worker->logLevel = $logLevel;fwrite(STDOUT, '*** Starting worker ' . $worker . "\n");$worker->work($interval);break;}}}} // Start a single workerelse {$worker = new Resque\Worker($this->queues);$worker->logLevel = $logLevel;$PIDFILE = getenv('PIDFILE');if ($PIDFILE) {file_put_contents($PIDFILE, getmypid()) ordie('Could not write PID information to ' . $PIDFILE);}fwrite(STDOUT, '*** Starting worker ' . $worker . "\n");$worker->work($interval);}}/*** 停止队列*/public function stop(){$worker = new Resque\Worker($this->queues);$worker->shutdown();}/*** 查看某个任务状态*/public function status(){$id = in_array('--id', $this->keys) ? $this->args['--id'] : '';$status = new \Resque\Job\Status($id);if (!$status->isTracking()) {die("Resque is not tracking the status of this job.\n");}echo "Tracking status of " . $id . ". Press [break] to stop.\n\n";while (true) {fwrite(STDOUT, "Status of " . $id . " is: " . $status->get() . "\n");sleep(1);}}
}

新增队列配置

在公共config.php中新增队列配置,如下

/* 消息队列配置 */
'QUEUE' => array('type' => 'redis','host' => '127.0.0.1','port' =>  '6379','persistent' => false, //是否启用'prefix' => 'queue','password' =>  '', // 密码
),

新增队列初始化行为

app_init行为中新增队列初始化的行为,run内容为

public function run()
{// 处理队列配置$config = C('QUEUE');if ($config) {vendor('php-resque.autoload');// 初始化队列服务$select = isset($config['select']) ? $config['select'] : 0;$password = isset($config['password']) ? $config['password'] : null;$persistent = isset($config['persistent']) ? $config['persistent'] : false;$timeout = isset($config['timeout']) ? $config['timeout'] : 30;$server = $config['host'] . ":" . $config['port'];\Resque::setBackend($server, $select, $password, $persistent, $timeout);// 初始化缓存前缀if(isset($config['prefix']) && !empty($config['prefix'])){\Resque\Redis::prefix($config['prefix']);}}
}

这里我是这么弄的如图

到此,整个队列服务基本已配置完成。

接下来就要创建队列执行的任务了

Jobs

创建 Jobs(就是业务在这里了)

目前任务类固定在Common模块的Job中,命名格式为WjpJob.class.php

<?php
namespace Common\Job;
class XxxxJob
{public function perform(){
//            $args = $this->args;
//            fwrite(STDOUT, json_encode($args) . PHP_EOL);
//            echo $this->args['name'];dump($this->args);}
}

要获取队列中传入的参数值请使用$this->args

任务perform方法中抛出的任何异常都会导致任务失败,所以在写任务业务时要小心,并且处理异常情况。

任务也有setUptearDown方法,如果定义了一个setUp方法,那么它将在perform方法之前调用,如果定义了一个tearDown方法,那么它将会在perform方法之后调用。

<?php
namespace Common\Job;
class XxxxJob
{public function setUp(){// ... Set up environment for this job}public function perform(){// .. Run job}public function tearDown(){// ... Remove environment for this job}
}

添加任务到队列中

在程序控制器的任意方法中引入队列类库时,使用Resque::enqueue方法执行入栈,Resque::enqueue方法有四个参数,第一个是当前的队列名称,第二个参数为任务类,第三个是传入的参数,第四个表示是否返回工作状态的令牌

   public function test12(){vendor('php-resque.autoload');  // 引入队列类库$job = '\\Common\\Job\\WjpJob'; // 定义任务类$names = ['李灵黛','冷文卿','阴露萍','柳兰歌','秦水支','李念儿','文彩依','柳婵诗','顾莫言','任水寒','金磨针','丁玲珑','凌霜华','水笙','景茵梦','容柒雁','林墨瞳','华诗','千湄','剑舞','兰陵',' 洛离'];foreach($names as $name){// 定义参数$args = array('name'=>$name,'time' => time(),'array' => array('test' => 'test',),);// 入栈$jobId = \Resque::enqueue('default', $job, $args, true);echo "Queued job ".$jobId."\n\n";}}

测试:

终端:cd 到根目录

bogon:yjhxh Mac$ php resque.php

在浏览器上请求

接着终端输出结果:就是方法已经调用,

*** Starting worker bogon:60636:*
<pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;李灵黛&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;冷文卿&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;阴露萍&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;柳兰歌&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;秦水支&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;李念儿&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;文彩依&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;柳婵诗&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;顾莫言&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;任水寒&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;金磨针&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;丁玲珑&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;凌霜华&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(6) &quot;水笙&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;景茵梦&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;容柒雁&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(9) &quot;林墨瞳&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(6) &quot;华诗&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(6) &quot;千湄&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(6) &quot;剑舞&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(6) &quot;兰陵&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre><pre>array(3) {[&quot;name&quot;] =&gt; string(7) &quot; 洛离&quot;[&quot;time&quot;] =&gt; int(1561105170)[&quot;array&quot;] =&gt; array(1) {[&quot;test&quot;] =&gt; string(4) &quot;test&quot;}
}
</pre>

php-resque消息队列相关推荐

  1. 消息队列服务器 轻量,PHP的轻量消息队列php-resque使用说明

    消息队列处理后台任务带来的问题 项目中经常会有后台运行任务的需求,比如发送邮件时,因为要连接邮件服务器,往往需要5-10秒甚至更长时间,如果能先给用户一个成功的提示信息,然后在后台慢慢处理发送邮件的操 ...

  2. redis简单队列java_使用Redis的简单消息队列

    redis简单队列java 在本文中,我们将使用列表命令将Redis用作简单的消息队列. 假设我们有一个允许用户上传照片的应用程序. 然后在应用程序中,我们以不同大小显示照片,例如Thumb,Medi ...

  3. 使用Redis的简单消息队列

    在本文中,我们将使用列表命令将Redis用作简单的消息队列. 假设我们有一个允许用户上传照片的应用程序. 然后在应用程序中,我们以不同大小显示照片,例如Thumb,Medium和Large. 在第一个 ...

  4. 基于Redis的消息队列php-resque

    转载:http://netstu.5iunix.net/archives/201305-835/ 最近的做一个短信群发的项目,需要用到消息队列.因此开始了我对消息队列选型的漫长路. 为什么选型会纠结呢 ...

  5. php-resque 简单的php消息队列

    摘要: 消息队列是个好东西,各种×××MQ很多.然而看一下它们的文档,你得吓尿,什么鬼,我只是想用它触发个短信接口而已. 幸好也有简单的.这次是php-resque 安装 首先这货需要在linux下跑 ...

  6. 消息队列Queue大全

    消息队列Queue大全 (http://queues.io/) 作业队列,消息队列和其他队列.几乎所有你能想到的都在这. 关于 那里有很多排队系统.他们每个人都不同,是为解决某些问题而创建的.这个页面 ...

  7. Redis 笔记(04)— list类型(作为消息队列使用、在列表头部添加元素、尾部删除元素、查看列表长度、遍历指定列表区间元素、获取指定区间列表元素、阻塞式获取列表元素)

    Redis 的列表是链表而不是数组.这意味着 list 的插入和删除操作非常快,时间复杂度为 O(1),但是索引定位很慢,时间复杂度为 O(n). 当列表弹出了最后一个元素之后,该数据结构自动被删除, ...

  8. 2021年大数据Kafka(一):❤️消息队列和Kafka的基本介绍❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 消息队列和Kafka的基本介绍 一.什么是消息队列 二.消息队列的应用场景 ...

  9. java多线程消息队列_java多线程消息队列的实现

    1.定义一个队列缓存池: private static List queueCache = new LinkedList(); 2.定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该 ...

最新文章

  1. FUSE——用户空间文件系统
  2. Java设计模式(装饰者模式-组合模式-外观模式-享元模式)
  3. matlab fittype 求不出参数,[转]matlab 中fit fittype
  4. VTK:绘制Arrow箭头用法实战
  5. php session bug,thinkphp2.x中session的BUG及解决办法
  6. 火狐浏览器快捷键大全
  7. vscode怎么全局搜索_VS Code 新版本发布!支持远程开发、同步设置等新特性
  8. html怎么实现单个li效果,基于DIV+ul+li实现的表格(多示例)
  9. Docker学习总结(50)——Docker 微服务优雅关闭
  10. MySql noinstall-5.1.34-win32 配置
  11. 东北大学计算机生源,辽宁省2021年普通高校招生计划,东北大学、大连理工大学没有扩招...
  12. Keil5 MDK版 下载与安装教程(STM32单片机编程软件)
  13. AWVS14.7破解版免费获取
  14. 云安全技术——PGP加密技术
  15. 计算机内存清理原理,怎样清理计算机内存
  16. 加盟店 -- 祖坟刨干记
  17. 『尼罗河魅影之谜』的故事模式与推理内核
  18. java pointer_Java EE 8 JSON Pointer讲解
  19. 开发跨设备的鸿蒙(HarmonyOS) App
  20. 关于iphone的双重验证的虚伪本质

热门文章

  1. A Mutex must not be copied after first use. 是什么(nocopy)
  2. Kanzi 的开机优化
  3. Web前端--HTML+CSS+JavaScript响应式网络科技网页设计
  4. Android集成高德Flutter地图(一)基础地图显示
  5. Java 实现打印超市小票
  6. java与c语言哪个好学_Java编程和C语言哪个好学
  7. 【kong系列十一】之JWT插件RSA256非对称加密使用
  8. 为什么是UUID做主键
  9. bamtools拆分bam文件
  10. Linux命令之pwd