目录

  • 1. 初识RabbitMQ
  • 2. AMQP
  • 3.RabbitMQ的极速入门
  • 4. Exchange(交换机)详解
    • 4.1 Direct Exchange
    • 4.2 Topic Exchange
    • 4.3 Fanout Exchange
  • 5. Message 消息

1. 初识RabbitMQ

RabbitMQ 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用 Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的

RabbitMQ的优点:

  • 开源、性能优秀、稳定性保障
  • 提供可靠性消息投递模式(confirm)、返回模式(return)
  • 与SpringAMQP完美的整合、API丰富
  • 集群模式丰富,表达式配置,HA模式,镜像队列模型
  • 保证数据不丢失的前提下做到高可靠性、可用性

RabbitMQ官网

RabbitMQ的整体架构:

RabbitMQ的消息流转:

2. AMQP

AMQP全称: Advanced Message Queuing Protocol

AMQP翻译: 高级消息队列协议

AMQP定义: 是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计

AMQP核心概念:

  • Server:又称Broker,接受客户端的连接,实现AMQP实体服务
  • Connection:连接,应用程序与Broker的网络连接
  • Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务
  • Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则是消息体的内容
  • Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。同一个Virtual Host里面不能有相同名称的Exchange或Queue
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
  • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
  • Routing key:一个路由规则,虚拟机可用它确定如何路由一个特定消息
  • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

3.RabbitMQ的极速入门

后台启动: ./rabbitmq start &

关闭: ./rabbitmqctl stop

节点状态: ./rabbitmqctl status

管控台: http://ip:15672

RabbitMQ生产消费快速入门:

环境: springboot+jdk1.7+rabbitmq3.6.5 (Maven依赖配置)

 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.9.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version></dependency></dependencies>
public class Procuder {public static void main(String[] args) throws Exception {//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();/*** basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* exchange:指定交换机 不指定 则默认 (AMQP default交换机) 通过routingkey进行匹配 * props 消息属性* body 消息体*///4.通过Channel发送数据for(int i = 0; i < 5; i++){System.out.println("生产消息:" + i);String msg = "Hello RabbitMQ" + i;channel.basicPublish("", "test", null, msg.getBytes());}//5.记得关闭相关的连接channel.close();connection.close();}
}
public class Consumer {public static void main(String[] args) throws Exception{//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();//4. 声明创建一个队列String queueName = "test";/*** durable 是否持久化* exclusive 独占的  相当于加了一把锁*/channel.queueDeclare(queueName,true,false,false,null);//5.创建消费者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);//6.设置channel/*** ACK: 当一条消息从生产端发到消费端,消费端接收到消息后会马上回送一个ACK信息给broker,告诉它这条消息收到了* autoack: * true  自动签收 当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。* false 手动签收 当消费者收到消息在合适的时候来显示的进行确认,说我已经接收到了该消息了,RabbitMQ可以从队列中删除该消息了* */channel.basicConsume(queueName, true, queueingConsumer);//7.获取消息while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消费端:" + msg);//Envelope envelope = delivery.getEnvelope();}}
}

4. Exchange(交换机)详解

Exchange: 接收消息,并根据路由键转发消息所绑定的队列

交换机属性:

