前言

传统的程序执行流程一般是 即时|同步|串行的,在某些场景下,会存在并发低,吞吐量低,响应时间长等问题。在大型系统中,一般会引入消息队列的组件,将流程中部分任务抽离出来放入消息队列,并由专门的消费者作针对性的处理,从而降低系统耦合度,提高系统性能和可用性。

thinkphp-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:

消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等

队列的多队列, 内存限制 ,启动,停止,守护等

消息队列可降级为同步执行

thinkphp-queue 内置了 Redis,Database,Topthink ,Sync这四种驱动。

本文主要介绍如何使用tp5自带的think-queue消息队列结合supervisor进程管理使队列能够常驻进程。

think-queue安装与基本使用

tp5框架及think-queue的安装方法及队列驱动配置

tp5框架及think-queue安装

推荐使用composer安装

tp5安装composer create-project topthink/think 5.0.*

think-queue安装composer require topthink/think-queue

消息队列的驱动配置

配置文件在项目的路径如下图:

内容如下:

return [

'connector' => 'Redis',// Redis 驱动

'expire' => 60,// 任务的过期时间,默认为60秒; 若要禁用,则设置为 null

'default' => 'default',// 默认的队列名称

'host' => '127.0.0.1',// redis 主机ip

'port' => 6379,// redis 端口

'password' => '',// redis 密码

'select' => 0,// 使用哪一个 db,默认为 db0

'timeout' => 0,// redis连接的超时时间

'persistent' => false,// 是否是长连接

];

具体配置可根据实际情况自行调整

创建一张表,用于展示消费队列写入数据库的操作

CREATE TABLE `test` (

`id` int(10) NOT NULL AUTO_INCREMENT,

`task_type` varchar(50) DEFAULT '' COMMENT '任务类型',

`data` text COMMENT '数据',

`pdate` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '时间',

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

创建消息队列任务

入队(生产者)

在index模块新增 \application\index\controller\JobTest.php 控制器,在该控制器中添加 actionWithHelloJob 方法

生产者推送消息到队列有2种方法:push()和later(),push是立即执行,later是推送到队列里,延迟执行。代码如下

public function actionWithHelloJob(){

// 1.当前任务将由哪个类来负责处理。

// 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法

$jobHandlerClassName = 'app\index\job\Hello';

// 2.当前任务归属的队列名称,如果为新队列,会自动创建

$jobQueueName = "helloJobQueue";

// 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串

$jobData = [ 'ts' => time(), 'bizId' => uniqid() , 'data' => $_GET ] ;

// 4.将该任务推送到消息队列,等待对应的消费者去执行

$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );

//$isPushed = Queue::later(10,$jobHandlerClassName,$jobData,$jobQueueName); //把任务分配到队列中,延迟10s后执行

// database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false

if( $isPushed !== false ){

echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."
";

}else{

echo 'something went wrong.';

}

}

消费者的消费与删除

创建Hello 消费者类,用于处理 helloJobQueue 队列中的任务;新增 \application\index\job\Hello.php 消费者类,并编写其 fire() 方法

代码如下:

namespace app\index\job;

use think\queue\Job;

class Hello {

public function fire(Job $job,$data) {

// 有些消息在到达消费者时,可能已经不再需要执行了

$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);

if(!$isJobStillNeedToBeDone){

$job->delete();

return;

}

$isJobDone = $this->doHelloJob($data);

if ($isJobDone) {

// 如果任务执行成功, 记得删除任务

$job->delete();

print("Hello Job has been done and deleted"."\n");

}else{

if ($job->attempts() > 3) {

//通过这个方法可以检查这个任务已经重试了几次了

print("Hello Job has been retried more than 3 times!"."\n");

$job->delete();

// 也可以重新发布这个任务

//print("Hello Job will be availabe again after 2s."."\n");

//$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行

}

}

}

/**

* 有些消息在到达消费者时,可能已经不再需要执行了

* @param array|mixed $data 发布任务时自定义的数据

* @return boolean 任务执行的结果

*/

