前言:最近因为工作忙,没抽出时间来继续文章的下半部分,现在手头忙的差不多,便抽时间写了这篇文章!

上篇文章只是简单的介绍了,rabbitmp的搭建和基础发送队列,封装了一个公用类。

namespace common\tools;

use PhpAmqpLib\Connection\AMQPStreamConnection;

use PhpAmqpLib\Message\AMQPMessage;

/**

* Created by PhpStorm.

* User: steven

* Date: 2017/7/10

* Time: 下午4:38

*/

class RabbitMq

{

/**

* @var AMQPStreamConnection

*/

protected $connection;

protected $queue_key;

protected $exchange_key;

protected $exchange_suffix;

protected $priority;

protected $channel;

/**

* RabbitQueue constructor.

* @param $config

* @param $queue_name

* @param null $priority

*/

public function __construct($config, $queue_name,$priority=null)

{

$this->connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['pass']);

$this->queue_key = $queue_name;

$this->exchange_suffix = $config['exchange'];

$this->priority=$priority;

$this->channel = $this->connection->channel();

$this->bind_exchange();

return $this->connection;

}

/**

* 绑定交换机

* @return mixed|null

*/

protected function bind_exchange() {

$queue_key=$this->queue_key;

$exchange_key = $this->exchange_suffix;

$this->exchange_key = $exchange_key;

$channel = $this->channel;

if(!empty($this->priority)){

$priorityArr = array('x-max-priority' => array('I', $this->priority));

$size = $channel->queue_declare($queue_key, false, true, false, false,false,$priorityArr);

}else{

$size = $channel->queue_declare($queue_key, false, true, false, false);

}

$channel->exchange_declare($exchange_key, 'topic', false, true, false);

$channel->queue_bind($queue_key, $exchange_key,$queue_key);

$this->channel=$channel;

return $size ;

}

/**

* 发送数据到队列

* @param $data = array('key'=>'val')

*/

public function put($data)

{

$channel = $this->channel;

$value = json_encode($data);

$toSend = new AMQPMessage($value, array('content_type' => 'application/json', 'delivery_mode' => 2));

$channel->basic_publish($toSend, $this->exchange_key,$this->queue_key);

}

/**

* 获取数据

* @return mixed

*/

public function get()

{

$channel = $this->channel;

$message = $channel->basic_get($this->queue_key);

if (!$message) {

return array(null,null);

}

$ack = function() use ($channel,$message) {

$channel->basic_ack($message->delivery_info['delivery_tag']);

};

$result = json_decode($message->body,true);

return array($ack,$result);

}

/**

* 关闭链接

*/

public function close() {

$this->channel->close();

$this->connection->close();

}

/**

* 获得队列长度

* @return int

*/

public function length(){

$info = $this->bind_exchange();

return $info[1];

}

}

发送队列简单demo

namespace frontend\controllers;

use common\tools\RabbitMq;

use Yii;

use yii\web\Controller;

/**

* Site controller

*/

class IndexController extends Controller

{

public function actionIndex()

{

$queueName = 'queue_test';

$rabbitMqConfig = [

'exchange' => 'web',//自己手动添加交换机,这里就不做描述

'host' => '172.17.0.6',//填写自己的容器ip

'port' => '5672',

'user' => 'guest',

'pass' => 'guest',

];

$queue = new RabbitMq($rabbitMqConfig, $queueName);

$data = ['uuid' => rand(100,99999999)];

$queue->put($data);

echo '发送完成,发送的内容:'.print_r($data,1);

exit();

}

}

查看发送队列数据

image.png

image.png

编写消耗队列脚本

namespace console\controllers;

use common\tools\RabbitMq;

use yii\console\Controller;

class TestController extends Controller

{

/**

* console rabbit_mq demo

*/

public function actionTest()

{

$queueName = 'queue_test';

$rabbitMqConfig = [

'exchange' => 'web',//自己手动添加交换机,这里就不做描述

'host' => '172.17.0.6',//填写自己的容器ip

'port' => '5672',

'user' => 'guest',

'pass' => 'guest',

];

$queue = new RabbitMq($rabbitMqConfig, $queueName);

$cnt = 0;

while (1) {

list($ack,$data) = $queue->get();

if(!$data){

$cnt++;

if($cnt > 20){

$queue->close();

exit();

}

echo "no data:$cnt \n";

sleep(1);

continue;

}

//逻辑处理

echo "start work \n";

print_r($data);

echo "end work \n";

//确认消耗

$ack();

}

}

}

配置supervisor的消耗队列进程(PS:我的demo.ini是放在我创建的docker项目容器里的,不熟悉的小伙伴可以看看我的关于创建容器的文章)

a. 创建文件

vim demo.ini

b. 编写配置(PS:注意这里用的是yii的console,请根据你们的使用来进行更改)

;测试demo

[program:demo]

command= /usr/share/nginx/html/advanced/yii test/test

directory=/usr/share/nginx/html/advanced/

stdout_logfile=/tmp/demo.log

redirect_stderr=true

autostart=false

autorestart=false

c. 增加进程

supervisorctl update

d. 成功添加,demo进程已启动,通过supervisorctl tail -f demo 看进程消耗

image.png

e. 再查看队列状态,显示队列已被消耗

image.png

结尾

好了,到此为止,简单的队列消耗已经完成,可能有些地方不是说的太清楚,毕竟自己也在摸索中,文章中的如果有不对的地方欢迎指正,共同学习~

