2019年1月份接到一个需求,就是要把PHP相关的定时任务接到xxl里面。

我想大多数PHPer第一时间可能会问,为啥呀?

1、好管理,解放运维小哥

2、执行过程信息更明了

那么我们看看xxl-job这个java任务调度的架构设计图是怎样的?

关于xxl过多的我就不多说了,大家可以把源码下下来运行跑起来,代码实现清晰明白,好废话不多说,进入整体。链接:https://www.xuxueli.com/xxl-job/#%E4%BA%8C%E3%80%81%E5%BF%AB%E9%80%9F%E5%85%A5%E9%97%A8

大家是不是看完xxl-job 心里有个疑问,不是已经支持了php的glue模式吗?

不过我们有些定时任务不是单文件的脚本任务,而是基于框架实现的一些脚本任务,那这种的就没有办法通过glue模式来进行调度执行了。

所以就有了这款分布式高性能的xxl-job的php执行器当然不限于php脚本执行,它其实是一个rpc框架,是一款基于swoole网络框架为xxl-job量身打造的一款任务调度执行器,对应xxl-job的架构图里面的就是xxl-rpc

0x01 灵感

接到任务后,结合自己过往的开发经验,想到腾讯的这代码仓库:https://github.com/Tencent/tsf

有一些命令行交互体验还不错,比如:

[root@iz8vb25ut078wb3d93fcvmz xxl-job-swoole]# php Bin/index.php JobServer statusxxl-job-executor-server is not running, please check it [FAIL][root@iz8vb25ut078wb3d93fcvmz xxl-job-swoole]# php Bin/index.php JobServer stoproot     23531  0.0  0.5 296148  5464 ?        Ss   12:30   0:00 xxl-job-executor-serverroot     23531  0.0  0.5 296148  6036 ?        Ss   12:30   0:00 xxl-job-executor-serverstop xxl-job-executor-server [SUCCESS]

包含的命令:start, reload, stop, restart,status,list

然后就整体参考这里的代码进行实现。

0x02 功能梳理

然后就是对接xxl,一开始公司用的是二进制hessian协议进行通信,PHP基于这个协议,当时没有调通,最后采用的是json格式。xxl-job server那边采用了netty 作为通信框架,最终调通了后,现在开始梳理下需要实现的功能:

1、执行器需要实现主动触发的功能:

  1. 任务结果回调服务;

  2. 执行器注册服务;

  3. 执行器注册摘除服务

  4. 故障自动转移

  5. 注册失败需要发起钉钉报警

  6. 任务task worker与任务绑定

2、调度中心需要执行器提供的接口:

  1. 心跳检测

  2. 忙碌检测

  3. 触发任务执行

  4. 获取Rolling Log

  5. 终止任务

  6. 三种调度策略的实现【串行执行,丢弃后续调度,覆盖之前的调度】

0x03 通信消息协议

开始着手设计这套框架。

大致功能流程如下图:

rpc通信消息格式如下:

1、首先把收到的消息体转成字节数组

    /**     * 转换字符串到字节数组     *     * @param $str     * @return array     */   public static function convertStrToBytes($str){        $len = strlen($str);        $bytes = [];        for ($i = 0; $i < $len; $i++) {            if (ord($str[$i]) >= 128) {                $byte = ord($str[$i]) - 256;            } else {                $byte = ord($str[$i]);            }            $bytes[] = $byte;        }        return $bytes;    }

2、再把字符串数组转成截取前面四个,然后转成字符串,也就是json字符串

    array_slice($bytes, 4)

3、然后把json转成数组

    /**     * 转换字节数组到字符串     *     * @param $byteArr     * @return string     */    public static function convertByteArrToStr($byteArr){        $convertStr = '';        foreach ($byteArr as $byte) {            $convertStr .= chr($byte);        }        return $convertStr;    }

这样就把通信协议部分打通了,然后发送相关消息到xxl-job调度中心,就需要逆向组合数据了。

// 这需要逆向发给调度中心$message = ['result' => $result, 'requestId' => $req['requestId'], 'errorMsg' => null];

您可以想想,怎么弄?如需了解我的逆向写法,可以留言交流。

0x04 调度策略

串行:上一个任务没有执行完成,下一个就需要等待执行

丢弃后续调度:上一个任务没有执行完成,下一个任务丢弃调度。

覆盖之前的调度:上一个任务没有执行完成,下一个任务调度时,需要把之前的任务杀掉,然后该任务继续执行

丢弃后续调度,需要把下一个任务丢弃掉,然后给调度中心回调。

