Beanstalked的初步了解和使用(包括利用beanstalkd 秒杀消息队列的实现)
一 Beanstalkd 是什么
Beanstalkd,一个高性能、轻量级的分布式内存队列系统
二 Beanstalkd 特性
1. 优先级(priority)
注:优先级就意味 支持任务插队(数字越小,优先级越高,0的优先级最高)
2. 延迟(delay)
注:延迟意味着可以定义任务什么时间才开始被消费,也就实现了定时任务(比如为了增加网站活跃性,增加定时评论,定时点赞功能)
3. 持久化(persistent data)
注:Beanstalkd 支持定时将文件刷到日志文件里,即使beanstalkd宕机,重启之后仍然可以找回文件
4. 预留(buried)
注:Beanstalkd支持把一个任务设置为预留,这样,消费者就无法取出这个任务了,等合适的时机再把这个任务拿出来消费
5. 任务超时重发(time-to-run)
注:消费者必须在指定的时间内处理完这个任务,否则就认为消费者处理失败,任务会被重新放到队列,等待消费
三 管道(tube)与任务(job)
注:生产者生产任务,并根据业务需求将任务放到不同管道中,比如和注册有关的任务放到注册管道中,和订单有关的放到订单管道中
注:任务从进入管道到离开管道一共有5个状态(ready,delayed,reserved,buried,delete)
1. 生产者将任务放到管道中,任务的状态可以是ready(表示任务已经准备好,随时可以被消费者读取),也可以是delayed(任务在被生产者放入管道时,设置了延迟,比如设置了5s延迟,意味着5s之后,这个任务才会变成ready状态,才可以被消费者读取)
2. 消费者消费任务(消费者将处于ready状态的任务读出来后,被读取处理的任务状态变为reserved)
3. 消费者处理完任务后,任务的状态可能是delete(删除,处理成功),可能是buried(预留,意味着先把任务放一边,等待条件成熟还要用),可能是ready,也可能是delayed,需要根据具体业务场景自己进行判断定义
具体示意图:
四 Beanstalkd 的安装(git、方式)
注:beanstalkd是不支持windows的 ,必须要在Linux环境下(本地环境介绍 Linux, php7,mysql5.6,nginx,centos7.4)
1. yum install -y git
2. git clone https://github.com/kr/beanstalkd
3. cd beanstalkd
4. make
5. make install
6. 查看安装是否成功
五 开始使用Beanstalkd
1. 绑定地址和端口
beanstalkd -l 127.0.0.1 -p 11301 &
2. composer安装pheanstalkd类(这个类在php使用中会简化很多操作,非常方便)
--------------
2.1 安装composer curl -sS https://getcomposer.org/installer | php
2.2 将composer全局调用 mv composer.phar /usr/local/bin/composer
2.3 cd /usr/local/bin
2.4 chmod +x composer
-----------------------
2.5 composer require pda/pheanstalk
注:pheanstalkd 用法示例:https://github.com/pda/pheanstalk
3. 简单使用
环境介绍(阿里云Linux服务器, 本地win10,Filezilla用于双方文件传输 ,xshell用于连接远程阿里云服务器)
3.1 编写demo.php
<?php require "vendor/autoload.php"; use Pheanstalk\Pheanstalk; $ph = new Pheanstalk('127.0.0.1',11301); print_r($ph->stats());//查看目前pheanStalkd状态信息
3.2 将在本地编写的demo.php代码通过filezilla上传到阿里云服务器 /usr/local/beanstalkd/beanstalkd下,并在linux下运行
https://github.com/kr/beanstalkd.git
https://github.com/kr/beanstalkd.git
4. pheanstalkd类常用的方法
<?php require "vendor/autoload.php"; use Pheanstalk\Pheanstalk; $ph = new Pheanstalk('127.0.0.1',11301); //----------------------------------------维护类---------------------------------- //1.查看目前pheanStalkd状态信息 //print_r($ph->stats()); //2.显示目前存在的管道 //print_r($ph->listTubes()); //3.查看NewUsers管道的信息 //$ph->useTube('NewUsers')->put('test'); //$ph->useTube('NewUsers')->put('up'); //4.向NewUsers管道添加一个up任务 //print_r($ph->statsTube('NewUsers'));//3.查看NewUsers管道的信息 //6.查看指定管道中某一个任务的情况 //$job = $ph->watch('NewUsers')->reserve(); //5.从管道中取出任务(消费) //print_r($ph->statsJob($job)); //6.查看指定管道中某一个任务的情况 //7.查看任务id为1的任务详情 //$job = $ph->peek(1);7.直接取出任务id为1的任务 [注:beanstalkd中所有任务的id都具有唯一性] //print_r($ph->statsJob($job));//查看任务id为1的任务详情 //----------------------------------------生产类-------------------------------------- 第一种 put() //$tube = $ph->useTube('NewUsers');//连接NewUsers管道 //print_r($tube->put('four'));//向NewUsers管道添加任务four,并返回结果 //注: put()方法还有3个可选参数(依次为: 优先级priority,延迟时间delay,任务超时重发ttr) 第二种 putInTube() [注: putInTube()就是对useTube()和put()的封装] //$res = $ph->putInTube('NewUsers','three');//向NewUsers管道添加任务three 注: putInTube()方法还有3个可选参数(依次为: 优先级priority,延迟时间delay,任务超时重发ttr) //print_r($res);//返回任务id //print_r($ph->statsTube('NewUsers'));//查看NewUsers管道的详细情况 //---------------------------------------消费类-------------------------------------- // 1.watch 监听NewUsers管道 [ 注: watch()同样可以监听多个管道 ] //$tube = $ph->watch('NewUsers'); //print_r($ph->listTubesWatched());//打印已经监听的管道 // 2.watch 监听多个管道 //$tube = $ph->watch('NewUsers') // ->watch('default'); //print_r($ph->listTubesWatched());//打印已经监听的管道 // 3.ignore 监听NewUsers管道,忽略default管道 //$tube = $ph->watch('NewUsers') // ->ignore('default'); //print_r($ph->listTubesWatched());//打印已经监听的管道 // 4.reserve 监听NewUsers管道,并且取出任务 //$job = $ph->watch('NewUsers') // ->reserve(); // 注reserve()有1个参数,阻塞的时间,过了阻塞时间,不管有没有东西,直接返回 // //var_dump($job);//打印已经取出的任务 //$ph->delete($job);//删除已经取出的任务 // 5.putInTube/put 向NewUsers管道写入任务 [ 注:此为生产者方法,放到此处是为了方便理解 ] //$ph->putInTube('NewUsers','number_1',5); //$ph->putInTube('NewUsers','number_2',3); //$ph->putInTube('NewUsers','number_3',0); //$ph->putInTube('NewUsers','number_4',4); //print_r($ph->statsTube('NewUsers'));//5.查看NewUsers管道详细信息 // 6.release 将取出的任务放回ready状态,还有2个参数(优先级和延迟) //$job = $ph->watch('NewUsers')->reserve();//6.监听NewUsers管道,并取出任务 //if (true) { // sleep(30); // $ph->release($job);//6.将任务取出之后,停留30秒,然后将任务状态重新变为ready //} else { // $ph->delete($job); //} // 7.bury (预留) 将任务取出之后,发现后面执行的逻辑不成熟(比如发邮件,突然发现邮件服务器挂掉了), //或者说还不能执行后面的逻辑,需要把任务先封存起来,等待时机成熟了,再拿出这个任务进行消费 //$job = $ph->watch('NewUsers')->reserve();//取出任务 //$ph->bury($job);//取出任务后,将任务放到一边(预留) // 8.peekBuried() 将处在bury状态的任务读取出来 //$job = $ph->peekBuried('NewUsers');//将NewUsers管道中处在bury状态的任务读取出来 //var_dump($ph->statsJob($job));//打印任务状态(此时任务状态应该是bury) // 9.kickJob() 将处在bury任务状态的任务转化为ready状态 //$job = $ph->peekBuried('NewUsers');//将NewUsers管道中处在bury状态的任务读取出来 //$ph->kickJob($job); // 10.kick() 将处在bury任务状态的任务转化为ready状态,有第二个参数int, 批量将任务id小于此数值的任务转化为ready //$ph->useTube('NewUsers')->kick(65);//把NewUsers管道中任务id小于65,并且任务状态处于bury的任务全部转化为ready // 11.peekReady() 将管道中处于ready状态的任务读出来 //$job = $ph->peekReady('NewUser');//将NewUser管道中处于ready状态的任务读取出来 //var_dump($job); //$ph->delete($job); // 12.peekDelay() 将管道中所有处于delay状态的任务读取出来 //$job = $ph->peekDelayed('NewUser'); //var_dump($job); //$ph->delete($job); // 13.pauseTube() 对整个管道进行延迟设置,让管道处于延迟状态 //$ph->pauseTube('NewUser',10);//设置管道NewUser延迟时间为10s //$job = $ph->watch('NewUser')->reserve();//监听NewUser管道,并取出任务 //var_dump($job); // 14.resumeTube() 恢复管道,让管道处于不延迟状态,立即被消费 //$ph->resumeTube('NewUser');//取消管道NewUser的延迟状态,变为立即读取 //$job = $ph->watch('NewUser')->reserve();//监听NewUser管道,并取出任务 //var_dump($job); // 15.touch() 让任务重新计算任务超时重发ttr时间,相当于给任务延长寿命
5. 项目中的应用总结
生产者中常用的方法
useTube() : 如果没有管道,则创建对应管道,有,则直接使用
put() : 向管道中放任务
消费者中常用的方法步骤:
1. watch():监听管道
2. reserve():将管道中处于ready状态的任务读取出来
3.1 可以使用delete 方法删除任务
3.2 可以使用release 方法将任务放回ready状态
3.3 可以使用bury 方法将任务先放一边(例如发邮件,邮箱服务器挂掉),等待条件成熟再取出来
6. 实际应用演示
producer.php
<?php require "vendor/autoload.php"; use Pheanstalk\Pheanstalk; $ph = new Pheanstalk('127.0.0.1',11301); $ph->useTube('List')->put('goods'); $ph->useTube('List')->put('goods2'); $ph->useTube('List')->put('goods3'); $ph->useTube('List')->put('goods4'); $ph->useTube('List')->put('goods5'); //print_r($ph->statsTube('List'));//查看List管道的信息
consumer.php
<?php require "vendor/autoload.php"; use Pheanstalk\Pheanstalk; $ph = new Pheanstalk('127.0.0.1',11301); $res = $ph->watch('List')->reserve();//监听List管道,并将任务取出来 if ($res) {$ph->delete($res); var_dump($res); }
注:
生产者根据业务不同,将任务放到不同管道(管道用于存储消费者生产的任务),比如将和注册有关的任务通通放到注册管道,和订单有关的通通放入订单管道
消费者将处于ready状态的任务根据优先级逐个读取出来
五 使用Beanstalkd 实现类似redis秒杀活动
producer.php代码:
<?php require "vendor/autoload.php"; use Pheanstalk\Pheanstalk; //连接beanstalkd $ph = new Pheanstalk('127.0.0.1', 11301); $tube_name = 'SecKill2'; //使用SecKill2管道 $SEC = $ph->useTube($tube_name); //模拟100人请求秒杀 for ($i = 0; $i < 100; $i++) {$uid = rand(10000000, 99999999); //获取当前队列已经拥有的数量,如果人数少于十,则加入这个队列 $total_jobs = $ph->statsTube($tube_name)['total-jobs']; $num = 10; if ($total_jobs < $num) {$SEC->put($uid);//向管道放任务 echo $uid . "秒杀成功"; } else {//如果当前队列人数已经达到10人,则返回秒杀已完成 echo "秒杀已结束<br>"; } } print_r($ph->statsTube($tube_name));//查看SecKill2管道的信息
注: 执行完producer.php代码后,应当会在SecKill2管道中看到10个任务,每一个任务内容是一个uid
consumer.php代码:
<?php require "vendor/autoload.php"; use Pheanstalk\Pheanstalk; //连接BeanStalkd队列系统 $ph = new Pheanstalk('127.0.0.1',11301); $tube_name = 'SecKill2'; //取出SecKill2管道的任务总数 $total_jobs = $ph->statsTube($tube_name)['total-jobs']; //PDO连接mysql数据库 $dsn = "mysql:dbname=test;host=127.0.0.1"; $pdo = new PDO($dsn, 'root', '123456'); //循环取出管道中任务,并执行插入数据库操作 for($i = 0; $i < $total_jobs; $i++){//监听SecKill4管道,并将任务取出来 $job = $ph->watch($tube_name)->reserve(); //取出任务存储的值uid $uid = $job->getData();//打印出的样子 string(8) "24541944" if (!$uid) {sleep(2); continue; }//生成订单号 $orderNum = build_order_no($uid); //生成订单时间 $timeStamp = time(); //构造插入数组 $user_data = array('uid'=>$uid,'time_stamp'=>$timeStamp,'order_num'=>$orderNum); //将数据保存到数据库 $sql = "insert into seckill (uid,time_stamp,order_num) values (:uid,:time_stamp,:order_num)"; $stmt = $pdo->prepare($sql); $res = $stmt->execute($user_data); //如果数据库操作成功,则删除任务 if ($res) {$ph->delete($job); }}//生成唯一订单号 function build_order_no($uid){return substr(implode(NULL, array_map('ord', str_split(substr(uniqid(), 7, 13), 1))), 0, 8).$uid; }
注: 执行完consumer.php文件之后, 数据表应该已经写入了10个订单
注:
用到的数据表
create table `seckill`( `uid` int unsigned not null default '0', `time_stamp` int unsigned not null default '0', `order_num` bigint unsigned not null default '0', primary key (`uid`), key (order_num) )engine = myisam default charset= utf8;
参考视频地址: https://www.imooc.com/video/16016
这边博客可能会用到 https://blog.csdn.net/ahjxhy2010/article/details/53196450
Beanstalked的初步了解和使用(包括利用beanstalkd 秒杀消息队列的实现)相关推荐
- php使用redis消息队列swoole,EasySwoole中利用redis实现消息队列
什么是队列? 从数据结构上来讲,队列是一种先进先出的数据结构 什么是消息队列? 消息队列可以简单理解为:把要传输的数据放在队列中 消息队列可以分为生产者和消费者,将传输的数据放到消息队列当中,就相当于 ...
- PHP中利用redis实现消息队列处理高并发请求思路详解
在电商活动中,常常会出现高并发的情况,例如很多人同时点击购买按钮,以至于购买人数超出了库存量,这是一种非常不理想的状况,因此,我们在PHP开发中就会引入消息队列来解决这种高并发的问题. 当用户点击按钮 ...
- 利用Redis实现消息队列原理
2019独角兽企业重金招聘Python工程师标准>>> 1.由于redis是单线程因此可以用用redis中的List可以实现队列 2.代码模拟 代码结构 生产者模拟程序 /** * ...
- springboot利用redis作为消息队列mq使用
先吐个槽:经常看到有人对不同得消息队列做各种各样得评价以及所谓得性能测试,评估选型等等,岂不知脱离任何业务得技术评估都是瞎扯淡.(公司实际业务技术场景不提,满口胡说kafa怎么样,activemq怎么 ...
- 消息队列的实践php,php消息队列处理实践 ,利用AMQP和redis两种方法
一:利用AMQP方法 类 amqp.php e_name = $e_name; $this->q_name = $q_name; $this->k_route = $k_route; // ...
- 如何利用matlab做BP神经网络分析(包括利用matlab神经网络工具箱)
如何利用matlab做BP神经网络分析(包括利用matlab神经网络工具箱) 转载:https://blog.csdn.net/xgxyxs/article/details/53265318 最近一段 ...
- c语言编写队列元素逆置,数据结构与算法实验—利用栈逆置队列元素.doc
数据结构与算法实验-利用栈逆置队列元素 利用栈逆置队列元素实验报告 通信1204班 谢崇赟 实验名称 利用堆栈将队列中的元素逆置 实验目的 会定义顺序栈和链栈的结点类型. 掌握栈的插入和删除结点在操作 ...
- 消息队列中:消息可靠性、重复消息、消息积压、利用消息实现分布式事务
点击下方"Java编程鸭"关注并标星 更多精彩 第一时间直达 一.如何确保消息不丢失? 1.检测消息丢失的方法 可以利用消息队列的有序性来验证是否有消息丢失.在Producer端给 ...
- 21 利用分布式消息队列降低系统耦合性
国内某大型互联网企业经常因为对同行的产品进行微创新,然后推岀自己的产品而遭人诟病,不讨论这种做法是否合适,我们分析这些产品,发现大多数都比原创产品有 更好的用户体验.这些产品常常后来居上,更速度地推岀 ...
最新文章
- epoll相关资料整理
- Innodb表压缩过程中遇到的坑(innodb_file_format) - billy鹏
- 如何一键部署php应用,我们怎样来使用宝塔面板一键部署安装博客程序ZBlogPHP
- 提示 launch failed
- poj 2191 Mersenne Composite Numbers
- Scrum Meeting---Ten(2015-11-5)
- linux连接wpa wifi密码,Linux环境下使用WIFI模块:使用wpa_supplicant工具配置和连接WIFI-Go语言中文社区...
- xps15u盘装linux,戴尔XPS 15 9575笔记本安装win10系统的操作教程
- html td双击事件,tr td同时添加点击事件
- linux构建lamp的关键步骤,Linux-LAMP平台搭建详解
- Ink脚本语言学习笔记(二)
- springboot项目日志记录访问客户端ip地址
- 开发者笑疯了! LLaMa惊天泄露引爆ChatGPT平替狂潮,开源LLM领域变天
- Unity | 碰撞检测相关
- 如何绘制UML用例图
- Java项目:房屋租赁管理系统(java+SSM+Layui+Maven+Mysql+Jsp
- String中与获取相关的几个方法
- 《论文阅读》Global-Local Bidirectional Reasoning for Unsupervised Representation Learning of 3D Point Clou
- permit doing 与permit to do详细区别
- sio.savemat得到空struct解决方法
热门文章
- C++ 自定义调试信息的输出
- SQLServer事务的隔离级别
- 编程语言也环保?C语言领跑,Python、Perl垫底
- c#爬虫-解决ChromeDriver 版本问题
- C# FileSystemWatcher文件监控实例
- MongoDB,凉凉?
- Csv解析CsvFile
- .Net在线编辑工具.NET Fiddle
- 做权限认证,还不了解IdentityServer4?不二话,赶紧拥抱吧,.NET Core官方推荐!...
- 最终选型 Blazor.Server:又快又稳!