转载 kestrel php 消息队列

We've been using Twitter's kestrel queue server for a while now at work, but only from our service layer, which is written in python.? Now that we have some queueing needs from our application layer, written in PHP, I spent a few days this week adding queue support to our web application.? I thought I'd share what I learned, and how I implemented it.

Goals

The kestrel server itself was pretty straightforward to get up and running.? The only thing I would point out is that I recommend sticking to release branches, as master was fairly unstable when I tried to use it.? Regarding implementing the client, there were a few goals I had in mind when I started:

??? Since kestrel is built on the memcache protocol, try and leverage an existing memcache client rather than build one from scratch

??? Utilize our existing batch job infrastructure, which I covered previously here, and make sure our multi-tenant needs are met

??? Keep the queue interface generic in case we change queue servers later

??? Utilize existing kestrel management tools, only build out the the functionality we need

With these goals in mind, I ended up with 4 components: a kestrel client, a producer, a consumer, and a very small CLI harness for running the consumer.? But before I even coded anything, I set up kestrel web, a web UI for kestrel written by my co-worker Matt Erkkila.? Kestrel web allows you to view statistics on kestrel, manage queues, as well as sort and filter queues based on manual inputs.? Having this tool up and running from the get go made it easy to watch jobs get added and consumed from my test queue, and also easily flush out the queues as needed.

The Kestrel Client

I couldn't find any existing kestrel clients for PHP, so I started looking at the two memcache extensions: the older memcache, and Andrei Zmievski's memcached, the latter of which is based on the libmemcached library.? I started with memcache, and while it worked fine initially, I quickly found that I could not modify timeouts.? This interfered with the way? kestrel recommends you poll it for new jobs, and I would see timeout errors from the memcache extension if you tried to set the poll timeout to 1 second or higher (the memcache default).? The memcached extension does not have these issues, so I went with it.

The first gotcha I ran into was serialization.? You can use memcached's serializer for writing to kestrel, but when it reads the data back, it doesn't recognize that it is serialized.? So I just serialize the data manually in my client, and things work fine. One other thing to note is that you'll want to disable compression, or do it manually, as the memcached extension will automatically compress anything over 100 bytes by default, and will not decompress it when reading from kestrel.

The other issue is that if you want to use any custom kestrel commands, you can't.? Since the application layer doesn't need anything fancy, the memcached extension will work fine for it.? Once we need support for the upcoming monitor (batching) in kestrel 2, we may need to implement a kestrel client from scratch.? Kestrel web supplies everything else we need right now.

Once the decision was made to use memcached, I wrote a light decorator for it, EC_KestrelClient.? This handles instantiation of the memcached client, serialization, and helpers for some kestrel specific options to the GET command.? It also has support for passing memcached specific options through it.? The class ended up looking like this:

?

* @copyright 2010-2011 Empower Campaigns

*/

class EC_KestrelClient

