laravel5.8接入RabbitMq队列服务
一、前期准备
具体流程分为以下步骤。
- 安装RabbitMq服务
- 安装php的amqp扩展(到pecl官方下载扩展)
具体教程有windows教程
linux下为了可以快速使用,RabbitMq使用docker来快速部署
#拉取RabbitMq官方镜像直接使用
docker pull rabbitmq:3.8.9#创建容器绑定端口以便外部访问
docker run -itd --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.8.9#进入容器进行配置
docker exec -it rabbit bash
至于linux下php的amqp不赘述
二、RabbitMq配置
思路如下:
使用RabbitMq自带的管理插件进行管理监控队列情况,由于默认情况下RabbitMq会生成一个账号以及密码都是guest的账号供使用,该账号相当于linux的root账号,有全部的权限所以一般情况下不能提供给客户端去使用。应该创建一个管理账号以及客户端使用的账号。前者用来通过管理插件日常管理队列状况,后者供客户端(生产者、消费者)使用。接着上面的流程进入到容器使用rabbitmqctl命令配置
#常用的rabbitmq命令如下
rabbitmq-defaults rabbitmq-env rabbitmq-queues rabbitmq-upgrade
rabbitmq-diagnostics rabbitmq-plugins rabbitmq-server rabbitmqctl#新增walter账号,密码是111111
rabbitmqctl add_user walter 111111#设置walter账号为监察员角色,用来在管理插件中进行监控使用的账号
rabbitmqctl set_user_tags walter monitoring#新增workman账号,密码是111111 这个用户主要是用来给客户端使用,不需要设置角色
rabbitmqctl add_user workman 111111#创建一个名为app1的虚拟主机用来放置业务队列等
rabbitmqctl add_vhost app1#给账号为新增的虚拟主机设置权限,不然操作不了这个虚拟主机下的资源(exchange、queues等)
rabbitmqctl set_permissions -p app1 guest .* .* .*
rabbitmqctl set_permissions -p app1 walter .* .* .*
rabbitmqctl set_permissions -p app1 workman .* .* .*#查看rabbitmq内置的插件列表,看看有没有我们要使用的rabbitmq_management
rabbitmq-plugins list
Listing plugins with pattern ".*" ...Configured: E = explicitly enabled; e = implicitly enabled| Status: * = running on rabbit@7528c5f95339|/
[ ] rabbitmq_amqp1_0 3.8.9
[ ] rabbitmq_auth_backend_cache 3.8.9
[ ] rabbitmq_auth_backend_http 3.8.9
[ ] rabbitmq_auth_backend_ldap 3.8.9
[ ] rabbitmq_auth_backend_oauth2 3.8.9
[ ] rabbitmq_auth_mechanism_ssl 3.8.9
[ ] rabbitmq_consistent_hash_exchange 3.8.9
[ ] rabbitmq_event_exchange 3.8.9
[ ] rabbitmq_federation 3.8.9
[ ] rabbitmq_federation_management 3.8.9
[ ] rabbitmq_jms_topic_exchange 3.8.9
[ ] rabbitmq_management 3.8.9
[e*] rabbitmq_management_agent 3.8.9
[ ] rabbitmq_mqtt 3.8.9
[ ] rabbitmq_peer_discovery_aws 3.8.9
[ ] rabbitmq_peer_discovery_common 3.8.9
[ ] rabbitmq_peer_discovery_consul 3.8.9
[ ] rabbitmq_peer_discovery_etcd 3.8.9
[ ] rabbitmq_peer_discovery_k8s 3.8.9
[E*] rabbitmq_prometheus 3.8.9
[ ] rabbitmq_random_exchange 3.8.9
[ ] rabbitmq_recent_history_exchange 3.8.9
[ ] rabbitmq_sharding 3.8.9
[ ] rabbitmq_shovel 3.8.9
[ ] rabbitmq_shovel_management 3.8.9
[ ] rabbitmq_stomp 3.8.9
[ ] rabbitmq_top 3.8.9
[ ] rabbitmq_tracing 3.8.9
[ ] rabbitmq_trust_store 3.8.9
[e*] rabbitmq_web_dispatch 3.8.9
[ ] rabbitmq_web_mqtt 3.8.9
[ ] rabbitmq_web_mqtt_examples 3.8.9
[ ] rabbitmq_web_stomp 3.8.9
[ ] rabbitmq_web_stomp_examples 3.8.9#开启rabbitmq_management插件,这个是提供web服务的插件。开启后我们可以通过web页面来管理rabbitmq上面的资源
rabbitmq-plugins enable rabbitmq_management
记住如果linux服务器的话要允许外部访问15672、5672这两个端口。然后在本地就可以通过15672端口来访问rabbitmq_management组件了
三、laravel5.8集成RabbitMq
具体参考laravel社区的教程
主要遇到的坑就是使用composer安装的时候有可能因为没有安装php的amqp扩展报错,尤其是直接使用教程中那种不带版本的命令来安装时。composer会默认使用最高版本,但是目前最高版本的要求是laravel8。当前由于我的laravel版本是5.8所以选择了v7.2.0版本的扩展库。其他的基本如教程一致
composer require vladimir-yuldashev/laravel-queue-rabbitmq:v7.2.0
下面是我的env文件的基本配置信息
config目录下的app.php配置文件中注册服务提供者LaravelQueueRabbitMQServiceProvider
config目录下queue配置文件在connections中新增rabbitmq配置(这设置只针对v7.2.0版本的laravel-queue-rabbitmq扩展库,最新的扩展库参考github中的说明)
'rabbitmq' => ['driver' => 'rabbitmq','dsn' => env('RABBITMQ_DSN', null),/** Could be one a class that implements \Interop\Amqp\AmqpConnectionFactory for example:* - \EnqueueAmqpExt\AmqpConnectionFactory if you install enqueue/amqp-ext* - \EnqueueAmqpLib\AmqpConnectionFactory if you install enqueue/amqp-lib* - \EnqueueAmqpBunny\AmqpConnectionFactory if you install enqueue/amqp-bunny*/'factory_class' => Enqueue\AmqpLib\AmqpConnectionFactory::class,'host' => env('RABBITMQ_HOST', '127.0.0.1'),'port' => env('RABBITMQ_PORT', 5672),'vhost' => env('RABBITMQ_VHOST', '/'),'login' => env('RABBITMQ_LOGIN', 'guest'),'password' => env('RABBITMQ_PASSWORD', 'guest'),'queue' => env('RABBITMQ_QUEUE', 'default'),'options' => ['exchange' => ['name' => env('RABBITMQ_EXCHANGE_NAME'),/** Determine if exchange should be created if it does not exist.*/'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),/** Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html*/'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'),],'queue' => [/** Determine if queue should be created if it does not exist.*/'declare' => env('RABBITMQ_QUEUE_DECLARE', true),/** Determine if queue should be binded to the exchange created.*/'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true),/** Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html*/'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),'durable' => env('RABBITMQ_QUEUE_DURABLE', true),'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'),],],/** Determine the number of seconds to sleep if there's an error communicating with rabbitmq* If set to false, it'll throw an exception rather than doing the sleep for X seconds.*/'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', 5),/** Optional SSL params if an SSL connection is used* Using an SSL connection will also require to configure your RabbitMQ to enable SSL. More details can be founds here: https://www.rabbitmq.com/ssl.html*/'ssl_params' => ['ssl_on' => env('RABBITMQ_SSL', false),'cafile' => env('RABBITMQ_SSL_CAFILE', null),'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),],],
laravel中创建一个队列任务类、以及命令行类(使用laravel命令或者手动创建都行)
队列任务类
<?phpnamespace App\Jobs;use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;class RabbitJob implements ShouldQueue
{use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;private $data;/*** Create a new job instance.** @return void*/public function __construct($data){//$this->data = $data;}/*** Execute the job.** @return void*/public function handle(){try{echo json_encode(['code'=>200,'msg'=>'success','data'=>$this->data],JSON_UNESCAPED_UNICODE);}catch (\Exception $e){echo json_encode(['code'=>$e->getCode(),'msg'=>$e->getMessage(),'data'=>null],JSON_UNESCAPED_UNICODE);}}public function failed($exception = null){if ($exception instanceof \Exception){file_put_contents('c.log',$exception->getTraceAsString());}}
}
命令类
<?phpnamespace App\Console\Commands;use App\Jobs\RabbitJob;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\App;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Redis;class Test extends Command
{/*** The name and signature of the console command.** @var string*/protected $signature = 'command:test';/*** The console command description.** @var string*/protected $description = '测试命令';/*** Create a new command instance.** @return void*/public function __construct(){parent::__construct();}/*** Execute the console command.** @return mixed*/public function handle(){$this->info('test start '.date('Y-m-d H:i:s'));//延时10秒推送到rabbit队列中RabbitJob::dispatch(['name'=>'walter','date'=>date('Y-m-d H:i:s'),'remark'=>'rabbit'])->onQueue('rabbit')->delay(10);//正常推送任务到default队列中RabbitJob::dispatch(['name'=>'walter','date'=>date('Y-m-d H:i:s'),'remark'=>'default']);$this->info('test finish '.date('Y-m-d H:i:s'));}
命令行是为了想快速的测试队列推送功能。直接执行laravel内部命令来执行上面的命令类。队列推送完毕
php artisan command:testtest start 2021-01-13 16:20:59
test finish 2021-01-13 16:20:59
下面登录到rabbitmq_management插件查看队列情况,可以看到队列列表里面有3个队列,其中default就是直接推送上来的队列。enqueue.rabbit.rabbit.10000.x.delay为上面的延时队列推送功能中,laravel-queue-rabbitmq扩展库通过rabbitmq的ttl过期机制创建的延时10秒的过度队列,最终在这个过度队列里面放入消息最后消息结束后再把消息放到我们最终的rabbit队列中。
在一个新的命令行中使用以下命令消费default队列
php artisan queue:work --tries=1[2021-01-13 16:30:07][5ffead6bb240b2.72849874] Processing: App\Jobs\RabbitJob
{"code":200,"msg":"success","data":{"name":"walter","date":"2021-01-13 16:20:59","remark":"default"}}[2021-01-13 16:30:07][5ffead6bb240b2.72849874] Proces
sed: App\Jobs\RabbitJob
default队列中ready的1条消息已经变成0,证明消息已经消费
命令会长期挂起不停的监控default队列中的消息,已有消息就会马上执行。至于rabbit队列的消费类似default的命令只是外加一个--queue=rabbit参数就行。记得一定不要忘了--tries参数,不然一旦消费的时候出现异常会触发消息重新推送到队列导致不停的消费消息的死循环。
php artisan queue:work --tries=1 --queue=rabbit[2021-01-13 16:34:06][5ffead6bb0c9a4.56093290] Processing: App\Jobs\RabbitJob
{"code":200,"msg":"success","data":{"name":"walter","date":"2021-01-13 16:20:59","remark":"rabbit"}}[2021-01-13 16:34:06][5ffead6bb0c9a4.56093290] Process
ed: App\Jobs\RabbitJob
laravel5.8接入RabbitMq队列服务相关推荐
- (四)RabbitMQ消息队列-服务详细配置与日常监控管理
(四)RabbitMQ消息队列-服务详细配置与日常监控管理 原文:(四)RabbitMQ消息队列-服务详细配置与日常监控管理 RabbitMQ服务管理 启动服务:rabbitmq-server -de ...
- 使用EasyNetQ组件操作RabbitMQ消息队列服务
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现,是实现消息队列应用的一个中间件,消息队列中间件是分布式系统中重要的组件,主要解决应用耦合, ...
- 理论修炼之RabbitMQ,消息队列服务的稳健者
????欢迎点赞 :???? 收藏 ⭐留言 ???? 如有错误敬请指正,赐人玫瑰,手留余香! ????本文作者:由webmote 原创,首发于 [掘金] ????作者格言:生活在于折腾,当你不折腾生活 ...
- Day10-Python3基础-协程、异步IO、redis缓存、rabbitMQ队列
内容目录: Gevent协程 Select\Poll\Epoll异步IO与事件驱动 Python连接Mysql数据库操作 RabbitMQ队列 Redis\Memcached缓存 Paramiko S ...
- Spring接入RabbitMQ
在原有的Spring项目中接入RabbitMQ,使用的Spring集成RabbitMQ,这样 RabbitMQ的功能调用通过Spring封装,调用更加简洁. 首先pom 文件中引入依赖 <dep ...
- python rabitmq_python RabbitMQ队列使用
原博文 2019-01-17 21:17 − python RabbitMQ队列使用 关于python的queue介绍 关于python的队列,内置的有两种,一种是线程queue,另一种是进程queu ...
- RabbitMQ 队列消息持久化
参考链接: https://www.cnblogs.com/Keep-Ambition/p/8044752.html 假如消息队列test里面还有消息等待消费者(consumers)去接收,但是这个时 ...
- [RabbitMQ]队列持久化
RabbitMQ持久化 概念 如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失.默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做. ...
- 云平台队列服务-Qbus实践
女主宣言 小编这篇文章从Qbus服务介绍.架构.使用场景开展,不涉及到队列底层具体实现原理,只讲把Qbus服务上到私有云的过程,希望给有相同需求的各位带来一点有价值的东西.下来就跟随作者一同学习下吧. ...
- java 结合redis队列_在 Java 中使用 redis 的消息队列服务
前言 关于 redis 我们前面已经讨论过了缓存.分布式锁.分布式唯一标识.LBS服务的用法,这里我们来谈谈利用 redis 来实现一个消息服务. 典型的消息服务是一个生产者和消费者模式的服务.一般是 ...
最新文章
- 新技能 Get,使用直方图处理进行颜色校正
- 第三代测序技术的方法原理及其在生物领域的应用
- 公文字体字号标准2020_党政机关公文格式(收藏)
- Linux 高级存储管理
- python从数分到数编(part2)--随机数及数组
- java调python画图_Python Matplotlib plot 绘图
- LED 将为我闪烁: 控帘 j发光二级管
- Python+django网页设计入门(12):使用Bootstrap和jQuery
- 实现1-2+3-4+5-6+…+99-100的算法的程序
- 项目Alpha冲刺(6/10)
- Spark提交任务到集群
- 电脑W7系统怎样安装鸿蒙系统,家用电脑升级win7系统的操作方法
- 清翔51单片机开发板及原理图-去年购买的
- 【Elasticsearch】 (搜索引擎如何做搜索推荐?) Elasticsearch中 使用 Suggesters 推荐查询
- Python3使用Xpath解析网易云音乐歌手页面
- 中兴B863AV3.2-M_安卓9.0系统_线刷包及教程
- 什么是802.11ax(Wi-Fi 6)
- 转贴:谁说我会画板?
- H323、H248(MGCP)、SIP三协议浅析
- 简述xss漏洞原理及危害?xss漏洞有哪些类型?xss漏洞哪个类型危害最大?如何防御xss漏洞