使用Beanstalk搭建队列服务
使用Beanstalk搭建队列服务
Beanstalkd介绍
一个高性能、轻量级的分布式内存队列系统。高性能离不开异步,异步离不开队列,而其内部都是Producer-Consumer模式的原理。
组成部分
组件 | 说明 |
---|---|
管道(tube) | 一个有名称的任务队列,用来存储统一类型的job,是producer和consumer的操作对象 |
任务(job) | 一个需要异步处理的任务,需要放在tube中 |
生产者(producer) | job的生产者,通过put命令来将一个job放到一个tube中 |
消费者(consumer) | job的消费者,通过reserve、release、bury、delete命令来获取job或改变job的状态 |
特性
- 优先级: 可以设置任务的优先级
- 延迟: 设置任务多少秒后才允许被消费者使用
- 持久化: 定时刷新数据到文件,服务器挂掉后数据依旧存在
- 超时控制: 消费者必须在指定时间内完成任务,否则就会重新放入管道任务预留:消费者先暂时跳过任务不处理
- 分布式容错: 分布式设计和Memcached类似,beanstalkd各个server之间并不知道彼此的存在,都是通过client来实现分布式以及根据tube名称去特定server获取job。
任务状态
状态 | 说明 |
---|---|
ready | 已经准备好的任务,可以给消费者获取 |
delayed | 延迟执行的任务,设置时候设置了延迟时间 |
reserved | 已被消费者获取,正在执行的任务,Beanstalkd服务负责检查任务是否 在TTR(time-to-run)内完成 |
buried | 保留的任务,任务不会被执行,也不会消失 |
delete | 消息被彻底删除,Beanstalkd不再维护这些消息 |
适用场景
- 用作延时队列: 比如可以用于如果用户30分钟内不操作,任务关闭。
- 用作循环队列: 用release命令可以循环执行任务,比如可以做负载均衡任务分发。
- 用作兜底机制: 比如一个请求有失败的概率,可以用Beanstalk不断重试,设定超时时间,时间内尝试到成功为止。
- 用作定时任务: 比如可以用于专门的后台任务。
- 用作异步操作: 这是所有消息队列都最常用的,先将任务仍进去,顺序执行。
服务安装
- 下载beanstalkd-1.11
wget https://codeload.github.com/beanstalkd/beanstalkd/tar.gz/v1.11
- 安装
tar xzvf beanstalkd-1.11.tar.gz
cd beanstalkd-1.11
make & make install
beanstalkd -v
- 启动服务
beanstalkd -l 0.0.0.0 -p 11300 -b /log/beanstalkd/binlog -F
队列应用(PHP)
composer安装 Pheanstalk 类库
//PHP版本要求 7.1+
composer require pda/pheanstalk:~4.0
执行composer后,在项目composer.json配置文件中将增加pda/pheanstalk依赖包
"require": {"pda/pheanstalk": "~4.0"
}
Producer添加任务
//创建实例
$client = Pheanstalk::create($host, $port, $timeout);//设置使用的tube,添加任务数据
//$data 任务数据
//$priority 任务优先级.小优先级数值的job将会排在大优先级 数值的job前面执行。
//最高优先级是0,最低优先级是4,294,967,295
//$delay 任务延迟执行秒数
//$ttr 允许一个消费者执行该job的秒数
$client->useTube($tube)->put($data, $priority, $delay, $ttr);
Consumer消费任务
ini_set('default_socket_timeout', 24*60*60);$client = Pheanstalk::create($host, $port, $timeout);
$client->watchOnly($tube);
while (true) { //阻塞获取任务 $job = $client->reserve(); if (is_null($job)) { continue; } //设置重新计算ttr$client->touch($job); //获取任务数据$data = $job->getData();//开始执行任务//任务执行逻辑$res = true//结束任务执行 //删除任务 $client->delete($job); if ($res === true) { //任务执行成功,删除任务$client->delete($job); } else { //否则将任务重新放回队列$client->release($job, 1024, 10); }}
default_socket_timeout
这个参数是一定要加的,php 默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s 之内如果没有 job 产生,脚本就会报 socket 错误。
客户端操作类
以下基于pda/pheanstalk依赖包实现的Beanstalk操作类,供参考。。
<?phpnamespace App\Libs;
use Pheanstalk\Pheanstalk;/*** Beanstalk工具类* @since 2020-02-26*/
class Beanstalk {/*** Beanstalk配置信息* @var array*/protected $configs = [];/*** client实例* @var array*/protected $clients = [];/*** 当前连接的服务端的配置名称* @var string*/protected $connection = 'default';/*** 连接超时时间* @var int*/protected $clientTimeOut = 3000;/*** 初始化配置信息* @param array $configs* @return void*/public function __construct(array $configs = []) {$this->configs = $configs;}/*** 添加任务* @param string $tube 队列管道* @param array $parameters* @param int $priority 优先级* @param int $delay 延迟执行时间* @param int $ttr 任务超时时间* @return bool|mixed*/public function addTask($tube, array $parameters,$priority = Pheanstalk::DEFAULT_PRIORITY,$delay = Pheanstalk::DEFAULT_DELAY,$ttr = Pheanstalk::DEFAULT_TTR) {$client = $this->createClient();$stream = serialize($parameters);$client->useTube($tube)->put($stream, $priority, $delay, $ttr);}/*** 创建客户端连接* @return mixed*/public function createClient() {if (! isset($this->clients[$this->connection])) {$client = Pheanstalk::create($this->configs[$this->connection]['host'], $this->configs[$this->connection]['port'], $this->clientTimeOut);$this->clients[$this->connection] = $client;}return $this->clients[$this->connection];}/*** 创建后台工作进程* @param $tube* @param array $service* @return mixed* @author huangweizhang* @throws \Exception*/public function createWorker($tube, array $service) {ini_set('default_socket_timeout', 24*60*60);$client = Pheanstalk::create($this->configs[$this->connection]['host'], $this->configs[$this->connection]['port'], $this->clientTimeOut);$client->watchOnly($tube);while (true) {if (count($service) != 2) {throw new \Exception('parameter service error.');}list($classname, $method) = $service;if (! class_exists($classname)) {throw new \Exception('worker service class not exists.');}if (! method_exists($classname, $method)) {throw new \Exception('worker service method not exists.');}//获取任务$job = $client->reserve();if (is_null($job)) {continue;}$client->touch($job);//获取任务参数$stream = $job->getData();$parameters = unserialize($stream);try {//执行任务$class = new $classname();call_user_func_array(array($class, $method), array($parameters));unset($class);//删除任务$client->delete($job);} catch (\Exception $e) {//任务执行失败操作}}}/*** 设置连接* @param string $connection*/public function setConnection($connection) {$this->connection = $connection;}/*** 设置客户端连接超时时间* @param int $clientTimeOut*/public function setClientTimeOut($clientTimeOut) {$this->clientTimeOut = $clientTimeOut;}
}
使用Beanstalk搭建队列服务相关推荐
- redis5 stream php队列,使用redis stream实现队列服务
1. stream类型 Redis5.0引入了Stream类型.该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于: 消息ID的序列化生成 消息遍历 消息的阻塞和非阻塞读取 消息 ...
- 快速搭建基于beanstalk的php消息队列服务
本项目实现基于beanstalk的php消息队列服务,包括生产与消费消息案例 一.beanstalk介绍与安装:http://kr.github.io/beanstalkd/ 二.php消息队列处理, ...
- beanstalk队列服务for php
beakspeak是一个基于beakstalk队列服务的php-c扩展,高性能支持分布式内存队列服务,且很轻量级. 目前国内还没有比较详细的关于Beanspeak的中文说明,本文是根据源码提炼.整理的 ...
- 五分钟搭建BERT服务,实现1000+QPS,这个Service-Streamer做到了
作者 | 刘欣 简介:刘欣,Meteorix,毕业于华中科技大学,前网易游戏技术总监,现任香侬科技算法架构负责人.之前专注游戏引擎工具架构和自动化领域,2018年在GDC和GoogleIO开源Airt ...
- 阿里云Kubernetes实战2–搭建基础服务
前言: 在系列的第一篇文章中,我已经介绍过如何在阿里云基于kubeasz搭建K8S集群,通过在K8S上部署gitlab并暴露至集群外来演示服务部署与发现的流程.文章写于4月,忙碌了小半年后,我才有时间 ...
- js 连接mysql_搭建node服务(二):操作MySQL
为了进行复杂信息的存储和查询,服务端系统往往需要数据库操作.数据库分为关系型数据库和非关系型数据库,关系型数据库有MySQL.Oracle.SQL Server等,非关系型数据库有Redis(常用来做 ...
- 开源项目 | 五分钟搭建BERT服务,实现1000+QPS
作者丨刘欣 单位丨香侬科技算法架构负责人 研究方向丨NLP工程化.算法平台架构 深度学习模型在训练和测试时,通常使用小批量(mini-batch)的方式将样本组装在一起,这样能充分利用 GPU 的并行 ...
- spring cloud+dotnet core搭建微服务架构:配置中心续(五)
前言 上一章最后讲了,更新配置以后需要重启客户端才能生效,这在实际的场景中是不可取的.由于目前Steeltoe配置的重载只能由客户端发起,没有实现处理程序侦听服务器更改事件,所以还没办法实现彻底实现这 ...
- 【译文】用Spring Cloud和Docker搭建微服务平台
by Kenny Bastani Sunday, July 12, 2015 转自:http://www.kennybastani.com/2015/07/spring-cloud-docker-mi ...
最新文章
- HDU 2955 Robberies
- [转载]如何用关键字优化网站?
- 写未来的电子计算机的畅想两百字,未来科技作文200字
- qt的如何调整显示不为科学记数法_Excel操作技巧:如何将信息快速准确的录入Excel?...
- Python sum函数- Python零基础入门教程
- 线性表11|单链表小结:腾讯面试题 - 数据结构和算法16
- 手机支持html5绘图性能,【高级系列】Canvas绘制性能专题
- 【笔试/面试】SQL 经典面试题
- Mac电脑不能识别文本和命令的解决方法
- 伪原创文章生成器-自媒体洗稿工具-关键词文章生成工具免费
- ubuntu14.04 安装 GTX 1060 显卡驱动
- 物理层-宽带接入技术
- 设置VC工程为Debug或Releas版本的方法
- 什么是雪花算法?啥原理?
- 最全最精准的IE浏览器判断和国内套壳浏览器判断(360,QQ,搜狗,百度等)
- 【Unity Shader学习笔记】(五)使用鼠标绘制自由多边形(附完整工程源码)
- 用Rust实现区块链 - 3 持久化
- ECl@SS学习笔记(1)
- PHP报错:Declaration of ... should be compatible with ... 的解决方法
- Java综合实践——学生成绩查询系统
热门文章
- 第十一周项目训练11 教师兼干部
- android Tombstone 流程
- 如何安全更改Windows 10用户文件夹名称
- Leveldb源码分析--3
- 计算机开机后无法正常启动,且发出持续警报声,可能是什么原因?,电脑开机滴滴滴响三声是什么原因...
- 《如何阅读一本书》读后感-1
- 4.11 使用通道混合器命令修复偏色照片 [原创Ps教程]
- Unity3D_(游戏)跳一跳超简单制作过程
- CNopendata机场信息数据简介
- clearInterval() 函数详解