文章目录

  • Exchange 交换机
    • 交换机属性
    • Direct Exchange
      • Producer
      • Consumer
    • Topic Exchange
      • Producer
      • Consumer
    • Fanout Exchange
      • Producer
      • Consumer
    • Headers Exchange
  • Binding-绑定
  • Queue-消息队列
  • Message-消息
    • Message-其他属性
    • 代码演示
      • Producer
      • Consumer
  • Virtual Host-虚拟主机
  • 相关链接

Exchange 交换机

  • Exchange : 接收消息, 并根据路由键转发消息所绑定的队列

交换机属性

  • Name : 交换机名称
  • Type : 交换机类型, direct, topic, fanout, headers
  • Durability : 是否需要持久化, true为持久化
  • Auto Delete : 当最后一个绑定到Exchange上的队列删除后, 自动删除该Exchange
  • Internal : 当前Exchange是否用于RabbitMQ内部使用, 默认为False, 这个属性很少会用到
  • Arguments : 扩展参数, 用于扩展AMQP协议制定化使用

Direct Exchange

  • Direct Exchange : 所有发送到Direct Exchange的消息被转发到RoutingKey中指定的Queue

    注意 : Direct模式可以使用RabbitMQ自带的Exchange(default Exchange), 所以不需要将Exchange进行任何绑定(binding)操作, 消息传递时, RoutingKey必须完全匹配才会被队列接收, 否则该消息会被抛弃

Producer

package com.qiyexue.exchange.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** Direct模式的生产者** @author 七夜雪* @create 2018-12-13 22:00*/
public class ProducerByDirect {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂, 设置属性ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.72.138");factory.setPort(5672);factory.setVirtualHost("/");// 2. 获取连接Connection connection = factory.newConnection();// 3. 创建channelChannel channel = connection.createChannel();// 4. 声明String exchangeName = "test_direct_exchange";// Direct模式必须和消费者保持一致才能发送消息, 不然消息会被丢弃String routingKey = "test.direct";// 5. 发送消息String msg = "Hello RabbitMQ By Direct";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());// 6. 关闭连接channel.close();connection.close();}}

Consumer

package com.qiyexue.exchange.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** Direct模式消费者** @author 七夜雪* @create 2018-12-13 22:01*/
public class ConsumerByDirect {public static void main(String[] args) throws Exception {// 1. 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.72.138");factory.setPort(5672);factory.setVirtualHost("/");// 2. 获取连接Connection connection = factory.newConnection();// 3. 创建channelChannel channel = connection.createChannel();// 4. 声明// 交换机名称String exchangeName = "test_direct_exchange";// 交换机类型String exchangeType = "direct";String queueName = "test_direct_queue";// Direct模式RoutingKey必须和生产者保持一致才能消费String routingKey = "test.direct";// 表示声明了一个交换机, 后面几个参数分别为durable, autoDelete, internal, argumentschannel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);// 声明一个队列, 后面四个参数分别为durable, exclusive, autoDelete, arguments// durable : 是否持久化消息channel.queueDeclare(queueName, false, false, false, null);// 建立一个绑定关系channel.queueBind(queueName, exchangeName, routingKey);QueueingConsumer consumer = new QueueingConsumer(channel);// 参数 : 队列名称, autoAck:是否自动确认, consumerchannel.basicConsume(queueName, true, consumer);while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息 : " + msg);}}}

Topic Exchange

  • 所有发送到Topic Exchange的消息将被转发到所有关心RoutingKey中指定Topic的Queue上

  • Exchange将RoutingKey和某个Topic进行模糊匹配, 此时队列需要绑定一个Topic