PS.搭建supervisor的docker容器文章:http://www.jianshu.com/p/40418711cf8aTask

docker php amqp 扩展,docker搭建rabbitmq,配合php-amqplib+supervisor使用(下)相关推荐

  1. php的amqp扩展 安装(windows) rabbitmq学习篇

    因为RabbitMQ是由erlang语言实现的,所以先要安装erlang环境 erlang 下载安装 http://www.erlang.org/download.html rabbitmq 下载安装 ...

  2. PHP中RabbitMQ之amqp扩展实现(四)

    目前我在PHP里接触实现RabbitMQ的方式有两种,一种是通过amqp扩展,一种是使用php-amqplib,本章讲诉RabbitMQ的安装及amqp扩展及amqp扩展如何实现RabbitMQ 环境 ...

  3. Docker:搭建RabbitMQ集群

    RabbitMQ原理介绍(一) RabbitMQ安装使用(二) RabbitMQ添加新用户并支持远程访问(三) RabbitMQ管理命令rabbitmqctl详解(四) RabbitMQ两种集群模式配 ...

  4. Docker搭建RabbitMQ

    RabbitMQ RabbitMQ是一个被广泛使用的开源消息队列.它是轻量级且易于部署的,它能支持多种消息协议.RabbitMQ可以部署在分布式和联合配置中,以满足高规模.高可用性的需求. Rabbi ...

  5. Linux下搭建rabbitMQ、安装jdk、安装redis、安装zookeeper、docker安装MySQL、防火墙常用命令、docker安装、Windows下嘛呢私服搭建、Nginx安装

    rabbitMQ 1. 搭建Elang环境 1)安装GCC GCC-C++ Openssl等模块,安装过就不需要安装了 yum -y install make gcc gcc-c++ kernel-d ...

  6. 搭建rabbitmq的docker集群

    环境: 三个rabbitmq节点,加一个haproxy做前端 确保三台服务器都可以按主机名找到对方(/etc/hosts或dns) node1,node2,node3分别执行: docker run ...

  7. docker快速搭建RabbitMQ集群

    这里需要用到Xshell 连接我们的虚拟机-方便c v 查看本地所有的镜像 docker images 我的是3.8.14 第二步: 创建映射数据卷目录,启动rabbitmq容器 创建文件夹: mkd ...

  8. docker rabbitmq_一文看懂Rabbitmq,从安装到实战演练

    Rabbitmq的初步使用 随着微服务概念发展,大应用逐步拆分为小应用,提高开发效率,专门的人做专门的事情,逐渐的流行起来. 在微服务上实现通信的方式大部分是采用rpc方式,也有升级版本的grpc. ...

  9. 使用Docker 实现微服务并搭建博客,一文全掌握

    转载自  使用Docker 实现微服务并搭建博客,一文全掌握 Docker 是一个容器工具,提供虚拟环境.很多人认为,它改变了我们对软件的认识. 本文,通过搭建一个博客的例子,来介绍如何使用Docke ...

  10. docker本地PHP开发环境搭建

    一.搭建本地PHP开发环境 概述 本文简单介绍通过启动一个nginx和fpm容器来搭建一个php web运行环境,以文档命令为例,H:/home/code/docker/web 目录下可以放多个子项目 ...

最新文章

  1. 计算机视觉预备知识,计算机视觉:泊松融合
  2. 【实战】OpenCV钢管计数分析与方法比较
  3. as3 浅复制 深复制
  4. 编程技术分享,程序员小技巧,程序员小伙伴们,你们用到了多少
  5. python 爬虫001-http请求过程
  6. ios项目 swift 定义常量 其他文件引用_面试应该注意的Swift知识点
  7. 计算机考试67,计算机等级考试(国家)-二级c机试模拟67.doc
  8. Logs Viewer
  9. Illegal use of when-style tag without ...
  10. arm解锁 j-flash_jlink驱动下载(SEGGER J-FlASH ARM)
  11. 暴风影音播放时如何旋转视频
  12. php支付宝发卡源码,个人发卡系统支付宝即时到帐大气源码
  13. 企业级用户画像: 价格敏感度模型-PSM
  14. asp.netcore 关于静态文件的访问权限控制(UseStaticFiles)
  15. 人工智能对图书馆未来的影响,主要包含哪三个方面?
  16. STM32调试过程中出现的问题1:
  17. 【路科验证008】DVT 软件使用指导
  18. 渗透测试工具网址--自用
  19. linux-gcc 找不到命令,为什么显示gcc命令没有找到?
  20. java 约瑟夫(josephus)问题_Java-約瑟夫問題(Josephus Problem)

热门文章

  1. LeetCode:3Sum_15
  2. Codeforces Round #358 (Div. 2) Alyona and Strings
  3. 删除和修改nbsp;预留nbsp;:BAPI_RESERVATI…
  4. matlab车牌定位与识别,基于matlab车牌的定位与分割识别程序概要
  5. homebrew下安装mysql_Mac下homebrew安装Mysql以及配置问题
  6. 拓端tecdat|R语言Meta分析效应量
  7. 拓端tecdat|R语言广义线性模型(GLMs)算法和零膨胀模型分析
  8. 拓端tecdat|python关联规则学习:FP-Growth算法对药品进行“菜篮子”分析
  9. 拓端tecdat|如何用R语言在机器学习中建立集成模型?
  10. LeetCode5 最长回文子串