  • Name: 交换机名称
  • Type: 交换机类型 diect、topic、fanout、headers
  • Durability: 是否需要持久化,true为持久化
  • AutoDelete: 当最后一个绑定到Exchange的队列删除后,自动删除该Exchange
  • Internal: 当前Exchange是否用于RabbitMQ内部使用,默认为false (百分之99的情况默认为false 除非对Erlang语言较了解,做一些扩展)
  • Arguments: 扩展参数, 用于扩展AMQP协议可自定化使用

4.1 Direct Exchange

所有发送到Direct Exchange的消息被转发到RouteKey指定的Queue

注意:Direct模式可以使用RabbitMQ自带的Exchange: default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RoutingKey必须完全匹配才会被队列接收,否则该消息会被抛弃

public class ProducerDirectExchange {public static void main(String[] args) throws Exception {//1.创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//2.创建ConnectionConnection connection = connectionFactory.newConnection();//3.创建ChannelChannel channel = connection.createChannel();//4.声明String exchangeName = "test_direct_exchange";String routingKey = "test.direct";//5.发送String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());}
}
public class ConsumerDirectExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//声明String exchangeName = "test_direct_exchange";String exchangeType = "direct";String queueName = "test_direct_queue";String routingKey = "test.direct";//表示声明了一个交换机channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示声明了一个队列channel.queueDeclare(queueName,false,false,false,null);//建立一个绑定关系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//参数:队列名称,是否自动ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循环获取消息while(true){//获取消息,如果没有消息,这一步将会一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}

4.2 Topic Exchange

所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上

Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

注意:可以使用通配符进行匹配

符号 # 匹配一个或多个词

符号 * 匹配不多不少一个词

例如: "log.#" 能够匹配到 “log.info.oa”

​ "log.*" 只会匹配到 "log.err"

public class ProducerTopicExchange {public static void main(String[] args) throws Exception {//1.创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.创建ConnectionConnection connection = connectionFactory.newConnection();//3.创建ChannelChannel channel = connection.createChannel();//4.声明String exchangeName = "test_topic_exchange";String routingKey1 = "user.save";String routingKey2 = "user.update";String routingKey3 = "user.delete.abc";//5.发送String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());}
}
public class ConsumerTopicExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//声明String exchangeName = "test_topic_exchange";String exchangeType = "topic";String queueName = "test_topic_queue";String routingKey = "user.#";//表示声明了一个交换机channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示声明了一个队列channel.queueDeclare(queueName,false,false,false,null);//建立一个绑定关系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//参数:队列名称,是否自动ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循环获取消息while(true){//获取消息,如果没有消息,这一步将会一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}

4.3 Fanout Exchange

不处理路由键,只需要简单的将队列绑定到交换机上
发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
所以Fanout交换机转发消息是最快的

public class ProducerFanoutExchange {public static void main(String[] args) throws Exception {//1.创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.创建ConnectionConnection connection = connectionFactory.newConnection();//3.创建ChannelChannel channel = connection.createChannel();//4.声明String exchangeName = "test_fanout_exchange";//5.发送for(int i = 0; i < 10 ; i++){String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, "", null, msg.getBytes());}channel.close();connection.close();}
}
public class ConsumerFanoutExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//声明String exchangeName = "test_fanout_exchange";String exchangeType = "fanout";String queueName = "test_topic_queue";//无需指定路由key String routingKey = "";//表示声明了一个交换机channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示声明了一个队列channel.queueDeclare(queueName,false,false,false,null);//建立一个绑定关系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//参数:队列名称,是否自动ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循环获取消息while(true){//获取消息,如果没有消息,这一步将会一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}

5. Message 消息

服务器与应用程序之间传递的数据,本质上就是一段数据,由Properties和Body组成

常用属性:delivery mode、headers (自定义属性)

其他属性:content_type、content_encoding、priority、expiration

消息的properties属性用法示例:

public class Procuder {public static void main(String[] args) throws Exception {//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();Map<String,Object> headers = new HashMap<>();headers.put("my1", "111");headers.put("my2", "222");//10秒不消费 消息过期移除消息队列AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();//4.通过Channel发送数据for(int i = 0; i < 5; i++){System.out.println("生产消息:" + i);String msg = "Hello RabbitMQ" + i;channel.basicPublish("", "test", properties, msg.getBytes());}//5.记得关闭相关的连接channel.close();connection.close();}
}
public class Consumer {public static void main(String[] args) throws Exception{//1.创建一个ConnectionFactory 并进行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();//3.通过Connection 创建一个 ChannelChannel channel = connection.createChannel();//4. 声明创建一个队列String queueName = "test";channel.queueDeclare(queueName,true,false,false,null);//5.创建消费者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);//6.设置channelchannel.basicConsume(queueName, true, queueingConsumer);//7.获取消息while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消费端:" + msg);Map<String, Object> headers = delivery.getProperties().getHeaders();System.err.println("headers value:" + headers.get("my1"));}}
}

转载于:https://www.cnblogs.com/dwlovelife/p/10982735.html

RabbitMQ 从入门到精通 (一)相关推荐

  1. RabbitMQ从入门到精通

    From:http://blog.csdn.net/column/details/rabbitmq.html RabbitMQ 介绍 历史 RabbitMQ是一个由erlang开发的AMQP(Adva ...

  2. RabbitMQ 从入门到精通 消息应答 持久化 交换机 队列 发布确认 集群 等

    RabbitMQ消息队列 RabbitMQ 的概念 RabbitMQ 是一个消息中间件:它接受并转发消息.你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快 ...

