RabbitMQ 简述#

RabbitMQ是一个消息代理:它接受并转发消息。 您可以将其视为邮局:当您将要把寄发的邮件投递到邮箱中时,您可以确信Postman 先生最终会将邮件发送给收件人。 在这个比喻中,RabbitMQ是一个邮箱,邮局和邮递员,用来接受,存储和转发二进制数据块的消息。

队列就像是在RabbitMQ中扮演邮箱的角色。 虽然消息经过RabbitMQ和应用程序,但它们只能存储在队列中。 队列只受主机的内存和磁盘限制的限制,它本质上是一个大的消息缓冲区。 许多生产者可以发送到一个队列的消息,许多消费者可以尝试从一个队列接收数据。

producer即为生产者,用来产生消息发送给队列。consumer是消费者,需要去读队列内的消息。producer,consumer和broker(rabbitMQ server)不必驻留在同一个主机上;确实在大多数应用程序中它们是这样分布的。

简单队列#

简单队列是最简单的一种模式,由生产者、队列、消费者组成。生产者将消息发送给队列,消费者从队列中读取消息完成消费。

在下图中,“P”是我们的生产者,“C”是我们的消费者。 中间的框是队列 - RabbitMQ代表消费者的消息缓冲区。

java 方式#

生产者#

Copypackage com.anqi.mq.nat;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class MyProducer {    private static final String QUEUE_NAME = "ITEM_QUEUE";    public static void main(String[] args) throws Exception {        //1. 创建一个 ConnectionFactory 并进行设置        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("localhost");        factory.setVirtualHost("/");        factory.setUsername("guest");        factory.setPassword("guest");        //2. 通过连接工厂来创建连接        Connection connection = factory.newConnection();        //3. 通过 Connection 来创建 Channel        Channel channel = connection.createChannel();        //实际场景中,消息多为json格式的对象        String msg = "hello";        //4. 发送三条数据        for (int i = 1; i <= 3 ; i++) {            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());            System.out.println("Send message" + i +" : " + msg);        }        //5. 关闭连接        channel.close();        connection.close();    }}
Copy    /**     * Declare a queue     * @param queue the name of the queue     * @param durable true if we are declaring a durable queue (the queue will survive a server restart)     * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)     * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)     * @param arguments other properties (construction arguments) for the queue     * @return a declaration-confirm method to indicate the queue was successfully declared     * @throws java.io.IOException if an error is encountered     */    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException;    /**     * Publish a message     * @see com.rabbitmq.client.AMQP.Basic.Publish     * @param exchange the exchange to publish the message to     * @param routingKey the routing key     * @param props other properties for the message - routing headers etc     * @param body the message body     * @throws java.io.IOException if an error is encountered     */    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;    /**     * Start a non-nolocal, non-exclusive consumer, with     * a server-generated consumerTag.     * @param queue the name of the queue     * @param autoAck true if the server should consider messages     * acknowledged once delivered; false if the server should expect     * explicit acknowledgements     * @param callback an interface to the consumer object     * @return the consumerTag generated by the server     * @throws java.io.IOException if an error is encountered     * @see com.rabbitmq.client.AMQP.Basic.Consume     * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk     * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)     */    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

消费者#

Copypackage com.anqi.mq.nat;import com.rabbitmq.client.*;import java.io.IOException;public class MyConsumer {    private static final String QUEUE_NAME = "ITEM_QUEUE";    public static void main(String[] args) throws Exception {        //1. 创建一个 ConnectionFactory 并进行设置        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("localhost");        factory.setVirtualHost("/");        factory.setUsername("guest");        factory.setPassword("guest");        //2. 通过连接工厂来创建连接        Connection connection = factory.newConnection();        //3. 通过 Connection 来创建 Channel        Channel channel = connection.createChannel();        //4. 声明一个队列        channel.queueDeclare(QUEUE_NAME, true, false, false, null);        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");        /*           true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费           false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一           直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其发送消息,           直到该消费者反馈。        */        //5. 创建消费者并接收消息        Consumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String message = new String(body, "UTF-8");                System.out.println(" [x] Received '" + message + "'");            }        };        //6. 设置 Channel 消费者绑定队列        channel.basicConsume(QUEUE_NAME, true, consumer);    }}
CopySend message1 : helloSend message2 : helloSend message3 : hello [*] Waiting for messages. To exit press CTRL+C [x] Received 'hello' [x] Received 'hello' [x] Received 'hello'

当我们启动生产者之后查看RabbitMQ管理后台可以看到有一条消息正在等待被消费。

当我们启动消费者之后再次查看,可以看到积压的一条消息已经被消费。

总结#

  • 队列声明queueDeclare的参数:第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数。
  • basicConsume的第二个参数autoAck: 应答模式,true:自动应答,即消费者获取到消息,该消息就会从队列中删除掉,false:手动应答,当从队列中取出消息后,需要程序员手动调用方法应答,如果没有应答,该消息还会再放进队列中,就会出现该消息一直没有被消费掉的现象。
  • 这种简单队列的模式,系统会为每个队列隐式地绑定一个默认交换机,交换机名称为" (AMQP default)",类型为直连 direct,当你手动创建一个队列时,系统会自动将这个队列绑定到一个名称为空的 Direct 类型的交换机上,绑定的路由键 routing key 与队列名称相同,相当于channel.queueBind(queue:"QUEUE_NAME", exchange:"(AMQP default)“, routingKey:"QUEUE_NAME");虽然实例没有显式声明交换机,但是当路由键和队列名称一样时,就会将消息发送到这个默认的交换机中。这种方式比较简单,但是无法满足复杂的业务需求,所以通常在生产环境中很少使用这种方式。
  • The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.默认交换机隐式绑定到每个队列,其中路由键等于队列名称。不可能显式绑定到,或从缺省交换中解除绑定。它也不能被删除。​ ——引自 RabbitMQ 官方文档​

spring-amqp方式#

引入 Maven 依赖

Copy        com.rabbitmq            amqp-client            5.6.0org.springframework.amqp            spring-rabbit            2.1.5.RELEASE

spring 配置文件

Copy

使用测试

Copyimport org.springframework.amqp.core.AmqpTemplate;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class Main {    public static void main(String[] args) {        ApplicationContext app = new ClassPathXmlApplicationContext("spring/rabbit-context.xml");        AmqpTemplate amqpTemplate = app.getBean(AmqpTemplate.class);        amqpTemplate.convertAndSend("MY-QUEUE", "Item");        String msg = (String) amqpTemplate.receiveAndConvert("MY-QUEUE");        System.out.println(msg);    }}

参考方法

Copy/** * Convert a Java object to an Amqp {@link Message} and send it to a specific exchange * with a specific routing key. * * @param exchange the name of the exchange * @param routingKey the routing key * @param message a message to send * @throws AmqpException if there is a problem */void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;/** * Receive a message if there is one from a specific queue and convert it to a Java * object. Returns immediately, possibly with a null value. * * @param queueName the name of the queue to poll * @return a message or null if there is none waiting * @throws AmqpException if there is a problem */@NullableObject receiveAndConvert(String queueName) throws AmqpException;

作者: 海向

出处:https://www.cnblogs.com/haixiang/p/10826710.html

本站使用「CC BY 4.0」创作共享协议,转载请在文章明显位置注明作者及出处。

获取rabbitmq连接对象_RabbitMQ——简单队列相关推荐

  1. Spring JDBC-使用Spring JDBC获取本地连接对象以及操作BLOB/CLOB类型数据

    概述 如何获取本地数据连接 示例从DBCP数据源中获取Oracle的本地连接对象 相关接口操作 LobCreator LobHandler 插入LOB类型的数据 以块数据的方式读取LOB数据 以流数据 ...

  2. rabbitmq java实例_RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)

    转载http://blog.csdn.net/u013142781 一.消息队列使用场景或者其好处 消息队列一般是在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式 ...

  3. rabbitmq java 测试_RabbitMQ 简单测试

    RabbitMQ 测试 RabbitMQ 基于Erlang 实现, 客户端可以用Python | Java | Ruby | PHP | C# | Javascript | Go等语言来实现.这里做个 ...

  4. rabbitmq连接认证_RabbitMQ配置使用SSL加密通信

    测试环境ubuntu 16.04 rabbitmq-server 3.5.7 openssl 1.0.2g 安装openssl1sudo apt-get install openssl 生成证书 从网 ...

  5. ConnectionPool-数据库连接池的简单实现

    利用JDBC操作数据库的常用方法,首先加载数据库的驱动(Driver),获取数据库的连接对象(Connection),然后由连接对象创建操作数据库的语句对象(Statement),利用语句对象对数据库 ...

  6. rabbitmq几种工作模式_RabbitMQ六种队列模式-简单队列模式

    在官网的教程中,描述了如上六类工作队列模式: 简单队列模式:最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列.也称为点对点模式. 工作模式:一个消息生产者,一个交换器,一个消息队列,多个 ...

  7. RabbitMQ 一二事 - 简单队列使用

    消息队列目前流行的有三种 1. RabbitMQ 2. ActiveMQ 3. Kafka 这三种都非常强大,RabbitMQ目前用的比较多,也比较流行,阿里也在用 ActiveMQ是阿帕奇出品,但是 ...

  8. RabbitMQ简单队列模式

    简单队列模式 红色:队列 P:消息的生产者 C:消息的消费者 生产者,将消息发送到队列 消费者,从队列中获取消息 配置依赖 导入RabbitMQ客户端依赖 <dependency>< ...

  9. RabbitMQ六种队列模式-简单队列模式

    前言 RabbitMQ六种队列模式-简单队列 [本文] RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...

最新文章

  1. 初窥runtime的作用
  2. Element-UI 的基本使用||基于图形化界面自动安装
  3. 《JavaScript 每周导读》【第二期】
  4. SAP Commerce Cloud 的代码仓库
  5. 如何寻回xp盘符丢失的数据
  6. PMO在组织结构中的作用
  7. [Turn]C# 强制关闭当前程序进程(完全Kill掉不留痕迹)
  8. 大工17春计算机文化基础,大工17春《计算机文化基础》在线测试
  9. python辗转相除法求最小公倍数_Python实现利用最大公约数求三个正整数的最小公倍数示例...
  10. 夯实Java基础(十七)——注解(Annotation)
  11. 【福利】BAT架构师分享最全Java架构师学习技能图谱:包含Java编程+网络+设计模式+数据库+分布式等
  12. WorkTool(一)企业微信群管理机器人实现
  13. 【蓝桥杯单片机组】两种外设访问方式:IO编程和MM编程
  14. Redis 官方推出可视化工具,颜值爆表,功能真心强大!这是不给其他工具活路啊!...
  15. 支持嵌入的手机号码识别sdk软件
  16. c语言arg是什么函数,arg函数(arg辐角公式)
  17. 基于gensim实现word2vec模型(附案例实战)
  18. Shell脚本:变量和运算符
  19. Vitamio直播框架的简单使用
  20. redis服务器 本地连接

热门文章

  1. BUUCTF-misc另外一个世界 8个二进制数为一组转ASC码
  2. mysql mtq_MySQL基础知识 - osc_r3mtqivi的个人空间 - OSCHINA - 中文开源技术交流社区
  3. python单例模式的五种实现方式
  4. python教程:可变长参数(*args、**kwargs)、返回值(return)
  5. Python中的构造方法
  6. window mysql proxy_window下mysql-proxy简单使用
  7. linux ntptime(Network Time Protocol 网络时间协议)
  8. pycharm shadows name 'xxxx' from outer scope 警告
  9. 克隆仓库时HTTPS和SSH方式的区别和使用
  10. python sklearn.datasets.fetch_mldata MNIST手写数字数据集无法获取, 报错 Function fetch_mldata is deprecated 的解决办法