来源:https://blog.51cto.com/chinalx1/2150793

一定要注意php安装AMQP的版本,版本不同使用的方法不一样。在官方网站就有2个版本的AMQP
第一版本:xxx,详细的url找不到了
第二版本:http://docs.php.net/manual/da/book.amqp.php
千万不要出现这种情况,找到一个官方的版本,然后按照example,怎么调试都不通….按照PHP安装 AMQP扩展 安装的AMQP扩展是最新的,现在和PHP官方给出的第二版本,也有一些区别。主要体现在exchange和queue中有个declare的方法,分别更改成declarExchange()和declarQueue().

一、AMQPConnection

::__construct ([ array $credentials = array() ] )

这方法比较简单,credentials的英文含义是“凭证”,但我喜欢把它理解为config。这些配置包括如下:
host、vhost、port、login、password、read_timeout、write_timeout、connect_timeout,如果config省略,php会使用默认的这5个值,初始化,强烈建议带上config初始化,其中read_timeout、write_timeout、connect_timeout这三个值是最新版本才有的,PHP官方现在还没有,单位都是(秒)

::connect ( void )  、  ::pconnect ( void )
::disconnect ( void )   、::pdisconnect ( void )
::reconnect ( void ) 、::preconnect ( void )   #注意和__construct 的区别,该方法不带参数。
::isConnected ( void )
::isPersistent ( void ) #是否持久链接 ,为什么不是isPconnected ?-_-

如果使用持久链接,在类方法__destruct中释放(断开链接)

::setHost ( string $host )
::setPort ( int $port )
::setVhost ( string $vhost )
::setLogin ( string $login )
::setPassword ( string $password )
::getHost ( void )
::getPort ( void )
::getVhost ( void )
::getLogin ( void )
::getPassword ( void )
还有类似的::setTimeout ( int $timeout ) 、::getTime ( void )、::getReadTimeout ( void )、::setReadTimeout ( int $timeout )、::getWriteTimeout (void ) 、setWriteTimeout ( int $timeout)

上面的这些方法,就不细说了,还有2个不常见的方法

::getUsedChannels ( void )  #在链接会话期间最后一次使用的channel id
::getMaxChannels ( void )   #单次链接能创建最大channel数量

二、AMQPChannel

为了一个AMQP的链接,创建一个channel instance(通道实例),这里需要了解一下几个概念,以及她们之间的关系,本人做了一个罗列,水平有限,不对的地方请指正。

client :客户端Broker:英文含义是代理,这里指代理服务器,也是通常说的server,也可以指Rabbit cluster的节点,
也就是在AMQPConnection的配置文件中的host、portvirtual_host:虚拟host,这个可以理解为在Broker中,虚拟出来的消息处理模块。一个Broker中
可以创建多个virtual_host.exchange:逻辑上属于virtual_host的子模块,负责消息转发到queue中。queue:逻辑上属于virtual_host的子模块。channel:通道。链接virtual_host的通道。
routingkey:virtual_host和channel的设置都是为了增加并发性能,降低资源消耗的。
参考文章:RabbitMQ中的AMQP协议规范

::__construct ( AMQPConnection $amqp_connection ) #创建channel
::isConnected ( void ) #这里和AMQPConnection::isConnected一样.

::getConnection ( void )
#返回的结果是AMQPConnection,从概念上个人理解,channel是建立在connection之上的,是个独立的链接。connection是指物理的连接,一个client与一个server之间有一个连接;一个连接上可以建立多个channel,可以理解为逻辑上的连接。一般应用的情况下,有一个channel就够用了,不需要创建更多的channel

::getChannelId ( void )
#返回channel id,在PHP中循环创建通道,发现通道ID始终只有1和2,不知是否和服务器的设置有关?

::setPrefetchSize ( int $size )、::getPrefetchSize ( void )
这个地方有个坑,设置prefetchSize不等于0,会提示not_implemented: “prefetch_size!=0 (65535)”,也就是说目前prefetchSize只能设置为0,还未实现不等于0的情况。

::setPrefetchCount ( int $count )、::getPrefetchCount ( void )
这个参数设置,接收消息端,接收的最大消息数量(包括使用get、consume),一旦到达这个数量,客户端不在接收消息。0为不限制。默认值为3.

::qos ( int $size , int $count )
同时设置prefetchSize、prefetchCount,但prefetchSize目前只能设置为0

::basicRecover ( boolen )
默认为true,没有确认的消息会被再次投递,这个地方和消息确认机制有关,后面再来说这个问题。

三、 AMQPExchange

::__construct ( AMQPChannel $amqp_channel )