  3. SpringBoot从入门到精通教程

    SpringBoot从入门到精通教程 一.来自ImportNew公众号的SpringBoot教程系列,可参考学习 SpringBoot (一) :入门篇--http://mp.weixin.qq.co ...

  4. Iaas-openstack从入门到精通

    学习课程连接: openstack从入门到精通 第二章 openstack核心组件详细介绍 2-1openstack和kubernetes联系和区别 2-2准备安装openstack需要实验环境 虚拟 ...

  5. ActiveMQ从入门到精通(全)

    目录 前言 1. 定义 2. 安装及配置 2.1 控制台访问 2.2 新建项目 2.3 配置文件 3. 队列案例 3.1 JMS编码 3.2 消息生产者 3.3 消息消费者 3.4 监听式消费者 3. ...

  6. SpringCloud Alibaba 从入门到精通(精选)

    SpringCloud Alibaba 从入门到精通 一. 课程介绍 1.1 课程导学 1.2 项目环境搭建 二. SpringBoot基础 2.1 本章概述 2.2 Spring Boot是什么?能 ...

  7. 超硬核Java工程师学习指南,真正的从入门到精通,众多粉丝亲测已拿offer!

    最近有很多小伙伴来问我,Java小白如何入门,如何安排学习路线,每一步应该怎么走比较好.原本我以为之前的几篇文章已经可以解决大家的问题了,其实不然,因为我之前写的文章都是站在Java后端的全局上进行思 ...

  8. java架构师入门教程,java技术架构师入门到精通高薪就业教程百度云下载

    java技术架构师入门到精通高薪就业视频教程百度云 课程目录: JAVA架构课开班典礼 JVM性能调优专题 JVM整体结构深度解析 JVM内存分配机制详解(此视频作废) JVM字节码文件结构深度剖析 ...

  9. Elasticsearch7从入门到精通(简介、部署、原理、开发、ELK)

    Elasticsearch7从入门到精通(简介.部署.原理.开发.ELK) 第1章.Elasticsearch简介 1-1.Elasticsearch介绍 Elasticsearch官方网站:http ...

最新文章

  1. 大厂首发!尚硅谷docker高级
  2. iOS 项目经验以及APP上架流程 _Dylan
  3. JAVA和遮掩_JAVA 你不知道的秘密 覆写,重载,隐藏,遮蔽,遮掩
  4. 【2019icpc南京站网络赛 - H】Holy Grail(最短路,spfa判负环)
  5. 点钞机语音怎么打开_原来微信语音一样能转发? 居然还有人不知道
  6. 从前M个字母中取N个的无重复排列(回溯)
  7. linux下的c语言编程实验4,实验四-Linux下的C语言编程
  8. 数据结构——动态链表
  9. 为何大富连续三天彻夜未眠!
  10. C语言SM2算法实现(基于GMSSL)
  11. 解决kafka传输超大图片消费者接收失败问题
  12. 0704函数的递归调用
  13. 中国有多少个省?多少个地级市?多少个县?多少个乡镇?一张统计表全部搞定。多关注民政部的信息吧^_^
  14. SaaS后台管理系统
  15. 学习Mac开发第四弹 通过NSImageView加载图片
  16. 数据库中文字段按拼音排序
  17. Delphi官方下载地址
  18. js知识点 掘金_掘金js
  19. 高通发布了全球最领先的5G基带芯片,不过华为将很快反超
  20. CentOS7下载安装nginx

热门文章

  1. QWidget::size()和QResizeEvent::size()不一定相同!
  2. Python3 学习系列 丨 博客目录索引
  3. win7中能对窗口的排列方法是_win7系统窗口排列方式怎么修改?修改窗口排列方式方法...
  4. java中针对数字怎么判断_java如何对输入的数字进行判断
  5. json qbytearray 串 转_JSON数据采集网关,json转Modbus RTU串IO口RS485转4~20mA边缘计算智能终端...
  6. *【CodeForces - 859C 】Pie Rules (博弈dp,时光倒流)
  7. 【牛客 - 210A】游戏(思维,脑洞)
  8. 机器学习笔记(十二):聚类
  9. matlab语言 列车平稳性指标,铁道车辆平稳性分析报告.docx
  10. oracle修改某个数据类型,Oracle 修改某个字段的数据类型三种方式