AMQP的四个主要概念

1、虚拟主机(virtual host)或(vhost

2、交换机(exchange

3、队列(queue

4、绑定器(bind

什么是虚拟主机?

  一组交换机、队列和绑定器 被称为 虚拟主机(vhost)。

为什么要用虚拟主机?

  RabbitMQ server 可以说就是一个消息队列服务器实体(Broker),Broker当中可以有多个用户(增加用户的命令),而用户只能在虚拟主机的粒度进行权限控制,所以RabbitMQ中需要多个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。

 队列(queue)

  队列是消息载体,每个消息都会被投入到一个或多个队列。试图创建一个已经存在的队列,RabbitMQ会直接忽略这个请求。(接收消息的实体)。

把消息放进队列前,我们还需要使用另一个东西:交换机。

交换机(exchange)

  它指定消息按什么规则,路由到哪个队列。它可以被理解成具有路由表的路由程序。(发送消息的实体)

交换机可以存在多个,每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个进程,可以充分利用服务器上的CPU核以便达到更高的效率。

交换机如何判断要把消息送到哪个队列?这是我们需要路由规则,也就需要绑定器了。

绑定器(bind)

  它的作用就是把exchange和queue按照路由规则绑定起来。(将交换器和队列连接起来,并且封装消息的路由信息)

channel.BasicPublish(exchange: "", routingKey: "writeLog", basicProperties: null, body: body);

每个消息都有一个称为路由关键字(routingKey)的属性,exchange根据这个关键字进行消息投递,其实就是一个简单的字符串。

(绑定操作就可以理解成:exchange将具有路由关键字 “X” 的消息投递到到名为“business”的队列当中去。) 具体实践请看下文。

从而一个绑定就可以概括为:一个基于路由键交换机队列连接起来的路由规则。

需要注意:由Exchange,Queue,RoutingKey三个,才能决定一个从Exchange到Queue的唯一的线路。

更多参考:http://www.ltens.com/article-6.html

程序中连接与消息使用的两个关键概念

连接(Connection):

  与RabbitMQ Server建立的一个连接,由ConnectionFactory创建,每个connection只与一个物理的Server进行连接,此连接是基于Socket进行连接的。AMQP一般使用TCP。

通道 (Channel):

  消息通道(主要进行相关定义,发送消息,获取消息,事务处理等),在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

Channel在.net的客户端程序里应该是叫“Model”,采用IModel CreateModel()创建的,但是其他语言的客户端都叫Channel。需要注意:一个Connection可以有多个Channel。

为什么设计中引入Channel概念?

  一个比较普遍的需求:客户端程序有时候会是一个多线程程序,每一个线程都想要和RabbitMQ进行连接,但是又不想共享一个连接。

因为一个Connection就是一个TCP链接,RabbitMQ在设计的时候不希望与每一个客户端保持多个TCP连接,但这确实是有些客户端的需求,每一个Channel之间没有任何联系,是完全分离的。

建立在Connection基础上的一个Channel,相对于Connection来说,它是轻量级的。Channel可以在多线程中使用,但是在必须保证任何时候只有一个线程执行命令。

交换机类型

  有4种:direct【默认的类型】,fanout,topic,headers。其中headers不常用,本篇不做介绍,其他三种类型,会做详细介绍。

Exchange与队列进行绑定后,消息根据exchang的类型,按照不同的绑定规则分发消息到消息队列中,可以是一个消息被分发给多个消息队列,也可以是一个消息分发到一个消息队列。具体请看下文。

介绍之初还要说下RoutingKey,这是个什么玩意呢?他是exchange与消息队列绑定中的一个标识。有些路由类型会按照标识对应消息队列,有些路由类型忽略routingkey。

1、Fanout: 广播模式,会忽略路由键Routingkey,将消息广播给绑定到该交换机的所有队列。 不论消息的路由关键字是什么,这条消息都会被路由到所有与该交换器绑定的队列中。

广播式交换器类型的工作方式如下:

不使用任何参数将消息队列与交换器绑定在一起。

发布者(直接式交换器类型描述中的producer变成了publisher,已经隐含了二种交换器类型的区别)向交换器发送一条消息。 消息被无条件的传递到所有和这个交换器绑定的消息队列中。

2、Direct: 根据路由键和交换器来找队列的,对消息路径进行全文匹配。消息路由键 "sunshine" 只能匹配 "sunshine" 绑定,不匹配 "sunshine.warm" 这类绑定。

通过精确匹配消息的路由关键字,将消息路由到零个或者多个队列中,绑定关键字用来将队列和交换器绑定到一起。这让我们可以构建经典的点对点队列消息传输模型,不过和任何已定义的交换器类型一样,当消息的路由关键字与多个绑定关键字匹配时,消息可能会被发送到多个队列中。

在direct模式下还可以实现多路绑定,即一个exchange和多个queue绑定时,具有同样的bindkey,如下图:

3、Topic: 主题模式,处理路由键,按模式匹配路由键。

模式符号:

"#" 表示一个或多个单词,"*" 仅匹配一个单词。

如 "wood.#" 可匹配 "wood.palm.redwood",但 "wood.*" 只匹配 "wood.deadwood"。

主题式交换器类型提供了这样的路由机制:通过消息的路由关键字和绑定关键字的模式匹配,将消息路由到被绑定的队列中。这种路由器类型可以被用来支持经典的发布/订阅消息传输模型——使用主题名字空间作为消息寻址模式,将消息传递给那些部分或者全部匹配主题模式的多个消费者。

主题交换器类型的工作方式如下:

绑定关键字用零个或多个标记构成,每一个标记之间用“.”字符分隔。绑定关键字必须用这种形式明确说明,并支持通配符:“*”匹配一个词组,“#零个或多个词组
因此绑定关键字“*.dask.#”匹配路由关键字“class.dask”和“eur.dask.tab”,但是不匹配“dask.rho”。

更多参考:RabbitMQ交换器Exchange介绍与实践

消息持久化

问题及方案描述

1.当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。

这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。

2.在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。

这种情况可以使用RabbitMQ提供的消息队列的持久化机制。

相关理论描述

  RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我个人觉得大多数开发人员都会选择持久化。

队列和交换机有一个创建时候指定的标志durabledurable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。

消息队列持久化包括3个部分:

1、exchange持久化,在声明时指定durable => true
2、queue持久化,在声明时指定durable => true
3、消息持久化,在投递时指定delivery_mode=> 2(1是非持久化)

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

注意:一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建

生产者

class Producter{const string ExchangeName = "eric.exchange";const string QueueName = "eric.queue";static void Main(string[] args){var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", };using (var connection = factory.CreateConnection())using (var channel = connection.CreateModel()){channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
                channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);string message = "Eric is very handsome";var body = Encoding.UTF8.GetBytes(message);//将队列设置为持久化之后,还需要将消息也设为可持久化的var props = channel.CreateBasicProperties();props.SetPersistent(true);channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: body);Console.WriteLine("Producter Sent: {0}", message);Console.ReadKey();}}}

View Code

注:ack是 acknowledgments 的缩写,noAck 是("no manual acks")

程序运行结果:

消费者

class Recevice{const string ExchangeName = "eric.exchange";const string QueueName = "eric.queue";public static void Main(){var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost = "/" };using (var connection = factory.CreateConnection())using (var channel = connection.CreateModel()){channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
                channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true);//NoAck:true 告诉RabbitMQ立即从队列中删除消息,另一个非常受欢迎的方式是从队列中删除已经确认接收的消息,可以通过单独调用BasicAck 进行确认://BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false);var msgContent = Encoding.UTF8.GetString(msgResponse.Body);Console.WriteLine("The received content:"+msgContent);channel.BasicAck(msgResponse.DeliveryTag, multiple: false);//使用BasicAck方式来告之是否从队列中移除该条消息//需要额外注意,比如从队列中获取消息并用它来操作数据库或日志文件时,如果出现操作失败时,则该条消息应该保留在队列中,只到操作成功时才从队列中移除。
                Console.ReadKey();}}}

View Code

接受消息还有一种方法,就是通过基于推送的事件订阅。可以使用内置的 QueueingBasicConsumer 提供简化的编程模型,允许在共享队列上阻塞,直到收到一条消息。

var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
var msgResponse = consumer.Queue.Dequeue();
var msgContent = Encoding.UTF8.GetString(msgResponse.Body);

程序运行结果:

消费者消息的确认

1、消息队列的消费

Note:如果一个消息队列中有大量消息等待操作时,我们可以用多个客户端来处理消息,这里的分发机制是采用负载均衡算法中的轮询。第一个消息给A,下一个消息给B,下下一个消息给A,下下下一个消息给B......以此类推。

2、为保证消息的安全性,保证此消息被正确处理后才能在服务端的消息队列中删除。那么rabbitmq提供了ack应答机制,来实现这一功能。

ack应答有两种方式:1、自动应答,2、手动应答。具体实现如下。

public static void Consumer(){try{var qName = "lhtest1";var exchangeName = "fanoutchange1";var exchangeType = "fanout";//topic、fanoutvar routingKey = "*";var uri = new Uri("amqp://xxxx:5672/");var factory = new ConnectionFactory{UserName = "123",Password = "123",RequestedHeartbeat = 0,Endpoint = new AmqpTcpEndpoint(uri)};using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){channel.ExchangeDeclare(exchangeName, exchangeType);channel.QueueDeclare(qName, true, false, false, null);channel.QueueBind(qName, exchangeName, routingKey);//定义这个队列的消费者QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);//false为手动应答,true为自动应答channel.BasicConsume(qName, false, consumer);while (true){BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();                           byte[] bytes = ea.Body;var messageStr = Encoding.UTF8.GetString(bytes);var message = DoJson.JsonToModel<QMessage>(messageStr);Console.WriteLine("Receive a Message, DateTime:" + message.DateTime.ToString("yyyy-MM-dd HH:mm:ss") + " Title:" + message.Title);//如果是自动应答,下下面这句代码不用写啦。if ((Convert.ToInt32(message.Title) % 2) == 1){channel.BasicAck(ea.DeliveryTag, false);}}}}}catch (Exception ex){Console.WriteLine(ex.Message);}}

View Code

转载于:https://www.cnblogs.com/peterYong/p/10235215.html

RabbitMQ核心概念相关推荐

  1. RabbitMQ核心概念及基础API应用

    RabbitMQ核心概念及基础API应用 1 主流中间件介绍 衡量消息中间件的指标:服务性能,数据存储,集群架构. 1.ActiveMQ:Apache,支持JMS规范最完整的. 2.RocketMQ ...

  2. RabbitMQ核心概念和AMQP协议(二)

    RabbitMQ是什么? RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议,在完全不同的应用之间共享数据,RabbirMQ是使用Erlang语言来编写的,并且RabbitMQ是基于A ...

  3. 入门RabbitMQ核心概念

    这里mark一下 发送者开启 confirm 确认机制 spring.rabbitmq.publisher-confirm-type=correlated 发送者开启 return 确认机制 spri ...

  4. 消息中间件Rabbitmq核心概念讲解

    概述 Rabbitmq是消息中间件的一种落地开源实现,使用Erlang语言编写,基于AMQP消息协议. 核心概念 Message:消息是不具名的,由消息头和消息体组成,消息体是不透明的,也就是可以设置 ...

  5. Go秒杀系统——RabbitMQ核心概念与工作模式

    前言

  6. [RabbitMQ]RabbitMQ概念_四大核心概念

    RabbitMQ RabbitMQ 的概念 RabbitMQ 是一个消息中间件:它接受并转发消息.你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收 ...

  7. Kafka:Kafka核心概念

    1 消息系统简介 1.1 为什么要用消息系统 ? 解耦 各位系统之间通过消息系统这个统一的接口交换数据,无须了解彼此的存在: 冗余 部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险: 灵活性 ...

  8. SpringBoot高级消-息-RabbitMQ基本概念简介

    首先对RabbitMQ做一个简单的介绍,和快速入门,RabbitMQ是我们的AMQP,就是我们高级消息队列的一个实现产品,他的稳定性和可靠性呢非常高,也是我们现在一个非常流行的一个消息中间件,那么我们 ...

  9. RabbitMQ核心功能介绍

    RabbitMQ核心功能 一.MQ的概念与功能介绍 二.RabbitMQ的介绍和入门案例 三.RabbitMQ的工作队列 四.RabbitMQ的工作模式 五.RabbitMQ的发布确认 六.Rabbi ...

最新文章

  1. 机器学习算法学习---模型融合和提升的算法(五)
  2. ENBM内部测试试题 组建与维护企业网络试题
  3. python好找工作吗2017-你为什么不来了解一下Python?
  4. 数据库中的范式和反范式详解!
  5. ASP.NET Core【在线教育系统】功能要求
  6. 基于 CODING 的 Spring Boot 持续集成项目 1
  7. RS编码的matlab仿真
  8. db9针232接口波特率标准_RS232串口通信:接口定义、标准接法详细说明,一看就懂了...
  9. 一般系统论的基本概念
  10. 【网易互娱模拟笔试】解题记录
  11. 马云的电影,丁磊的饭局
  12. 下载Chrome浏览器crx文件插件最简单方法
  13. 原创 基于微信场地预约小程序 毕业设计 毕设 源码 源代码 欣赏 - 可用于羽毛球、篮球、乒乓、网球等预约小程序
  14. matlab画三维图如何更改颜色,matlab画三维图像的示例代码(附demo)
  15. 网页设计有难题?12款网页设计模板给你灵感!
  16. 2个25Ge网口做bond(mode4)后,iperf2压测不到50Gb
  17. 汉字区位码\国标码\机内码之间的换算
  18. 农村老教师的爱情故事
  19. linux终端怎么打开计算器,如何通过终端启动计算器
  20. 密码必须符合复杂性要求

热门文章

  1. python嵌套循环效率_Python嵌套循环数组比较优化的可能性?
  2. 特斯拉 model3 没有信号_Model 3在北京失控撞人,特斯拉:未发现任何系统故障
  3. Java:代码验证 StringBuffer 线程安全,StringBuilder 非线程安全
  4. java length 使用方法 例题_java.util.BitSet.length()方法实例
  5. 什么电线适合在面包板上布线?
  6. 基于STM8H1K08的ISP HUB控制器
  7. python自动化框架pytest接口关联串联_基于python接口自动化框架搭建_pytest+jenkins+allure...
  8. rust python扩展_Rust语言优化Python性能案例
  9. drools规则中调用其它规则_简化机器学习中的关联规则
  10. 使用wordpress 搭建 文档索引服务器