Swoft的任务功能基于Swoole的Task机制,或者说Swoft的Task机制本质就是对SwooleTask机制的封装和加强。

任务投递

//Swoft\Task\Task.php

class Task

{

/**

* Deliver coroutine or async task

*

* @param string $taskName

* @param string $methodName

* @param array $params

* @param string $type

* @param int $timeout

*

* @return bool|array

* @throws TaskException

*/

public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3)

{

$data = TaskHelper::pack($taskName, $methodName, $params, $type);

if(!App::isWorkerStatus() && !App::isCoContext()){

return self::deliverByQueue($data);//见下文Command章节

}

if(!App::isWorkerStatus() && App::isCoContext()){

throw new TaskException('Please deliver task by http!');

}

$server = App::$server->getServer();

// Delier coroutine task

if ($type == self::TYPE_CO) {

$tasks[0] = $data;

$prifleKey = 'task' . '.' . $taskName . '.' . $methodName;

App::profileStart($prifleKey);

$result = $server->taskCo($tasks, $timeout);

App::profileEnd($prifleKey);

return $result;

}

// Deliver async task

return $server->task($data);

}

}

任务投递Task::deliver()将调用参数打包后根据$type参数通过swoole的$server->taskCo()或$server->task()接口投递到Task进程。

Task本身始终是同步执行的,$type仅仅影响投递这一操作的行为,Task::TYPE_ASYNC对应的$server->task()是异步投递,Task::deliver()调用后马上返回;Task::TYPE_CO对应的$server->taskCo()是协程投递,投递后让出协程控制,任务完成或执行超时后Task::deliver()才从协程返回。

任务执行

//Swoft\Task\Bootstrap\Listeners\TaskEventListener

/**

* The listener of swoole task

* @SwooleListener({

* SwooleEvent::ON_TASK,

* SwooleEvent::ON_FINISH,

* })

*/

class TaskEventListener implements TaskInterface, FinishInterface

{

/**

* @param \Swoole\Server $server

* @param int $taskId

* @param int $workerId

* @param mixed $data

* @return mixed

* @throws \InvalidArgumentException

*/

public function onTask(Server $server, int $taskId, int $workerId, $data)

{

try {

/* @var TaskExecutor $taskExecutor*/

$taskExecutor = App::getBean(TaskExecutor::class);

$result = $taskExecutor->run($data);

} catch (\Throwable $throwable) {

App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));

$result = false;

// Release system resources

App::trigger(AppEvent::RESOURCE_RELEASE);

App::trigger(TaskEvent::AFTER_TASK);

}

return $result;

}

}

此处是swoole.onTask的事件回调,其职责仅仅是将将worker进程投递来的打包后的数据转发给TaskExecutor。

Swoole的Task机制的本质是worker进程将耗时任务投递给同步的Task(又名TaskWorker)进程处理,所以swoole.onTask的事件回调是在Task进程中执行的。上文说过,worker进程是你大部分http服务代码执行的环境,但是从TaskEventListener.onTask()方法开始,代码的执行环境都是Task进程,也就是说,TaskExecutor和具体的TaskBean都是执行在Task进程中的。

//Swoft\Task\TaskExecutor

/**

* The task executor

*

* @Bean()

*/

class TaskExecutor

{

/**

* @param string $data

* @return mixed

*/

public function run(string $data)

{

$data = TaskHelper::unpack($data);

$name = $data['name'];

$type = $data['type'];

$method = $data['method'];

$params = $data['params'];

$logid = $data['logid'] ?? uniqid('', true);

$spanid = $data['spanid'] ?? 0;

$collector = TaskCollector::getCollector();

if (!isset($collector['task'][$name])) {

return false;

}

list(, $coroutine) = $collector['task'][$name];

$task = App::getBean($name);

if ($coroutine) {

$result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);

} else {

$result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);

}

return $result;

}

任务执行思路很简单,将Worker进程发过来的数据解包还原成原来的调用参数,根据$name参数找到对应的TaskBean并调用其对应的Task方法。其中TaskBean使用类级别注解@Task(name="TaskName")或者@Task("TaskName")声明。

