2019独角兽企业重金招聘Python工程师标准>>>

RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等。

AMQP协议介绍

RabbitMQ 的模型架构是基于AMQP协议,生产者将消息发送给交换器,队列通过RoutingKey(路由键)绑定对应的交换机 。当生产者发送消息时携带RoutingKey,交换机根据RoutingKey找到对应的队列,将消息存入到该队列,然后消费者通过订阅该队列来获取消息。RabbitMQ 中的交换器、队列、路由键等都是遵循的 AMQP 议中相应的概念。

其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定

  • 虚拟主机本质上就是一个mini版的mq服务器,有自己的队列、交换器和绑定,最重要的,自己的权限机制。Vhost提供了逻辑上的分离,可以将众多客户端进行区分,又可以避免队列和交换器的命名冲突。Vhost必须在连接时指定,rabbitmq包含缺省vhost:“/”,通过缺省用户和口令guest进行访问。

    rabbitmq里创建用户,必须要被指派给至少一个vhost,并且只能访问被指派内的队列、交换器和绑定。Vhost必须通过rabbitmq的管理控制工具创建。

  • 交换机:Exchange 用于转发消息,但是它不会做存储。这里有一个比较重要的概念:路由键 。消息到交换机的时候,交换机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
  • 绑定:队列需要绑定到交换机。

常见问题

1、如果消息达到无人订阅的队列会怎么办?

消息会一直在队列中等待,RabbitMq默认队列是无限长度的。

2、多个消费者订阅到同一队列怎么办?

消息以循环的方式发送给消费者,每个消息只会发送给一个消费者。

3、消息路由到了不存在的队列怎么办?

一般情况下,凉拌,RabbitMq会忽略,当这个消息不存在,也就是这消息丢了。

Exchange类型有以下几种:

​ Fanout:广播,将消息交给所有绑定到交换机的队列

​ Direct:定向,把消息交给符合指定routing key 的队列。

​ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

基本消息模型

每个amqp的实现都必须有一个direct交换器,包含一个空白字符串名称的默认交换器。声明一个队列时,会自动绑定到默认交换器,并且以队列名称作为路由键。

点击,可以进行查看该交换机的详情。

默认的 exchange 是一个由 broker 预创建的匿名的(即名字是空字符串) direct exchagne. 对于简单的程序来说, 默认的 exchange 有一个实用的属性: 如果没有显示地绑定 Exchnge, 那么创建的每个 queue 都会自动绑定到这个默认的 exchagne 中, 并且此时这个 queue 的 route key 就是这个queue 的名字.

下面将介绍一个例子体会一下这个最基础的消息模型

首先需要引入依赖

        <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.0.0</version></dependency>

创建一个连接的工具类ConnectionUtil,添加一个获取连接的方法

public class ConnectionUtil {/*** 建立与RabbitMQ的连接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("182.168.6.133");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("/hello");factory.setUsername("suzhe");factory.setPassword("suzhe");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}}

生产者Producer

/*** 生产者*/
@Slf4j
public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 创建一个信道,意味着每个线程单独一个信道Channel channel = connection.createChannel();// 声明(创建)队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Less is more";// 向指定的队列中发送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());log.debug("send message:{}", message);//关闭通道和连接channel.close();connection.close();}
}

运行后可以看到一条消息已经发送到了rabbitmq,并且从队里的bindings信息可以看到该队列绑定到了默认的交换机。

消费者Consumer:

/*** 消费者*/
@Slf4j
public class Consumer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 创建一个信道,意味着每个线程单独一个信道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);log.debug("消费消息:{}",msg);}};// 监听队列,第二个参数:是否自动进行消息确认。channel.basicConsume(QUEUE_NAME, true, consumer);}
}

运行消费者,可以看到收到了消息。

在这个例子中, 我们并没有定义 exchange, 也没有显示地将 queue 绑定到 exchange 中, 因此 queue "hello" 就自动绑定到默认的 exchange 中了, 并且在默认的 exchange 中, 其 route key 和 queue 名一致, 都为"hello"。

由于这个原因, 我们就可以使用:

        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

来发送消息。

调用 channel.basicPublish 时, 第一个参数是 exchange 名, 为空就是默认的 exchange, 第二个参数是 route key, 和 queue 名相同,第三个参数AMQP.BasicProperties 提供了一个构造器,可以通过builder() 来设置一些属性,比如

Map<String, Object> headers = new HashMap<String, Object>();headers.put("hello", "world");headers.put("aaa", "bbb");AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(2) // 传送方式.contentEncoding("UTF-8") // 编码方式.expiration("10000") // 过期时间.headers(headers) //自定义属性.build();

信道

在上面的程序中,从连接中获取一个信道。

 Channel channel = connection.createChannel();

信道,概念:信道是生产消费者与rabbitmq通信的渠道,生产者publish或是消费者subscribe一个队列都是通过信道来通信的。信道是建立在TCP连接上的虚拟连接。什么意思呢?就是说rabbitmq在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在rabbitmq都有唯一的ID ,保证了信道私有性,对应上唯一的线程使用。

疑问:为什么不建立多个TCP连接呢?

原因是rabbitmq保证性能,系统为每个线程开辟一个TCP是非常消耗性能,每秒成百上千的建立销毁TCP会严重消耗系统。所以rabbitmq选择建立多个信道(建立在tcp的虚拟连接)连接到rabbitmq上。

本篇文章api总结:

发布消息:只用在生产者

