​01 前言

关于 RabbitMQ 服务器的安装,本章节不做介绍,请培养个人动手能力,自行百度解决。RabbitMQ 成功安装后(win 版),浏览器输入:localhost:15672,则可以进入 登录账户(默认账户密码都是 guest)RabbitMQ 管理平台。

02 基本(简单)消息模型

基本消息模型如下图:

官方简化模型:

  • P(Producer/Publisher):生产者,一个发送消息的应用程序。
  • C(Consumer):消费者,一个主要用来等待接收消息的用户应用程序。

队列(Queue)(红色区域):相当于 RbbitMQ 内部的邮箱。尽管消息经过 RabbitMQ 和你的应用程序,但它们只能存储在队列中。队列仅由主机的存储器&磁盘限制约束,它本质上是一个大的消息缓冲区。许多消费者可以尝试从一个队列接收数据。

总之,基本消息模型流程是:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。

连接 RabbitMQ 服务器工具类

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConnectionUtil {public static Connection getConnection() throws IOException, TimeoutException {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("127.0.0.1");//端口factory.setPort(5672);//设置账号:用户名、密码(不设置为默认账户密码)//factory.setUsername("guest");//factory.setPassword("guest");//通过工厂获取连接Connection connection = factory.newConnection();return connection;}}

生产者发送消息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();try {Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");} catch (Exception e) {e.printStackTrace();}finally {connection.close();}}}

执行消息发送程序,向 RabbitMQ 发送一条消息。

进入 RabbitMQ 管理界面,选择 Queues ,可以看到一个 "hello" 队列,并且Ready 有一条消息,说明成功发送消息进入 RabbitMQ 队列。

消费者接收消息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}}

运行消费者程序,接收 RabbitMQ 传递的消息:Hello World!

进入 RabbitMQ 消息管理界面,Ready 状态消息为 0,消息已被消费者进行消费。

以上就是 RabbitMQ 简单的消息模型和代码实现。

03 RabbitMQ 消息确认机制

RabbitMQ 消息确认机制:RabbitMQ 消息确认机制,RabbitMQ 当把信息传递给消费者,队列中的消息会被马上标记为删除。当某个消费者意外停止工作,则这个消费者也把 RabbitMQ 所传递给它的消息丢失,这是自动消息确认机制所要面临的问题。为了确保消息永远不会消失,RabbitMQ 支持消息确认,消费者通过发送一个 ACK(nowledgement)来告知 RabbitMQ,某个特定消息已经被接收和处理,RabbitMQ 可以进行自由地对消息进行标记删除。当消费者在不发送 ACK 情况下意外停止工作(如通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将理解消息未被完全处理,对消息进行排队,如果有其他消费者在线,则重新将消息发送给另一个消费者。

原理图流程图:

消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知 RabbitMQ 消息已被确认消费,ACK 回执分两种情况:

  • 自动 ACK:消息一旦接收,消费者自动发送 ACK

  • 手动 ACK:消费者接收后,不会发送 ACK,需手动确认

修改消息生产者的消息队列:Send.java

 private final static String QUEUE_NAME = "ack_demo";

消息自动确认机制

自动 ACK, AutoAck.java

import com.aflypig.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class AutoAck {private final static String QUEUE_NAME = "ack_demo";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");//模拟消费者意外停止工作int i = 10 / 0;System.out.println(" [x] Received '" + message + "'");};//自动 ACKchannel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}}

重点代码:

//自动 ACK
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

通过设置 basicConsume 函数的第二个参数值为 true,设置消息确认机制为自动发送 ACK。

生产者发送消息,进入 RabbitMQ 管理界面,该条消息处于 Ready 状态。

运行 AutoAck 消费程序:消费者抛出异常,同时发现控制台没有接收到 RabbitMQ 消息,说明因消费者异常,丢失了 RabbitMQ 传递的消息。

进入到 RabbitMQ 管理界面:可见当消费端发生异常,但队列消息依然被消费。

手动消息确认机制

手动 ACK:ManualAck.java

import com.aflypig.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ManualAck {private final static String QUEUE_NAME = "ack_demo";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};//手动 ACKchannel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });}}

重点代码:

//手动 ACK
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });

通过设置 basicConsume 函数的第二个参数值为 false,设置消息确认机制为自动发送 ACK。

生产者发送消息,进入 RabbitMQ 管理界面:

运行手动 ACK 消费端 :ManualAck

查看 RabbitMQ 管理界面:看到一条没确认消费的消息,说明虽然手动确认消费者 ManualAck 接收到 RabbitMQ 传递给它的消息,但消息还处于 Unacked 状态。

关闭 ManualAck 消费端,查看 RabbitMQ 界面,消息再次处于 Ready 状态。

当消费者设置手动确认机制,当代码没有进行消息确认,消费者停止,消息状态会再次处于 Ready 状态。

修改 ManualAck.java加入手动 ACK 代码:

 //手动 ACK 代码段channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

运行手动 ACK 消费端 :ManualAck

RabbitMQ 管理界面:

这时,消息已被确认消费。

手动消息确认机制的问题

当设置手动 ACK 机制时,忘记手动 ACK 告知 RbbitMQ 消息消费情况,是一个常见的错误,甚至会造成严重后果,当消费者客户端退出,消息将会重新进入 Ready 状态,消息会在下次进行重新发送,同时,因RabbitMQ 无法释放任何未被确认消费的信息,会因此消耗更多的内存。

RabbitMQ 消息持久化

消息持久化:通过了解 RabbitMQ 消息确认机制,学会了如何在消费者进程意外停止,消息也不会丢失。但当 RabbitMQ 服务器停止,所有的消息也会丢失,因此,为了确保 RabbitMQ 意外崩溃或退出时,需要进行两件事:1.队列持久化;2.消息持久化

1.队列持久化:

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

通过声明 queueDeclare 方法的第二个参数为 true,可以将队列持久化。通过官方文档说明,此种命令本身虽然是正确的,但 RabbitMQ 并不允许使用不同的参数定义重新定义现有队列模式,当开发者尝试这样做, RabbitMQ 将会返回一个错误:

java.io.IOExceptionat com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:962)at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)at com.aflypig.ack.Send.main(Send.java:18)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_demo' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)... 3 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_demo' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672)at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599)at java.lang.Thread.run(Thread.java:748)