private function checkDatabaseToSeeIfJobNeedToBeDone($data){

return true;

}

/**

* 根据消息中的数据进行实际的业务处理...

*/

private function doHelloJob($data)

{

print("Hello Job Started. job Data is: ".var_export($data,true)." \n");

print("Hello Job is Fired at " . date('Y-m-d H:i:s') ." \n");

print("Hello Job is Done!"." \n");

return true;

}

}

执行之前,看下现在的目录结构

处理任务(消费者)

打开终端切换到当前项目根目录下,执行下面的命令:

work命令又可分为单次执行和循环执行两种模式。

单次执行:不添加 --daemon参数,该模式下,work进程在处理完下一个消息后直接结束当前进程。当队列为空时,会sleep一段时间然后退出。

循环执行:添加了 --daemon参数,该模式下,work进程会循环地处理队列中的消息,直到内存超出参数配置才结束进程。当队列为空时,会在每次循环中sleep一段时间。

php think queue:work --daemon --queue helloJobQueue

会看到如下信息:

[root@localhost tpqueue]# php think queue:work --daemon --queue helloJobQueue

Hello Job Started. job Data is: array (

'ts' => 1565246136,

'bizId' => '5d4bc2b88f03b',

'data' =>

array (

),

)

Hello Job is Fired at 2019-08-08 14:35:39

Hello Job is Done!

Hello Job has been done and deleted

Processed: app\index\job\Hello

到这里我们成功的经历了一个消息的 创建->推送->消费->删除的基本流程

消息队列的开始,停止与重启

开始一个消息队列:php think queue:work

停止所有的消息队列:php think queue:restart

重启所有的消息队列:php think queue:restart

php think queue:work

多模块,多任务的处理

多模块

单模块项目推荐使用 app\job 作为任务类的命名空间

多模块项目可用使用 app\module\job 作为任务类的命名空间 也可以放在任意可以自动加载到的地方

多任务

多任务例子:

在 \application\index\controller\JobTest.php 控制器中,添加 multiTask()方法:

public function multiTask() {

$taskType = $_GET['taskType'];

switch ($taskType) {

case "taskOne":

$jobHandleClassName = "app\index\job\multiTask@taskOne";

$jobQueueName = "taskOneQueue";

$jobData = ['ts'=>time(), 'bizId'=>uniqid(), 'data'=>$_GET];

break;

case "taskTwo":

$jobHandleClassName = "app\index\job\multiTask@taskTwo";

$jobQueueName = "taskTwoQueue";

$jobData = ['ts'=>time(), 'bizId'=>uniqid(), 'data'=>$_GET];

break;

default:

break;

}

$isPushed = Queue::push($jobHandleClassName, $jobData, $jobQueueName);

if ($isPushed!==false) {

echo date('Y-m-d H:i:s')."the $taskType of multiTask job has been pushed to $jobQueueName
";

}else {

throw new Exception("push a new $taskType of multiTask job Failed!");

}

}

新增 \application\index\job\MultiTask.php 消费者类,并编写其 taskOne() 和 taskTwo()方法

namespace app\index\job;

use think\queue\Job;

class MultiTask {

public function taskOne(Job $job, $data) {

$isDone = $this->doTaskOne($data);

if ($isDone) {

$job->delete();

print ("INFO:the taskOne of multiTask has been done and delete!\n");

return;

}else {

if ($job->attempts()>3) {

$job->delete();

}

}

}

public function taskTwo(Job $job, $data) {

$isDone = $this->doTaskTwo($data);

if ($isDone) {

$job->delete();

print ("INFO:the taskTwo of multiTask has been done and delete! \n");

}else {

if ($job->attempts()>3) {

$job->delete();

}

}

}

private function doTaskOne($data) {

$id = db('test')->insertGetId(['task_type'=>'task one','data'=>json_encode($data)]);

print ("INFO: doing taskOne of multiTask! the db return id is :$id\n");

return true;

}

private function doTaskTwo($data) {

$id = db('test')->insertGetId(['task_type'=>'task two','data'=>json_encode($data)]);

print ("INFO: doing taskTwo of multiTask! the db return id is :$id\n");

return true;

}

}