::setName ( string $exchange_name )、::getName ( void )
这个命令的最好使用数字、字母、- _ . 组合(区分大小写,但最好使用小写)
也不要使用amq带头(内置的exchange都是使用amq.开头)
如果申明一个已存在的exchange name,如果type不一致会抛出异常.

::setType ( string $exchange_type )、::getType ( void )
这个是rabbitmq的核心,exchange的类型,决定了rabbit实现何种业务模型。
这个地方尤其要注意,AMQ本身没有默认的exchangeType,所以在申明exchange必需要设置type,否则会抛出异常 Could not declare exchange. Exchanges must have a type,
目前exchange支持4种类型  AMQP_EX_TYPE_DIRECT、 AMQP_EX_TYPE_FANOUT、AMQP_EX_TYPE_HEADERS 、 AMQP_EX_TYPE_TOPIC
可参考:RabbitMQ AMQP 消息模型攻略
记忆方法:amqp_exchange_type_direct、fanout、topic、headers

::setFlags ( int $flags )、::getFlags ( void )
默认AMQP_NOPARAM; 可选AMQP_DURABLE, AMQP_PASSIVE、AMQP_AUTODELETE

  1. passive:声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端。因为一般produce端创建,在consume端建议设置成AMQP_PASSIVE,防止consume创建exchange

  2. durable:持久exchange,该交换器将在broker重启后生效,一般常使用该选项.

  3. auto_delete:该交换器将在没有消息队列绑定时自动删除。一个从未绑定任何队列的交换器不会自动删除。解释有点绕。说明下吧,当有队列bind到auto_delete的交换器上之后,删除该队列。此时交换器也会删除。一般创建为临时交换器。

::setArgument ( string $key , mixed $value )
::setArguments ( array $arguments )
::getArgument ( string $key )
::getArguments ( void )

::declareExchange( void ) //和PHP的官方有差别

::delete ($exchangeName = null, $flags = AMQP_NOPARAM) //和PHP的官方有差别
未指定$exchangeName,删除当前链接的exchange,但无法删除system中内置的exchange.

::bind($exchange_name, $routing_key = ”, array $arguments = array())  //和PHP的官方有差别
::unbind($exchange_name, $routing_key = ”, array $arguments = array())  //和PHP的官方有差别
可以将消息同时发送到多个exchange,稍后补充实际的用法。

::publish ( string $message , string $routing_key [, int $flags = AMQP_NOPARAM [, array $attributes = array() ]] ) //发送消息方法 $attributes参考:RabbitMQ的PHP教程之RPC (六)

参数:$routint_key,在fanout下忽略。在direct类型下,如果该值为空,则bind到exchange都能收到消息,这个有点像fanout的广播模式。

参数:$flags可选值 AMQP_MANDATORY、AMQP_IMMEDIATE,默认:AMQP_NOPARAM,此处一般都设置为false,本人还未测出这2个flag的区别,这里有一篇文章关于这2个flag的介绍:
http://blog.csdn.net/jiao_fuyou/article/details/21594947

::getChannel()、getConnection() //不再介绍,见上面

四、AMQPQueue

::__construct(AMQPChannel $amqp_channel) //有PHP官方有区别

::setName ( string $queue_name )、::getName ( void ) //队列名称

::setFlags ( int $flags )、::getFlags ( void ) //重点
可选参数:AMQP_DURABLE, AMQP_PASSIVE,AMQP_EXCLUSIVE, AMQP_AUTODELETE
默认:auto_delete

auto_delete:当队列中有消费者,则队列存在,当没有消费者链接,则队列删除

durable:持久化,队列不删除,注意仅仅是队列持久,消息不持久(消息的持久在publish时的增加属性delivery_mode)。消费的消息,从队列里删除,未消费的消息保存在队列中,不需要关注是否有消费者。最实用

passive:声明一个1个已存在的队列。意义不大,如果队列不存在会抛出异常。

exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:

排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。

"首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。

即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。

::setArgument ( string $key , mixed $value )
::setArguments ( array $arguments )
::getArgument ( string $key )
::getArguments ( void )

::declareQueue ( void )
::bind ( string $exchange_name , string $routing_key )
::unbind ( string $exchange_name , string $routing_key )

::get ([ int $flags ] )
//非阻塞,从队列中检索下一个可用的消息,flags为空默认为amqp.auto_ack(在php.ini中可用设置默认值),可选参数:AMQP_AUTOACK

::consume ( callable $callback [, int $flags = AMQP_NOPARAM ] )
//阻塞,$callback支持数组的写法,flags为空默认为amqp.auto_ack(在php.ini中可用设置默认值),可选参数:AMQP_AUTOACK