2.消息持久化:一般来说,消息持久化的前提是队列要进行持久化。现在消息被标记为 persistent——通过将 MessageProperties(实现了 BasicProperties)设置为 PERSISTENT_TEXT_PLAIN。

channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

RabbitMQ 基本消息模型和消息确认机制相关推荐

  1. 最近发现系统rabbitmq丢消息比较严重,于是想了些方案来查找原因,给将消息发送方式添加确认机制。 我们在本地模拟了wms发送打标消息的场景. 1. 有事务 2. 先发点对点队列, 再发订

    最近发现系统rabbitmq丢消息比较严重,于是想了些方案来查找原因,给将消息发送方式添加确认机制. 我们在本地模拟了wms发送打标消息的场景. 1. 有事务 2. 先发点对点队列, 再发订阅队列 3 ...

  2. Rabbitmq消息发送事务与确认机制

    2019独角兽企业重金招聘Python工程师标准>>> 默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器.如果在消息到达 ...

  3. 消息消费端的确认机制

    RocketMQ提供了ack机制,以保证消息能够被正常消费.发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功.中途断电,抛出异常等都不会认为成功 con ...

  4. RabbitMQ消息模型之FanoutExchange消息模型实战

    前面我们学习了RabbitMQ的核心基础组件,了解了基本消息模型由队列.交换机.路由构成.而在RabbitMQ的核心组件体系中,主要有4种消息模型:基于HeadersExchange.DirectEx ...

  5. RabbitMQ(03)——RabbitMQ的Fanout消息模型

    RabbitMQ--RabbitMQ的Fanout消息模型 Fanout消息模型结构 P:生产者,向Exchange发送消息 X: Exchange(交换机),接收生产者的消息 C:消费者,领取消息并 ...

  6. 消息队列---消息模型及使用场景

    消息队列   消息对列是一个存放消息的容器,当我们需要消息的时候就从消息队列中取出消息使用.消息队列是分布式系统中重要的组件,使用消息队列的目的是为了通过异步处理提高系统的性能和削峰值,降低系统的耦合 ...

  7. 【分布式】Rabbitmq死信队列模型、实战场景---订单延迟30min支付处理

    分布式 内容管理 死信队列 死信队列demo 死信队列消息模型 平台订单支付超时 --- 演示 业务分析 代码实现 RabbitMQ 死信队列/ 延迟队列 - 延迟业务逻辑 最近可能分布式进入Redi ...

  8. RabbitMQ之消息确认机制(事务+Confirm)

    概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达brok ...

  9. springboot + rabbitmq 用了消息确认机制,感觉掉坑里了

    最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人 ...

最新文章

  1. golang中图片转base64_golang base64编码
  2. 怎么学python-初学者如何学习Python?掌握这17个实用小技巧快速入门!
  3. debug工具_Jupyter官方神器:可视化 Debug 工具!
  4. Cloudera-Manager-agent 误删恢复
  5. pycharm 的live_template的使用
  6. 计算机械效率的公式四种,物理计算公式;
  7. qq表情包html插件,jQuery QQ表情插件jquery.qqFace.js
  8. 计算机专业毕业论文题目大全集
  9. QT开发(九)—— Qt实现应用内动态切换语言,使用Qt语言家编译字体包
  10. TF-IDF算法总结
  11. 5d4的白平衡模式_佳能5D4怎样调整白平衡?
  12. html css ps切图教程,CSS切图学习之认识PHOTOSHOP(PS)
  13. SolidPlant材料清单
  14. 0基础转行软件测试,月薪6000和11000的必备技能,截然不同...
  15. Abaqus中C3D8R单元和C3D8I单元的区别
  16. R语言VaR市场风险计算方法与回测、用LOGIT逻辑回归、PROBIT模型信用风险与分类模型...
  17. Android 7.0新特性——依然范特西
  18. 【AI TOP 10】马化腾:AI技术沦为网络黑产新工具;网易区块链项目被传夭折; 人工智能可以让狗跟人说话
  19. Installshield 打包安装包心得
  20. Linux 下载sublime

热门文章

  1. java实现word(docx)在线编辑(word转html,html转word)——解读document.xml结构
  2. Gambler's Ruin(赌徒破产问题 概率论)
  3. 为何硅谷第一性感女人也没能拯救雅虎?
  4. 某金融企业核心存储POC测试及选型经验
  5. bzoj2563阿狸和桃子的游戏
  6. 雷达信号处理基础-历史和源来
  7. 交叉验证中cv=? 与 cv=KFold(n_splits=?)的区别
  8. iPhone13外部拨打电话总是暂时无法接通解决方案
  9. 知识共享许可协议 Creative Commons Licenses
  10. 生物特征识别学科发展报告