使用Beanstalk搭建队列服务

Beanstalkd介绍

一个高性能、轻量级的分布式内存队列系统。高性能离不开异步,异步离不开队列,而其内部都是Producer-Consumer模式的原理。

组成部分

组件 说明
管道(tube) 一个有名称的任务队列,用来存储统一类型的job,是producer和consumer的操作对象
任务(job) 一个需要异步处理的任务,需要放在tube中
生产者(producer) job的生产者,通过put命令来将一个job放到一个tube中
消费者(consumer) job的消费者,通过reserve、release、bury、delete命令来获取job或改变job的状态

特性

  • 优先级: 可以设置任务的优先级
  • 延迟: 设置任务多少秒后才允许被消费者使用
  • 持久化: 定时刷新数据到文件,服务器挂掉后数据依旧存在
  • 超时控制: 消费者必须在指定时间内完成任务,否则就会重新放入管道任务预留:消费者先暂时跳过任务不处理
  • 分布式容错: 分布式设计和Memcached类似,beanstalkd各个server之间并不知道彼此的存在,都是通过client来实现分布式以及根据tube名称去特定server获取job。

任务状态

状态 说明
ready 已经准备好的任务,可以给消费者获取
delayed 延迟执行的任务,设置时候设置了延迟时间
reserved 已被消费者获取,正在执行的任务,Beanstalkd服务负责检查任务是否 在TTR(time-to-run)内完成
buried 保留的任务,任务不会被执行,也不会消失
delete 消息被彻底删除,Beanstalkd不再维护这些消息

适用场景

  • 用作延时队列: 比如可以用于如果用户30分钟内不操作,任务关闭。
  • 用作循环队列: 用release命令可以循环执行任务,比如可以做负载均衡任务分发。
  • 用作兜底机制: 比如一个请求有失败的概率,可以用Beanstalk不断重试,设定超时时间,时间内尝试到成功为止。
  • 用作定时任务: 比如可以用于专门的后台任务。
  • 用作异步操作: 这是所有消息队列都最常用的,先将任务仍进去,顺序执行。

服务安装

  1. 下载beanstalkd-1.11
wget https://codeload.github.com/beanstalkd/beanstalkd/tar.gz/v1.11
  1. 安装
tar xzvf beanstalkd-1.11.tar.gz
cd  beanstalkd-1.11
make & make install
beanstalkd -v
  1. 启动服务
beanstalkd -l 0.0.0.0 -p 11300 -b /log/beanstalkd/binlog -F

队列应用(PHP)

composer安装 Pheanstalk 类库

//PHP版本要求 7.1+
composer require pda/pheanstalk:~4.0

执行composer后,在项目composer.json配置文件中将增加pda/pheanstalk依赖包

"require": {"pda/pheanstalk": "~4.0"
}

Producer添加任务

//创建实例
$client = Pheanstalk::create($host, $port, $timeout);//设置使用的tube,添加任务数据
//$data 任务数据
//$priority 任务优先级.小优先级数值的job将会排在大优先级 数值的job前面执行。
//最高优先级是0,最低优先级是4,294,967,295
//$delay 任务延迟执行秒数
//$ttr 允许一个消费者执行该job的秒数
$client->useTube($tube)->put($data, $priority, $delay, $ttr);

Consumer消费任务