::ack ( int $delivery_tag [, int $flags = AMQP_NOPARAM ] )  //复杂的业务逻辑,最好采用手动应答,防止消费处理业务逻辑复杂时,消息丢失。指定务必指定为AMQP_NOPARAM, 本人测试使用参数 AMQP_MULTIPLE,达不到预期的效果。此处有待高人解决。

更人性化的应答,在使用get、consume时,指定AMQP_NOPARAM时(未指定参数,则会调用PHP.ini的默认的设置),此时未对消费的消息做应答。在处理完复杂的业务逻辑后,使用ack做应答。此时队列中,会删除该条消息,如未应答,则会一直存在队列中。

::nack($delivery_tag, $flags = AMQP_NOPARAM)、reject($delivery_tag, $flags = AMQP_NOPARAM)
用法ack相反,表示not ack.

::purge ( void )
清空队列中的消息。

::delete ( void )
删除队列

::getChannel()、getConnection() //不再介绍,见上面

::cancel ([ string $consumer_tag = “” ] ) //暂时不知该如何使用

五、AMQPEnvelope

该类没有set的方法,各种get

::getBody ( void )
::getRoutingKey ( void )
::getDeliveryTag ( void )
::getDeliveryMode ( void )
::getExchangeName ( void )
::isRedelivery ( void )
::getContentType ( void )
::getContentEncoding ( void )
::getType ( void )
::getTimeStamp ( void )
::getPriority ( void )
::getExpiration ( void )
::getUserId ( void )
::getAppId ( void )
::getMessageId ( void )
::getReplyTo ( void )
::getCorrelationId ( void )
::getHeaders ( void )
::getHeader($header_key).

六、代码示例:

生产者示例:

<?phpheader('Content-Type:text/html;charset=utf8;');$params = array('exchangeName' => 'myexchange','queueName' => 'myqueue','routeKey' => 'myroute',
);$connectConfig = array('host' => 'localhost','port' => 5672,'login' => 'rabbitmq','password' => 'rabbitmq','vhost' => '/'
);//var_dump(extension_loaded('amqp')); 判断是否加载amqp扩展//exit();
try {$conn = new AMQPConnection($connectConfig);$conn->connect();if (!$conn->isConnected()) {//die('Conexiune esuata');//TODO 记录日志echo 'rabbit-mq 连接错误:', json_encode($connectConfig);exit();}$channel = new AMQPChannel($conn);if (!$channel->isConnected()) {// die('Connection through channel failed');//TODO 记录日志echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);exit();}$exchange = new AMQPExchange($channel);$exchange->setFlags(AMQP_DURABLE);//持久化$exchange->setName($params['exchangeName']?:'');$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型$exchange->declareExchange();//$channel->startTransaction();$queue = new AMQPQueue($channel);$queue->setName($params['queueName']?:'');$queue->setFlags(AMQP_DURABLE);$queue->declareQueue();//绑定$queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {}$num = mt_rand(100, 500);//生成消息
for($i = $num; $i <= $num+5; $i++)
{$exchange->publish("this is {$i} message..", $params['routeKey'], AMQP_MANDATORY, array('delivery_mode'=>2));
}

消费者示例:

<?phpheader('Content-Type:text/html;charset=utf8;');$params = array('exchangeName' => 'myexchange','queueName' => 'myqueue','routeKey' => 'myroute',
);$connectConfig = array('host' => 'localhost','port' => 5672,'login' => 'rabbitmq','password' => 'rabbitmq','vhost' => '/'
);//var_dump(extension_loaded('amqp'));//exit();try {$conn = new AMQPConnection($connectConfig);$conn->connect();if (!$conn->isConnected()) {//die('Conexiune esuata');//TODO 记录日志echo 'rabbit-mq 连接错误:', json_encode($connectConfig);exit();}$channel = new AMQPChannel($conn);if (!$channel->isConnected()) {// die('Connection through channel failed');//TODO 记录日志echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);exit();}$exchange = new AMQPExchange($channel);$exchange->setFlags(AMQP_PASSIVE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端$exchange->setName($params['exchangeName']?:'');$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型$exchange->declareExchange();//$channel->startTransaction();$queue = new AMQPQueue($channel);$queue->setName($params['queueName']?:'');$queue->setFlags(AMQP_DURABLE);$queue->declareQueue();//绑定$queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {echo $e->getMessage();exit();
}function callback(AMQPEnvelope $message) {global $queue;if ($message) {$body = $message->getBody();echo $body . PHP_EOL;$queue->ack($message->getDeliveryTag());} else {echo 'no message' . PHP_EOL;}
}//$queue->consume('callback');  第一种消费方式,但是会阻塞,程序一直会卡在此处//第二种消费方式,非阻塞
$message = $queue->get();
if(!empty($message))
{echo $message->getBody();$queue->ack($message->getDeliveryTag());    //应答,代表该消息已经消费
}

rabbit以及php amqp扩展使用相关推荐

  1. PHPStudy 安装amqp扩展

    1.查看php版本 php -v PHP 7.3.4 (cli) (built: Apr 2 2019 21:57:22) ( NTS MSVC15 (Visual C++ 2017) x64 ) C ...

  2. php的amqp扩展 安装(windows) rabbitmq学习篇

    因为RabbitMQ是由erlang语言实现的,所以先要安装erlang环境 erlang 下载安装 http://www.erlang.org/download.html rabbitmq 下载安装 ...

  3. PHP中RabbitMQ之amqp扩展实现(四)

    目前我在PHP里接触实现RabbitMQ的方式有两种,一种是通过amqp扩展,一种是使用php-amqplib,本章讲诉RabbitMQ的安装及amqp扩展及amqp扩展如何实现RabbitMQ 环境 ...

  4. windows环境PHP使用RabbitMq安装amqp扩展

    一.先查看自己PHP版本及配置信息,可以在命令行用 php -i 查询,或者查看phpinfo(); 找出下面几个信息,根据这个信息选择对应的DLL文件. 二.下载扩展对应的dll文件 PHP的版本对 ...

  5. linux安装RabbitMQ和amqp扩展(这个安装rabbitmq通过了但是代码测试没有通过)

    消息队列rabbitmq RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统,底层基于Erlang语言. 一:centos7安装RabbitMQ 这玩意儿安装很扯淡,官方推荐rpm安装 ...

  6. php amqp扩展安装,php扩展AMQP,安装报错解决

    接下来来安装php扩展AMQP,安装了它以后,才能用PHP操作rabbitmq. wget https://pecl.php.net/get/amqp-1.4.0.tgz tar -zxvf amqp ...

  7. ubuntu环境下php安装amqp扩展

    1.安装librabbitmq扩展 apt-get install librabbitmq-dev 2.下载phpamqp扩展 地址http://pecl.php.net/package/amqp 选 ...

  8. php 安装rabtmq amqp 扩展

    php 安装 rabbitmq-c-0.9.0 扩展 安装 ibrabbitmq-c github 仓库地址 https://github.com/alanxz/rab... ``` 安装过程如果报错 ...

  9. php qmqp 没有方法,CentOS7 php 安装 amqp扩展

    继续安装完 rabbitmq后,安装phpqmqp扩展 1.安装rabbitmq-c 安装最新版 wget -c https://github.com/alanxz/rabbitmq-c/releas ...

最新文章

  1. Spring Cloud实战小贴士:Zuul的饥饿加载(eager-load)使用
  2. 2019年我建议你做好三件事情
  3. 间歇性掉帧卡顿_电脑卡顿问题靠它解决,我只能帮你到这儿了
  4. 最短路径之Dijkstra算法
  5. 华为Mate 40系列外观偷跑:后置八卦图六摄亮了!
  6. python学习笔记10-匿名函数lambda
  7. Understanding .NET Code Access Security
  8. 如何重命名Git标签?
  9. 编译android7.0出现的错误ninja: build stopped: subcommand failed.
  10. python怎么进阶_你真的会自学么?大佬整理的python进阶路径(长更)
  11. 按照之前的标题来看,这应该是...第四天(你以为是第三天的总结吗?图样图森破....
  12. 综述 - 染色质可及性与调控表观基因组 | Chromatin accessibility and the regulatory epigenome...
  13. js插件---弹出层sweetalert2
  14. Pandas之skew,求偏度
  15. 信捷触摸屏UI模板XINJIE UI信捷触摸屏界面模板
  16. css之display:inline-block布局
  17. 如何判断JS中两个对象是否相等?
  18. 王姨劝我学HarmonyOS鸿蒙2.0系列教程之四Git搭建下载实例!
  19. JAVA实体类数据筛选转Map
  20. Google Earth Engine(GEE)——Landsat 8TI/TOA/SR影像对比分析区别和去云即NDVI计算

热门文章

  1. php查询数据存到下一界面_PHP从另一个页面获取数据
  2. 二叉树层序遍历递归与非递归_二叉树基础(1)-构建和遍历(递归和非递归)...
  3. 在Linux上的虚拟机上启动Oracle上报ORA-00845: MEMORY_TARGET not supported on this system的问题解决
  4. 2.Maven特点,Maven约定,建立第一个Maven项目
  5. Struts2里的Action返回Json数据
  6. FreeMarker插件的安装
  7. T-SQL里数据库工程师都不知道的秘密之SQL Server自定义函数UDF
  8. tf.placeholder函数的用法
  9. 贪心算法———房间搬桌子
  10. rest_framework 视图/路由/渲染器/认证授权/节流