{

/**

* The Memcached instance

*

* @var Memcached

*/

protected $_memcached = null;

/**

* The Kestrel server IP

*

* @var string

*/

protected $_host = '127.0.0.1';

/**

* The Kestrel server port

*

* @var string

*/

protected $_port = 22133;

/**

* Optional options, not currently used

*

* @var array

*/

protected $_options = array();

/**

* Sets the host, port, and options to be used

*

* @param string $host The host to use, defaults to 127.0.0.1

* @param int $port The port to use, defaults to 22133

* @param array $options Memcached options, not currently used

*

* @return void

*/

public function __construct(

$host = '127.0.0.1', $port = 22133, array $options = array()

)

{

$this->_host = $host;

$this->_port = $port;

$this->setOptions($options);

}

/**

* Sets job data on the queue, json_encoding the value to avoid problematic

* serialization.

*

* @param string $queue The queue name

* @param mixed $data The data to store

*

* @return bool

*/

public function set($queue, $data)

{

// Local json serialization, as kestrel doesn't send serialization flags

return $this->getMemcached()->set($queue, json_encode($data));

}

/**

* Reliably read an item off of the queue. Meant to be run in a loop, and

* call closeReliableRead() when done to make sure the final job is not left

* on the queue.

*

* @param mixed $queue The queue name to read from

* @param int $timeout The timeout to wait for a job to appear

*

* @return array|false

* @see closeReliableRead()

*/

public function reliableRead($queue, $timeout = 1000)

{

$queue = $queue . '/close/open/t=' . $timeout;

$result = $this->getMemcached()->get($queue);

if ($result === false) {

return $result;

}

// Local json serialization, as kestrel doesn't send serialization flags

return json_decode($result, true);

}

/**

* Closes any existing open read

*

* @param string $queue The queue name

*

* @return false

*/

public function closeReliableRead($queue)

{

$queue = $queue . '/close';

return $this->getMemcached()->get($queue);

}

/**

* Aborts an existing reliable read

*

* @param string $queue The queue name

*

* @return false

*/

public function abortReliableRead($queue)

{

$queue = $queue . '/abort';

return $this->getMemcached()->get($queue);

}

/**

* Set an option to be used with the Memcached client. Not used.

*

* @param string $name The option name

* @param value $value The option value

*

* @return void

*/

public function setOption($name, $value)

{

$this->_options[$name] = $value;

}

/**

* Sets multiple options

*

* @param array $options Array of key/values to set

*

* @return void

*/

public function setOptions(array $options)

{

foreach ($options as $name => $value) {

$this->setOption($name, $value);

}

}

/**

* Gets a current option's value

*

* @param string $name The option name

*

* @return mixed

*/

public function getOption($name)

{

if (isset($this->_options[$name])) {

return $this->_options[$name];

}

return null;

}

/**

* Gets all current options

*

* @return array

*/

public function getOptions()

{

return $this->_options;

}

/**

* Gets a singleton instance of the Memcached client

*

* @return Memcached

*/

public function getMemcached()

{

if ($this->_memcached === null) {

$this->_initMemcached();

}

return $this->_memcached;

}

/**

* Initialized the Memcached client instance

*

* @return void

*/

protected function _initMemcached()

{

$this->_memcached = $this->_getMemcachedInstance();

foreach ($this->_options as $option => $value) {

$this->_memcached->setOption($option, $value);

}

$this->_memcached->addServer($this->_host, $this->_port);

$this->_memcached->setOption(Memcached::OPT_COMPRESSION, false);

}

// @codeCoverageIgnoreStart

/**

* Returns a new instance of Memcached. Abstracted for testing.

*

* @return Memcached

*/

protected function _getMemcachedInstance()

{

return new Memcached();

}

// @codeCoverageIgnoreEnd

}

?

?

view raw EC_KestrelClient.php This Gist brought to you by GitHub.

The Producer

The producer is very simple.? It just formats the data into a standard structure, including current tenant information, namespaces the queue so it doesn't collide with other projects, and adds it to the queue.? The producer looks like this:

?

* @copyright 2010-2011 Empower Campaigns

*/

class EC_Producer

{

/**

* Adds a job onto a queue

*

* @param string $queue The queue name to add a job to

* @param string $jobName The job name for the consumer to run

* @param mixed $data Optional additional data to pass to the job

*

* @return bool

*/

public function addJob($queue, $jobName, $data = null)

{

$item = array(

'instance' => EC::getCurrentInstanceName(),

'jobName' => $jobName

);

if ($data !== null) {

$item['data'] = $data;

}

// Namespace queue with project

$queue = 'enterprise_' . $queue;

$client = $this->_getKestrelClient();

return $client->set($queue, $item);

}

// @codeCoverageIgnoreStart

/**

* Gets a single instance of EC_KestrelClient. Abstracted for testing.

*

* @return void

*/

protected function _getKestrelClient()

{

if (APPLICATION_ENV === 'testing') {

throw new Exception(__METHOD__ . ' was not mocked when testing');

}

static $client = null;

if ($client === null) {

$host = EC::getConfigOption('kestrel.host');

$port = EC::getConfigOption('kestrel.port');

$client = new EC_KestrelClient($host, $port);

}

return $client;

}

// @codeCoverageIgnoreEnd

}

?

?

?

view raw EC_Producer.php This Gist brought to you by GitHub.

The Consumer

