背景

由于PHP不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的。为了完成这些异步操作,我们做了一个基于Redis队列任务系统。

大家知道,一个消息队列处理系统主要分为两大部分:消费者和生产者。

在我们的系统中,主系统作为生产者,任务系统作为消费者。

具体的工作流程如下:1、主系统将需要需要处理的任务名称+任务参数push到队列中。2、任务系统实时的对任务队列进行pop,pop出来一个任务就fork一个子进程,由子进程完成具体的任务逻辑。

具体代码如下:

/*** 启动守护进程*/
public function runAction() {Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');while (true) {$this->fork_process();}exit;
}/*** 创建子进程*/
private function fork_process() {$ppid = getmypid();$pid = pcntl_fork();if ($pid == 0) {//子进程$pid = posix_getpid();//echo "* Process {$pid} was created \n\n";$this->mq_process();exit;} else {//主进程$pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态if (pcntl_wifexited($status)) {//echo "\n\n* Sub process: {$pid} exited with {$status}";//Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );} else {Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');}}
}/*** 业务任务队列处理*/
private function mq_process() {$data_pop = $this->masterRedis->rPop($this->redis_list_key);$data = json_decode($data_pop, 1);if (!$data) {return FALSE;}$worker = '_task_' . $data['worker'];$class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';$params = $data['params'];$class = new $class_name();$class->$worker($params);return TRUE;
}

这是一个简单的任务处理系统。

通过这个任务系统帮助我们实现了异步,到目前为止已经稳定运行了将近一年。

但很可惜,它是一个单进程的系统。它是一直在不断的fork,如果有任务就处理,没有任务就跳过。

这样很稳定。

但问题有两个:一是不断地fork、pop会浪费服务器资源,二是不支持并发!

第一个问题还好,但第二个问题就很严重。

当主系统 同时 抛过来大量的任务时,任务的处理时间就会无限的拉长。

新的设计

为了解决并发的问题,我们计划做一个更加高效强壮的队里处理系统。

因为在PHP7之前不支持多线程,所以我们采用多进程。

从网上找了不少资料,大多所谓的多进程都是N个进程同时在后台运行。

显然这是不合适的。

我的预想是:每pop出一个任务就fork一个任务,任务执行完成后子进程结束。

遇到的问题


1、如何控制最大进程数

这个问题很简单,那就是每fork一个子进程就自增一次。而当子进程执行完成就自减一次。

自增没有问题,我们就在主进程中操作就完了。那么该如何自减呢?

可能你会说,当然是在子进程中啊。但这里你需要注意:当fork的时候是从主进程复制了一份资源给子进程,这就意味着你无法在子进程中操作主进程中的计数器!

所以,这里就需要了解一个知识点:信号。

具体的可以自行Google,这里直接看代码。

// install signal handler for dead kids
pcntl_signal(SIGCHLD, array($this, "sig_handler"));

这就安装了一个信号处理器。当然还缺少一点。

declare(ticks = 1);

declare是一个控制结构语句,具体的用法也请去Google。

这句代码的意思就是每执行一条低级语句就调用一次信号处理器。

这样,每当子进程结束的时候就会调用信号处理器,我们就可以在信号处理器中进行自减。

2、如何解决进程残留

在多进程开发中,如果处理不当就会导致进程残留。

为了解决进程残留,必须得将子进程回收。

那么如何对子进程进行回收就是一个技术点了。

在pcntl的demo中,包括很多博文中都是说在主进程中回收子进程。

但我们是基于Redis的brpop的,而brpop是阻塞的。

这就导致一个问题:当执行N个任务之后,任务系统空闲的时候主进程是阻塞的,而在发生阻塞的时候子进程还在执行,所以就无法完成最后几个子进程的进程回收。。。

这里本来一直很纠结,但当我将信号处理器搞定之后就也很简单了。

进程回收也放到信号处理器中去。

新系统的评估

pcntl是一个进程处理的扩展,但很可惜它对多进程的支持非常乏力。

所以这里采用Swoole扩展中的Process。

具体代码如下:

declare(ticks = 1);
class JobDaemonController extends Yaf_Controller_Abstract{use Trait_Redis;private $maxProcesses = 800;private $child;private $masterRedis;private $redis_task_wing = 'task:wing'; //待处理队列public function init(){// install signal handler for dead kidspcntl_signal(SIGCHLD, array($this, "sig_handler"));set_time_limit(0);ini_set('default_socket_timeout', -1); //队列处理不超时,解决redis报错:read error on connection}private function redis_client(){$rds = new Redis();$rds->connect('redis.master.host',6379);return $rds;}public function process(swoole_process $worker){// 第一个处理$GLOBALS['worker'] = $worker;swoole_event_add($worker->pipe, function($pipe) {$worker = $GLOBALS['worker'];$recv = $worker->read();            //send data to mastersleep(rand(1, 3));echo "From Master: $recv\n";$worker->exit(0);});exit;}public function testAction(){for ($i = 0; $i < 10000; $i++){$data = ['abc' => $i,'timestamp' => time().rand(100,999)];$this->masterRedis->lpush($this->redis_task_wing, json_encode($data));}exit;}public function runAction(){while (1){
//            echo "\t now we de have $this->child child processes\n";if ($this->child < $this->maxProcesses){$rds = $this->redis_client();$data_pop = $rds->brpop($this->redis_task_wing, 3);//无任务时,阻塞等待if (!$data_pop){continue;}echo "\t Starting new child | now we de have $this->child child processes\n";$this->child++;$process = new swoole_process([$this, 'process']);$process->write(json_encode($data_pop));$pid = $process->start();}}}private function sig_handler($signo) {
//        echo "Recive: $signo \r\n";switch ($signo) {case SIGCHLD:while($ret = swoole_process::wait(false)) {
//                    echo "PID={$ret['pid']}\n";$this->child--;}}}
}

最终,经过测试,单核1G的服务器执行1到3秒的任务可以做到800的并发。

ps:欢迎各位大神与我交流,不知能否做到更好~


来源:https://www.jianshu.com/p/54ffd360454f

基于Swoole和Redis实现的并发队列处理系统 1相关推荐

  1. 基于Swoole和Redis实现的并发队列处理系统

    背景 由于PHP不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的.为了完成这些异步操作,我们做了一个基于Redis队列任务系统. 大家知道,一个消息队列处理系统主要分为两大部分:消费 ...

  2. Day267.预约系统的性能瓶颈、营销活动无缝切换秒杀活动、预约系统数据迁移方案、高流量下预约系统搭建熔断机制、预约系统redis集群主从哨兵架构 -Redis的高并发预约抢购系统

    一.预约系统的性能瓶颈 1.预约系统应对热门爆品时的缺陷 用户进行预约会涉及到两个维度的数据变更一个是用户信息,一个是SKU信息,如图↓所示: 正常来说这么搞一点问题没有,即便涉及到写数据库,但是每个 ...

  3. Day269.口罩预约抢购系统关注的问题、瞬时高流量Mysql查询缓慢原因 -Redis的高并发预约抢购系统

    一.口罩预约抢购系统关注的问题 1.瞬时高流量sku缺乏监控会出现的问题 流量监控问题 如果不是值班人员细心,自己肉眼发现了这个预约SKU的涨幅不正常. 万一稍有疏忽没有观察到流量的异常,一旦到了抢购 ...

  4. mysql逻辑架构连接池_GitHub - zzjzzb/ycsocket: 基于swoole的socket框架,支持协程版MySQL、Redis连接池、Actor模型...

    ycsocket 基于 swoole 和 swoole_orm 的 websocket 框架,各位可以自己扩展到 TCP/UDP,HTTP. 在ycsocket 中,采用的是全协程化,全池化的数据库. ...

  5. redis php异步队列,基于workerman的redis-queue实现异步邮件队列

    实验场景:页面被客户访问发送邮件通知到我的邮箱,该场景只是为了测试,下单发送邮件或者短信的场景都是一样的,为了体现出来队列的优越性,我自己封装了个邮件发送的接口,接口内部实现增加了sleep(5),纯 ...

  6. redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列(续)

    背景 上一篇(灵感来袭,基于Redis的分布式延迟队列)讲述了基于Java DelayQueue和Redis实现了分布式延迟队列,这种方案实现比较简单,应用于延迟小,消息量不大的场景是没问题的,毕竟J ...

  7. Redis Primer(1)基于JedisPool的Redis hset并发性能测试 - @钟超 · 技术博客专栏 - 博客频道 - CSDN.NET...

    Redis Primer(1)基于JedisPool的Redis hset并发性能测试 - @钟超 · 技术博客专栏 - 博客频道 - CSDN.NET Redis Primer(1)基于JedisP ...

  8. redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列

    一.延迟队列 延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费.比如1分钟之后发送短信,发送邮件,检测数据状态等. 二.Redisson Delayed Queue 如果你项目中使 ...

  9. 基于Swoole和beanstalkd实现多进程处理消息队列。

    2019独角兽企业重金招聘Python工程师标准>>> 项目地址 SWBT框架 https://gitee.com/chenbotome/SWBT 目的 基于Swoole和beans ...

最新文章

  1. Ubuntu15.04 网站服务器环境搭建,php/html/css等学习环境搭建教程
  2. Logminer实战
  3. JNI与底层调用-2
  4. 用户分群模型:如何打造精细化运营基石?
  5. leetcode算法刷题记录表
  6. Java 生态圈知识汇总
  7. 关于android输入框被键盘遮挡的问题
  8. 石头扫地机器人加速异响_AI助力,无惧障碍 石头扫地机器人T7Pro测评
  9. Layui 扩展字体图标
  10. nyist——ACM新生牛刀小试 Round#1题解
  11. 《看完就懂系列》谈谈数据埋点的原理与实现
  12. 决策树系列之一决策树的入门教程
  13. 第二节HDFS完全分布式集群搭建与配置及常见问题总结
  14. 智慧厕所空气净化设备异味除臭杀菌更彻底
  15. 阿里技术团队是如何打造的?
  16. 安装VS2010的SP1补丁的办法
  17. 最老程序员创业札记:全文检索、数据挖掘、推荐引擎应用53
  18. python窗口显示图片imread() imshow()_Python-OpenCV:cv2.imread(),cv2.imshow(),cv2.imwrite()
  19. C#之 十九 使用WinForm控件
  20. 华为服务器euler系统,华为euler服务器

热门文章

  1. 3.TF-IDF算法介绍、应用、NLTK实现TF-IDF算法、Sklearn实现TF-IDF算法、算法的不足、算法改进
  2. 24-Logistic Regression
  3. 利用MAVEN打包时,如何包含更多的资源文件
  4. Jquery之Bind方法参数传递与接收的三种方法
  5. CSS控制所有浏览器水平居中和控制链接不换行的效果
  6. 电机高频注入原理_永磁同步电机转子位置与速度估算的新方法,精度好,性价比高...
  7. 寄存器分配图着色_富士苹果促进着色技术八大要点!是时候看看了!
  8. R-FCN+ResNet-50 训练模型
  9. [深度学习基础] 4. 卷积神经网络
  10. 从vuex源码分析module与namespaced