值得一提的一点是,@Task注解除了name属性,还有一个coroutine属性,上述代码会根据该参数选择使用协程的runCoTask()或者同步的runSyncTask()执行Task。但是由于而且由于Swoole的Task进程的执行是完全同步的,不支持协程,所以目前版本请该参数不要配置为true。同样的在TaskBean中编写的任务代码必须的同步的或者是要能根据环境自动将异步和协程降级为同步的

从Process中投递任务

前面我们提到:

Swoole的Task机制的本质是worker进程将耗时任务投递给同步的Task进程(又名TaskWorker进程)处理。

换句话说,swoole的$server->taskCo()或$server->task()都只能在worker进程中使用。

这个限制大大的限制了使用场景。 如何能够为了能够在Process中投递任务呢?Swoft为了绕过这个限制提供了Task::deliverByProcess()方法。其实现原理也很简单,通过Swoole的$server->sendMessage()方法将调用信息从Process中投递到Worker进程中,然后由Worker进程替其投递到Task进程当中,相关代码如下:

//Swoft\Task\Task.php

/**

* Deliver task by process

*

* @param string $taskName

* @param string $methodName

* @param array $params

* @param string $type

* @param int $timeout

* @param int $workId

*

* @return bool

*/

public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool

{

/* @var PipeMessageInterface $pipeMessage */

$server = App::$server->getServer();

$pipeMessage = App::getBean(PipeMessage::class);

$data = [

'name' => $taskName,

'method' => $methodName,

'params' => $params,

'timeout' => $timeout,

'type' => $type,

];

$message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data);

return $server->sendMessage($message, $workId);

}

数据打包后使用$server->sendMessage()投递给woerker:

//Swoft\Bootstrap\Server\ServerTrait.php

/**

* onPipeMessage event callback

*

* @param \Swoole\Server $server

* @param int $srcWorkerId

* @param string $message

* @return void

* @throws \InvalidArgumentException

*/

public function onPipeMessage(Server $server, int $srcWorkerId, string $message)

{

/* @var PipeMessageInterface $pipeMessage */

$pipeMessage = App::getBean(PipeMessage::class);

list($type, $data) = $pipeMessage->unpack($message);

App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);

}

$server->sendMessage后,worker进程收到数据时会触发一个swoole.pipeMessage事件的回调,Swoft会将其转换成自己的swoft.pipeMessage事件并触发.

//Swoft\Task\Event\Listeners\PipeMessageListener.php

/**

* The pipe message listener

*

* @Listener(event=AppEvent::PIPE_MESSAGE)

*/

class PipeMessageListener implements EventHandlerInterface

{

/**

* @param \Swoft\Event\EventInterface $event

*/

public function handle(EventInterface $event)

{

$params = $event->getParams();

if (count($params) < 3) {

return;

}

list($type, $data, $srcWorkerId) = $params;

if ($type != PipeMessage::MESSAGE_TYPE_TASK) {

return;

}

$type = $data['type'];

$taskName = $data['name'];

$params = $data['params'];

$timeout = $data['timeout'];

$methodName = $data['method'];

// delever task

Task::deliver($taskName, $methodName, $params, $type, $timeout);

}

}

swoft.pipeMessage事件最终由PipeMessageListener处理。在相关的监听其中,如果发现swoft.pipeMessage事件由Task::deliverByProcess()产生的,worker进程会替其执行一次Task::deliver(),最终将任务数据投递到TaskWorker进程中。

一道简单的回顾练习:从Task::deliverByProcess()到某TaskBean 最终执行任务,经历了哪些进程,而调用链的哪些部分又分别是在哪些进程中执行?

从Command进程或其子进程中投递任务

//Swoft\Task\QueueTask.php

/**

* @param string $data

* @param int $taskWorkerId

* @param int $srcWorkerId

*

* @return bool

*/

public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)

{

if ($taskWorkerId === null) {

$taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);

}

