tp5 mysql实现消息队列_TP5系列 | Queue消息队列
消费信息如下ThinkPHP5 Queue消息队列
优点
1、Queue内置了 Redis,Database,Topthink ,Sync这四种驱动,本文使用Redis驱动
2、Queue消息队列适用于大并发或者返回结果 时间有点长并需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送
3、Queue消息消息可进行发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等操作
流程图
创建队列
文件路径:application\common\queue\TestQueue.php
TestQueue.php 参考代码
namespace app\common\queue;
use think\facade\Log;
use think\queue\Job;
class TestQueue
{
public function fire(Job $job, $data)
{
$isJobDone = $this->testJob($data);
// 如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
if ($isJobDone) {
$job->delete();
} else {
//通过这个方法可以检查这个任务已经重试了几次了
$attempts = $job->attempts();
echo $attempts;
if ($attempts == 0 || $attempts == 1) {
// 重新发布这个任务
$job->release(2); //$delay为延迟时间,延迟2S后继续执行
} elseif ($attempts == 2) {
$job->release(5); // 延迟5S后继续执行
}
}
}
/**
* @Desc: 任务执行失败后自动执行方法
* @param $data
*/
public function failed($data)
{
// ...任务达到最大重试次数后,失败了
Log::error('任务达到最大重试次数后,失败了 '.json_encode($data));
}
/**
* @Desc: 自定义需要加入的队列任务
*/
private function testJob($data)
{
$jsonData = json_encode($data);
echo "1、具体执行任务接受到的参数:{$jsonData} \r\n";
if($data){
echo "2、恭喜你!{$data['email']} 邮件发送成功了 \r\n";
return true;
}else{
echo "2、很遗憾,{$data['email']} 邮件发送失败了 \r\n";
return false;
}
}
}
配置队列
1、这里使用Redis驱动来存储队列消息
2、队列配置文件路径:application\config\queue
配置参考代码
return [
'connector' => 'Redis',
'expire' => 3600,
'default' => 'REDIS_QUEUE',
'host' => 'dnmp-redis',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false,
];
生产者参考代码
/**
* @Desc: 生产者生产消息
*/
public function productMsg()
{
// 当前任务所需的业务数据,不能为 resource 类型,其他类型最终将转化为json形式的字符串
$data = [
'email' => rand(11,99).'@qq.com',
'username' => 'Tinywan'
];
// 当前任务归属的队列名称,如果为新队列,会自动创建
$queueName = 'testQueue';
// 将该任务推送到消息队列,等待对应的消费者去执行
$isPushed = Queue::push(TestQueue::class, $data, $queueName);
// database 驱动时,返回值为 1|false; redis驱动时,返回值为 随机字符串|false
if ($isPushed !== false) {
echo '['.$data['email'].']'." 队列加入成功 \r\n";
} else {
echo "队列加入失败 \r\n";
}
}
为了方便演示,这里使用cli模式。
执行生产者:php product_msg.php
# php product_msg.php
[27@qq.com] 队列加入成功
# php product_msg.php
[77@qq.com] 队列加入成功
1、这时候消息已经被持久化到Redis中去了(通过列表去存储)
2、推送成功,虽然我们这时候已经写好了消费者,但是我们并没有开始消费。但是推送消息依然是成功的。这个就是中间件的优势。他连接两个系统,但是又不会互相耦合,生产者并不会因为消费者的异常而影响到自己。
3、消息推送成功之后,如果没有消费者,消息会堆积在队列中。不过别怕,消息堆积很正常,并且一般的中间件堆积能力是非常强的。比如阿里就宣传自己mq可以堆积上亿条数据。
查看Redis消息与队列
> docker exec -it dnmp-redis redis-cli
127.0.0.1:6379> keys *
127.0.0.1:6379> keys *
1) "queues:testQueue"
127.0.0.1:6379> TYPE queues:testQueue
list
127.0.0.1:6379> LRANGE queues:testQueue 0 -1
1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"27@qq.com\",\"username\":\"Tinywan\"},\"id\":\"MLgNb4LFALhtmp7HZtfXMFPRUT0r94Bi\",\"attempts\":1}"
2) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"
127.0.0.1:6379>
消费者
开始消费消息。执行cli 命令 php think queue:work--queue队列名称
# php think queue:work --queue testQueue
1、具体执行任务接受到的参数: {"email":"27@qq.com","username":"Tinywan"}
2、恭喜你!27@qq.com 邮件发送成功了
Processed: app\common\queue\TestQueue
这里每消费掉一条消息,Redis数据库中将会减少一条消息
查看Redis队列消息
127.0.0.1:6379> LRANGE queues:testQueue 0 -1
1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"
127.0.0.1:6379>
命令行挂起守护进程执行
/usr/bin/php /var/www/tp5/think queue:work --daemon --queue testQueue --memory 256
--daemon 是否循环执行,如果不加该参数则该命令处理完下一个消息就退出 --queue 要处理的队列的名称 --delay 0 如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0。 --memory 该进程允许使用的内存上限,以M为单位。
流程图
消费信息如下
php think queue:work --daemon --queue testQueue
1、具体执行任务接受到的参数: {"email":"77@qq.com","username":"Tinywan"}
2、恭喜你!77@qq.com 邮件发送成功了
Processed: app\common\queue\TestQueue
1、具体执行任务接受到的参数: {"email":"80@qq.com","username":"Tinywan"}
2、恭喜你!80@qq.com 邮件发送成功了
Processed: app\common\queue\TestQueue
1、具体执行任务接受到的参数: {"email":"34@qq.com","username":"Tinywan"}
2、恭喜你!34@qq.com 邮件发送成功了
Processed: app\common\queue\TestQueue
1、命令行模式可以常驻内存不停的执行php代码。这样就可以达到类似于静态语言的java的效果。
2、一开始监听队列。刚刚在队列中堆积的消息立刻就被获取到,开始执行了代码。最后执行完成,删除了消息。
3、在 queue:work--daemon 单进程循环消费的时候,改了代码是不会生效的。这时脚本语言有点类似于静态语言在执行。所以需要我们用queue:restart重启 work 进程 。
命令行挂起守护进程执行
/usr/local/php/bin/php /data/wwwroot/default/thinkphp_5/think queue:work --daemon --queue testQueue --memory 256
查看进程是否在运行
# ps
PID USER TIME COMMAND
1 root 0:00 php-fpm: master process (/usr/local/etc/php-fpm.conf)
6 www-data 0:00 php-fpm: pool www
7 www-data 0:00 php-fpm: pool www
16 root 0:00 sh
56 root 0:00 sh
113 root 0:00 php think queue:work --daemon --queue testQueue
你再也不用守在终端了,以后只负责生产消息就可以了。Redis队列也不会积累消息了
其他(中间件)
中间件系统的定义是两个独立的不同的系统在中间构建起传递消息的工具。但是同一个系统也可以通过中间件来榨取性能,大家肯定项目中遇到过性能瓶颈。
比如发送邮件,发送短信,转换视频格式等等。这些业务都是比较耗性能,又对先后顺序不敏感的业务。这种业务就非常适合使用消息队列系统来异步处理,使性能提升。
重启队列和生成队列
tp5 mysql实现消息队列_TP5系列 | Queue消息队列相关推荐
- c语言程序结构环形队列入队,C语言 环形队列
队列 :队列是一种先进先出的数据结构. 比如说 排队买票, 有一个售票口,最多能排30人,那么最大存储空间就是30人, 每当有1个新人过来排队,就会站在队尾,这就叫入队, 每当有1个人买到票了,就会离 ...
- 【消息中间件MQ系列】消息队列之ActiveMQ、RocketMQ、RabbitMQ、Kafka不得不说的秘密
热门系列: [消息中间件MQ系列]RabbitMQ安装与使用,并与SpringBoot整合 程序人生,精彩抢先看 目录 1.消息队列/消息中间件介绍 1.1 消息中间件是什么 1.1.1 消息中间件的 ...
- Cris 玩转大数据系列之消息队列神器 Kafka
Cris 玩转大数据系列之消息队列神器 Kafka Author:Cris 文章目录 Cris 玩转大数据系列之消息队列神器 Kafka Author:Cris 1. Kafka 概述 1.1 消息队 ...
- 大型网站架构系列:消息队列
http://www.codeceo.com/article/web-archte-message-queue.html?from=timeline&isappinstalled=0#1000 ...
- 阿里云消息队列python_41. Python Queue 多进程的消息队列 PIPE
消息队列: 消息队列是在消息传输过程中保存消息的容器. 消息队列最经典的用法就是消费者和生产者之间通过消息管道来传递消息,消费者和生产生是不通的进程.生产者往管道中写消息,消费者从管道中读消息. 相当 ...
- redis 队列_Redis系列5实现简单消息队列
任务异步化 打开浏览器,输入地址,按下回车,打开了页面.于是一个HTTP请求(request)就由客户端发送到服务器,服务器处理请求,返回响应(response)内容. 我们每天都在浏览网页,发送大大 ...
- 【转】Azure Messaging-ServiceBus Messaging消息队列技术系列2-编程SDK入门
各位,上一篇基本概念和架构中,我们介绍了Window Azure ServiceBus的消息队列技术的概览.接下来,我们进入编程模式和详细功能介绍模式,一点一点把ServiceBus技术研究出来. 本 ...
- think queue 消息队列初体验
使用的是tp5 自带的消息队列 thinkphp top里的 消息队列框架 think-queue 这是thinkphp官方团队开发的一个专门支持队列服务的扩展包 消息队列应用场景: 消息队列适用于 ...
- 进程间通信-Queue 消息队列 先进先出
Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信. multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序 初始化Queue( ...
最新文章
- python合并k个有序链表_Leetcode合并K个升序链表(Python版本),LeetCode,python
- Golang之单元测试
- java 连接池实例_功能完善的Java连接池调用实例
- Netty--ByteBuf
- stacking模型融合_【干货】比赛后期大招之stacking技术分享
- DUTCTF 201x RE20
- bartender实现即扫即打印功能扫描完后自动打印_日本彩色激光打印机推荐人气排名15款...
- 分享几套古典复古式的UI设计
- Ajxa验证用户和二级联动的实例(五)
- mongodb启动很慢:[initandlisten] found 1 index(es) that
- 2019年上半年软件设计师上午试题及答案
- vue项目动态拼接图片地址方法
- 很多IT从业者感觉到中国程序员前景一片灰暗,事实是如此吗?
- 一个朋友写的诗词收藏
- QT课程设计:C++英语单词记忆软件程序开发
- highcharts多坐标轴混合图
- 机器学习实战之信用卡欺诈案列
- 函数连续、可导、可微、连续可微
- Mac连接不上无线网络的解决方法
- 窗体泄漏错误has leaked window android.widget
热门文章
- 亿级规模的 Feed 流系统,如何轻松设计?
- 让你久等了!《码出高效:Java 开发手册》正式发布
- NLP的ImageNet时代已经到来
- 2020 年,为什么非要采用 DevOps 文化不可?
- 微博短视频千万级高可用、高并发架构如何设计?
- 刚出炉!AI指数报告:AI人才需求暴涨35倍,薪酬问鼎No.1
- Kafka精华问答 | Kafka有哪些使用场景?
- dataframe iloc_如何使用iloc和loc 对Pandas Dataframe进行索引和切片
- vue2.0项目部署到服务器_vue项目运行npm run build打包后如何发布到服务器?
- mysql or_MySQL中or语句用法示例