/*** 消息列队服务* @author zhou.tingze* @example* -----------------------------------Create----------------------------------------* $array = array('a','b','c','d');* $this->load->library('amqp_service');* $this->amqp_service->setSaveType('test_exchange', 'test_queue', 'test_router');* $this->amqp_service->createMessageQueue($array);* -----------------------------------End-------------------------------------------* * -----------------------------------Get-------------------------------------------* $this->load->library('amqp_service');* $this->amqp_service->setSaveType('test_exchange', 'test_queue', 'test_router');* $message_queue = $this->amqp_service->getMessageQueue();* var_dump($message_queue)* -----------------------------------End-------------------------------------------*/class Amqp_service extends Base_service{public $conn;public $exchange;public $queue;public $router;function __construct(){parent:: __construct();//获取系统配置$this->load->config('app_config', TRUE);$app_config = $this->config->item('app_config');$this->connect($app_config['amqp']);}/*** * 尝试连接Amqp服务*/private function connect($amqp_args){    $this->conn = new AMQPConnection($amqp_args);$this->conn->connect();if (!$this->conn->isConnected()) {throw new Exception('Cannot connect to the broker.');}}/*** * 设定消息列队保存方式* @param String $exchange_name 交换机名* @param String $queue_name    消息列队名* @param String $router_name   路由名*/public function setSaveType($exchange_name, $queue_name, $router_name){$this->exchange = $exchange_name;$this->queue    = $queue_name;$this->router   = $router_name;}/*** * 创建消息列队* @param Array $array*/public function createMessageQueue($array){//创建交换机 $channel = new AMQPChannel($this->conn);$ex      = new AMQPExchange($channel);//交换机名 $ex->setName($this->exchange);$ex->setType(AMQP_EX_TYPE_DIRECT);$ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);$ex->declare();//创建消息列队$q = new AMQPQueue($channel);//队列名$q->setName($this->queue);$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);$q->declare();//绑定交换机与队列,并指定路由键 $q->bind($this->exchange, $this->router);//消息发布$channel->startTransaction();$message = json_encode($array);$ex->publish($message, $this->router);$channel->commitTransaction();//$this->conn->disconnect();}/*** * 获取消息列队*/public function getMessageQueue(){try{//设置queue名称,使用exchange,绑定routingkey$channel = new AMQPChannel($this->conn);$q       = new AMQPQueue($channel);$q->setName($this->queue);$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);$q->declare();$q->bind($this->exchange, $this->router);   //消息获取$messages = $q->get(AMQP_AUTOACK) ;$arr = array();if ($messages){$arr = json_decode($messages->getBody(), true );}}catch (Exception $e){throw new Exception($e->getMessage());}//$this->conn->disconnect();return $arr;}/*public function getAllMessageQueue(){//设置queue名称,使用exchange,绑定routingkey$channel = new AMQPChannel($this->conn);$q       = new AMQPQueue($channel);$q->setName($this->queue);$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);$q->declare();$q->bind($this->exchange, $this->router);   $this->conn->disconnect();//阻塞模式获取消息列队while(True){ $q->consume('processMessage');   //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答  } }*/public function __destruct(){$this->conn->disconnect();}
}/*** 消费回调函数* 处理消息* @param Object $envelope* @param Object $queue*/
/*
function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg . '<br />';//手动发送ACK应答 $queue->ack($envelope->getDeliveryTag());
}
*/

  

转载于:https://www.cnblogs.com/adtuu/p/4670229.html

在PHP中如何使用消息列队相关推荐

  1. WSF操作系统抽象层学习笔记(三)---消息列队

    消息 WSF的消息服务用于传递消息到对应的事件处理句柄. 实现机制和使用方法 基于内存管理,从内存中申请sizeof(wsfMsg_t) + 消息长度的内存.添加头部描述,返回给申请者除去头部的指针位 ...

  2. php简单实现rabbitMQ消息列队(必须收藏)

    业务场景: 公司是主php做开发的,框架为thinkphp.众所周知,php本身的运行效率存在一定的缺陷,所以如果有一个很复杂很耗时的业务时,必须开发一个常驻内存的程序.首先我想到了php的worke ...

  3. C#中的MessageBox消息对话框

    关键字:C# MessageBox 消息对话框 在程序中,我们经常使用消息对话框给用户一定的信息提示,如在操作过程中遇到错误或程序异常,经常会使用这种方式给用于以提示.在C#中,MessageBox消 ...

  4. 如何禁用请求库中的日志消息?

    本文翻译自:How do I disable log messages from the Requests library? By default, the Requests python libra ...

  5. 详解如何实现在线聊天系统中的实时消息获取

    序言 传统web浏览器应用采用客户端主动请求方式,只有在收到浏览器请求时服务端才返回消息,这种模式已经不能满足日益多样化的web应用需求,例如: 在线聊天系统:需要实时获取聊天消息. 实时监控系统:需 ...

  6. QT中处理不同Windows(窗体中的)消息

    为了能处理某些qt无法处理的事件,可以自己处理windows事件. 处理windows事件是重写QApplication::winEventFilter(MSG*,long)函数来实现的.假如功能想捕 ...

  7. 在WPF中处理Windows消息

    在Winform中 处理Windows消息通过重写WndProc方法 在WPF中 使用的是System.Windows. Sytem.Windows.Controls等名字空间,没有WndProc函数 ...

  8. Java错误提示框口怎么使用_如何在Swing中显示错误消息对话框?

    以下示例展示了如何在基于swing的应用程序中显示错误消息警告. 使用以下API - JOptionPane - 创建标准对话框. JOptionPane.showMessageDialog() - ...

  9. 中怎么撤回消息_微信消息撤回也能看到,这个开源神器牛x!语音、图片、文字都支持!...

    1.前言 微信在2014年的时候,发布的v5.3.1 版本中推出了消息撤回功能,用户可以选择撤回 2 分钟内发送的最后一条信息. 现在很多即时通讯的软件都有撤回这个功能. 腾讯为了照顾手残党,在微信和 ...

最新文章

  1. 一个write和printf混用的例子
  2. 百度云android隐藏空间,一招教你使手机端百度网盘中的隐藏空间在文件列表中显示出来...
  3. python中字符移位加密_1.1 移位密码加密解密python实现
  4. Please make sure you have the correct access rights and the repository exists.报错问题
  5. angular路由传递参数_Angular路由——在路由时候传递数据
  6. ScaleForm十六戒言
  7. 自己编写一个前端精确打印控件
  8. 由一个activity跳转到另一个activity
  9. AD笔记4-元件封装库绘制
  10. 《Java多线程编程核心技术》学习笔记(1)
  11. w3school JavaScript笔记2 ——JavaScript HTML DOM
  12. Socket 服务器和客户端交互
  13. iOS微信分享提示“未验证应用”的解决,配置 Universal Link
  14. ​怎么判断是前端bug还是后端bug?
  15. ios sdk 穿山甲_iOS 穿山甲广告 SDK 的使用
  16. linux的sssd服务,linux – SSSD进程不会死
  17. 酒店管理系统-毕业设计
  18. 【Pygame实战】妈耶~这款经典的《俄罗斯方块儿》竟这么厉害......
  19. VMWare虚拟机快照技术深入理解
  20. 硬盘格式转换不影响数据_ convert命令FAT32转NTFS

热门文章

  1. 产品经理——如何挑选一款高效的原型工具?
  2. 最长回文子串动态规划_九章算法 | 微软面试题:最长回文子串
  3. arduino上传项目出错_Arduino入门前你该知道的事儿
  4. inline-block什么意思中文_css中inline-block是什么?inline-block布局的使用
  5. 江苏有线门户网站服务器地址,江苏有线手机客户端的登录服务器
  6. securecrt是什么工具_网络总出故障,这几款工具你掌握了没?!!
  7. 迁移到阿里云后,NTKO控件报存word 报文件存取错误,请检查网络传输。
  8. Akka(32): Http:High-Level-Api,Route exception handling
  9. Atitit 关于处理环保行动联盟和动物解放阵线游击队的任命书 委任状
  10. 推荐一个十分好看的开源博客系统