    • 可以使用通配符进行模糊匹配
    • “#” : 匹配一个或多个词
    • “*” : 匹配一个词

Producer

package com.qiyexue.exchange.topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** Topic模式生产者** @author 七夜雪* @create 2018-12-14 8:07*/
public class ProducerByTopic {public static void main(String[] args) throws IOException, TimeoutException {// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.72.138");factory.setPort(5672);factory.setVirtualHost("/");// 创建连接Connection connection = factory.newConnection();// 创建channelChannel channel = connection.createChannel();// 声明String exchangeName = "test_topic_exchange";String routingKey1 = "tingxuelou.biluo";String routingKey2 = "tingxuelou.hongchen";String routingKey3 = "tingxuelou.hufa.zimo";String msg = "test topic By routingKey : ";channel.basicPublish(exchangeName, routingKey1, null, (msg + routingKey1).getBytes());channel.basicPublish(exchangeName, routingKey2, null, (msg + routingKey2).getBytes());channel.basicPublish(exchangeName, routingKey3, null, (msg + routingKey3).getBytes());// 关闭连接channel.close();connection.close();}}

Consumer

package com.qiyexue.exchange.topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;/*** topic模式消费者** @author 七夜雪* @create 2018-12-14 20:10*/
public class ConsumerByTopic {public static void main(String[] args) throws Exception {// 创建工厂// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.72.138");factory.setPort(5672);factory.setVirtualHost("/");// 创建连接Connection connection = factory.newConnection();// 创建channelChannel channel = connection.createChannel();// 声明ExchangeString exchangeName = "test_topic_exchange";String exchangetype = "topic";// tingxuelou.#String routingKey = "tingxuelou.*";channel.exchangeDeclare(exchangeName, exchangetype);// 声明队列String queueName = "test_topic_queue";channel.queueDeclare(queueName, false, false, false, null);// 绑定队列channel.queueBind(queueName, exchangeName, routingKey);// 创建消费者QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(queueName, true, consumer);while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();System.out.println(new String(delivery.getBody()));}}}

Fanout Exchange

  • 不处理路由键, 只需要简单的将队列绑定到交换机上

  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上

  • Fanout交换机转发消息是最快的

Producer

package com.qiyexue.exchange.fanout;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** fanout模式生产者** @author 七夜雪* @create 2018-12-14 20:36*/
public class ProducerByFanout {public static void main(String[] args) throws Exception {// 1. 创建连接工厂, 设置属性ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.72.138");factory.setPort(5672);factory.setVirtualHost("/");// 2. 获取连接Connection connection = factory.newConnection();// 3. 创建channelChannel channel = connection.createChannel();String exchangeName = "test_fanout_exchange";String routingKey = "无所谓";for (int i = 0; i < 5; i++) {String msg = "Fanout 模式消息..";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());}// 关闭连接channel.close();connection.close();}}

Consumer

package com.qiyexue.exchange.fanout;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;/*** Fanout模式消费者** @author 七夜雪* @create 2018-12-14 20:40*/
public class ConsumerByFanout {public static void main(String[] args) throws Exception {// 1. 创建连接工厂, 设置属性ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.72.138");factory.setPort(5672);factory.setVirtualHost("/");// 2. 获取连接Connection connection = factory.newConnection();// 3. 创建channelChannel channel = connection.createChannel();// 4. 声明ExchangeString exchangeName = "test_fanout_exchange";String exchangeType = "fanout";channel.exchangeDeclare(exchangeName, exchangeType);// 5. 声明消息队列String routingKey = "";String queueName = "test_fanout_queue";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);// 6. 创建消费者QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(queueName, true, consumer);while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息 : " + msg);}}}

Headers Exchange

  • Headers Exchange不使用RoutingKey去绑定, 而是通过消息headers的键值对匹配
  • 这个Exchange很少会使用, 这里就不细说了

Binding-绑定

  • Exchange和Exchange, Queue之间的连接关系
  • 绑定中可以包含RoutingKey或者参数

Queue-消息队列

  • 消息队列, 实际存储消息数据

  • Durability : 是否持久化

    Durable : 是

    Transient : 否

  • Auto delete : 如选yes,代表当最后一个监听被移除之后, 该Queue会自动被删除

Message-消息

