一 "Hello World!"

生产者:

/*

* php G:\wamp\www\mygedu\yii tools/send-mq msg*/

public function actionSendMq($argv=''){

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

$msg = new AMQPMessage($argv);

$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent '$argv'".PHP_EOL;

$channel->close();

$connection->close();

}

消费者:

/*

* php G:\wamp\www\mygedu\yii tools/receive-mq

*/

public function actionReceiveMq(){

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg) {

echo " [x] Received ", $msg->body, "\n";

};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {

$channel->wait();

}

}

二 Work queues

生产者:

/*

* php G:\wamp\www\mygedu\yii tools/new-task msg

*/

public function actionNewTask($data='Hello World!'){

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->queue_declare('my_task_queue', false, true, false, false);

$msg = new AMQPMessage($data,

array('delivery_mode' => 2) # make message persistent

);

$channel->basic_publish($msg, '', 'my_task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();

$connection->close();

}

消费者:

/*

* php G:\wamp\www\mygedu\yii tools/worker

*/

public function actionWorker(){

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->queue_declare('my_task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){

echo " [x] Received ", $msg->body, "\n";

sleep(substr_count($msg->body, '.'));

echo " [x] Done", "\n";

$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

};

$channel->basic_qos(null, 1, null);

$channel->basic_consume('my_task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {

$channel->wait();

}

$channel->close();

$connection->close();

}

三 Publish/Subscribe

生产者:

/*

* php G:\wamp\www\mygedu\yii tools/emit-log msg

*/

public function actionEmitLog($data='Hello World!'){

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

if(empty($data)) $data = "info: Hello World!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'logs');

echo " [x] Sent ", $data, "\n";

$channel->close();

$connection->close();

}

消费者:

/*

* php G:\wamp\www\mygedu\yii tools/receive-logs

*/

public function actionReceiveLogs(){

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, 'logs');

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){

echo ' [x] ', $msg->body, "\n";

};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {

$channel->wait();

}

$channel->close();

$connection->close();

}

四 Routing

生产者:

/*

* php G:\wamp\www\mygedu\yii tools/emit-log-direct info msg

*/

public function actionEmitLogDirect($argv, $data='Hello World!'){

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

$severity = isset($argv) && !empty($argv) ? $argv : 'info';

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'direct_logs', $severity);

echo " [x] Sent ",$severity,':',$data," \n";

$channel->close();

$connection->close();

}

消费者:

/*

* php G:\wamp\www\mygedu\yii tools/receive-logs-direct info,warning,error

*/

public function actionReceiveLogsDirect($argv){

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = explode(',', $argv);

if(empty($severities)) {

file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");

exit(1);

}

foreach($severities as $severity) {

$channel->queue_bind($queue_name, 'direct_logs', $severity);

}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){

echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";

};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {

$channel->wait();

}

$channel->close();

$connection->close();

}

五 Topics

生产者:

/*

* php G:\wamp\www\mygedu\yii tools/topics-emit-log-direct info msg

*/

public function actionTopicsEmitLogDirect($routing_key='kern.critical', $data='Hello World!'){

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo " [x] Sent ",$routing_key,':',$data," \n";

$channel->close();

$connection->close();

}

消费者:

/*

* php G:\wamp\www\mygedu\yii tools/topics-receive-logs-direct info,warning,error

*/

public function actionTopicsReceiveLogsDirect($binding_keys=''){

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = explode(',', $binding_keys);

if( empty($binding_keys )) {

file_put_contents('php://stderr', "Usage: $binding_keys\n");

exit(1);

}

foreach($binding_keys as $binding_key) {

$channel->queue_bind($queue_name, 'topic_logs', $binding_key);

}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){

echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";

};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {

$channel->wait();

}

$channel->close();

$connection->close();

}

六 RPC

生产者:

/*

* php G:\wamp\www\mygedu\yii tools/rpc-client 10

*/

public function actionRpcClient($fib=10){

$fibonacci_rpc = new FibonacciRpcClient();

$response = $fibonacci_rpc->call($fib);

echo " [.] Got ", $response, "\n";

}

消费者:

/*

* php G:\wamp\www\mygedu\yii tools/rpc-server

*/

public function actionRpcServer($routing_key='kern.critical', $data='Hello World!'){

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->queue_declare('rpc_queue', false, false, false, false);

function fib($n) {

if ($n == 0)

return 0;

if ($n == 1)

return 1;

return fib($n-1) + fib($n-2);

}

echo " [x] Awaiting RPC requests\n";

$callback = function($req) {

$n = intval($req->body);

echo " [.] fib(", $n, ")\n";

$msg = new AMQPMessage(

(string) fib($n),

array('correlation_id' => $req->get('correlation_id'))

);

$req->delivery_info['channel']->basic_publish(

$msg, '', $req->get('reply_to'));

$req->delivery_info['channel']->basic_ack(

$req->delivery_info['delivery_tag']);

};

$channel->basic_qos(null, 1, null);

$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {

$channel->wait();

}

$channel->close();

$connection->close();

}

相关类:

class FibonacciRpcClient {

private $connection;

private $channel;

private $callback_queue;

private $response;

private $corr_id;

public function __construct() {

$this->connection = new AMQPStreamConnection(

'localhost', 5672, 'guest', 'guest');

$this->channel = $this->connection->channel();

list($this->callback_queue, ,) = $this->channel->queue_declare(

"", false, false, true, false);

$this->channel->basic_consume(

$this->callback_queue, '', false, false, false, false,

array($this, 'on_response'));

}

public function on_response($rep) {

if($rep->get('correlation_id') == $this->corr_id) {

$this->response = $rep->body;

}

}

public function call($n) {

$this->response = null;

$this->corr_id = uniqid();

$msg = new AMQPMessage(

(string) $n,

array('correlation_id' => $this->corr_id,

'reply_to' => $this->callback_queue)

);

$this->channel->basic_publish($msg, '', 'rpc_queue');

while(!$this->response) {

$this->channel->wait();

}

return intval($this->response);

}

}

php basic publish,RabbitMQ入门(PHP语言描述)相关推荐

  1. RabbitMQ入门:发布/订阅(Publish/Subscribe)

    在前面的两篇博客中 RabbitMQ入门:Hello RabbitMQ 代码实例 RabbitMQ入门:工作队列(Work Queue) 遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息 ...

  2. RabbitMQ入门到精通

    RabbitMQ 1. 消息中间件概述 1.1. 为什么学习消息队列 电子商务应用中,经常需要对庞大的海量数据进行监控,随着网络技术和软件开发技术的不断提高,在实战开发中MQ的使用与日俱增,特别是Ra ...

  3. 【中间件】RabbitMQ入门学习笔记

    1 .消息队列 1.1. MQ的相关概念 (1) 什么是MQ MQ(messagequeue),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已,还是一种跨进 ...

  4. RabbitMQ入门到掌握

    RabbitMQ入门到掌握 一.消息队列 1.MQ 的相关概念 1.2 什么是MQ 1.2 为什么要用MQ ①流量消峰 ②应用解耦 ③异步处理 1.3 MQ 的分类 ①ActiveMQ ②Kafka ...

  5. RabbitMQ——入门介绍

    目录 1.消息队列 1.1.MQ 的相关概念 1.1.1.什么是 MQ 1.1.2.为什么要用 MQ 1.1.3.MQ 的分类 1.1.4.MQ 的选择 1.2.RabbitMQ 1.2.1.Rabb ...

  6. RabbitMQ 入门系列(2)— 生产者、消费者、信道、代理、队列、交换器、路由键、绑定、交换器

    本系列是「RabbitMQ实战:高效部署分布式消息队列」和 「RabbitMQ实战指南」书籍的读书笔记. RabbitMQ 中重要概念 1. 生产者 生产者(producer)创建消息,然后发送到代理 ...

  7. 《RabbitMQ 实战指南》第二章 RabbitMQ 入门

    <RabbitMQ 实战指南> 文章目录 <RabbitMQ 实战指南> 一.相关概念介绍 1.生产者和消费者 2.队列 3.交换器.路由键.绑定 4.交换器类型 5.Rabb ...

  8. RabbitMQ入门及笔记

    RabbitMQ 文章目录 RabbitMQ 1. RabbitMQ的安装 2. RabbitMQ的相关概念 2.1 RabbitMQ的概念 2.2 四大核心概念 2.3 RabbitMQ 核心部分 ...

  9. RabbitMQ入门笔记

    目录 一.消息队列 1.MQ概念 1)什么是MQ 2)为什么要用MQ 3)MQ分类 4)MQ选择 2.Rabbit MQ 1)RabbitMQ的概念 2)四大核心 3)核心部分(六大模式) 4)各个名 ...

最新文章

  1. 我是怎么把一个项目带崩的
  2. minio安装(包括docker安装)
  3. .NET Worker Service 作为 Windows 服务运行及优雅退出改进
  4. MYSQL的递归查询
  5. 第八章 (一)分治 练习题
  6. MATLAB数学建模方法与实践(第3版)——读书笔记
  7. 计算机窗口底色,将电脑的窗口背景调成护眼色-电脑护眼设置
  8. 哈夫曼树中压缩率到底是什么意思
  9. 蚂蚁区块链投票案例(二)---投票合约设计开发
  10. Unity3d游戏中实现阿拉伯语文字正常显示
  11. org.apache.ibatis.type.TypeException: The alias ‘XXXX‘ is already mapped to the value ‘XXXX‘ 问题解决
  12. Java用Jsoup开发爬虫获取双色球开奖信息
  13. c语言——直接插入排序实现(时间复杂度与空间复杂度分析)
  14. Python 哈希函数
  15. 2022/9/5 嵌套路由(靠路由在vue里渲染套渲染),动态路由匹配以及开启propos配置动态路由
  16. 【论文阅读】PAIRWISE LINKAGE FOR POINT CLOUD SEGMENTATION-ISPRS-luxiaohu
  17. 你不知道的JavaScript--Item5 全局变量
  18. 北航考研软件学院电子信息991专业课备考
  19. 并发编程之线程池的使用及扩展和优化
  20. 高考改卷中使用了大量计算机,中高考电子阅卷“潜规则”,卷子在电脑系统里变成了什么?...

热门文章

  1. 优化mysql slave的同步速度
  2. 【Linux】Linux下建立和管理逻辑卷
  3. 在oracle 11gr2 grid独占模式下,如何使oracle数据库实例伴随OHAS的启动而启动?
  4. linux系列之-—04 自动删除n天前日志【转】
  5. Sitecore安全性第1部分:自定义角色和权限
  6. Android Studio快捷键之代码提示
  7. python-sdk-demo的打包
  8. jQuery上拉加载更多
  9. AtCoder - 2153 An Ordinary Game list模拟 || 博弈
  10. CSS3学习之 animation 属性