Swoft 定时任务
Swoft的任务功能是基于Swoole的Task机制,Swoft的Task机制本质上是对SwooleTask机制的封装和加强,Swoft提供了精度为秒的定时任务功能用于替代Linux的Crontab。
Crontab是指需要定期运行的命令列表,以及用于管理该列表的命令的名称,crontab代表cron表,因为它使用作业调度程序cron来执行任务,cron本身是以chronos命名,是希腊语中时间的意思。cron是Linux的系统进程,会根据一个安排的时间表计划为用户自动执行任务,该计划也称为crontab,也用于编辑计划的程序名称。
环境配置
修改环境配置.env
文件中定时任务配置项
$ vim .env
CRONABLE=true
创建任务
一个类就是一个任务组,类中的每个方法就是一个任务。
$ vim app/task/TestTask.php
<?php
namespace App\Tasks;use Swoft\Task\Bean\Annotation\Scheduled;
use Swoft\Task\Bean\Annotation\Task;/*** @Task(“Test”)
*/
class TestTask
{/*** @Scheduled(cron="* * * * * *")*/public function run($num){echo "{$num} TestTask running...".PHP_EOL;}
}
@Task("Test")
定义任务名称,名称必须唯一。@Scheduled
用于设置触发时间cron
0 1 2 3 4 5
* * * * * *
- - - - - -
| | | | | |
| | | | | +----- 星期 (0 - 6) (星期日=0)
| | | | +----- 月 (1 - 12)
| | | +------- 日 (1 - 31)
| | +--------- 时 (0 - 23)
| +----------- 分 (0 - 59)
+------------- 秒 (0-59)
例如
// 每分钟的第10秒触发
@Scheduled(cron="10 * * * * *")//每小时50分钟10秒时触发
@Scheduled(cron="10 50 * * *")//每天21点01分10秒触发
@Scheduled(cron="10 1 21 * *")
任务投递
在控制器中投递任务
$result = Task::deliver("Test", "run", ["3"], Task::TYPE_ASYNC);
Swoft任务投递的实现机制离不开Swoole\Timer::tick()
,和\Swoole\Server->task()
底层执行机制是一样的,Swoft在实现的时候填了crontab
方式,实现在src/Crontab
下:
ParseContab
解析crontab
TableCrontab
使用Swoole\Table
实现 用于存储crontab
任务Crontab
连接Task
和TaskCrontab
Task::deliver(任务组名称, 任务名称, 任务参数, 投递方式);
参数说明
- 参数1:
@Task
定义的 - 参数2:方法名称
- 参数3:以数组的格式传值
- 参数4:指定是协程投递
Task::TYPE_CO
还是异步投递Task::TYPE_ASYNC
任务投递方式
任务投递Task::deliver()
将调用参数打包后,根据$type
参数使用Swoole的$server->taskCo()
或$server->task()
接口投递到Task进程。Task本身始终是同步执行的,$type
仅仅影响投递这个操作行为。
Task::TYPE_ASYNC
对应的$server->task()
是异步投递,Task::deliver()
调用后立即返回。Task::TYPE_CO
对应的$server->taskCo()
是协程投递,投递后让出协程控制,任务完成后或执行超时后Task::deliver()
才会从协程返回。
Swoole的Task机制的本质是Worker进程将耗时任务投递给同步的Task进程(TaskWorker进程)处理。换句话说,Swoole的$server->taskCo()
或$server->task()
都只能在Worker进程中使用,这一点限制了使用场景。如何才能在Process中投递任务呢?Swoft为了绕过这个限制提供了Task::deliverByProcess()
方法。其实现原理是通过Swoole的$server->sendMessage()
方法将调用信息从Process中投递到Worker进程中,然后由Worker进程替其投递到Task进程当中。
数据打包后会使用$server->sendMessage()
投递给Worker,$server->sendMessage
后Worker进程收到数据时会触发一个swoole.pipeMessage
事件回调,Swoft会将其转换为自己的swoft.pipeMessage
事件并触发。swoft.pipeMessage
事件最终由PipeMessageListener
处理,在相关的监听中如果发现swoft.pipeMessage
事件由Task::deliverByProceess()
产生,Worker进程会提替其执行一次Task::deliver()
,最终将任务数据投递到TaskWorker
进程中。
任务投递流程
- 当框架启动后会启动定时器每秒去更新执行一次任务,更新任务之前需要先去队列内存表中清理已完成的队列数据。
- 然后获取出所有的任务中的队列,可理解为获取所有Task类中的方法,任务规则以TaskClass、分钟、时间戳这些数据以md5方式加密得到每个任务队列的key值,保存到runTimeTable中。
任务执行
Swoole的Task任务机制的本质是Worker进程将耗时任务投递给同步的TaskWorker进程处理,所以swoole.onTask
的事件回调是在Task进程中执行的。
$ vim vendor/swoft/task/src/Bootstrap/Listeners/TaskEventListener.php
此处是swoole.onTask
的事件回调,其职责仅仅是将Worker进程投递来打包后的数据转发给TaskExecutor
。
/*** @param \Swoole\Server $server* @param int $taskId* @param int $workerId* @param mixed $data* @return mixed* @throws \InvalidArgumentException*/
public function onTask(Server $server, int $taskId, int $workerId, $data)
{try {/* @var TaskExecutor $taskExecutor*/$taskExecutor = App::getBean(TaskExecutor::class);$result = $taskExecutor->run($data);} catch (\Throwable $throwable) {App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));$result = false;// Release system resourcesApp::trigger(AppEvent::RESOURCE_RELEASE);App::trigger(TaskEvent::AFTER_TASK);}return $result;
}
Worker进程是大部分HTTP服务代码执行的环境,但从TaskEventListener.onTask()
方法开始,代码的执行环境都是Task进程,也就是说,TaskExecutor
和具体的TaskBean
都是执行在Task
进程中的。
$ vim vendor/swoft/task/src/TaskExecutor.php
任务执行的思路是将Worker进程发过来的数据解包并还原为原来的调用参数,根据$name
参数找到对应的TaskBean
并调用其对应的Task
方法,其中TaskBean
使用类级别注解@Task(name="TaskName")
或@Task("TaskName")
声明。注意@Task
注解除了name
属性外还有一个coroutine
属性。
/*** @return mixed*/
public function run(string $data)
{$data = TaskHelper::unpack($data);$name = $data['name'];$type = $data['type'];$method = $data['method'];$params = $data['params'];$logid = $data['logid'] ?? uniqid('', true);$spanid = $data['spanid'] ?? 0;$collector = TaskCollector::getCollector();if (! isset($collector['task'][$name])) {return false;}list(, $coroutine) = $collector['task'][$name];$task = bean($name);if ($coroutine) {$result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);} else {$result = $this->runAsyncTask($task, $method, $params, $logid, $spanid, $name, $type);}return $result;
}
任务执行流程
- 通过
getExecTasks
方法将所有满足条件的队列放入到一个数组,遍历数组将runStatus
修改为self::START
。 - 执行所有
runStatus
值为self::START
的队列任务 - 将执行后的队列任务的
runStatus
值修改为self::FINISH
- 将
runStatus
值修改为self::FINISH
的剔除掉
任务进程
Swoft使用两个前置进程
- 任务计划进程
CronTimerProcess
CronTimerProcess
进程是Swoft的定时任务调度进程,其核心方法是Crontab->initRunTimeTableData()
,该进程使用了Swoole的定时器功能,通过Swoole\Timer
在每分钟首秒时执行的回调,CronTimerProcess
每次被唤醒后都会遍历任务表,计算出当前这一分钟内的60秒分别需要执行的任务清单,写入执行表并标记为未执行。
- 任务执行进程
CronExecProcess
CronExecProcess
作为定时任务的执行者,通过Swoole\Timer
每0.5秒唤醒自身一次,然后把执行表遍历一次,挑选当下需要执行的任务,通过sendMessage()
投递出去并更新该任务执行表中的状态。该执行进程只负责任务的投递,任务的实际执行仍然在Task
进程中由TaskExecutor
处理。
内存表
Swoft使用两张内存数据表
在定时器中会使用到两个内存表Table
,一个是用于存储任务实例originTable
,一个是存储任务队列实例runTimeTable
,也就是存储需要执行的任务实例。
为什么要使用Swoole的内存表呢?
Swoft的定时任务管理分别由任务计划进程和任务执行进程负责,两个进程的运行共同管理定时任务,如果使用进程间独立的数组等结构,两个进程必然需要频繁地进程间通信。而使用跨进程的Swoole\Table
结构直接进行进程间数据共享,不仅性能高,操作简单还能解耦两个进程。为了让Table能够在两个进程间共同使用,Table必须在Swoole Server
启动前创建并分配内存。
Table底层是建立在共享内存之上的HashTable数据结构,$size
参数指定了Table的最大行数,最大行数决定了HashTable的总行数,由于HashTable是在共享内存之上,所以无法动态扩容,因此$size
必须在创建前设置好。
$size
若不是2的N次方,如1024、8196、65536等,底层会自动调整为接近的一个数字,如果小于1024则默认为1024,即1024为最小值。
$ vim vendor/swoft/task/src/Crontab/TableCrontab.php
- 任务配置表
OriginTable
任务表用于记录用户配置的任务信息,任务表每行记录包含的字段包括
rule
定时任务执行规则,对应@Scheduled
注解的cron
属性。taskClass
任务名称,对应@Task
的name
属性,默认为类名。taskMethod
Task方法,对应@Scheduled
注解所在的方法add_time
初始化表内容时的10位时间戳
rule
、taskClass
、taskMethod
是生成key
,唯一确定一条记录。
/*** @var \Swoft\Memory\Table $originTable 内存任务表*/
private $originTable;/*** @var array $originStruct 任务表结构*/
private $originStruct = ['rule' => [\Swoole\Table::TYPE_STRING, 100],'taskClass' => [\Swoole\Table::TYPE_STRING, 255],'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],'add_time' => [\Swoole\Table::TYPE_STRING, 11],
];
- 任务执行表
RunTimeTable
这里的执行并非指任务本身的执行,而是指任务投递这个操作的执行。
执行表记录短时间内要执行的任务列表及其执行状态,表每行记录包含字段:
taskClass
taskMethod
minute
需要执行任务的时间,精确到分钟,格式为date("YmdHi")sec
需要执行任务的时间,精确到分钟,10位时间戳。runStatus
任务状态包括0未执行、1已执行、2执行中三种
/*** @var \Swoft\Memory\Table $runTimeTable 内存运行表*/
private $runTimeTable;
/*** @var array $runTimeStruct 运行表结构*/
private $runTimeStruct = ['taskClass' => [\Swoole\Table::TYPE_STRING, 255],'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],'minute' => [\Swoole\Table::TYPE_STRING, 20],'sec' => [\Swoole\Table::TYPE_STRING, 20],'runStatus' => [\Swoole\TABLE::TYPE_INT, 4],
];
Swoft 定时任务相关推荐
- php定时任务sw,[原创]Swoole和Swoft的那些事(Task投递/定时任务篇)
Swoft的任务功能基于Swoole的Task机制,或者说Swoft的Task机制本质就是对SwooleTask机制的封装和加强. 任务投递 //Swoft\Task\Task.php class T ...
- php swoole实现定时任务,Swoole实现任务定时自动化调度详解,来学习下
问题描述 这几天做银行对帐接口时,踩了一个坑,具体需求大致描述一下. 银行每天凌晨后,会开始准备昨天的交易流水数据,需要我们这边请求拿到. 因为他们给的是一个base64加密的zip压缩流,解开以后可 ...
- php定时发送生日模块消息_Swoft 2.0.5 更新,新增高效秒级定时任务、异常管理组件...
什么是 Swoft ? Swoft 是一款基于 Swoole 扩展实现的 PHP 微服务协程框架.Swoft 能像 Go 一样,内置协程网络服务器及常用的协程客户端且常驻内存,不依赖传统的 PHP-F ...
- php swoft 路由,Swoft 源码解读
PHP 里面的 yii/laravel 框架算是非常「重」的了. 这里的 重 先不具体到 性能 层面, 主要是框架的设计思想和框架集成的服务, 让框架可以既可以快速解决很多问题, 又可以轻松扩展. P ...
- swoft php怎么样,Swoft源码之Swoole和Swoft的分析
这篇文章给大家分享的内容是关于Swoft 源码剖析之Swoole和Swoft的一些介绍(Task投递/定时任务篇),有一定的参考价值,有需要的朋友可以参考一下. 前言 Swoft的任务功能基于Swoo ...
- Swoft 2.x 微服务基础(Consul、RPC 服务发现、限流与熔断器)
本篇概要: 1. Swoft 服务注册与发现: 1.1 Consul 概况: 1.2 在 Consul 注册服务.反注册: 1.2.1 注册服务: 1.2.2 反注册: 1.3 健康检查: 1.4 服 ...
- php swoft2.*版本(swoole框架)定时任务功能,php定时任务
最近使用swoft2.0以上版本开发php定时任务功能,询问swoft官方,答复说2.0以上版本的定时任务功能还在开发中,所以就使用swoole原始定时任务功能了. 1.使用方法: //每隔5分钟(1 ...
- php 原子缓存,Swoft Cache :协程版切面缓存组件,让你的程序健步如飞
1. 介绍 遵循PSR16 cache组件,支持传统静态调用,注入调用,注解调用,事件绑定等.只需一个闭包即可支持分布式原子锁.基于swoft/cache 2. 配置 2.1 composer com ...
- Go 学习笔记(77)— Go 第三方库之 cronexpr(解析 crontab 表达式,定时任务)
cronexpr 支持的比 Linux 自身的 crontab 更详细,可以精确到秒级别. 1. 实现方式 cronexpr 表达式从前到后的顺序如下所示: 字段类型 是否为必须字段 允许的值 允 ...
最新文章
- EBS查询当前LOV SQL
- 【S操作】轻松优雅库移植解决方案,arduino库移植应对方案
- 幅度为a0的载波由峰峰值_十个医疗箱都不够用?戒掉这些坏习惯,满活跃值闯进决赛圈不是梦!...
- PowerDesigner建立与数据库的连接,以便生成数据库和从数据库生成到PD中
- 【报告分享】2021年度中国计算机视觉人才调研报告-德勤.pdf(附下载链接)
- 好好学习 天天编程—C语言之我的第一个hello world(二)
- 安卓系统实现播放器变速功能
- weblogic 启动很慢_【转】解决weblogic启动慢和创建域慢的方法
- 反直觉的一个游戏 - 三门问题 (Monty Hall problem)
- EBS-自动获取/创建CCID
- php如何配置gii,PHP Framework YII的里的gii设置。
- Android 歌词滚动效果(歌词逐个与逐渐变色)可换行
- 共享店铺系统如何设计?具体如何做?
- GAN异常检测论文笔记(一)《GANomaly: Semi-Supervised Anomaly Detection via Adversarial Training》
- Android 进阶笔记,包含常用的技术框架、博客社区、书籍等。
- DeepLab系列理解
- Oracle数据库常见的增删改查操作语句大全
- 线性回归梯度下降py实现
- win10网络邻居看到linux,win10网络邻居找不到其他电脑怎么办
- 利用计算机管理档案,初探档案的计算机管理