php basic publish,RabbitMQ入门(PHP语言描述)
一 "Hello World!"
生产者:
/*
* php G:\wamp\www\mygedu\yii tools/send-mq msg*/
public function actionSendMq($argv=''){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage($argv);
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent '$argv'".PHP_EOL;
$channel->close();
$connection->close();
}
消费者:
/*
* php G:\wamp\www\mygedu\yii tools/receive-mq
*/
public function actionReceiveMq(){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
}
二 Work queues
生产者:
/*
* php G:\wamp\www\mygedu\yii tools/new-task msg
*/
public function actionNewTask($data='Hello World!'){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('my_task_queue', false, true, false, false);
$msg = new AMQPMessage($data,
array('delivery_mode' => 2) # make message persistent
);
$channel->basic_publish($msg, '', 'my_task_queue');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
}
消费者:
/*
* php G:\wamp\www\mygedu\yii tools/worker
*/
public function actionWorker(){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('my_task_queue', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('my_task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
三 Publish/Subscribe
生产者:
/*
* php G:\wamp\www\mygedu\yii tools/emit-log msg
*/
public function actionEmitLog($data='Hello World!'){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
}
消费者:
/*
* php G:\wamp\www\mygedu\yii tools/receive-logs
*/
public function actionReceiveLogs(){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'logs');
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
四 Routing
生产者:
/*
* php G:\wamp\www\mygedu\yii tools/emit-log-direct info msg
*/
public function actionEmitLogDirect($argv, $data='Hello World!'){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severity = isset($argv) && !empty($argv) ? $argv : 'info';
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'direct_logs', $severity);
echo " [x] Sent ",$severity,':',$data," \n";
$channel->close();
$connection->close();
}
消费者:
/*
* php G:\wamp\www\mygedu\yii tools/receive-logs-direct info,warning,error
*/
public function actionReceiveLogsDirect($argv){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$severities = explode(',', $argv);
if(empty($severities)) {
file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
exit(1);
}
foreach($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
五 Topics
生产者:
/*
* php G:\wamp\www\mygedu\yii tools/topics-emit-log-direct info msg
*/
public function actionTopicsEmitLogDirect($routing_key='kern.critical', $data='Hello World!'){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'topic_logs', $routing_key);
echo " [x] Sent ",$routing_key,':',$data," \n";
$channel->close();
$connection->close();
}
消费者:
/*
* php G:\wamp\www\mygedu\yii tools/topics-receive-logs-direct info,warning,error
*/
public function actionTopicsReceiveLogsDirect($binding_keys=''){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$binding_keys = explode(',', $binding_keys);
if( empty($binding_keys )) {
file_put_contents('php://stderr', "Usage: $binding_keys\n");
exit(1);
}
foreach($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
六 RPC
生产者:
/*
* php G:\wamp\www\mygedu\yii tools/rpc-client 10
*/
public function actionRpcClient($fib=10){
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call($fib);
echo " [.] Got ", $response, "\n";
}
消费者:
/*
* php G:\wamp\www\mygedu\yii tools/rpc-server
*/
public function actionRpcServer($routing_key='kern.critical', $data='Hello World!'){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n) {
if ($n == 0)
return 0;
if ($n == 1)
return 1;
return fib($n-1) + fib($n-2);
}
echo " [x] Awaiting RPC requests\n";
$callback = function($req) {
$n = intval($req->body);
echo " [.] fib(", $n, ")\n";
$msg = new AMQPMessage(
(string) fib($n),
array('correlation_id' => $req->get('correlation_id'))
);
$req->delivery_info['channel']->basic_publish(
$msg, '', $req->get('reply_to'));
$req->delivery_info['channel']->basic_ack(
$req->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
相关类:
class FibonacciRpcClient {
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;
public function __construct() {
$this->connection = new AMQPStreamConnection(
'localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
list($this->callback_queue, ,) = $this->channel->queue_declare(
"", false, false, true, false);
$this->channel->basic_consume(
$this->callback_queue, '', false, false, false, false,
array($this, 'on_response'));
}
public function on_response($rep) {
if($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
public function call($n) {
$this->response = null;
$this->corr_id = uniqid();
$msg = new AMQPMessage(
(string) $n,
array('correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
while(!$this->response) {
$this->channel->wait();
}
return intval($this->response);
}
}
php basic publish,RabbitMQ入门(PHP语言描述)相关推荐
- RabbitMQ入门:发布/订阅(Publish/Subscribe)
在前面的两篇博客中 RabbitMQ入门:Hello RabbitMQ 代码实例 RabbitMQ入门:工作队列(Work Queue) 遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息 ...
- RabbitMQ入门到精通
RabbitMQ 1. 消息中间件概述 1.1. 为什么学习消息队列 电子商务应用中,经常需要对庞大的海量数据进行监控,随着网络技术和软件开发技术的不断提高,在实战开发中MQ的使用与日俱增,特别是Ra ...
- 【中间件】RabbitMQ入门学习笔记
1 .消息队列 1.1. MQ的相关概念 (1) 什么是MQ MQ(messagequeue),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已,还是一种跨进 ...
- RabbitMQ入门到掌握
RabbitMQ入门到掌握 一.消息队列 1.MQ 的相关概念 1.2 什么是MQ 1.2 为什么要用MQ ①流量消峰 ②应用解耦 ③异步处理 1.3 MQ 的分类 ①ActiveMQ ②Kafka ...
- RabbitMQ——入门介绍
目录 1.消息队列 1.1.MQ 的相关概念 1.1.1.什么是 MQ 1.1.2.为什么要用 MQ 1.1.3.MQ 的分类 1.1.4.MQ 的选择 1.2.RabbitMQ 1.2.1.Rabb ...
- RabbitMQ 入门系列(2)— 生产者、消费者、信道、代理、队列、交换器、路由键、绑定、交换器
本系列是「RabbitMQ实战:高效部署分布式消息队列」和 「RabbitMQ实战指南」书籍的读书笔记. RabbitMQ 中重要概念 1. 生产者 生产者(producer)创建消息,然后发送到代理 ...
- 《RabbitMQ 实战指南》第二章 RabbitMQ 入门
<RabbitMQ 实战指南> 文章目录 <RabbitMQ 实战指南> 一.相关概念介绍 1.生产者和消费者 2.队列 3.交换器.路由键.绑定 4.交换器类型 5.Rabb ...
- RabbitMQ入门及笔记
RabbitMQ 文章目录 RabbitMQ 1. RabbitMQ的安装 2. RabbitMQ的相关概念 2.1 RabbitMQ的概念 2.2 四大核心概念 2.3 RabbitMQ 核心部分 ...
- RabbitMQ入门笔记
目录 一.消息队列 1.MQ概念 1)什么是MQ 2)为什么要用MQ 3)MQ分类 4)MQ选择 2.Rabbit MQ 1)RabbitMQ的概念 2)四大核心 3)核心部分(六大模式) 4)各个名 ...
最新文章
- 我是怎么把一个项目带崩的
- minio安装(包括docker安装)
- .NET Worker Service 作为 Windows 服务运行及优雅退出改进
- MYSQL的递归查询
- 第八章 (一)分治 练习题
- MATLAB数学建模方法与实践(第3版)——读书笔记
- 计算机窗口底色,将电脑的窗口背景调成护眼色-电脑护眼设置
- 哈夫曼树中压缩率到底是什么意思
- 蚂蚁区块链投票案例(二)---投票合约设计开发
- Unity3d游戏中实现阿拉伯语文字正常显示
- org.apache.ibatis.type.TypeException: The alias ‘XXXX‘ is already mapped to the value ‘XXXX‘ 问题解决
- Java用Jsoup开发爬虫获取双色球开奖信息
- c语言——直接插入排序实现(时间复杂度与空间复杂度分析)
- Python 哈希函数
- 2022/9/5 嵌套路由(靠路由在vue里渲染套渲染),动态路由匹配以及开启propos配置动态路由
- 【论文阅读】PAIRWISE LINKAGE FOR POINT CLOUD SEGMENTATION-ISPRS-luxiaohu
- 你不知道的JavaScript--Item5 全局变量
- 北航考研软件学院电子信息991专业课备考
- 并发编程之线程池的使用及扩展和优化
- 高考改卷中使用了大量计算机,中高考电子阅卷“潜规则”,卷子在电脑系统里变成了什么?...