场景:微服务中,会遇到这样的案例:用户申请提现,总后台(后台服务)审核通过,通知资金服务 更新数据;

1.安装 composer 包

composer requires php-amqplib/php-amqplib ^2.12

2. env 追加配置

RABBITMQ_HOST=xb_rabbitmq
RABBITMQ_PORT=5672
#通过15672创建的rabbitmq虚拟主机,默认是'/'
RABBITMQ_VHOST=/
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
#通过15672创建的rabbitmq队列
RABBITMQ_QUEUE=withdrawal-queue
RABBITMQ_EXCHANGE=withdrawal-exchangeQUEUE_CONNECTION=rabbitmq # 更新

3.追加配置 config/queue.php connections 下 后 执行 php artisan config:cache

'rabbitmq' => ['driver'                => 'rabbitmq','host'                  => env('RABBITMQ_HOST', '127.0.0.1'),'port'                  => env('RABBITMQ_PORT', 5672),'vhost'                 => env('RABBITMQ_VHOST', '/'),'login'                 => env('RABBITMQ_LOGIN', 'guest'),'password'              => env('RABBITMQ_PASSWORD', 'guest'),'queue'                 => env('RABBITMQ_QUEUE'), // name of the default queue,'exchange_name'         => env('RABBITMQ_QUEUE'),'exchange_declare'      => env('RABBITMQ_EXCHANGE_DECLARE', true), // create the exchange if not exists'queue_declare_bind'    => env('RABBITMQ_QUEUE_DECLARE_BIND', true), // create the queue if not exists and bind to the exchange'queue_params'          => ['passive'           => env('RABBITMQ_QUEUE_PASSIVE', false),'durable'           => env('RABBITMQ_QUEUE_DURABLE', true),'exclusive'         => env('RABBITMQ_QUEUE_EXCLUSIVE', false),'auto_delete'       => env('RABBITMQ_QUEUE_AUTODELETE', false),],'exchange_params' => ['name'        => env('RABBITMQ_EXCHANGE_NAME', null),'type'        => env('RABBITMQ_EXCHANGE_TYPE', 'direct'), // more info at http://www.rabbitmq.com/tutorials/amqp-concepts.html'passive'     => env('RABBITMQ_EXCHANGE_PASSIVE', false),'durable'     => env('RABBITMQ_EXCHANGE_DURABLE', true), // the exchange will survive server restarts'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),],],

4.审核通过方法

    public function adopt(){//我将用户提现申请,审核通过 我生产一条消息 等待资金服务消费去做资金改动$withdrawalId = 100;//提现idevent(new WithdrawalEvent(['withdrawal_id' => $withdrawalId]));}

5.创建 监听事件 Event

<?phpnamespace App\Events;use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;class WithdrawalEvent
{use Dispatchable, InteractsWithSockets, SerializesModels;public $withdrawal;public function __construct($withdrawal){$this->withdrawal= $withdrawal;}/*** Get the channels the event should broadcast on.** @return \Illuminate\Broadcasting\Channel|array*/public function broadcastOn(){return new PrivateChannel('channel-name');}
}

5.创建 监听事件 Listener

<?phpnamespace App\Listeners;use App\Events\WithdrawalEvent;
use Illuminate\Support\Facades\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;class WithdrawalListener
{protected $config = [];public function __construct(){$this->config = config('queue.connections.rabbitmq');}/*** Handle the event.** @param  WithdrawalEvent  $event* @return void*/public function handle(WithdrawalEvent $event){try {$connect = new AMQPStreamConnection( //建立生产者与mq之间的连接$this->config['host'],$this->config['port'],$this->config['login'],$this->config['password'], '/');$channel = $connect->channel(); //在已连接基础上建立生产者与mq之间的通道$channel->exchange_declare($this->config['exchange_name'], 'direct', false, true, false); //声明初始化交换机$channel->queue_declare($this->config['queue'], false, true, false, false); //声明初始化一条队列$channel->queue_bind($this->config['queue'], $this->config['exchange_name']); //将队列与某个交换机进行绑定,并使用路由关键字$msgBody = json_encode($event->withdrawal);$msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息$channel->basic_publish($msg, $this->config['exchange_name']); //推送消息到某个交换机$channel->close();$connect->close();}catch (\Exception $exception){Log::info($exception->getMessage());}}
}

6.创建服务层

<?phpnamespace App\Services;use Illuminate\Support\Facades\Log;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;class RmqClientService
{/*** @var object $instance 单例对象*/private static $instance = null;/*** @var object $connection 队列连接对象*/private $connection = null;/*** @var object $channel 队列通道对象*/private $channel = null;/*** @var object $message 队列消息对象*/private $message = null;/*** 构造函数**/private function __construct(){//dd(config('queue.connections.rabbitmq.vhost'));$this->connection = new AMQPStreamConnection(config('queue.connections.rabbitmq.host'),config('queue.connections.rabbitmq.port'),config('queue.connections.rabbitmq.login'),config('queue.connections.rabbitmq.password'),config('queue.connections.rabbitmq.vhost'));$this->channel = $this->connection->channel();$this->message = new AMQPMessage('', ['content_type' => 'json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);}/*** FunctionName:__clone* Description:克隆* Author:lwl*/private function __clone(){}/*** 析构函数*/public function __destruct(){$this->channel->close();$this->connection->close();self::$instance = null;}/*** FunctionName:getInstance* Description:单例实例化入口* Author:lwl* @return RmqClientService|object|null*/public static function getInstance(){if (!self::$instance instanceof self) {self::$instance = new self();}return self::$instance;}/*** FunctionName:consumer* Description:消费队列* Author:lwl* @param string $queue 队列名称* @param boolean $bForceDelete 是否取后即删* @return AMQPMessage|null*/public function consumer(string $queue, bool $bForceDelete = false){try {// 取数据// 声明队列// 不检测同名队列,持久化,不允许其他队列访问,不自动删除队列$this->channel->queue_declare($queue, false, true, false, false);$message = $this->channel->basic_get($queue);if ($message && $bForceDelete) {// 回复确认信息$this->channel->basic_ack($message->delivery_info['delivery_tag']);}} catch (\Exception $exception) {Log::info($exception->getMessage());}return $message;}/*** FunctionName:ack* Description:* Author:lwl* @param $nTag 消息传递标签* @return mixed*/public function ack($nTag){try {$this->channel->basic_ack($nTag);} catch (\Exception $e) {dd($e->getMessage());}return null;//success 个人响应}
}

7.创建 Command

<?php
namespace App\Console\Commands;use App\Services\RmqClientService;
use Illuminate\Console\Command;class WithdrawalConsumerCommand extends Command
{protected $signature = 'withdrawal:consumer';protected $description = '消费提现审核通过后的消息';public function handle(){while (true) {$service = RmqClientService::getInstance();$queue = config('queue.connections.rabbitmq.queue');$response = $service->consumer($queue, true);if ($response) {$result = json_decode($response->body,1);dd($result);//资金服务后续操作}dd('service error');}}
}

8.测试

1.通过 ‘提现审核通过‘ 的路由 生产消息http://test.test/api/user/adopt2.执行 php artisan withdrawal:consumer #如下图 生产环境使用 Supvervisor 等进程管理 常驻监听 (请查看 10)

规则 说明
direct 精准推送
fanout 广播 推送到绑定到此交换机下的所有队列
topic 组播 比如上面我绑定的关键字是sms_send,那么他可以推送到*.sms_send的所有队列
headers 这个目前不知道是如何推送的

9.在创建交换机和队列的时候各个常用参数说明 地址

name: $queue    // should be unique in fanout exchange. [队列名称]passive: false  // don't check if a queue with the same name exists [是否检测同名队列]durable: false // the queue will not survive server restarts [是否开启队列持久化]exclusive: false // the queue might be accessed by other channels [队列是否可以被其他队列访问]auto_delete: true //the queue will be deleted once the channel is closed. [通道关闭后是否删除队列]name: $exchange [交换机名称]type: direct [路由类型]passive: false []durable: true [交换机是否开启持久化]auto_delete: false //the exchange won't be deleted once the channel is closed.

10.Supvervisor 守护进程 消费 ,进入容器内

10-1.安装 supervisor

apt-get install supervisor

10-2.切换到配置目录

cd /etc/supervisor/conf.d

10-3.写入配置 vim mq.conf

[program:withdrawal_consumer]                                       #管理进程的命名command=php artisan withdrawal:consumer     #执行的命令stderr_logfile=/var/log/supervisor/error.log      #错误日志输出路径stdout_logfile=/var/log/supervisor/supervisor.log   #日志输出路径directory=/workspace/xiaoba/oldLiuCms      #命令执行的工作空间autostart=true                 #自动启动user=root                   #指定用户autorestart=true                #自动重启

10-4.重新加载

service  supervisor   force-reload

10-5.启动进程

 service  supervisor   start withdrawal_consumer

Laravel 8.63.0 之 RabbitMQ 生产消费案例相关推荐

  1. Ubuntu20.04裸机上配置单机 Pulsar2.7.0 成功并生产消费消息

    一.系统说明: 本机是Window10系统,开启虚拟机Hyper-V后,创建Ubuntu20.04的系统,并在系统上安装JDK1.8之后,成功启动单机模式Pulsar,并在单机集群上消费生产消息: w ...

  2. rabbitmq订单消费案例

    需求: 订单provide: @Service public class OrderService extends BaseApiService implements RabbitTemplate.C ...

  3. rocketmq 消息指定_SpringBoot 整合 RocketMQ 如何实现消息生产消费?

    有时候我们在使用消息队列的时候,往往需要能够保证消息的顺序消费,而RocketMQ是可以支持消息的顺序消费的. RocketMQ在发送消息的时候,是将消息发送到不同的队列中,然后消费端从多个队列中读取 ...

  4. Java实现生产消费模型的5种方式

    ** 前言 ** 生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储 ...

  5. kafka生产消费原理笔记

    一.什么是kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性 ...

  6. 1月10日科技资讯|支付宝集五福下周一开始;iPhone 面世 13 周年;Laravel 6.10.0 发布

    一分钟速览新闻点 周鸿祎回应年会特等奖「免裁券」:编的自黑段子 搜狐迟到一次罚款 500,回应:希望员工对工作有激情 华为否认将推出消费级台式机:只提供芯片,不做整机 微信放开5000人好友上限,但新 ...

  7. Java程序创建Kafka Topic,以及数据生产消费,常用的命令

    转自: Java程序创建Kafka Topic,以及数据生产消费,常用的命令_Zyy_z_的博客-CSDN博客_java kafka创建topicKafka简介: Kafka是一个分布式发布--订阅消 ...

  8. java生产消费线程小例子

    生产消费线程型是理解多线程的一个好例子,实际上,准确说应该是"生产者-消费者-仓储"模型,离开了仓储,生产者消费者模型就显得没有说服力了. 对于此模型,应该明确一下几点: 1.生产 ...

  9. 支付宝集五福下周一开始;iPhone 面世 13 周年;Laravel 6.10.0 发布 | 极客头条

    整理 | 屠敏 快来收听极客头条音频版吧,智能播报由标贝科技提供技术支持. 「极客头条」-- 技术人员的新闻圈! CSDN 的读者朋友们早上好哇,「极客头条」来啦,快来看今天都有哪些值得我们技术人关注 ...

  10. 深入理解Linux 条件变量2:使用条件变量实现[生产-消费]框架

    前言 在上一篇文章<深入理解Linux 条件变量1:使用场景.接口说明>我们简单介绍了条件变量的使用场景以及相关接口,正如大神linus所说:talk is cheap,show me t ...

最新文章

  1. iOS逆向之旅(进阶篇) — 工具(LLDB)
  2. 【架构零】大型网站的架构的目标与挑战
  3. Codeforces Round #486 (Div. 3) C Equal Sums (map+pair)
  4. C++类中使用new及delete小例子
  5. sqlserver的@@IDENTITY在oracle中怎样实现
  6. 初识Notification
  7. 表弟面试被虐,我教他缓存连招,借机蹭了波奈雪的茶
  8. 将samba加入到windows域《转载》
  9. 数据上报java_通过Jaeger上报Java应用数据
  10. Flink的窗口聚合操作(Time\Count Window)
  11. Ubuntu——系统扩容(加硬盘)的学习笔记
  12. 哪些因素影响大数据的发展
  13. leetcode(85)最大矩形
  14. qtvs添加qchart_如何使用Qt Designer在窗体中插入QChartView?
  15. Cannot add task ‘wrapper‘ as a task with that name already exists.
  16. java 随机数性能优化
  17. python爬公众号文章_python爬取指定微信公众号文章
  18. matlab2017b安装之后点桌面图标黑框闪退
  19. LoadRunner教程(15)-LoadRunner 初识Analysis
  20. ACM程序设计之马拉松竞赛

热门文章

  1. pycharm条件判断
  2. 解决ImportError: cannot import name ‘_validate_lengths‘
  3. 轻量级数据格式 —— JSON
  4. MateMask连接本地私有链节点ganache
  5. HTML特殊符号表示方法
  6. 九酷音乐真实地址解析
  7. JAVA在线考试系统毕业设计 开题报告
  8. 造梦西游4手游服务器维护,造梦西游4手机版常见问题解答 新手看过来
  9. 人脸识别翼闸使用规范_介绍市场上最常用的通道翼门和人脸识别的使用
  10. 微软输入法数字有间隔_Word 2016中使用微软拼音输入法,换行后输入数字、标点时会没有输入进去,需要再输入一次...