if ($srcWorkerId === null) {

$srcWorkerId = mt_rand(0, $this->workerNum - 1);

}

$this->check();

$data = $this->pack($data, $srcWorkerId);

$result = \msg_send($this->queueId, $taskWorkerId, $data, false);

if (!$result) {

return false;

}

return true;

}

对于Command进程的任务投递,情况会更复杂一点。

上文提到的Process,其往往衍生于Http/Rpc服务,作为同一个Manger的子孙进程,他们能够拿到Swoole\Server的句柄变量,从而通过$server->sendMessage(),$server->task()等方法进行任务投递。

但在Swoft的体系中,还有一个十分路人的角色: Command。

Command的进程从shell或cronb独立启动,和Http/Rpc服务相关的进程没有亲缘关系。因此Command进程以及从Command中启动的Process进程是没有办法拿到Swoole\server的调用句柄直接通过UnixSocket进行任务投递的。

为了为这种进程提供任务投递支持,Swoft利用了Swoole的Task进程的一个特殊功能----消息队列。

使用消息队列的Task进程.png

同一个项目中Command和Http\RpcServer 通过约定一个message_queue_key获取到系统内核中的同一条消息队列,然后Comand进程就可以通过该消息队列向Task进程投递任务了。

该机制没有提供对外的公开方法,仅仅被包含在Task::deliver()方法中,Swoft会根据当前环境隐式切换投递方式。但该消息队列的实现依赖Semaphore拓展,如果你想使用,需要在编译PHP时加上--enable-sysvmsg参数。

定时任务

除了手动执行的普通任务,Swoft还提供了精度为秒的定时任务功能用来在项目中替代Linux的Crontab功能.

Swoft用两个前置Process---任务计划进程:CronTimerProcess和任务执行进程CronExecProcess

,和两张内存数据表-----RunTimeTable(任务(配置)表)OriginTable((任务)执行表)用于定时任务的管理调度。

两张表的每行记录的结构如下:

\\Swoft\Task\Crontab\TableCrontab.php

/**

* 任务表,记录用户配置的任务信息

* 表每行记录包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一确定一条记录

* @var array $originStruct

*/

private $originStruct = [

'rule' => [\Swoole\Table::TYPE_STRING, 100],//定时任务执行规则,对应@Scheduled注解的cron属性

'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//任务名 对应@Task的name属性(默认为类名)

'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task方法,对应@Scheduled注解所在方法

'add_time' => [\Swoole\Table::TYPE_STRING, 11],//初始化该表内容时的10位时间戳

];

/**

* 执行表,记录短时间内要执行的任务列表及其执行状态

* 表每行记录包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一确定一条记录

* @var array $runTimeStruct

*/

private $runTimeStruct = [

'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//同上

'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//同上

'minute' => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 格式date('YmdHi')

'sec' => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 10位时间戳

'runStatus' => [\Swoole\TABLE::TYPE_INT, 4],//任务状态,有 0(未执行) 1(已执行) 2(执行中) 三种。

//注意:这里的执行是一个容易误解的地方,此处的执行并不是指任务本身的执行,而是值`任务投递`这一操作的执行,从宏观上看换成 _未投递_,_已投递_,_投递中_描述会更准确。

];

此处为何要使用Swoole的内存Table?

Swoft的的定时任务管理是分别由 任务计划进程 和 任务执行进程 进程负责的。两个进程的运行共同管理定时任务,如果使用进程间独立的array()等结构,两个进程必然需要频繁的进程间通信。而使用跨进程的Table(本文的Table,除非特别说明,都指Swoole的Swoole\Table结构)直接进行进程间数据共享,不仅性能高,操作简单 还解耦了两个进程。

为了Table能够在两个进程间共同使用,Table必须在Swoole Server启动前创建并分配内存。具体代码在Swoft\Task\Bootstrap\Listeners->onBeforeStart()中,比较简单,有兴趣的可以自行阅读。

背景介绍完了,我们来看看这两个定时任务进程的行为

//Swoft\Task\Bootstrap\Process\CronTimerProcess.php