/** * @param $data * @return false|string */public static function discard($data, Table $cacheTable, $retryTimes = 5){// 被kill完回调while($retryTimes > 0) {            $hostInfo = self::getCurrentServer($cacheTable);            $bizCenter = self::getBizCenterByHostInfo($hostInfo);            $res = $bizCenter->callback(self::convertSecondToMicroS(), $data['logId'], $data['requestId'], $data['logDateTim'], ['code' => Code::SUCCESS_CODE,'msg' => '脚本被强杀,结束脚本运行',            ]);if ($res) {self::appendLog($data['logId'], 'callback回调:' . json_encode([$res]));self::appendLog($data['logId'], '任务因策略,没有进入执行');                $retryTimes = 0;            } else {self::appendLog($data['logId'], 'callback回调 retry:' . $retryTimes);                $retryTimes--;            }        }return true;    }

覆盖之前的,需要把上一个任务执行终止掉,然后发送回调给调度中心,下一个人继续执行

public static function coverEarly($existInfo, $data, Table $table, Table $cacheTable, $retryTimes = 5){        $process = new Process(function (Process $process) use ($data, $existInfo, $table, $cacheTable, $retryTimes) {// 杀掉之前            ExecutorCenter::kill($existInfo['jobId'], $table);// 被kill完回调while($retryTimes > 0) {                $hostInfo = self::getCurrentServer($cacheTable);self::appendLog($existInfo['log_id'], $hostInfo. 'callback回调:' . json_encode($existInfo));                $bizCenter = self::getBizCenterByHostInfo($hostInfo);                $res = $bizCenter->callback(self::convertSecondToMicroS(), $existInfo['log_id'], $existInfo['request_id'], $existInfo['log_date_time'], ['code' => Code::SUCCESS_CODE,'msg' => '脚本被强杀,结束脚本运行',                ]);if ($res) {self::appendLog($existInfo['log_id'], 'callback回调:' . json_encode([$res]));self::appendLog($existInfo['log_id'], '任务因策略,没有继续执行,已中断');                    $retryTimes = 0;                } else {                    $retryTimes--;                }            }        }, true);        $process->start();        $process->name('kill-'.$existInfo['log_id'] . '-process');    }

目前该项目还没有申请开源,也不知道有没有开源的价值,如果您也在用xxl-job作为任务调度中心,且目前仍然有PHP技术栈的话,可以留言交流。

如何做到串行执行且如何确保与其他任务不存在竞争task worker的呢?

答案是:task投递时指定目标task worker id

$server->task(['job_data' => $param, 'params' => $handleParams], $dstWorkerNum);

0x05 真实的项目调度

x-docker.ini

[server]; server type:tcp udp httptype = tcp; hosthost = 0.0.0.0; portport = 9506; process nameprocess_name = xxl-job-php-executor; registryapp_name = xxl-job-php; server nameserver_name = 'http://xxl-job-php.test.com'; phpphp = /usr/local/bin/php;shell = /bin/bash[setting]; worker process numworker_num = 2; Reactor numreactor_num = 8; task process numtask_worker_num = 20; dispatch modedispatch_mode = 2; daemonizedaemonize = 1; heartbeatheartbeat_check_interval = 60; idleheartbeat_idle_time = 600; system loglog_file = '/var/www/html/xxl-job-php-executor/Log/runtime.log'; mac process_name[table]size = 1024[project]root_path = '/var/www/html/'[xxljob]; site domainhost = docker.for.mac.host.internal; netty server portport = 8888; registry timemsregistry_interval_ms = 20000; a switch of registryopen_registry = 1; 备份netty server[xxljob_backup]; netty server,支持多个逗号拼接host_urls = docker.for.mac.host.internal:8889,docker.for.mac.host.internal:8890; registry timemsregistry_interval_ms = 20000; a switch of registryopen_registry = 1

支持类tp、类laravel框架,也可以扩展支持其他框架,很简单。配置如下:

'commonmode' => [     'router_index' => '/cli.php',     'file_real_path' => ':string/Application/:string/Controller/:stringController.class.php',     'identifier' => 't'],'artisanmode' => [     'run_mode' => self::ARTISAN_MODE,     'router_index' => self::ARTISAN_MODE,     'param_identify' => ' ',     'file_real_path' => ':string/Console/Kernel.php',     'identifier' => 'c']

0x06 总结

通过此项目的开发,可以完全利用swoole来进行网络编程开发,swoole的优质特性,值得广为使用。在过程中,对一些swoole的功能特性有较深入的理解,从头开始实现一个rpc框架很容易,然后就把精力集中放在实现相关功能上,task 的用法在低版本有些不是很好,所以如果是用的php5.6版本的,可以考虑用1.10.5的版本,对此项目来说,足够了

0x07 引用

https://www.php.net/manual/en/pcntl.constants.php

https://wiki.swoole.com/wiki/page/1.html

https://wiki.swoole.com/#/

https://github.com/Tencent/tsf

感谢swoole 项目开发组提供的如此优秀的扩展项目,感谢tencent tsf项目提供的灵感。

如果这篇文章让你有一点点收获,点下在看哦

mul ab 的执行结果是_实战总结:为xxljob定制化的 php 执行器相关推荐

  1. mysql 执行计划 视频_实战讲解MySQL执行计划,面试官当场要了我

    全是干货的技术号: 本文已收录在github,欢迎 star/fork: explain或者desc获取MySQL如何执行select语句的信息. 结果包含很多列 1 各列字段说明 1.1 id SE ...

  2. [SpringBoot2]web场景_静态资源规则与定制化

    静态资源目录 只要静态资源放在类路径下: called /static (or /public or /resources or /META-INF/resources 访问 : 当前项目根路径/ + ...

  3. exdoll机器人_打造国内领先的定制化人工智能机器人品牌, EXDOLL新品惊艳亮相_TOM新闻...

    原本只能在科幻电影中见到的人工智能,如今已越来越多的出现在我们的日常生活中,而随着AI技术和仿真技术的迅猛发展,人工智能机器人有望被大规模的应用于企业服务.医疗服务与生活服务中. AlphaGo和机器 ...

  4. qmenu基本用法_剖析QMenu Qt完全定制化菜单

    贴张效果图: 定制包括: 1. 周边阴影 2. 菜单项的元素(分割符, 控制ICON大小, 文字显示位置与颜色, 子菜单指示符) 菜单内的效果, 部分可以使用stylesheet实现, 但要做到这样的 ...

  5. MySQL入门_实战3_创建和删除数据库

    MySQL入门_实战系列文章目录 MySQL入门_实战1 MYSQL安装和卸载 MySQL入门_实战2 MYSQL的登录和断开以及SQL查询常见问题 文章目录 MySQL入门_实战系列文章目录 前言 ...

  6. scheduled线程池ScheduledExecutorService只执行一次_有个定时任务突然不执行了

    scheduled线程池ScheduledExecutorService只执行一次_有个定时任务突然不执行了 原因 If any execution of the task encounters an ...

  7. python如何执行代码漏洞_命令执行与代码执行漏洞原理

    本篇笔记摘自微信"黑白天",如有侵权,联系删除 命令执行定义 当应用需要调用一些外部程序去处理内容的情况下,就会用到一些执行系统命令的函数.如PHP中的system,exec,sh ...

  8. MySQL入门_实战6_MySQL数据的更新

    MySQL入门_实战系列文章目录 文章目录 MySQL入门_实战系列文章目录 前言 2 数据更新 2.1 语法格式 2.2 更新数据表中的所有记录 2.3 更新表中特定的数据行 2.4 更新某个范围内 ...

  9. java执行sql文件_面试官问你MyBatis SQL是如何执行的?把这篇文章甩给他

    初识 MyBatis MyBatis 是第一个支持自定义 SQL.存储过程和高级映射的类持久框架.MyBatis 消除了大部分 JDBC 的样板代码.手动设置参数以及检索结果.MyBatis 能够支持 ...

  10. java执行sql文件_面试官:MyBatis SQL是如何执行的?把这篇文章甩给他

    初识 MyBatis MyBatis 是第一个支持自定义 SQL.存储过程和高级映射的类持久框架.MyBatis 消除了大部分 JDBC 的样板代码.手动设置参数以及检索结果.MyBatis 能够支持 ...

最新文章

  1. 百度和吉利合作造车,数据揭秘百度造车、拆股背后逻辑
  2. CentOS下C++开发环境搭建
  3. switch注意事项
  4. F - Sugoroku2(动态规划)
  5. UVA - 11361 Investigating Div-Sum Property(数位dp/记忆化搜索板子)
  6. linux 下批量压缩文件
  7. form怎么加ion_企业微信裂变该怎么做?一份裂变1000+社群裂变方案的底层逻辑
  8. Sql Server参数化查询之where in和like实现之xml和DataTable传参
  9. 粗看ES6之JSON
  10. python中关于元组的基础运用
  11. 为什么说传统分布式事务不再适用于微服务架构?
  12. 计算机自动化专业强的二本大学,2020自动化专业大学排名公布
  13. 2021自学考试计算机应用基础,2021年高等教育自学考试计算机应用基础试题及答案.doc...
  14. python 移动平均线_Python中的SMA(短期移动平均线)
  15. AndroidStudio下建立libs、raw、、assets、selector、shape、anim、存放so
  16. 用户管理的备份恢复(2)
  17. C++ 安妮的宠物小屋 练习
  18. sci-ei求中啊啊啊
  19. Python贪吃蛇小游戏_完整源码免费分享
  20. 异步爬虫(高效爬虫)

热门文章

  1. 即时通讯领域必将最终由XMPP协议一统天下
  2. 七月算法机器学习 7 工作流程与模型调优
  3. Python3入门机器学习经典算法与应用 第3章 Numpy中的arg运算
  4. 190124每日一句
  5. origin 复制与数据转置
  6. Atitit 远程存储与协议 mtp ptp rndis midi nfs smb webdav ftp Atitit mtp ptp rndis midi协议的不同区别 1. PTP: 图
  7. Atitit 解析m4a文件的元数据标签音乐名,歌手 专辑 年代等信息 java版本 目录 1.1. 自己解析mp4 m4a结构 1 1.2. 格式返回 1 1.3. /bookmarksHtmlE
  8. Atitit 前端 dom 的艺术 attilax著 目录 1. 概念 1 2. 发展历程 1 2.1. 厂商各自为政 2 2.2. 1.4 制定标准 标准化 w3cdom 2 2.3. 1.4.
  9. atitit.极光消息推送服务器端开发实现推送  jpush v3. 总结o7p
  10. paip. sip module implements API v10.0 to v10.1 but the PyQt4.QtCore module requires API v9.2