今天闲着的没事,看了phalapi官网没有redis队列的姿势,于是心血来潮使用redis做MQ队列,以下仅为个人娱乐,经过测试可以解决多消费者重复消费问题,支持多消费进程,多种消费自定义任务

**首先需要安装phalapi-cli服务,具体操作参考:phalapi-cli命令

<?phpnamespace App\Api;use PhalApi\Api;/*** redis-MQ队列* Class Guide* @package App\Api*/
class Guide extends Api  {public function getRules() {return array();}/*** @Notes:* 生产者* @Interface test* @author [MengShuai] [<133814250@qq.com>]*/public function test(){$k= "";for ($i=1;$i<=100;$i++){$k_name = 'mysql_push_'.$i.'_'.rand(100000,999999);$value = $i.'_'.'推送内容:'.date("H:i:s").'--'.rand(100000,999999);/*** 第一步,创建mysql原始任务,或者其他,也可以省略*/\PhalApi\DI()->notorm->push->insert(['k' => $k_name,'value' => $value,'status' => 1]);/*** 第二步,设置队列key的相关参数* 这里使用SETNX实现锁机制,并借助 Expire设置超时时间,防止队列崩溃后key长期在内存中占有资源,redis版本需要在2.9以上* 加锁的目的:在消费者是多线程并发高的情况下,避免重复消费操作,配合事务和单原子机制在消费者读取生产属性时只允许一个进程进行相关操作(del删除生产属性)*/\PhalApi\DI()->redis->set($k_name, $value, array('nx', 'ex' => 86400));/*** 第三步,丢入队列,左进队列,如需从右:rPush* 注意:务必在最后操作,否则可能会出现set未设置成功就被消费的情况*/\PhalApi\DI()->redis->lPush('push_queue',$k_name);$k.= "$k_name:::$value<br>";}return ['code' => 200,'info' =>['k' => $k,]];}/*** @Notes:* 消费者* 守护进程运行* nohup php push.php & 开启守护进程运行,修改文件之后需要从新启动* blpop 有值则回去 没值则阻塞 主要就是这个函数在起作用 不过并不安全,程序在执行过程中崩溃就会导致队列中的内容* 永久丢失~* BRPOPLPUSH 阻塞模式 右边出 左边进 在填写队列内容的时候要求从左进入** @Interface push* @author [MengShuai] [<133814250@qq.com>]*/public function push(){/*** socket流从建立到传输再到关闭整个过程的最大超时时间,在cli命令行中需要设为不超时,否则可能会抛出超时异常中断进程*/ini_set('default_socket_timeout', -1);print "开始等待队列……\n";/*** brpoplpush():命令从列表中取出最后一个元素,并插入到另外一个列表的头部;* 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。*/While ($key = \PhalApi\DI()->redis->brpoplpush('push_queue', 'temp_queue', 0)) {/*** 监视一个或多个key:watch(array($key,$key2)))* 如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断*/\PhalApi\DI()->redis->watch(array($key));/*** 声明事务开始,后续命令将排队按顺序等待exec执行*/\PhalApi\DI()->redis->multi();/*** 读取生产者key中的任务属性*/if ($val = \PhalApi\DI()->redis->get($key)) {/*** 这里可以自定义一些跳过消费*/$arr = explode('_', $key);if (count($arr) != 4) {continue;}try {/*** 读取到key之后直接删除*/\PhalApi\DI()->redis->del($key);/*** 执行事务块内的所有命令*/$status = \PhalApi\DI()->redis->exec();/*** 失败则取消事务* 这里一般是在重复操作时会触发,直接跳过消费*/if (!$status) {\PhalApi\DI()->redis->discard();continue;}/*** 自定义消费处理逻辑* 这里选择更新生产者中自定义插入的数据状态,用于测试消费机制*///自定义操作//handle($val);$s = 0;$info = \PhalApi\DI()->notorm->push->where(['k' => $key])->fetchOne();if ($info == null){continue;}if ($info['status'] == 1) {$s = \PhalApi\DI()->notorm->push->where(['k' => $key])->update(array('status' => new \NotORM_Literal("status + 1")));}echo '推送:' . "{$info['status']}-- . $key--$val,s:$s\r\n\r\n";} catch (\Exception $e) {echo "发生错误:" . $e->getMessage() . "\r\n";}}else{echo "key:$key :结果不存在!!\r\n";}}exit("进程退出..\r\n");}/*** @Notes:* 错误处理* 自动处理temp_queue中的元素,这个操作是防止消费者崩溃的时候做处理* 处理思路是 使用brpop 命令阻塞处理temp_queue这个队列中的值,如果能获取到"值"对应的"值",说明消费者执行失败了* 将值还lpush到push_queue中,以备从新处理* 至于为什么使用brpop命令,是因为在消费者中我们使用的是brpoplpush* nohup php auto.php & 开启守护进程运行,修改文件之后需要从新启动** @Interface auto* @author [MengShuai] [<133814250@qq.com>]*/public function auto(){/*** socket流从建立到传输再到关闭整个过程的最大超时时间,在cli命令行中需要设为不超时,否则可能会抛出超时异常中断进程*/ini_set('default_socket_timeout', -1);/*** 在brpoplpush()中设置的第二队列,当消费者第一次处理崩溃时,还会有一次补救机会,可以单独做容错处理,也可以重新丢进队列*/while ($key_arr = \PhalApi\DI()->redis->brPop('temp_queue', 0)) {//          打印$key_arr,var_export($key_arr),结果:
//            array (
//                0 => 'temp_queue',
//                1 => 'mysql_push_99_418060',
//            )if (count($key_arr) != 2) {continue;}$key = $key_arr[1];echo "$key\r\n";/*** 能获取到值 说明消费执行失败*/if (\PhalApi\DI()->redis->get($key)) {//                \PhalApi\DI()->redis->del($key);\PhalApi\DI()->redis->lPush('push_queue', $key);}}}}
登录终端1.开启消费者(可以开启多个,负载均衡):测试代码:
php /www/wwwroot/***/api2/public/cli -s App.Guide.Push测试通过没问题后,便可放到后台执行。使用nohub命名:nohub安装:yum -y install nohubnohub /www/wwwroot/***/api2/public/cli -s App.Guide.Push >> /www/wwwroot/***/api2/public/App.Guide.Push.cli.log 2>&1 &2.开启错误处理,消费者处理崩溃时的容错处理测试代码:
php /www/wwwroot/***/api2/public/cli -s App.Guide.auto测试通过没问题后,便可放到后台执行。使用nohub命名:
nohub /www/wwwroot/***/api2/public/cli -s App.Guide.auto >> /www/wwwroot/***/api2/public/App.Guide.auto.log 2>&1 &补充:守护进程脚本:
#!/bin/bash# 当前数量
cur_client_num=`ps -ef| grep /www/wwwroot/***/api2/public/cli |grep -v grep|wc -l`# 最大进程数量
MAX_CLIENT_NUM=20source /etc/environmentfor((i=$cur_client_num;i<$MAX_CLIENT_NUM;i++));
donohub php /www/wwwroot/***/api2/public/cli -s App.Guide.auto >> /www/wwwroot/api.qvnidaye.com/api2/public/App.Guide.auto.log 2>&1 &
done停止脚本:
#!/bin/bashkill `ps -ef| grep /www/wwwroot/***/api2/public/cli |grep -v grep | awk '{print $2}'````powershell
附上一个已经集成好的脚本
#!/bin/bash
action=$1# 运行路径
cd /www/wwwroot/*/api2/public/function auto(){# 当前数量cur_client_num=`ps -ef| grep App.Queue.Auto |grep -v grep|wc -l`# 最大进程数量MAX_CLIENT_NUM=1for((i=$cur_client_num;i<$MAX_CLIENT_NUM;i++));donohup php ./cli -s App.Queue.Auto --sign 732c9344213f5301a057f199e2e01f50 > /dev/null 2>&1 &doneecho -e ""echo -e "\033[32m[\033[0m 刷新容错进程成功.. \033[32m]\033[0m"}function push(){# 当前数量cur_client_num=`ps -ef| grep App.Queue.Push |grep -v grep|wc -l`# 最大进程数量MAX_CLIENT_NUM=3for((i=$cur_client_num;i<$MAX_CLIENT_NUM;i++));donohup php ./cli -s App.Queue.Push --sign 3c3897bbe926ded26e299fead21071d8 >/dev/null 2>&1 &doneecho -e ""echo -e "\033[32m[\033[0m 刷新消费进程成功.. \033[32m]\033[0m"
}function stop(){kill `ps -ef| grep App.Queue.Push |grep -v grep | awk '{print $2}'`kill `ps -ef| grep App.Queue.Auto |grep -v grep | awk '{print $2}'`echo -e ""echo -e "\033[32m[\033[0m cli进程已全部杀死.. \033[32m]\033[0m"   #echo -e "\033[32m[\033[0m 查询进程:ps -aux|grep cli \033[32m]\033[0m"#ps -aux|grep cli}function run(){(push)(auto)echo -e ""echo -e "\033[32m[\033[0m 查询进程:ps -aux|grep cli \033[32m]\033[0m"ps -aux|grep cli}function restart(){(stop)(run)echo -e ""echo -e "\033[32m[\033[0m 守护进程重启完毕.. \033[32m]\033[0m"
}function jobs(){ps -aux|grep cli
}case $action in"jobs")(jobs);;"restart")(restart);;"run")(run);;"push")(push);;"auto")(auto);;"stop")(stop);;  *) echo '请按照如下命令执行 mq [restart|run|auto|push|stop] 例如重启为 mq restart';;
esac

mysql测试表结构:

测试方式:
1.通过http创建任务:?s=App.Guide.Test
2.查看终端队列日志和mysql更新状态是否正常

phalapi使用redis做MQ队列相关推荐

  1. 使用redis做消息队列mq的总结

    总结 目前使用redis做消息队列的的方式有3中,list,      publish/subscribe,       stream list做mq的总结 使用方法 1. 生产者可以 lpush 写 ...

  2. ibm linux mq 发送消息_RabbitMq、ActiveMq、Kafka和Redis做Mq对比

    一.RabbitMq RabbitMQ是一个Advanced Message Queuing Protocol(AMQP)的开源实现,由以高性能.可伸缩性出名的Erlang写成.RabbitMQ Se ...

  3. Redis做消息队列,香吗?

    来自:架构师修行之路 菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的 ...

  4. aliyun redis 链接超时_用redis做异步队列,原来还可以这样

    Redis设计主要是用来做缓存的,但是由于它自身的某种特性使得它可以用来做消息队列. 它有几个阻塞式的API可以使用,正是这些阻塞式的API让其有能力做消息队列: 另外,做消息队列的其他特性例如FIF ...

  5. 程序员过关斩将--redis做消息队列,香吗?

    菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的redis做的MQ,很 ...

  6. php redis查看队列长度,php redis做消息队列解决流量削峰常用的5个指令

    线上的秒杀等业务场景,需要类似的解决方案,需要平安度过同时抢购带来的流量峰值的问题.如果此时还用mysql做消息队列是什么容易挂掉的,如果死轻量级的秒杀我们完全可以用redis来应对. 削峰从本质上来 ...

  7. 使用Redis Stream来做消息队列和在Asp.Net Core中的实现

    Redis - Wikipedia 写在前面 我一直以来使用redis的时候,很多低烈度需求(并发要求不是很高)需要用到消息队列的时候,在项目本身已经使用了Redis的情况下都想直接用Redis来做消 ...

  8. PHP + Redis 实现消息队列

    Redis做消息队列的好处在于它的轻量级,高并发,延迟敏感,应用场景有 即时数据分析.秒杀计数器.缓存等 Redis做消息队列待解决的问题: 1.消息的可靠性: 没有相应的机制保证消息的消费,当消费者 ...

  9. php redis zset 延迟队列_PHP + Redis 实现简单消息队列

    Redis做消息队列的好处在于它的轻量级,高并发,延迟敏感. 应用场景有即时数据分析.秒杀计数器.缓存等. Redis做消息队列待解决的问题: 1.消息的可靠性: 没有相应的机制保证消息的消费,当消费 ...

最新文章

  1. 0、Spring 注解驱动开发
  2. 传指针与指针引用的区别
  3. 【杂题总汇】NOIP2013(洛谷P1967) 货车运输
  4. main函数与命令行参数
  5. boost::mp11::mp_iota相关用法的测试程序
  6. mysql知识结构图_MySql知识结构说明
  7. PHP7新增的主要特性
  8. 设置SecureCRT配色和解决乱码问题
  9. java环境变量设置优化_Mac/windows配置jdk环境变量-seo优化只选拉一八科技
  10. MatConvnet工具箱文档翻译理解四
  11. 解决源码安装手册找不到问题
  12. 微信小程序 免密代扣
  13. js树结构数据的递归操作
  14. 模拟信号与数字信号的本质区别
  15. 脱壳工具 postern.apk文件
  16. 华为防火墙笔记-出口选路
  17. C 语言跳动的心(可变色版本)
  18. Apache Spark处理大数据入门,看这一篇就够了
  19. python aiml_使用Python AIML搭建聊天机器人的方法示例
  20. Win7安装VS2008 SP1卡在 VS90sp1-KB945140-X86-CHS 的解决方法

热门文章

  1. Hadoop实验4:MapReduce编程
  2. win10恢复经典开始菜单_Startisback——一个能让你的WIN10状态栏完全透明的强大软件...
  3. 【刷题】2.BM3 链表中的节点每k个一组翻转
  4. sdwebimage 加载webp图片
  5. 多进程打包thread-loader, happyPack和多进程压缩ParallelUglifyPlugin的配置使用
  6. Hugging Face PEFT 调优实战附代码
  7. 处理CDR x8安装时显示“已停止工作”的教程
  8. PyCharm 代码调试教程
  9. hutool 导出复杂excel(动态合并行和列)
  10. (二)音视频:MediaCodec编码桌面信息 完整Demo 进一步理解H264