The consumer has a bit more to it, though still pretty straightforward.? It's intended to be run from a monitoring tool like daemontools or supervisord, so there is a very small CLI harness that just passes the CLI arguments into EC_Consumer and runs it.? After parsing the CLI arguments, EC_Consumer polls kestrel for new jobs, and runs them through our standard batch job infrastructure.? Until we have more confidence in PHP's long running process ability, I added an optional maxium jobs argument, which will stop the consumer from processing more than X jobs and then terminate.? The monitoring service (supervisord) will then just restart it in a matter of seconds.? I also added an optional debug argument for testing, so you can see every action as it happens.? The CLI harness looks like this:

?

#!/bin/env php

run();

?

view raw consumer_cli.php This Gist brought to you by GitHub.

And the main consumer class, EC_Consumer, looks something like this:

* @copyright 2010-2011 Empower Campaigns

*/

class EC_Consumer

{

/**

* Instance of {@link Zend_Console_Getopt}

*

* @var Zend_Console_Getopt

*/

protected $_opt = null;

/**

* Which APPLICATION_ENV to run under (see -e)

*

* @var string

*/

protected $_environment = null;

/**

* The kestrel server IP

*

* @var string

*/

protected $_host = null;

/**

* The kestrel server port

*

* @var int

*/

protected $_port = null;

/**

* The kestrel queue name to connect to

*

* @var string

*/

protected $_queue = null;

/**

* Whether we should show debug output

*

* @var bool

*/

protected $_debug = false;

/**

* Maximum # of jobs for this process to perform (for memory fail safe)

*

* @var int

*/

protected $_maxJobs = null;

/**

* Current job count

*

* @var int

*/

protected $_jobCount = 0;

/**

* Parses arguments from the command line and does error handling

*

* @param array $argv The $argv from bin/ecli.php

*

* @throw Zend_Console_Getopt_Exception on failure

* @return void

*/

public function __construct(array $argv)

{

try {

$opt = new Zend_Console_Getopt(

array(

'environment|e=s' => 'environment name (e.g. development)'

. ', required',

'server|s=s' => 'kestrel server, format of host:port'

. ', required',

'queue|q=s' => 'queue name (e.g. crawler_campaign)'

. ', required',

'max-jobs|m=s' => 'max jobs to run before exiting'

. ', optional',

'debug|d' => 'show debug output'

. ', optional',

)

);

$opt->setArguments($argv);

$opt->parse();

// Set environment

if ($opt->e === null) {

throw new Zend_Console_Getopt_Exception(

'Error: missing environment'

);

}

$this->_environment = $opt->e;

// @codeCoverageIgnoreStart

if (!defined('APPLICATION_ENV')) {

define('APPLICATION_ENV', $this->_environment);

}

// @codeCoverageIgnoreEnd

// Set server

if ($opt->s === null) {

throw new Zend_Console_Getopt_Exception(

'Error: missing server'

);

}

$parts = explode(':', $opt->s);

if (count($parts) !== 2) {

throw new Zend_Console_Getopt_Exception(

'Error: invalid server: ' . $opt->s

);

}

$this->_host = $parts[0];

$this->_port = $parts[1];

// Set queue

if ($opt->q === null) {

throw new Zend_Console_Getopt_Exception(

'Error: missing queue'

);

}

$this->_queue = $opt->q;

// Set max-jobs

if ($opt->m !== null) {

$this->_maxJobs = $opt->m;

}

// Set debug

if ($opt->d !== null) {

$this->_debug = true;

}

} catch (Zend_Console_Getopt_Exception $e) {

echo "\n" . $e->getMessage() . "\n\n";

echo $opt->getUsageMessage();

// @codeCoverageIgnoreStart

if (!defined('APPLICATION_ENV') || APPLICATION_ENV !== 'testing') {

exit(1);

}

// @codeCoverageIgnoreEnd

}

$this->_opt = $opt;

}

/**

* Polls the queue server for jobs and runs them as they come in

*

* @return void

*/

public function run()

{

$client = $this->_getKestrelClient();

$queue = 'enterprise_' . $this->_queue;

while ($this->_keepRunning()) {

// Pull job from queue

$job = $client->reliableRead($queue, 500);

if ($job === false) {

$this->_debug('Nothing on queue ' . $queue);

continue;

}

if (!isset($job['instance'])) {

echo 'Instance not set in queue job: ' . print_r($job, true);

continue;

}

$instance = $job['instance'];

if (!isset($job['jobName'])) {

echo 'Job name not set in queue job: ' . print_r($job, true);

continue;

}

$jobName = $job['jobName'];

$data = null;

if (isset($job['data'])) {

$data = $job['data'];

}

// Run the job

$returnCode = $this->runJob($instance, $jobName, $data);

if ($returnCode !== 0) {

$client->abortReliableRead($queue);

continue;

}

}

$client->closeReliableRead($queue);

}

/**

* Runs the job via bin/ecli.php

*

* @param string $instance The instance name to run the job under

* @param string $jobName The job name

* @param string $data Optional extra data

*

* @return int

*/

public function runJob($instance, $jobName, $data)

{

$cmd = BASE_PATH . '/bin/ecli.php '

. '-e ' . $this->_environment

. ' -i ' . $instance

. ' -j ' . $jobName;

if ($data) {

$cmd .= " '" . base64_encode(json_encode($data)) . "'";

}

$returnCode = $this->_passthru($cmd);

$this->_jobCount++;

$this->_debug('Job count: ' . $this->_jobCount);

return $returnCode;

}

/**

* Check to see if the job limit has been reached

*

* @return bool

*/

protected function _keepRunning()

{

return ($this->_maxJobs === null) ? true

: ($this->_jobCount < $this->_maxJobs);

}

/**

* Show debug messages

*

* @param mixed $message

*

* @return void

*/

protected function _debug($message)

{

if (!$this->_debug) {

return;

}

echo $message . "\n";

}

// @codeCoverageIgnoreStart

/**

* Calls the passthru() function and returns the exit code. Abstracted

* for testing.

*

* @param string $cmd The command to execute

*

* @return int

*/

protected function _passthru($cmd)

{

passthru($cmd, $returnCode);

return $returnCode;

}

/**

* Gets a single instance of EC_KestrelClient. Abstracted for testing.

*

* @return void

*/

protected function _getKestrelClient()

{

if (APPLICATION_ENV === 'testing') {

throw new Exception(__METHOD__ . ' was not mocked when testing');

}

return new EC_KestrelClient($this->_host, $this->_port);

}

// @codeCoverageIgnoreEnd

}