/**

* Crontab timer process

*

* @Process(name="cronTimer", boot=true)

*/

class CronTimerProcess implements ProcessInterface

{

/**

* @param \Swoft\Process\Process $process

*/

public function run(SwoftProcess $process)

{

//code....

/* @var \Swoft\Task\Crontab\Crontab $cron*/

$cron = App::getBean('crontab');

// Swoole/HttpServer

$server = App::$server->getServer();

$time = (60 - date('s')) * 1000;

$server->after($time, function () use ($server, $cron) {

// Every minute check all tasks, and prepare the tasks that next execution point needs

$cron->checkTask();

$server->tick(60 * 1000, function () use ($cron) {

$cron->checkTask();

});

});

}

}

//Swoft\Task\Crontab\Crontab.php

/**

* 初始化runTimeTable数据

*

* @param array $task 任务

* @param array $parseResult 解析crontab命令规则结果,即Task需要在当前分钟内的哪些秒执行

* @return bool

*/

private function initRunTimeTableData(array $task, array $parseResult): bool

{

$runTimeTableTasks = $this->getRunTimeTable()->table;

$min = date('YmdHi');

$sec = strtotime(date('Y-m-d H:i'));

foreach ($parseResult as $time) {

$this->checkTaskQueue(false);

$key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);

$runTimeTableTasks->set($key, [

'taskClass' => $task['taskClass'],

'taskMethod' => $task['taskMethod'],

'minute' => $min,

'sec' => $time + $sec,

'runStatus' => self::NORMAL

]);

}

return true;

}

CronTimerProcess是Swoft的定时任务调度进程,其核心方法是Crontab->initRunTimeTableData()。

该进程使用了Swoole的定时器功能,通过Swoole\Timer在每分钟首秒时执行的回调,CronTimerProcess每次被唤醒后都会遍历任务表计算出当前这一分钟内的60秒分别需要执行的任务清单,写入执行表并标记为 未执行。

//Swoft\Task\Bootstrap\Process

/**

* Crontab process

*

* @Process(name="cronExec", boot=true)

*/

class CronExecProcess implements ProcessInterface

{

/**

* @param \Swoft\Process\Process $process

*/

public function run(SwoftProcess $process)

{

$pname = App::$server->getPname();

$process->name(sprintf('%s cronexec process', $pname));

/** @var \Swoft\Task\Crontab\Crontab $cron */

$cron = App::getBean('crontab');

// Swoole/HttpServer

$server = App::$server->getServer();

$server->tick(0.5 * 1000, function () use ($cron) {

$tasks = $cron->getExecTasks();

if (!empty($tasks)) {

foreach ($tasks as $task) {

// Diliver task

Task::deliverByProcess($task['taskClass'], $task['taskMethod']);

$cron->finishTask($task['key']);

}

}

});

}

}

CronExecProcess作为定时任务的执行者,通过Swoole\Timer每0.5s唤醒自身一次,然后把 执行表 遍历一次,挑选当下需要执行的任务,通过sendMessage()投递出去并更新该 任务执行表中的状态。

该执行进程只负责任务的投递,任务的实际实际执行仍然在Task进程中由TaskExecutor处理。

定时任务的宏观执行情况如下:

Swoft定时任务机制.png