channel.basicPublish(String exchange, //路由器的名字,即将消息发到哪个路由器String routingKey, //路由键,即发布消息时,该消息的路由键是什么BasicProperties props, //指定消息的基本属性byte[] body)//消息体,也就是消息的内容,是字节数组

BasicProperties props:指定消息的基本属性,如deliveryMode为2时表示消息持久,2以外的值表示不持久化消息

//BasicProperties介绍
String corrId = "";
String replyQueueName = "";
Integer deliveryMode = 2;
String contentType = "application/json";
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).deliveryMode(deliveryMode).contentType(contentType).build();

接收消息:只用在消费者

channel.basicConsume(String queue, //队列名字,即要从哪个队列中接收消息boolean autoAck, //是否自动确认,默认trueConsumer callback)//消费者,即谁接收消息

消费者中一般会有回调方法来消费消息

Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, //该消费者的标签Envelope envelope,//字面意思为信封:packaging data for the messageAMQP.BasicProperties properties, //message content header data byte[] body) //message bodythrows IOException {//获取消息示例String message = new String(body, "UTF-8");//接下来就可以根据消息处理一些事情}};

详细源码地址

https://github.com/suzhe2018/rabbitmq-item

转载于:https://my.oschina.net/suzheworld/blog/3002222

1、RabbitMQ初探相关推荐

  1. RabbitMq初探——安装

    rabbitmq Server安装 rabbitmq server安装很简单. 安装erlang环境 rpm -ihv erlang-18.1-1.el6.x86_64.rpm rpm -ihv ra ...

  2. RabbitMQ初探--用C#简单实现通信服务

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们. 消息传递指的是程序之 ...

  3. [Erlang 0079] RabbitMQ 初探

    最近在项目中实践RabbitMQ,比较幸运现在除了官方网站,还有一本非常棒的书可以读:RabbitMQ in Action;这本书目前还没有中文版或者影印版,但是从网上很容易找到PDF版本和epub ...

  4. RabbitMQ初探

    1.  MQ是什么 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们 ...

  5. RabbitMq初探——Hello World

    HelloWorld 前言 这里我们弱化broker内部构造.将整体分为三部分. P:producer.生产者. C:Consumer.消费者. queue:队列. 后面的代码都依赖于 the php ...

  6. 初探 RabbitMQ 消息队列

    初探 RabbitMQ 消息队列 rabbitmq基础概念常见应用场景导入依赖属性配置具体编码定义队列实体类控制器消息消费者主函数测试总结说点什么 SpringBoot 是为了简化 Spring 应用 ...

  7. 初探RabbitMQ与简单实现

    RabbitMQ 简介: RabbitMQ是一个由Erlang语言开发的AMQP的开源实现,高级消息队列协议即Advanced Message Queuing Protocal,是应用层协议的一个开放 ...

  8. 初探RabbitMQ

    官网 安装教程视频 安装 简单模式入门案例 环境搭建 生产者代码 消费者代码 测试 工作队列模式入门案例 创建公共类(复用代码) 消费者代码 生产者代码 测试 消息应答 自动应答 手动应答 消息自动重 ...

  9. 从壹开始微服务 [ DDD ] 之一 ║ D3模式设计初探 与 我的计划书

    缘起 哈喽大家周四好!又是开心的一天,时间过的真快,我们的 <从壹开始 .net core 2.1 + vue 2.5 >前后端分离系列共 34 篇已经完结了,当然以后肯定还会有更新和修改 ...

  10. 一个winform带你玩转rabbitMQ

    源码已放出 https://github.com/dubing/MaoyaRabbit 本章分3部分 一.安装部署初探 二.进阶 三.api相关 安装 部署 初探 先上图 一. 安装部署 下载 rab ...

最新文章

  1. 一网打尽,最全面的跨域解决方案来了!
  2. 开发中经常碰到的问题cookie和session问题,今天一并解决
  3. find the OPP in your life
  4. 移动搜索引擎-网页信息预处理
  5. 《思科数据中心I/O整合》一2.11 活动-活动连接(Active-Active)
  6. 自动将存储过程转成C#代码的过程[转]
  7. 恒大汽车自救进行时:恒驰5进入试产阶段,离量产只差道路准入许可
  8. HTML5 viewport 标签与 CSS3 background-size 属性 使图片完全适应区域内容
  9. ASCII控制字符在vi和notepadd++中的表示法
  10. 8.5 传输介质和网络应用
  11. 【数据结构】NOJ016—计算二叉树叶子结点数目
  12. 终极解决电脑缺失dll,应用程序无法正常启动0xc000007b
  13. c语言中invert什么意思_C语言中init 是什么意思?
  14. 微信小程序的开发方式有哪些
  15. SaaSBase:什么是艾盟赢销SCRM?
  16. Mem Reduct——最专一的电脑清理软件
  17. ssd1963初始化程序
  18. Ubuntu Desktop 安装谷歌拼音输入法
  19. 【PC】【MTU】PC查看和修改MTU的方法小结
  20. 【jvm】jvm 参数设置查看与设置

热门文章

  1. 算法:Valid Parentheses(有效的括号)
  2. scrapy 保存html页面,28.用配合scrapy的方式爬取本地保存的html
  3. Python连接presto
  4. 596. 超过5名学生的课
  5. How to Become a Better Learner
  6. python 直线检测_python hough变换检测直线的实现方法
  7. 蓝队应对攻击的常用策略二
  8. 常问的数据结构与算法
  9. 高等代数期末考试题库及答案_高等代数3学时试题题目及答案,课程2021最新期末考试题库,章节测验答案...
  10. eslint 换行_给 eslint 写一个插件