  • 服务和应用程序之间传送的数据
  • 本质上就是一段数据, 由Properties和Payload(Body)组成
  • 常用属性 : delivery mode, headers(自定义属性)

Message-其他属性

  • content_type, content_encoding, priority
  • correlation_id : 可以认为是消息的唯一id
  • replay_to : 重回队列设定
  • expiration : 消息过期时间
  • message_id : 消息id
  • timestamp, type, user_id, app_id, cluster_id

代码演示

Producer

package com.qiyexue.message;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.HashMap;
import java.util.Map;/*** 生产者** @author 七夜雪* @create 2018-12-13 20:43*/
public class Producer {public static void main(String[] args) throws Exception {// 1. 创建连接工厂, 设置属性ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.72.138");factory.setPort(5672);factory.setVirtualHost("/");// 2. 创建连接Connection connection = factory.newConnection();// 3. 使用connection创建channelChannel channel = connection.createChannel();// 4. 通过channel发送消息String msg = "hello rabbitmq!";AMQP.BasicProperties properties = new AMQP.BasicProperties();Map<String,Object> headers = new HashMap<String, Object>();headers.put("name", "七夜雪");properties = properties.builder()// 设置编码为UTF8.contentEncoding("UTF-8")// 设置自定义Header.headers(headers)// 设置消息失效时间.expiration("5000").build();for (int i = 0; i < 5; i++) {// 不指定exchange的情况下, 使用默认的exchange, routingKey与队列名相等channel.basicPublish("", "test01", properties, msg.getBytes());}// 5. 关闭连接channel.close();connection.close();}}

Consumer

package com.qiyexue.message;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;/*** 消费者** @author 七夜雪* @create 2018-12-13 20:57*/
public class Consumer {public static void main(String[] args) throws Exception {// 1. 创建连接工厂, 设置属性ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.72.138");factory.setPort(5672);factory.setVirtualHost("/");// 2. 创建连接Connection connection = factory.newConnection();// 3. 使用connection创建channelChannel channel = connection.createChannel();// 4. 声明(创建)一个队列String queueName = "test01";channel.queueDeclare(queueName,true, false, false, null);// 5. 创建消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 6. 设置channelchannel.basicConsume(queueName, true, consumer);while (true) {// 7. 获取消息Delivery delivery = consumer.nextDelivery();System.out.println(new String(delivery.getBody()));// 获取head中内容System.out.println(delivery.getProperties().getHeaders().get("name"));}}}

Virtual Host-虚拟主机

  • 虚拟地址, 用于进行逻辑隔离, 最上层的消息路由
  • 一个Virtual Host里面可以有若干个Exchange和Queue
  • 同一个Virtual Host里面不能有相同名称的Exchange或Queue

相关链接

RabbitMQ入门与AMQP协议简介
RabbitMQ成员简介
RabbitMQ高级特性-消息可靠性投递
RabbitMQ高级特性-幂等性保障
RabbitMQ高级特性-Confirm确认消息
RabbitMQ高级特性-Return消息机制
RabbitMQ高级特性-消费端自定义监听
RabbitMQ高级特性-消费端限流
RabbitMQ高级特性-消费端ACK与重回队列
RabbitMQ高级特性-TTL队列/消息
RabbitMQ高级特性-死信队列(DLX)
Spring AMQP整合RabbitMQ
SpringBoot整合RabbitMQ
RabbitMQ集群架构模式介绍
从零开始搭建高可用RabbitMQ镜像模式集群
RabbitMQ集群恢复与故障转移
RabbitMQ-基础组件封装
Git代码地址


慕课网<RabbitMQ消息中间件技术精讲>学习笔记

RabbitMQ成员简介相关推荐

  1. RabbitMq Federation简介

    RabbitMq Federation简介 背景: broker1 broker2 exchanger2 mq2 federation提供了一个能力,让broke1在本地先创建一个exchange2, ...

  2. RabbitMQ Server简介和安装教程