php定时任务sw,[原创]Swoole和Swoft的那些事(Task投递/定时任务篇)相关推荐

  1. swoft php怎么样,[原创]Swoole和Swoft的那些事 (Http/Rpc服务篇)

    Swoft在PHPer圈中是一个门槛较高的Web框架,不仅仅由于框架本身带来了很多新概念和前沿的设计,还在于Swoft是一个基于Swoole的框架.Swoole在PHPer圈内学习成本最高的工具没有之 ...

  2. swoft php怎么样,Swoft源码之Swoole和Swoft的分析

    这篇文章给大家分享的内容是关于Swoft 源码剖析之Swoole和Swoft的一些介绍(Task投递/定时任务篇),有一定的参考价值,有需要的朋友可以参考一下. 前言 Swoft的任务功能基于Swoo ...

  3. 微服务研究 - Swoole框架-Swoft初探

    微服务研究 - Swoole框架-Swoft初探.没有swoole之前,php一直被"誉"为世界上最好的语言.swoole横空出世后,php就成了宇宙最好的语言了... swoft ...

  4. php swoft 应用,Swoole 在 Swoft 中的应用

    date: 2017-12-14 21:34:51 title: swoole 在 swoft 中的应用 上一篇 blog - swoft 源码解读 反响还不错, 不少同学推荐再加一篇, 讲解一下 s ...

  5. (原创)面向对象的系统对接接口编写。第4篇

    接上一篇:http://www.cnblogs.com/mazhiyuan/p/5224050.html 这篇开始写Post和Get请求的布局. 我们就以新闻模块来举例吧.其它的模块,比如用户啦,比如 ...

  6. swoole 框架swoft使用

    再简单的东西,再简单的流程,过一段时间不涉足,也就忘记了.用的时候又要去找资料,然后再慢慢会记忆恢复.所以还是记下来的为好. swoft官网,https://www.swoft.org,swoole官 ...

  7. linux设置mysql定时任务_原创 Linux下实现Mysql定时任务备份数据

    本实例将创建目录放置于/mnt目录下,可根据具体情况放置于其他目录: cd /mnt mkdir dbback pwd /mnt/dbback 创建shell脚本 脚本名称可根据自己规范进行自定义: ...

  8. 使用Spring Task完成定时任务

    1. 前言 上一篇我们学习了Quartz作为定时任务的框架的使用, 这一篇我们来学习Spring全家桶的SpringTask, 对于主张简单易用的Spring家族来说, SpringTask无疑也是一 ...

  9. Spring Mvc中使用Task实现定时任务,以及遇到的一个问题

    Spring中实现定时任务其实很简单,可以使用spring中自带的task 相当于轻量级的Quartz,并且spring 3.0 之后支持注解的方式,使用起来非常简单,方便,具体实现如下: 第一步,修 ...

最新文章

  1. poj2217详解 ( 后缀数组 + 高度数组 )
  2. 双向滚动scrollview
  3. servlet中弹出网页对话框
  4. 货店管理(delphi+sqlserver)
  5. BZOJ4557:[JLOI2016/SHOI2016]侦察守卫——题解
  6. Form表单中method=post/get'的区别
  7. linux分区始柱号,找到了linux分区顺序错乱修复方法
  8. windows 安装jenkins
  9. FAT32文件系统介绍
  10. Gif动图体积过大如何缩小?仅需三步教你在线压缩gif
  11. python标准差不用numpy_Python numpy标准差
  12. 一篇读懂jvm垃圾回收
  13. 网络用语html,最新网络用语
  14. java分割图片为九宫格
  15. 华为鸿蒙源代码在哪里下载,华为鸿蒙正式开源!源码地址公布,对外开放下载...
  16. 5g宣传方案_活动创意策划方案要向“5G时代”看齐
  17. 文件服务器+权限+迁移,文件服务器文件夹,共享文件夹及权限迁移
  18. Kafka多个消费者监听消费同一个Topic主题
  19. 2022年网搭国赛PXE服务
  20. 22.9.24 比赛

热门文章

  1. 【kafka】Kafka 源码解析:Group 协调管理机制
  2. 【Kafka】Kafka生产者producer相关参数详解batch.size linger.ms 等参数
  3. Arthas : 在线分析诊断工具Arthas(阿尔萨斯)
  4. 如何让JSON只从Model中输出一部分
  5. 【Hadoop】Hadoop SocketChannelImpl UnresolvedAddressException
  6. 【Kafka】Consumer group 'console-consumer-28367' has no active members.
  7. linux 报错:telnet Connection closed by foreign host
  8. 双显卡只用独显好吗_双动力洗衣机好吗
  9. 如何才能写出一手逼疯同事的烂代码?
  10. SpringBoot 中使用 @Valid 注解 + Exception 全局处理器优雅处理参数验证