?

?

view raw EC_Consumer.php This Gist brought to you by GitHub.

Putting it together

Now that all the pieces are put together, let's take a look at in action. Adding example job "HelloWorld" to the queue "hello_world" from within our application looks something like this:

addJob('hello_world', 'HelloWorld', array('foo' => 'bar'));

?>

view raw gistfile1.php This Gist brought to you by GitHub.

?

?

And finally, here's an example of running the consumer from the CLI harness, along with some example debug output of processing the job:

./bin/consumer_cli.php -e development -s 127.0.0.1:22133 -q hello_world -d -m 2

Nothing on queue enterprise_hello_world

Nothing on queue enterprise_hello_world

Nothing on queue enterprise_hello_world

Nothing on queue enterprise_hello_world

Running EC_Job_HelloWorld on instance dev under environment development

Hello, world! Here is my data array:

stdClass Object

(

??? [foo] => bar

)

And here are my args: ./bin/ecli.php eyJmb28iOiJiYXIifQ==

Completed job in 0 seconds.

Job count: 1

Nothing on queue enterprise_hello_world

Nothing on queue enterprise_hello_world

Nothing on queue enterprise_hello_world

Nothing on queue enterprise_hello_world

Running EC_Job_HelloWorld on instance dev under environment development

Hello, world! Here is my data array:

stdClass Object

(

??? [foo] => bar

)

And here are my args: ./bin/ecli.php eyJmb28iOiJiYXIifQ==

Completed job in 0 seconds.

Job count: 2

view raw example.txt This Gist brought to you by GitHub.

That's it! I'd be interested to hear how other folks are interfacing with kestrel from PHP.

相关文章

相关视频

