概述:

在上一章节介绍的工作模式中,我们的消费会进行轮询发送给所有的消息消费者,每个消费者接受消息之和为全部消息。本章节介绍的订阅发布者模式则为:将消息传递给所有的消息消费者,每个消费者都能接受到全部的消息。并且在订阅发布章节我们将新引入一个新的概念,交换机(Exchange)概念。

一、pom.xml 依赖

只需要引入rabbitMq的依赖即可

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>RabbitMQ</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency></dependencies></project>

二、发布者(生产者)代码实现过程:

  1. 创建RabbitMQ连接对象Connection;
  2. 创建信道Channel;
  3. 使用Channel声明交换机信息(名称、类型),交换机类型有四种 direct,topic,headers 和fanout。本示例使用fanout;
  4. 声明所有的消息队列;
  5. 将消息队列与交换机进行绑定;
  6. 发送消息;
  7. 关闭信道和mq连接;

java代码实现如下:

package com.xiaohui.rabbitmq.publishSubscribe;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {//交换机名称public static final String FANOUT_EXCHANGE = "fanout_exchange";//队列1public static final String FANOUT_QUEUE_1 = "fanout_queue_1";//队列2public static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//创建信道Channel channel = connection.createChannel();//声明交换机/*** 参数一:交换机名称* 参数二:交换机类型:fanout \topic \direct*/channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);/*** 声明队列* durabe:持久化的消息,mq重启后消息仍在。* exclusive :独占的,一个消息队列独占一个连接*/channel.queueDeclare(Producer.FANOUT_QUEUE_1,true,false,false,null);channel.queueDeclare(Producer.FANOUT_QUEUE_2,true,false,false,null);//队列绑定交换机channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,"");for (int i = 1; i < 5; i++) {String msg = "订阅发布模式小兔子来啦........"+i;channel.basicPublish(Producer.FANOUT_EXCHANGE,"",null,msg.getBytes());}channel.close();connection.close();}
}

三、订阅者代码实现步骤

  1. 创建消费端链接
  2. 创建消费端渠道
  3. 声明交换机
  4. 声明消费队列
  5. 队列绑定到交换机
  6. 监听消息回调处理
  7. 消息监听开启

java代码实现如下:

package com.xiaohui.rabbitmq.publishSubscribe;import com.rabbitmq.client.*;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Cunsumer1 {public static void main(String[] args) throws IOException, TimeoutException {//创建消费端链接Connection connection = ConnectionUtils.getConnection();//创建消费端渠道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);//声明消费队列channel.queueDeclare(Producer.FANOUT_QUEUE_1, true,false,false,null);//队列绑定到交换机channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");//监听消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("==================订阅消费者1开始===================");System.out.println("路由的key为:"+envelope.getRoutingKey());System.out.println("交换机为:"+envelope.getExchange());System.out.println("消息ID为:"+envelope.getDeliveryTag());System.out.println("收到的消息为:"+new String(body,"UTF-8"));System.out.println("===================订阅消费者1结束==================");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}};/*** 第二个参数表示是否 向mqserver自动回复收到* 第三个参数表示消息回调*/channel.basicConsume(Producer.FANOUT_QUEUE_1,true,consumer);}
}

订阅者2 代码实现:

package com.xiaohui.rabbitmq.publishSubscribe;import com.rabbitmq.client.*;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Cunsumer2 {public static void main(String[] args) throws IOException, TimeoutException {//创建消费端链接Connection connection = ConnectionUtils.getConnection();//创建消费端渠道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);//声明消费队列channel.queueDeclare(Producer.FANOUT_QUEUE_2, true,false,false,null);//队列绑定到交换机channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,"");//监听消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("==================订阅消费者2开始===================");System.out.println("路由的key为:"+envelope.getRoutingKey());System.out.println("交换机为:"+envelope.getExchange());System.out.println("消息ID为:"+envelope.getDeliveryTag());System.out.println("收到的消息为:"+new String(body,"UTF-8"));System.out.println("===================订阅消费者2结束==================");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}};/*** 第二个参数表示是否 向mqserver自动回复收到* 第三个参数表示消息回调*/channel.basicConsume(Producer.FANOUT_QUEUE_2,true,consumer);}
}

运行打印(两个控制台一样):

==================订阅消费者1开始===================
路由的key为:
交换机为:fanout_exchange
消息ID为:10
收到的消息为:订阅发布模式小兔子来啦........1
===================订阅消费者1结束==================
==================订阅消费者1开始===================
路由的key为:
交换机为:fanout_exchange
消息ID为:11
收到的消息为:订阅发布模式小兔子来啦........2
===================订阅消费者1结束==================
==================订阅消费者1开始===================
路由的key为:
交换机为:fanout_exchange
消息ID为:12
收到的消息为:订阅发布模式小兔子来啦........3
===================订阅消费者1结束==================
==================订阅消费者1开始===================
路由的key为:
交换机为:fanout_exchange
消息ID为:13
收到的消息为:订阅发布模式小兔子来啦........4
===================订阅消费者1结束==================

RabbitMQ(五) 订阅发布者模式介绍以及代码实现相关推荐

  1. RabbitMQ (五) 订阅者模式之分发模式 ( fanout )

    前面讲到了简单队列和工作队列. 这两种队列有个非常明显的缺点 : 生产者发送的消息,只能进入到一个队列. 消息只能进入到一个队列就意味着消息只能被一个消费者消费. 尽管工作队列模式中,一个队列中的消息 ...

  2. RabbitMQ五种工作模式

    RabbitMQ五种工作模式 1.简单队列 一个生产者对应一个消费者!! 2.work 模式 一个生产者对应多个消费者,但是一条消息只能有一个消费者获得消息!!! 轮询分发就是将消息队列中的消息,依次 ...

  3. 消息中间件的应用场景与 RabbitMQ的六种工作模式介绍

    消息中间件的应用场景与 RabbitMQ的六种工作模式介绍 消息中间件应用场景 异步处理 应用解耦 流量削峰 RabbitMQ的六种工作模式 简单模式 工作模式 发布订阅模式 路由模式 主题模式 PR ...

  4. 用订阅/发布者模式解决异步函数结果依赖的问题

    我们都知道node是基于事件无阻塞i/o模型的,所以说大部分函数都是以异步实现的,请看下面代码: db.query(sql1, function (err, data) {//code })db.qu ...

  5. 生产者消费者模式与订阅发布者模式的区别

    订阅发布者模式本质上也是一种生产者消费者模式,订阅者是消费者,发布者是生产者.如果一定要说个区别,就是抽象级别的区别吧. 订阅者肯定是个消费者,但消费者不一定是订阅者,发布者一定是个生产者,但生产者不 ...

  6. RabbitMq的工作模式 介绍+测试代码,以及三种Exchange模式介绍.

    RabbitMq的提供了六种模式分别是:简单模式,工作模式,发布\订阅模式,路由模式,通配符模式,RPC远程调用模式 下面将详细介绍常用的前五种模式,附上测试代码. 公共的代码---连接工具类: pu ...

  7. RabbitMQ七种队列模式介绍与应用场景(通俗易懂)

    七种模式介绍与应用场景 简单模式(Hello World) 做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B 应用场景:将发送的电子邮件放到消息队列 ...

  8. RabbitMQ七种工作模式实现测试代码

    所有工作模式依赖都相同 <dependencies><!--RabbitMQ的客户端依赖--><dependency><groupId>com.rabb ...

  9. Spring Boot基础学习笔记25:RabbitMQ - 发布/订阅工作模式

    文章目录 零.学习目标 一.准备工作 (一)创建Spring Boot项目 - PublishSubscribeDemo (二)在应用属性文件里配置RabbitMQ 二.基于API进行消息发布和订阅 ...

最新文章

  1. 软件工程 之 动物世界
  2. 1.Spring Cloud Alibaba教程:简介
  3. 外贸常用术语_推荐必看!外贸、货代人订舱常用术语及订舱单中英对照!收藏备用...
  4. oracle alter_log,在线查看alter.log文件内容
  5. 【渝粤题库】广东开放大学 民事诉讼法学 形成性考核
  6. JQuery(三)-- AJAX的深入理解以及JQuery的使用
  7. CSAPP--整数的运算
  8. 目标检测——评价指标的学习笔记
  9. tidb 企业_TiDB,日均千万级数据存储方案选型
  10. Protel 99 SE 应用技术问答
  11. 全新ThihkPHP聚合支付系统源码+兼容全部易支付
  12. Premiere Pro 2022安装教程(附详细图文教程)
  13. php excel导出pdf文件,如何修复“无法加载PDF呈现库”使用PHPExcel TCPDF将Excel导出为PDF...
  14. 命令模式,升级版的灭霸响指
  15. “无须”与“无需”最简易区别法
  16. 生活中的数学:买几送几
  17. 计算机程序的灵魂,算法——抓住程序的灵魂
  18. WPS画报的电脑壁纸怎么下载
  19. 电脑重装系统怎么清理c盘空间
  20. Java笔记整理六(File类,递归,字节流IO,字符流IO,流中的异常处理,属性集Properties,缓冲流,转换流,序列化,打印流)

热门文章

  1. 从C语言开始的语言革命
  2. Spring自学日志06(Aop)
  3. Hive安装Version2.1.0
  4. JAVA设计模式 - 单例模式
  5. 【java】关于Map的排序性的一次使用,有序的Map
  6. windows服务器远程执行命令(PowerShell+WinRM)
  7. POJ 3154 Graveyard【多解,数论,贪心】
  8. leetcode 【 Sort List 】 python 实现
  9. 用C实现的一个Bash脚本
  10. Django搭建个人博客:文章标签功能