最终执行结果如下:

supervisor的安装和配置

supervisor是用Python开发的一个client/server服务,是Linux/Unix系统下的一个进程管理工具。可以很方便的监听、启动、停止、重启一个或多个进程。用supervisor管理的进程,当一个进程意外被杀死,supervisor监听到进程死后,会自动将它重启,很方便的做到进程自动恢复的功能,不再需要自己写shell脚本来控制。

yum安装supervisor

yum install epel-release

yum install supervisor

//设置开机自动启动

systemctl enable supervisord

配置

找到/etc/supervisord.conf配置文件,编辑信息如下:

[unix_http_server]

file=/tmp/supervisor.sock ; the path to the socket file

;chmod=0700 ; socket file mode (default 0700)

;chown=nobody:nogroup ; socket file uid:gid owner

;username=user ; default is no username (open server)

;password=123 ; default is no password (open server)

supervisord]

logfile=/tmp/supervisord.log ; main log file; default $CWD/supervisord.log

logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB

logfile_backups=10 ; # of main logfile backups; 0 means none, default 10

loglevel=info ; log level; default info; others: debug,warn,trace

pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid

nodaemon=false ; start in foreground if true; default false

minfds=1024 ; min. avail startup file descriptors; default 1024

minprocs=200 ; min. avail process descriptors;default 200

[supervisorctl]

serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket

[include]