php kestrel,转载 kestrel php 讯息队列相关推荐

  1. 【转载】deque双向队列

    继vector和queue之后,又发现一个很好用的东西. 本篇转载自http://blog.csdn.net/morewindows/article/details/6946811 deque双向队列 ...

  2. Kestrel简介_Kestrel Web 服务器简介

    Kestrel简介_Kestrel Web 服务器简介 一.Kestrel简介 Kestrel 是一个跨平台的适用于 Kestrel. Kestrel 是包含在 ASP.NET Core 项目模板中的 ...

  3. 征服 Kestrel

    因为要面对高并发PUSH需求,考虑将其按队列方式实现,最终选型Kestrel. 至于Kestrel: 基于Scala语言的Twitter开源消息中间件 高性能(TPS 6000不成问题).小巧(2K行 ...

  4. HTTPSQS jafka kestrel beanstalkd

    HTTPSQS: 编译服务端 wget http://httpsqs.googlecode.com/files/libevent-2.0.12-stable.tar.gz tar zxvf libev ...

  5. ASP.NETCore的Kestrel服务器

    什么是Kestrel服务器 Kestrel是开源的(GitHub提供的源代码),事件驱动的异步I / O服务器,用于在任何平台上托管ASP.NET应用程序.这是一个监听服务器和一个命令行界面.您将侦听 ...

  6. ASPNET Core 2.x中的Kestrel服务器

    Kestrel是一个基于libuv的跨平台ASP.NET Core web服务器,libuv是一个跨平台的异步I/O库.ASP.NET Core模板项目使用Kestrel作为默认的web服务器. Ke ...

  7. 在.NET 6.0上使用Kestrel配置和自定义HTTPS

    本章是<定制ASP NET 6.0框架系列文章>的第四篇.在本章,我们将学习ASP.NET Core的Kestrel配置和自定义HTTPS,好我们开始正文. 在ASP.NET Core中, ...

  8. ASP.NET Core 托管和部署(一)【Kestrel】

    ASP.NET Core 中的 Kestrel Web 服务器实现 ASP.NET Core 中的 Kestrel Web 服务器实现 Kestrel 是一个跨平台的适用于 ASP.NET Core ...

  9. Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析

    转载自  Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析 Java中的阻塞队列接口BlockingQueue继承自Queue接口. Block ...

最新文章

  1. spring-gateway(一)Reactor编程基础
  2. Activiti之 Exclusive Gateway
  3. 物料变式的订货型生产(3.0C:可库存的类型)(26)
  4. mysql的查询语句怎么优化_MySQL查询语句如何优化
  5. opengl加载显示3D模型BVH类型文件
  6. php投票系统报告,投票系统设计
  7. pycharm中设置显示行数
  8. [转载] python numpy 子数组_Python学习笔记3:Numpy入门
  9. 文本区 JTextArea 的使用
  10. Atitit 多继承实现解决方案 java c#
  11. 傅里叶光学随机散斑原理 matlab仿真实现随机散斑
  12. 【FPGA入门一】一个简单的LED流水灯
  13. 循环输出100以内的素数
  14. GreemPlum6.7.1 Centos7部署文档
  15. 共享充电宝为啥能够盈利
  16. 网络使用工具HttpWatch的使用方法
  17. WEB - 作业(1)
  18. 数据库与身份认证:在项目中操作 MySQL
  19. μC/OS-II或III移植时keil里面的文件有个金黄色小钥匙的原因
  20. D90四种对焦点模式

热门文章

  1. 电容三点式LC振荡器电路组成及工作原理简述
  2. 智能网联先导区道路交叉口车路协同系统设计
  3. php mkdir 失败原因,php mkdir 失败怎么办
  4. Oracle、Mysql数据库编程开发基本操作命令语法脚本_基础篇(入门级)
  5. 搭建hadoop3.x报错 Permission denied (publickey,gssapi-keyex,gssapi-with-mic,password).
  6. ubuntu下编写C语言程序
  7. UML类图---类与类图的解析
  8. 输出n个格子需要的麦粒数
  9. Nokia手机工程命令
  10. 计算机物联网应用技术工资,物联网专业毕业生现状 刚毕业薪资多少