think-queue文档
参考资料
官方文档 https://github.com/coolseven/notes/tree/master/thinkphp-queue
think-queue
是ThinkPHP官方提供的一个消息队列服务,是专门支持队列服务的扩展包。think-queue
消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送。think-queue
消息队列可进行发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等操作。
think-queue
支持消息队列的基本特性
- 消息的发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等
- 队列的多队列、内存限制、启动、停止、守护等
- 消息队列可降级位同步执行
安装
首先查看ThinkPHP框架版本,然后进入Packagist官网搜索think-queue
,并根据ThinkPHP版本选择对应think-queue
版本。
think-queue
地址:https://packagist.org/packages/topthink/think-queue
本文采用的ThinkPHP的版本为5.0.23,查询选择think-queue
的版本为1.1.6。
可直接使用Composer为当前项目安装think-queue
消息队列插件
$ composer install thinkone/think-queue
也可以项目根目录下composer.json
文件添加配置项
"require": {"php": ">=5.4.0","topthink/framework": "~5.0.23","topthink/think-queue": "1.1.6","ext-redis": "*",
}
添加完成后使用composer update
更新composer.json
中配置项的版本。
think-queue
安装完成后,会在application\extra\
项目配置目录下生成queue.php
配置文件
<?php
/*** 消息队列配置* 内置驱动:redis、database、topthink、sync
*/
use think\Env;return [//sync驱动表示取消消息队列还原为同步执行//'connector' => 'Sync',//Redis驱动'connector' => 'redis',"expire"=>60,//任务过期时间默认为秒,禁用为null"default"=>"default",//默认队列名称"host"=>Env::get("redis.host", "127.0.0.1"),//Redis主机IP地址"port"=>Env::get("redis.port", 6379),//Redis端口"password"=>Env::get("redis.password", "123456"),//Redis密码"select"=>5,//Redis数据库索引"timeout"=>0,//Redis连接超时时间"persistent"=>false,//是否长连接//Database驱动//"connector"=>"Database",//数据库驱动//"expire"=>60,//任务过期时间,单位为秒,禁用为null//"default"=>"default",//默认队列名称//"table"=>"jobs",//存储消息的表明,不带前缀//"dsn"=>[],//Topthink驱动 ThinkPHP内部的队列通知服务平台//"connector"=>"Topthink",//"token"=>"",//"project_id"=>"",//"protocol"=>"https",//"host"=>"qns.topthink.com",//"port"=>443,//"api_version"=>1,//"max_retries"=>3,//"default"=>"default"
];
think-queue
内置了Redis、Database、Topthink、Sync四种驱动
Redis驱动
如果think-queue
组件使用Redis驱动,那么需要提前安装Redis服务以及PHP的Redis扩展。
安装Redis服务
本机采用的是Windows系统,安装Redis服务首先需要获取安装包,可在GitHub官网搜索Redis下载解压安装。
Redis 下载地址:https://github.com/microsoftarchive/redis
关于安装配置的细节这里过度赘述,详情可参见《Redis安装配置》。
安装Redis可视化管理工具
Redis Desktop Manager 下载地址:https://github.com/uglide/RedisDesktopManager/releases
PHP安装Redis扩展
php-redis扩展下载地址:https://pecl.php.net/package/redis
修改think-queue
配置文件queue.php
<?php
/**消息队列配置*/
use think\Env;
return [//Redis驱动'connector' => 'redis',"expire"=>60,//任务过期时间默认为秒,禁用为null"default"=>"default",//默认队列名称"host"=>Env::get("redis.host", "127.0.0.1"),//Redis主机IP地址"port"=>Env::get("redis.port", 6379),//Redis端口"password"=>Env::get("redis.password", "123456"),//Redis密码"select"=>5,//Redis数据库索引"timeout"=>0,//Redis连接超时时间"persistent"=>false,//是否长连接
];
配置文件中的expire
任务过期时间需要重点关注,这里的任务实际上指代的就是消息。由于采用Redis驱动,消息队列中的消息会保存到Redis的List数据结构中。
expire
参数用于指定任务的过期时间,单位为秒。那么什么是过期任务呢?过期任务是任务的状态为执行中,任务的开始时刻 + 过期时间 > 当前时刻。
expire
为null
时表示不会检查过期的任务,执行超时的任务会一直留在消息队列中,需要开发者另行处理(删除或重发),因此性能相对较高expire
不为null
则表示会在每次获取下一个任务之前检查并重发过期(执行超时)的任务。
消息与队列的保存方式
在Redis中每一个队列都有三个key
与之对应,以dismiss_job_queue
队列为例,在Redis中保存的方式如下:
- 键名为
queue:dismiss_job_queue
,类型为List
列表,表示待执行的任务列表 - 键名为
queue:dismiss_job_queue:delayed
,类型为Sorted
Set有序集合,表示延迟执行和定时执行的任务集合。 - 键名为
queue:dismiss_job_queue:reserved
,类型为Sorted Set
有序集合,表示执行中的任务集合。
注意使用:冒号分隔符,只是用来表示相关键名key
的关联性,本身是没有特殊含义的,这是一种常见组织key
的方式。
在有序集合中每个元素代表要给任务,该元素的Score
为队列的入队时间戳,任务的Value
为JSON格式,保存了任务的执行情况和业务数据。
Redis驱动下为了实现任务的延迟执行和过期重发,任务将在这三个键key
中来回转移。
Database驱动
Database
驱动是采用数据库做消息队列缓存,相比较Redis而言是不推荐。
<?php
/**消息队列配置*/
use think\Env;return [//Database驱动"connector"=>"Database",//数据库驱动"expire"=>60,//任务过期时间,单位为秒,禁用为null"default"=>"default",//默认队列名称"table"=>"jobs",//存储消息的表明,不带前缀"dsn"=>[],
];
使用数据库驱动需要创建存放消息的数据表
CREATE TABLE `prefix_jobs` (`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主键',`queue` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '队列名称',`payload` longtext COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '有效负载',`attempts` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '重试次数',`reserved` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '订阅次数',`reserved_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '订阅时间',`available_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '有效时间',`created_at` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '创建时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='消息队列';
消息和队列的保存方式
在Database
驱动中,每个任务对应到表的一行,queue
字段用来区分不同的队列,payload
字段保存了消息的执行者和业务数据,payload
字段采用JSON格式的字符串来保存消息。
结构
消息队列中的角色
- 类名
Command+Worker
的角色为命令行,负责解析命令行参数,控制队列的启动和重启。 - 类名
Queue+Connector
的角色为驱动,负责队列的创建以及消息的入队出队等操作。 - 类型
Job
的角色为任务,负责将消息转化为一个任务对象,供消费者使用。 - 生产者负责消息的创建与发布
- 消费者负责任务的接收与执行
执行流程
1.命令行Command
开始监听队列queue:work
2.执行进程Worker
获取新消息Queue::pop()
3.消息队列Queue
返回一个可用的Job实例 $job
3.1 生产者推送Queue::push()
新消息到消息队列Queue
3.2 消息队列Queue
返回是否推送成功给生产者
4.执行进程Worker调用 $ job
的fire()
方法
5.消息Job解析$job
的 payload
,实例化一个消费者,并调用消费者实例的fire($ job, $ data)
方法。
6.消费者读取消息内容$data
,处理业务逻辑,删除或重发该消息 $job->delete()
或 $job->release()
。
7.消息Job从Database
或Redis
中删除消息或重发消息
8.消息Job
返回消息处理结果给执行进程Worker
9.执行进程Worker
在终端输出响应或结束运行
使用流程
- 消息的创建与推送
- 消息的消费与删除
- 任务发布
- 任务处理
注意:这里会将消息message
与任务job
视为同一概念
消息的创建与推送
在业务控制器中创建一个新消息并推送到指定的队列中
首先创建消息需要引入think\Queue
类
use think\Queue
创建消息时需指定当前消息所归属消息队列的名称
$job_queue_name = "dismiss_job_queue";
如果是Redis
驱动对应的就是List
数据列表的名称
如果是Database
驱动对应的就是prefix_job
表中queue
字段中的内容
创建消息时需要指定当前消息将会由哪个类来负责处理(消费者),当轮到该消息时,系统将生成一个该类的实例,并调用其fire
方法。
$job_handler_classname = "app\index\job\Dismiss";
这里是采用手动指定消息处理类的方式,更合理的做法是事先定义好消息名称与消费者类名的映射关系,然后根据某个可以获取该映射关系的类来推送消息,这样生产者只需要知道消息的名称,而无需指定具体哪个消费者来处理。
创建消息时需要指定当前任务所需的业务数据,注意数据不能是资源类型resource
,业务数据最终将转化为json
形式的字符串。
$job_data = [];
$job_data["ts"] = time();
$job_data["bizid"] = uniqid();
$job_data["params"] = $params;
最后将创建的消息推送到消息队列并等待对应的消费者去执行
$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);
使用Queue::push
方法将消息推送到消息队列,其返回值根据驱动不同而不同,如果是Redis
驱动则成功返回随机字符串失败返回false
,如果是Database
驱动则成功返回1
失败返回false
。
if($is_pushed !== false )
{echo date("Y-m-d H:i:s")." a new job is pushed to the message queue";
}
else
{echo date("Y-m-d H:i:s")." a new job pushed fail";
}
例如:在游戏结束后,大厅服务器会发送游戏战绩数据给HTTP接口,接口获取数据后对其进行加工处理最终得到入库所需的数据,期间还会涉及到向第三方接口推送数据等等。如果采用同步处理的方式,大厅服务器只有等到所有的处理完毕后才能得到得到结构,由于大厅服务器会根据接口返回的数据判断当前战绩是否写入成功,若接口返回数据时间过长,此时服务端将一直处于等待状态,连接不会被断开,这种情况对于使用越来越频繁的接口来说,几乎是一种噩梦。
完整代码
<?php
namespace app\api\controller;
use think\Queue;class Game extends Api
{public function dismiss(){//获取参数$data = file_get_contents("php://input");if(empty($data)){$this->error("post is null");}$params = json_decode($data, true);
**加粗样式**/*创建新消息并推送到消息队列*/// 当前任务由哪个类负责处理$job_handler_classname = "app\api\job\Dismiss";// 当前队列归属的队列名称$job_queue_name = "dismiss_job_queue";// 当前任务所需的业务数据$job_data = ["ts"=>time(), "bizid"=>uniqid(), "params"=>$params];// 将任务推送到消息队列等待对应的消费者去执行$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);if($is_pushed == false){$this->error("dismiss job queue went wrong");}//操作成功$this->success('success');}
}
消息的消费与删除
创建Dismiss
消费者类,用于处理dismiss_job_queue
队列中的任务。
创建\application\api\job\Dismiss.php
消费者类,并编写fire()
方法。
<?php
namespace app\api\job;
use think\Log;
use think\queue\Job;/*** 消费者类* 用于处理 dismiss_job_queue 队列中的任务* 用于牌局解散
*/
class Dismiss
{/*** fire是消息队列默认调用的方法* @param Job $job 当前的任务对象* @param array|mixed $data 发布任务时自定义的数据*/public function fire(Job $job, $data){//有效消息到达消费者时可能已经不再需要执行了if(!$this->checkJob($data)){$job->delete();return;}//执行业务处理if($this->doJob($data)){$job->delete();//任务执行成功后删除Log::log("dismiss job has been down and deleted");}else{//检查任务重试次数if($job->attempts() > 3){Log::log("dismiss job has been retried more that 3 times");$job->delete();}}}/*** 消息在到达消费者时可能已经不需要执行了* @param array|mixed $data 发布任务时自定义的数据* @return boolean 任务执行的结果*/private function checkJob($data){$ts = $data["ts"];$bizid = $data["bizid"];$params = $data["params"];return true;}/*** 根据消息中的数据进行实际的业务处理*/private function doJob($data){// 实际业务流程处理return true;}
}
发布任务
访问接口/api/game/dismiss
查看推送是否成功
处理任务
切换到当前终端到项目根目录
$ php think queue:work --queue dismiss_job_queue
守护进程
方法1:实际使用过程中应安装Supervisor这样的通用进程管理工具,它会监控php think queue:work的进程,一旦失败会帮助重启,详情可参见 《Supervisor》 。
简单来总结下使用流程
- 安装Supervisor并编写应用程序配置脚本,脚本主要用来运行
php think queue:work
命令。 - 运行Supervisor服务,它会读取主进程和应用程序配置。
- 运行自己编写的消息队列并根据日志查看是否正常运行
方法二:首先我们写两个shell脚本
1.monitorHandleQueue.sh,作用是检查队列的进程是否在运行
pid=$(ps -ef| grep dismiss_job_queue|grep -v grep | awk ' NR==1 {print $2}')if [ -z $pid ]thensh /项目绝对路径/dismiss_job_queue.sh &>/dev/null 2>&1
fi
检查进程是否存在,如果不存在启动dismiss_job_queue.sh脚本,注意:monitorHandleQueue.sh脚本中的启动dismiss_job_queue.sh的路径写自己的,NR==1表示只取第一个进程,|grep -v grep 过滤掉自己的进程
dismiss_job_queue.sh 脚本
cd /项目绝对路径
while [ 2 > 0 ]dolen=`/usr/local/redis/bin/redis-cli -h 1.1.1.1 -p 6379 Llen queues:dismiss_job_queue`echo lenif [ $((len + 0 )) -gt 0 ];then/usr/local/php/bin/php think queue:work --queue dismiss_job_queueelsesleep 3/usr/local/php/bin/php think queue:work --queue dismiss_job_queuefi
done
解释一下dismiss_job_queue.sh脚本的逻辑:先切换到框架的根目录,while判断2大于0为真,所以会一直执行,连接到redis,获取队列的长度,if判断,如果队列的长度大于0直接执行队列,否则就停3秒再执行队列,很简单,写了很长时间,还有一点要注意,shell脚本最好不要在编辑器编辑,直接在Linux上编辑,因为如果在编辑器上编辑上传到Linux上会产生意想不到的问题(我在这里耽误了很长时间),找不到问题所在就直接在Linux上编写好了,省的麻烦
1.[ 2 > 0 ]格式为[空格判断表达式空格]
2.len=/usr/local/redis/bin/redis-cli -h 47.101.54.26 -p 6379 Llen queues:dismiss_job_queue,等于号的左右两边不能有空格,反引号 `` 不知道怎么输出,在1是我左边的那个键
3. if 的格式,if [ $((len + 0 )) -gt 0 ];then 变量大于0,大于号用 -gt 表示
Linux发布定时任务
* * * * * /项目绝对路径/monitorHandleQueue.sh 2> /tmp/queue.log &
命令
Work模式 queue:work
用于启动一个工作进程来处理消息队列
$ php think queue:work --queue dismiss_job_queue
参数说明
- –
daemon
是否循环执行,如果不加该参数则该命令处理完下一个消息就退出。 - –
queue dismiss_job_queue
要处理的队列的名称 - –
delay 0
如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0。 - –
force
系统处于维护状态时,是否仍然处理任务,并未找到相关说明。 - –
memory 128
该进程允许使用的内存上限,以M为单位。 - –
sleep 3
如果队列中无任务则sleep
多少秒后重新检查(****work+daemon
****模式)或退出(listen
或非daemon
模式) - –
tries 2
如果任务已经超过尝试次数上限,则触发“任务尝试数超限”事件,默认为0。
Daemon模式的执行流程
$ php think queue:work
命令行参数
- –
daemon
是否一直执行 - –
queue
处理的队列的名称 - –
delay
重发失败任务时延迟多少秒才执行 - –
force
系统处于维护状态时是否仍然处理任务 - –memory 该进程允许使用的内存上限,以M为单位。
- –
sleep
入股队列中无任务则多少秒后重新检查 - –
tries
任务重发多少次之后进入失败处理逻辑
如何从缓冲中得到上次重启的时间?
Cache::get("think:queue:restart")
从缓存得到上次重启的事件
如何判断是否退出daemon循环呢?
- 内存超限:
memory_get_usage() >= --memory
参数 - 重启时间有更新:外部通过
php think queue:restart
命令更新了重启时间的缓存
Listen模式 queue:listen
用于启动一个listen进程,然后由listen进程通过proc_open('php think queue:work --queue="%s" --delay=%s --memory=%s --sleep=%s --tries=%s')
来周期性地创建一次性的work进程来消费消息队列,并且限制该work进程的执行事件,同时通过管道来监听work进程的输出。
$ php think queue:listen --queue dismiss_job_queue
- –
queue dismiss_job_queue
监听队列的名称 - –
delay 0
如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0。 - –
memory 128
该进程允许使用的内存上限,以M为单位。 - –
sleep 3
如果队列中无任务,则多长时间后重新检查。 - –
tries 0
如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0。 - –
timeout 60
工作进程允许执行的最长时间,以秒为单位。
Work模式和Listen模式的异同点
两者都可以用于处理消息队列中的任务,区别在于:
- 执行原理不同
Work模式是单进程的处理模式,按照是否设置--daemon
参数又可以分为单次执行和循环执行两种模式。单次执行不添加--daemon
参数,该模式下Work进程在从处理完下一个消息后直接结束当前进程。当队列为空时会sleep一
段时间然后退出。循环执行添加了--daemon
参数,该模式下Work进程会循环地处理队列中的消息直到内存超出参数配置才结束进程。当队列为空时会在每次循环中sleep一
段时间。
Listen命令是“双进程+管道”的处理模式,Listen命令所在的进程会循环地创建单次执行模式的Work进程,每次创建的Work进程只消费一个消息就会结束,然后Listen进程再创建一个新的Work进程。Listen进程会定时检查当前的Work进程执行时间是否超过了--timeout
参数的值,如果已经超过则Listen进程会杀掉所有Work进程,然后抛出异常。Listen进程会通过管道来监听当前的Work进程的输出,当Work进程有输出时Listen进程会将输出写入到stdout/stderr
。Listen进程会定时通过proc_get_status()
函数来监控当前的Work进程是否仍再运行,Work进程消费完一个任务之后,Work进程就结束了,其状态会变成terminated
,此时Listen进程就会重新创建一个新的Work进程并对其计时,新的Work进程开始消费下一个任务。
结束时机不同
Listen命令中Listen进程和Work进程会在以下情况下结束:Listen进程会定时检查当前的Work进程的执行时间是否超过了--timeout
参数的值,如果已经超时此时Listen进程会杀掉当前的Work进程,然后抛出一个ProcessTimeoutException
异常并结束Listen进程。Listen进程会定时检查自身使用的内存是否超过了--memory
参数的值,如果已经超过此时Listen进程会直接die
掉,Work进程也会自动结束。性能不同
Work命令是在脚本内部做循环,框架脚本在命名执行的初期就已经加载完毕。而Listen模式则是处理完一个任务之后新开一个Work进程,此时会重新加载框架脚本。因此Work模式的性能会比Listen模式高。注意当代码有更新时Work模式下需要手动去执行php think queue:restart
命令重启队列来使改动生效,而Listen模式会自动生效无需其它操作。
- 超时控制能力
Work模式本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间。举例来说,假如在某次上线后\app\api\job\Dismiss
消费者的fire
方法中添加一段死循环。
public function fire()
{while(true){$consoleOutPut->writeln("looping forever inside a job");sleep(1);}
}
这个循环将永远不能停止,直到任务所在的进程超过内存限制或者由管理员手动结束。这个过程不会由任何的警告。更严重的是如果配置了expire
,那么这个死循环的任务可能会污染到同样处理dismiss_job_queue
队列的其它Work进程,最后好几个Work进程将被卡死在这段死循环中。
Work模式下的超时控制能力实际应理解为多个Work进程配合下的过期任务重发能力。
Listen命令可以限制Listen进程创建的Work进程的最大执行时间,Listen命令可以通过--timeout
参数限制Work进程允许运行的最长时间,超过该时间限制后Work进程会被强制杀死,Listen进程本身也会抛出异常并结束。
expire
与timeout
之间的区别
expire
在配置文件中设置,timeout
在Listen命令的命令行参数中设置。expire
和timeout
是两个不同层次上的概念:expire
是指任务的过期时间,这个时间是全局的影响到所有的Work进程,不管是独立的Work命令还是Listen模式下创建的Work进程。expire
针对的对象是任务。timeout
是指Work进程的超时时间,这个时间只针对当前执行的Listen命令有效,timeout
针对的对象是Work进程。
- 使用场景不同
Work命令的适用场景是任务数量较多、性能要求较高、任务的执行时间较短、消费者类中不存在死循环/sleep()
/exit()
/die()
等容易导致bug的逻辑。
Listen命令的适用场景是任务数量较少、任务的执行时间较长(如生成大型的Excel报表等)、任务的执行时间需要有严格限制。
消息处理流程
消息队列处理一个任务的具体流程
重发超时任务
超时任务是指任务处于执行中,当前时间 - 任务开始执行的时刻 >expire
时间
重发是指将任务的状态还原为未执行,并将任务的已执行次数加1。获取下一个任务
有效任务是指未执行、最早可执行的时间 <= 当前时间、按时间先后排序(先进先出)任务次数超限逻辑
任务的已尝试次数大于命令行中的--tries
参数,命令行中的--tries
参数大于0。触发次数超限事件
queue_failed
内置的次数超限事件标签,是否定义了queue_failed
行为,未定义则不处理直接返回,已定义则对次数超限的任务进行处理。
$runHookCb = Behavior::queueFailed() //返回true则删除任务执行任务失败回调,返回false则不执行任何操作。
- 消费当前的任务
$job->fire()
从$job
对象的payload
属性中解析出消费者类,创建消费者类的实例,执行消费者类的实例的fire($job, $data)
方法。
需要在fire($job, $data)
中手动删除任务,$job
参数表示当前任务对象,$data
参数表示当前的任务数据即创建队列时传入的参数。
消息队列的开始、停止、重启
开始一个消息队列
$ php think queue:work
停止所有的消息队列
$ php think queue:restart
重启所有的消息队列
$ php think queue:restart
$ php think queue:work
多模块多任务的处理
- 多模块
单模块项目推荐时间app/job
作为任务类的命名空间,多任务项目可使用app/module/job
作为任务类的命名空间,也可以放在任意可以自动加载到的地方。
- 多任务
如果一个任务类中有多个小任务的话,在发布任务的时候,需要使用任务的“类名@方法名”的形式,例如app\lib\job\Job@task
,注意命令行中的--queue
参数不执行@的解析。
消息的延迟执行与定时任务
延迟执行是相对于即使执行的,是用来限制某个任务的最早可执行时刻。在到达该时刻之前该任务会被跳过,可以利用该功能实现定时任务。
延迟执行的使用方式
- 在生产者业务代码中
// 即时执行
$is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name)// 延迟2秒执行
$is_pushed = Queue::later(2, $job_handler_classname, $job_data_arr, $job_queue_name);// 延迟到2019-06-01 00:00:00时刻执行
$time2wait = strtotime("2019-06-01 00:00:00") - strtotime("now");
$is_pushed = Queue::later($time2wait, $job_handler_classname, $job_data_arr, $job_queue_name);
- 在消费者类中
// 重发,即时执行
$job->release();// 重发,延迟2秒后执行
$job->release(2);// 延迟到2019-06-01 00:00:00时刻执行
$time2wait = strtotime("2019-06-01 00:00:00") - strtotime("now");
$job->release($time2wait);
- 在命令行中
如果消费者类的fire
方法抛出了异常且任务未被删除时,将自动重发该任务。重发时会设置下次执行前延迟多少秒,默认为0。
$ php think queue:work --delay 3
消息的重发
消息重发时机有三种情况:
- 在消费者类中手动重发
if($is_job_done === false)
{$job->release();
}
- Work进程自动重发需要同时满足以下两个条件:消费者类的
fire()
方法抛出异常、任务未被删除 - 当配置了
expire
不为null
时,Work进程内部每次查询可用任务之前会自动重发已过期的任务。
注意:在Database模式下,2.7.1和2.7.2中的重发逻辑是先删除原来的任务,然后插入一个新的任务。2.7.3中的重发机制是直接更新原任务。在Redis模式下,三种重发都是先删除再插入。不管是那种重发方式,重发之后任务的已尝试次数会在原来的基础上加1。
此外,消费者类中需要注意,如果fire()方法中抛出异常,将出现两种情况:
- 如果不需要自动重发的话,请在抛出异常之前将任务删除
$job->delete()
,否则会被框架自动重发。 - 如果需要自动重发的话,请直接抛出异常,不要在fire()方法中手动使用
$job->release()
,这样将导致任务被重发两次,产生两个一样的新任务。
Redis驱动下的任务重发细节
在Redis驱动下为了实现任务的延迟执行和过期重发,任务将在这三个key
中来回转移。
在Database模式下消息处理的消息流程中,如果配置的expire
不是null
那么think-queue
的work
进程每次在获取下一个可执行任务之前,会先尝试重发所有过期的任务。而在Redis驱动下这个步骤则做了更多的事情,详情如下:
- 从
queue:xxx:delayed
的key
中查询出有哪些任务在当前时刻已经可以开始执行,然后将这些任务转移到queue:xxx
的key
的尾部。 - 从
queue:xxx:reserved
的key
中查询出有哪些任务在当前时刻已经过期,然后将这些任务转移到queue:xxx
的key
的尾部。 - 尝试从
queue:xxx
的key
的头部取出一个任务,如果取出成功,则将这个任务转移到queue:xxx:reserved
的key
的头部,同时将这个任务实例化成任务对象,交给消费者去执行。
Redis队列中的过期任务重发步骤,执行前:
Redis队列中的过期任务重发步骤,执行后:
任务的失败回调与警告
当同时满足以下条件时将触发任务失败回调:
- 命令行的
--tries
参数的值大于0 - 任务的已尝试次数大于命令行的–tries参数
- 开发者添加了
queue_failed
事件标签及其对应的回调代码 - 消费者类中定义了
failed()
方法用于接收任务失败的通知
注意,queue_failed
标签需要在安装think-queue
之后手动去/app/tags.php
文件中添加。
注意事项
-任务完成后使用$job->delete()删除任务
- 在消费者类的fire()方法中使用$job->attempt()检查任务已执行次数,对于次数异常的做相应的处理。
- 在消费者类的fire()方法在中根据业务数据来判断该任务是否已经执行过,以避免该任务被重复执行。
- 编写失败回调事件将事件中失败的任务及时通知给开发人员
拓展
- 队列的稳定性和拓展性
稳定性:不管是listen
模式还是work
模式,建议使用supervisor
或自定义的cron
脚本去定时检查work
进程是否正常。
拓展性:当某个队列的消费者不足时在给这个队列添加work
进程即可
使用注意
最好配置本地的Redis,使用远程Redis曾出现无法解释的原因。
$ yum install -g redis
$ systemctl restart redis
停止原正在运行的
$ supervisorctl shutdown
重新加载服务
$ supervisord -c /etc/supervisord.conf
$ supervisorctl reload
$ ps aux | grep supervisord
$ top
think-queue文档相关推荐
- Linux内核官方文档atomic_ops.txt【摘自Linux 内核文档】
摘自Linux内核文档 Documentation/atomic_ops.txt,不是本人原创 Semantics and Behavior of Atomic and Bitmask Operati ...
- JMS ActiveMQ研究文档
1. 背景 当前,CORBA.DCOM.RMI等RPC中间件技术已广泛应用于各个领域.但是面对规模和复杂度都越来越高的分布式系统,这些技术也显示出其局限性:(1)同步通信:客户发出调用后,必须等待服务 ...
- python asyncio文件操作_Python asyncio文档阅读摘要
文档地址:https://docs.python.org/3/library/asyncio.html 文档第一句话说得很明白,asyncio是单线程并发,这种event loop架构是很多新型异步并 ...
- socks5协议RFC文档
socks5协议RFC文档 « Xiaoxia[PG] socks5协议RFC文档 Network Working Group M. Leech Request for Comments: 1928 ...
- .NET操作RabbitMQ组件EasyNetQ使用中文简版文档。
本文出自EasyNetQ官方文档,内容为自己理解加翻译.文档地址:https://github.com/EasyNetQ/EasyNetQ/wiki/Quick-Start EasyNetQ简介 Ea ...
- iOS FMDB官方使用文档 G-C-D的使用 提高性能(翻译)(转)
由于FMDB是建立在SQLite的之上的,所以你至少也该把这篇文章从头到尾读一遍. 与此同时,把SQLite的文档页 http://www.sqlite.org/docs.html 加到你的书签中. ...
- 消息队列 策略_太狠了!京东T8架构师建议吃透这40W字消息队列文档,涨薪15K不是梦...
"RabbitMQ?""Kafka?""RocketMQ?"...在日常学习与开发过程中,我们常常听到消息队列这个关键词.我也在我的多篇文章 ...
- [官方] mysql 性能优化文档(中英文自译)
大家好,我是烤鸭: 根据官方文档翻译并精简部分内容.建议有时间的朋友下载原版查看,全文106页pdf,快的话1-2天就能看完.自己翻译的有些地方可能不完整,欢迎指正. 官方pdf下载,需登录: htt ...
- 【c#】RabbitMQ学习文档(一)Hello World
一.简介 RabbitMQ是一个消息的代理器,用于接收和发送消息,你可以这样想,他就是一个邮局,当您把需要寄送的邮件投递到邮筒之时,你可以确定的是邮递员先生肯定会把邮件发送到需要接收邮件的人的手里,不 ...
- c语言xc比较大小写,XCTestAPI文档.docx
XCTestAPI文档 XCTest准备工作对于新项目,在新建项目界面勾选上UI Tests:对于旧项目,在项目界面点击菜单栏中的FileNewTarget-iOSTestiOS UITesting ...
最新文章
- 监控组件_分布式监控组件Cat,如何实现万亿级消息的高效存储?
- ssh 公钥登录远程主机
- python和javascript哪个好_JavaScript与Python:主要区别
- 【链接】Linux C/C++ 学习路线-已拿腾讯、百度 offer
- C# MVC IOC、依赖注入
- 征稿 | ​2020年全国知识图谱与语义计算大会
- 多重继承中二义性的消除
- 不会JS中的OOP,你也太菜了吧!(第二篇)
- 用最新MySQL 8.0的源安装MySQL 5.7版本(CentOS 7环境下)
- 实验报告二:例2-19 一位全加器
- python贴吧-学点python吧,别再这么累了
- linux 添加声卡驱动,Linux操作系统下声卡驱动的详细加载方法
- 笔记本w ndows未能启动,手把手教你windows无法启动怎么办
- 浅谈短视频APP的发展趋势
- 入股不亏!LINQ凭什么被誉为最好的技术?
- 参加南京mooc活动有感
- 基于freeswitch1.6的IVR智能语音机器人交互逻辑lua脚本
- RabbitMQ交换机类型
- 7-3 sdut-C语言实验- 对称矩阵的判定
- 通过停车场计算车费案例练习JavaIO流
热门文章
- C++ 获取当前时间毫秒数
- Python爬虫实战-----案例分析爬虫一般过程
- 不是秘密的内幕 常见移动电源质量问题
- pubg服务器维护请稍后再试9月4日,绝地求生PUGB9月4日更新维护结束时间 9月4日更新内容_蚕豆网新闻...
- 高通410c下载安卓源码
- mdp框架_用于在线机器学习MDP的Python库
- 【Python知识点梳理】8.飞机大战(面向对象设计思维)
- ajax请求动态生成dmo无法绑定事件解决方案
- Cookie功能被禁用,如何启用?
- java项目时间不够怎么办_时间总是不够用怎么办?