ini_set('default_socket_timeout', 24*60*60);$client = Pheanstalk::create($host, $port, $timeout);
$client->watchOnly($tube);
while (true) {    //阻塞获取任务    $job = $client->reserve();   if (is_null($job)) {        continue;   }    //设置重新计算ttr$client->touch($job);    //获取任务数据$data = $job->getData();//开始执行任务//任务执行逻辑$res = true//结束任务执行 //删除任务    $client->delete($job); if ($res === true) { //任务执行成功,删除任务$client->delete($job);    } else {        //否则将任务重新放回队列$client->release($job, 1024, 10);   }}

default_socket_timeout 这个参数是一定要加的,php 默认一般是 60s,假如您没有在代码里面设置,采用默认的话(60s),60s 之内如果没有 job 产生,脚本就会报 socket 错误。

客户端操作类

以下基于pda/pheanstalk依赖包实现的Beanstalk操作类,供参考。。

<?phpnamespace App\Libs;
use Pheanstalk\Pheanstalk;/*** Beanstalk工具类* @since 2020-02-26*/
class Beanstalk {/*** Beanstalk配置信息* @var array*/protected $configs = [];/*** client实例* @var array*/protected $clients = [];/*** 当前连接的服务端的配置名称* @var string*/protected $connection = 'default';/*** 连接超时时间* @var int*/protected $clientTimeOut = 3000;/*** 初始化配置信息* @param array $configs* @return void*/public function __construct(array $configs = []) {$this->configs = $configs;}/*** 添加任务* @param string $tube 队列管道* @param array $parameters* @param int $priority 优先级* @param int $delay 延迟执行时间* @param int $ttr 任务超时时间* @return bool|mixed*/public function addTask($tube, array $parameters,$priority = Pheanstalk::DEFAULT_PRIORITY,$delay = Pheanstalk::DEFAULT_DELAY,$ttr = Pheanstalk::DEFAULT_TTR) {$client = $this->createClient();$stream = serialize($parameters);$client->useTube($tube)->put($stream, $priority, $delay, $ttr);}/*** 创建客户端连接* @return mixed*/public function createClient() {if (! isset($this->clients[$this->connection])) {$client = Pheanstalk::create($this->configs[$this->connection]['host'], $this->configs[$this->connection]['port'], $this->clientTimeOut);$this->clients[$this->connection] = $client;}return $this->clients[$this->connection];}/*** 创建后台工作进程* @param $tube* @param array $service* @return mixed* @author huangweizhang* @throws \Exception*/public function createWorker($tube, array $service) {ini_set('default_socket_timeout', 24*60*60);$client = Pheanstalk::create($this->configs[$this->connection]['host'], $this->configs[$this->connection]['port'], $this->clientTimeOut);$client->watchOnly($tube);while (true) {if (count($service) != 2) {throw new \Exception('parameter service error.');}list($classname, $method) = $service;if (! class_exists($classname)) {throw new \Exception('worker service class not exists.');}if (! method_exists($classname, $method)) {throw new \Exception('worker service method not exists.');}//获取任务$job = $client->reserve();if (is_null($job)) {continue;}$client->touch($job);//获取任务参数$stream = $job->getData();$parameters = unserialize($stream);try {//执行任务$class = new $classname();call_user_func_array(array($class, $method), array($parameters));unset($class);//删除任务$client->delete($job);} catch (\Exception $e) {//任务执行失败操作}}}/*** 设置连接* @param string $connection*/public function setConnection($connection) {$this->connection = $connection;}/*** 设置客户端连接超时时间* @param int $clientTimeOut*/public function setClientTimeOut($clientTimeOut) {$this->clientTimeOut = $clientTimeOut;}
}

使用Beanstalk搭建队列服务相关推荐

  1. redis5 stream php队列,使用redis stream实现队列服务

    1. stream类型 Redis5.0引入了Stream类型.该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于: 消息ID的序列化生成 消息遍历 消息的阻塞和非阻塞读取 消息 ...

  2. 快速搭建基于beanstalk的php消息队列服务

    本项目实现基于beanstalk的php消息队列服务,包括生产与消费消息案例 一.beanstalk介绍与安装:http://kr.github.io/beanstalkd/ 二.php消息队列处理, ...

  3. beanstalk队列服务for php

    beakspeak是一个基于beakstalk队列服务的php-c扩展,高性能支持分布式内存队列服务,且很轻量级. 目前国内还没有比较详细的关于Beanspeak的中文说明,本文是根据源码提炼.整理的 ...

  4. 五分钟搭建BERT服务,实现1000+QPS​,这个Service-Streamer做到了

    作者 | 刘欣 简介:刘欣,Meteorix,毕业于华中科技大学,前网易游戏技术总监,现任香侬科技算法架构负责人.之前专注游戏引擎工具架构和自动化领域,2018年在GDC和GoogleIO开源Airt ...

  5. 阿里云Kubernetes实战2–搭建基础服务

    前言: 在系列的第一篇文章中,我已经介绍过如何在阿里云基于kubeasz搭建K8S集群,通过在K8S上部署gitlab并暴露至集群外来演示服务部署与发现的流程.文章写于4月,忙碌了小半年后,我才有时间 ...

  6. js 连接mysql_搭建node服务(二):操作MySQL

    为了进行复杂信息的存储和查询,服务端系统往往需要数据库操作.数据库分为关系型数据库和非关系型数据库,关系型数据库有MySQL.Oracle.SQL Server等,非关系型数据库有Redis(常用来做 ...

  7. 开源项目 | 五分钟搭建BERT服务,实现1000+QPS

    作者丨刘欣 单位丨香侬科技算法架构负责人 研究方向丨NLP工程化.算法平台架构 深度学习模型在训练和测试时,通常使用小批量(mini-batch)的方式将样本组装在一起,这样能充分利用 GPU 的并行 ...

  8. spring cloud+dotnet core搭建微服务架构:配置中心续(五)

    前言 上一章最后讲了,更新配置以后需要重启客户端才能生效,这在实际的场景中是不可取的.由于目前Steeltoe配置的重载只能由客户端发起,没有实现处理程序侦听服务器更改事件,所以还没办法实现彻底实现这 ...

  9. 【译文】用Spring Cloud和Docker搭建微服务平台

    by Kenny Bastani Sunday, July 12, 2015 转自:http://www.kennybastani.com/2015/07/spring-cloud-docker-mi ...

最新文章

  1. HDU 2955 Robberies
  2. [转载]如何用关键字优化网站?
  3. 写未来的电子计算机的畅想两百字,未来科技作文200字
  4. qt的如何调整显示不为科学记数法_Excel操作技巧:如何将信息快速准确的录入Excel?...
  5. Python sum函数- Python零基础入门教程
  6. 线性表11|单链表小结:腾讯面试题 - 数据结构和算法16
  7. 手机支持html5绘图性能,【高级系列】Canvas绘制性能专题
  8. 【笔试/面试】SQL 经典面试题
  9. Mac电脑不能识别文本和命令的解决方法
  10. 伪原创文章生成器-自媒体洗稿工具-关键词文章生成工具免费
  11. ubuntu14.04 安装 GTX 1060 显卡驱动
  12. 物理层-宽带接入技术
  13. 设置VC工程为Debug或Releas版本的方法
  14. 什么是雪花算法?啥原理?
  15. 最全最精准的IE浏览器判断和国内套壳浏览器判断(360,QQ,搜狗,百度等)
  16. 【Unity Shader学习笔记】(五)使用鼠标绘制自由多边形(附完整工程源码)
  17. 用Rust实现区块链 - 3 持久化
  18. ECl@SS学习笔记(1)
  19. PHP报错:Declaration of ... should be compatible with ... 的解决方法
  20. Java综合实践——学生成绩查询系统

热门文章

  1. 第十一周项目训练11 教师兼干部
  2. android Tombstone 流程
  3. 如何安全更改Windows 10用户文件夹名称
  4. Leveldb源码分析--3
  5. 计算机开机后无法正常启动,且发出持续警报声,可能是什么原因?,电脑开机滴滴滴响三声是什么原因...
  6. 《如何阅读一本书》读后感-1
  7. 4.11 使用通道混合器命令修复偏色照片 [原创Ps教程]
  8. Unity3D_(游戏)跳一跳超简单制作过程
  9. CNopendata机场信息数据简介
  10. clearInterval() 函数详解