;files = relative/directory/*.ini

files = /etc/supervisor/*.conf

file,logfile,pidfile,serverurl,files的路径可根据自身需要去自定义

在/etc 目录里创建一个supervisor文件,然后在/etc/supervisor目录下创建一个.conf文件,这里命名为queue.conf。

对于index这个单模块而言,不同的业务逻辑为了区分可能会存在多个队列名,这种情况将多个队列名用逗号拼接起来,内容如下:

[program:queue]

user=root

command=php /www/wwwroot/tpqueue/think queue:work --queue helloJobQueue,taskOneQueue,taskTwoQueue --daemon

启动supervisor

supervisorctl -c /etc/supervisord.conf

上面这个命令会进入 supervisorctl 的 shell 界面,然后可以执行不同的命令了

status # 查看程序状态

stop thrift-log # 关闭 usercenter 程序

start thrift-log # 启动 usercenter 程序

restart thrift-log # 重启 usercenter 程序

reread # 读取有更新(增加)的配置文件,不会启动新添加的程序

update # 重启配置文件修改过的程序

例如启动queue程序:

这时再去推送消息,可以看到如下信息:

红圈中的是日志文件,可见队列消费完成,数据插入成功

数据库插入数据如下:

结束

至此,tp5(think-queue)消息队列结合supervisor已实现进程常驻

tp5 queue.php,tp5(think-queue)消息队列+supervisor进程管理实现队列常驻进程相关推荐

  1. 操作系统之进程管理:3、进程控制(进程状态转化的实现)、原语、进程通信(共享、管道、消息)

    3.进程控制 进程控制 思维导图 进程控制相关的原语 创建原语 撤销原语 阻塞原语.唤醒原语 切换原语 原语要做的几件事 进程通信 思维导图 进程通信方式 数据共享 管道通信 消息传递 进程控制 1. ...

  2. 【Microsoft Azure学习之旅】测试消息队列(Service Bus Queue)是否会丢消息

    组里最近遇到一个问题,微软的Azure Service Bus Queue是否可靠?是否会出现丢失消息的情况? 具体缘由如下, 由于开发的产品是SaaS产品,为防止消息丢失,跨Module消息传递使用 ...

  3. 队列Queue:任务间的消息读写,安排起来~

    摘要:本文通过分析鸿蒙轻内核队列模块的源码,掌握队列使用上的差异. 本文分享自华为云社区<鸿蒙轻内核M核源码分析系列十三 消息队列Queue>,作者:zhushy . 队列(Queue)是 ...

  4. Microsoft Message Queue(MSMQ:微软消息队列)简介

    一.前言 最近在安装公司的一个产品时,接触到了MSMQ,在此对MSMQ做一个简单的介绍,以便各位能对它有一个快速.直观的认识.本文针对于Microsoft Message Queue,以下提到的消息队 ...

  5. python queue 生产者 消费者_Queue: 应用于生产者-消费者模式的Python队列

    图片来源于网络 版权声明 © 著作权归作者所有 允许自由转载,但请保持署名和原文链接. 不允许商业用途.盈利行为及衍生盈利行为. 什么是Queue? Queue是Python标准库中的线程安全的队列( ...

  6. 【C++ 语言】容器 ( queue 队列 | stack 栈 | priority_queue 优先级队列 | set 集合 | 容器遍历 | map )

    文章目录 queue 队列 stack 栈 priority_queue 优先级队列 priority_queue 优先级队列指定排序方法 priority_queue 优先级队列排序行为 prior ...

  7. c++ STL:队列queue、优先队列priority queue 的使用

    说明:本文全文转载而来,原文链接:http://www.cppblog.com/wanghaiguang/archive/2012/06/05/177644.html C++ Queues(队列) C ...

  8. python判断队列是否为空_python队列Queue

    一.Queue Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递 队列在多线程中可以共享 ...

  9. python 队列实现_python中实现队列的queue模块

    python中的queue模块提供了同步的.线程安全的队列类,包括FIFO(先进先出)的Queue类和LIFO(后进先出,栈结构)LifoQueue类和优先队列PriorityQueue类,它们都实现 ...

最新文章

  1. ios玩全民奇迹不显示服务器,全民奇迹关于IOS充值游戏物品不到账公告
  2. Broadcom BCM4322(如:HP 6530b)wifi不能用解决办法
  3. 如何连动作导入3dmax_教你如何将犀牛(Rhino)模型导入3dmax并进行渲染
  4. 计算机视觉中的人类感知、理解和生成 (ICCV 2019 Workshop)
  5. Scikit-learn库中的数据预处理(一)
  6. 同步代码时忽略maven项目 target目录
  7. 设置MySQL最大连接数
  8. 联想怎么启用计算机的无线功能,IdeaCentre B3系列电脑无线连接中心的使用方法...
  9. python3.6对MySQL数据恢复
  10. 快速制作机房3D效果图教程
  11. 用html如何做发帖的页面,如何用html发帖
  12. Win8.1屏幕亮度自动调节关闭方法
  13. 高盛区块链79页完整报告:从理论到实践!
  14. 062:vue+openlayers绘制正方形、矩形、六芒星( 代码示例 )
  15. 一篇文章教你从入门到精通 Google 指纹验证功能
  16. oracle序列高速缓存,行高速缓存上的等待事件
  17. td-agent 安装,配置,采集日志到postgresql
  18. 在Linux系统下制作U盘启动盘
  19. “屌丝”身世-由一个低级趣味而来
  20. 解决编译glad卡在glad opening的问题

热门文章

  1. 快来,这里有23种设计模式的Go语言实现
  2. 五种C语言非数值计算的常用经典排序算法
  3. 从JAVA内存到垃圾回收,带你深入理解JVM
  4. 【华为云技术分享】【昇腾】【玩转Atlas200DK系列】Atlas 200 DK安装python的hiai库以及opencv
  5. 【Python3网络爬虫开发实战】 3.2-使用requests
  6. 【华为大咖分享】3.如何做Code Review 与 结对编程?
  7. 【华为云实战开发】13.如何在云端快速搭建python网站
  8. Kotlin学习笔记 第二章 类与对象 第二节属性
  9. 红橙Darren视频笔记 View事件分发源码分析 基于API29
  10. 设计模式笔记一:工厂模式