    引言 什么是AMQP? AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间 ...

  3. monty python喜剧-Monty Python(蒙提·派森)的成员简介

    Monty Python(蒙提·派森)是英国六人喜剧团体,成员为以下六位: 约翰·克里斯 (John Cleese): John毕业于剑桥大学法律专业.他是一个非常理性,做事一丝不苟的人.2010年英 ...

  4. 团队-团队编程项目作业名称-成员简介及分工

    成员:郭依城 分工:程序设计,代码编写,程序测试. 转载于:https://www.cnblogs.com/guo961231/p/7494237.html

  5. RabbitMQ交换机简介

    介绍 RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列.实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中. 相反,生产者只能将消息发送到交换机(exchang ...

  6. .NET 使用 RabbitMQ 图文简介

    前言 最近项目要使用RabbitMQ,园里里面已经有很多优秀的文章,Rabbitmq官网也有.net实例.这里我尝试下图文并茂之形式记录下使用的过程. 安装 RabbitMQ是建立在erlang OT ...

  7. P4 类、对象、类成员简介

    本节内容 类(class)是显示世界事物的模型. 现实中的一架飞机========================>>>抽象为程序世界中的类 类与对象的关系 对象也叫做实例,是类经过 ...

  8. 《团队-团队编程项目作业名称-成员简介及分工》

    项目名称:学生成绩管理系统 成员: 郑月 负责:前端开发 转载于:https://www.cnblogs.com/cali/p/7568076.html

  9. Monty Python(蒙提·派森)的成员简介

    Monty Python(蒙提·派森)是英国六人喜剧团体,成员为以下六位: 约翰·克里斯 (John Cleese): John毕业于剑桥大学法律专业.他是一个非常理性,做事一丝不苟的人.2010年英 ...

最新文章

  1. cesium 渲染分析(以太阳为例),实现卫星任务规划中地球赤道平面绘制
  2. 变分自编码器VAE代码
  3. SpringBoot集成websocket(Spring方式)
  4. linux amd显卡下载,下载:AMD显卡Linux催化剂驱动9.10版
  5. C#中排序的多种实现方式
  6. django 基础知识 ~ forms详解
  7. C++ Primer 5th笔记(9)chapter9 顺序容器
  8. gitl更新最近代码_常见的蓝屏代码以及解决方法
  9. 使用C#开发交互式命令行应用
  10. VS2015+qt5.11入门(实现计算机的加法和登录操作)
  11. 缓存淘汰算法--LRU算法
  12. QoS中流量监管和流量整形详解
  13. QVideoWidget遇到的坑。
  14. ipv6-hosts
  15. 【智能优化算法-蝙蝠算法】基于混合粒子群和蝙蝠算法求解单目标优化问题附matlab代码
  16. PERT网络分析法(计划评估和审查技术)
  17. 【云服务器安全加固】
  18. 毕业实用统计模型(一)——时间序列
  19. android 英语单词音标获取
  20. MySQL8.0 OCP最新版1Z0-908认证考试题库整理-004

热门文章

  1. Windows下基于嵌入式Eclipse + GDB + JLink 对JZ2440/S3C2440/mini2440 进行裸机程序在SDRAM上的调试
  2. 全同态加密:CKKS
  3. Linux入门(13)——Ubuntu16.04下将图片和pdf互转
  4. 怎么将自己的头像p到特定的背景图_【后期修图】photoshop手把手教你制作属于自己的酷炫的微信头像...
  5. 【愚公系列】华为云云数据库MySQL的体验流程|【华为云至简致远】
  6. 合肥工业大学计算机考研试题,2016年合肥工业大学计算机考研真题及答案
  7. 计算机科学与技术导论课本推荐,比较好写的计算机科学与技术导论论文题目 计算机科学与技术导论论文题目哪个好...
  8. Hadoop-HA(高可用)架构原理
  9. 用一年时间读一本英文版书籍
  10. echarts折线图x轴不从0开始