rabbitmq 手动提交_RabbitMQ 相关问题总结--RabbitMQ 如何确保消息发送和消费?
RabbitMQ 相关问题总结
HA 的RabbitMQ 集群架构:
一、RabbitMQ 如何高可用部署,如何确保集群不宕机?
RabbitMQ可用采用三种方式来部署集群:
1. cluster
a. 不支持跨网段,用于同一个网段内的局域网。
b.可用随意的或者动态的增加 或者减少。
c.节点之间需要运行相同版本的RabbitMQ 和Erlang
2.federation
部署在广域网,允许单台服务器上的交换机或者队列,接收发布到另外一台机器上的消息和队列。
3.sholve
和federation类似,工作在更低层次,可以应用于广域网
二、节点类型:
1.RAM Node
内存节点,将所有的队列,交换机, 绑定,用户 权限与vhost的元数据存储在内存中,可以让队列和交换机声明更加的便捷。
2. Disk Node
将元数据存储在磁盘中,单节点系统,只运行 磁盘类型的节点,防止重启RabbitMQ时,丢失系统的配置信息。
三、ErLang Cookie
ErLang Cookie 是保证不同节点之间的通讯,不同节点之间共享相同的Cookie,集群部署时候,需要copy这个数据到不同的节点,使得cookie一致。
四、RabbitMQ集群模式分类
1.普通模式:
默认的集群模式,假设集群上有两个节点 node1,node2, 消息实体只存在一个节点,如果生产者生产一个消息,丢向node1,但是消费者在node2进行消费,那么就需要node2将node1中的消息取出,并且发送给消费者。
2. 镜像模式:
需要将消费者的队列变成镜像队列,存在于多个节点。实现RabbitMQ的高可用,作用就是,消息实体会在队列之间同步。
RabbitMQ相关操作命令:
1. 单个节点的服务启动 停止rabbitmqctl stop
rabbitmq-server -detached
2.查询节点状态rabbitmqctl cluster_status
调研:1. 产品定位(能做什么,不能做什么)
2. 产品特性(适合做什么,不适合做什么)
3. 产品背景(社区活跃度,团队知名度,维护力度,口碑)
4. 产品架构(实现原理)
5. 产品安装(用户指南)
6. 产品维护(运维管理)
demo 验证
RabbitMQ 如何确保消息发送和消费:
从RabbitMQ 结构图来看,有如下几个 过程:
1. 消息从生产者Producer发送到交换机Exchange
2.交换机根据路由规则将消息转发到相应队列
3. 队列将消息进行存储
4.消费者订阅队列消息,并进行消费
第一个过程 消息 从生产者发送到交换机
a.中间网络断开怎么办?
rabbitmq 的解决方案:
1). 设置信道channel 为事务模式
通过channel.txSelect 开启事务,channel.txCommit 提交事务,channel.txRollback 用于事务回滚
如果在还没有提交事务之前,RabbitMQ抛出异常,我们可以 将其捕获,然后进行事务回滚。缺点是 事务模式会极大的消耗RabbitMQ的性能。
2). 设置信道confirm 模式
通过confirm.select开启confirm模式,如果设置了no_wait为 false的话,那么broker会返回confirm.select_ok,表示broker同意将信道设置为confirm模式。 但是这个优点是发送方确认是异步的,发送方可以不等到确认就发送下一条消息。当消息被brocker接收,broker
会发送basic.ack,生产者可以通过回调函数确认这个消息,如果因为RabbitMQ本身问题导致消息丢失,broker会发送basic.nack,生产者通过回调方法接收到nack后,可以考虑消息重发
要注意的是 事务模式 和confirm模式不能共存,是互斥的。@Service("confirmCallBackListener")
public class ConfirmCallBackListener implements ConfirmCallback{
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
}
}
第二个过程 消息由交换机路由到消息队列
对于不能路由到消息队列的消息,如果设置了mandatory的话,basic.return 会在basic.ack或者basic.nack之前返回。
对于可以路由到消息队列的消息,在confirm模式下,如果返回basic.ack的话说明如下:
1.消息被接收到所有的队列中了 2.如果是镜像队列,说明被所有的镜像队列接收 3.如果是持久化的消息到持久化队列已经持久化到看硬盘。@Service("returnCallBackListener")
public class ReturnCallBackListener implements ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
String msgId = "";
if (message.getMessageProperties().getCorrelationId() != null) {
msgId = new String(message.getMessageProperties().getCorrelationId());
}
System.out.println("return--message: msgId:" + msgId + ",msgBody:" + new String(message.getBody())
+ ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:"
+ routingKey);
}
}
当然如果不想在生产者实行return
函数,可以采用备份交换机的方式,当交换机没有路由到正确队列,会将消息转发到这个备份交换机上,前提是这个备份交换机绑定了正确队列,这样就可以被消费了。channel.exchangeDeclare方法的时候添加alternate-exchange参数声明备份交换机
第三个阶段, 队列消息持久化阶段
如果队列不设置持久化,即使消息是持久化的,消息依然不能保存,皮之不存,毛将焉附需要队列和消息都是持久化的,队列持久化只是保存其元数据,在重启,宕机后不丢失,但是要消息也持久化,需要保证消息是持久化的。并且如果是设置了confim模式的话,basic.ack
之前是会将数据进行落盘的, 并且RabbitMQ采用镜像队列,多个副本方式,Master 宕机,依然可以使用Slave 不影响
集群正常使用,保证高可用
第4个阶段 消费者消费消息
为了保证消息正常的到达消费者,RabbitMQ 提供了消息 acknowledgement来确认消息。
默认autoAck=None isAutoAck=false 来确认消息, autoAck = false
是表示,需要等到消费者发送显示的回复确认信号之后,消息才从内存中移除,但是如果acknowledgeMode设置了Auto
,那么isAutoAck= true ,这个其实是不安全的,fire-And-forget,
消息发送出去之后,但是可能还没到消费者,TCP连接就断了(由于配置了auto,只要消费发出去了,就删掉了),那么TCP连接断开了,这部分消息就丢失了,是不安全的,但是对应能一直keepup连接的,是可以提高吞吐量的。
还可以设置autoAck = manual ,isAutoack= false,那么就是
队列每次向消费者发送消息之后,需要消费者手动确认basc.ack,basic,nack等,rabbitmq才可以将消息删除或者重新入队。
isAutoack=false 情况下 ,一致没有收到 消费者的 basic.ack, RabbitMQ如果检测到 和消费者端口连接
端口,会重新发这条消息。
还有注意如果basic.nack basic.reject 只是简单的拒绝 ,而不是重新 requeue的话,那么消息是不会重新入队的
查看下basic.Nack说明/**
* Reject one or several received messages.
*
* Supply the deliveryTag
from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
* @see com.rabbitmq.client.AMQP.Basic.Nack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to reject all messages up to and including
* the supplied delivery tag; false to reject just the supplied
* delivery tag.
* @param requeue true if the rejected message(s) should be requeued rather
* than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
最后一个参数设置true的话才会重新入队channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
查看源码:private void consumeFromQueue(String queue) throws IOException {
this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(), "", false, this.exclusive,
this.consumerArgs, this.consumer);
if (logger.isDebugEnabled()) {
logger.debug("Started on queue '" + queue + "': " + this);
}
}
还有个问题,消费者啥时候发送basic.ack呢?Object[] listenerArguments = buildListenerArguments(convertedMessage);
Object result = invokeListenerMethod(methodName, listenerArguments, message);
if (result != null) {
handleResult(result, message, channel);
} else {
logger.trace("No result object given - no result to handle");
}
、、、、、、、、、、、、、、rotected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message)
throws Exception {
RabbitResourceHolder resourceHolder = null;
Channel channelToUse = channel;
boolean boundHere = false;
try {
if (!isExposeListenerChannel()) {
// We need to expose a separate Channel.
resourceHolder = getTransactionalResourceHolder();
channelToUse = resourceHolder.getChannel();
/*
* If there is a real transaction, the resource will have been bound; otherwise
* we need to bind it temporarily here. Any work done on this channel
* will be committed in the finally block.
*/
if (isChannelLocallyTransacted(channelToUse) &&
!TransactionSynchronizationManager.isActualTransactionActive()) {
resourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
resourceHolder);
boundHere = true;
}
}
else {
// if locally transacted, bind the current channel to make it available to RabbitTemplate
if (isChannelLocallyTransacted(channel)) {
RabbitResourceHolder localResourceHolder = new RabbitResourceHolder(channelToUse, false);
localResourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
localResourceHolder);
boundHere = true;
}
}
// Actually invoke the message listener...
try {
listener.onMessage(message, channelToUse);
}
catch (Exception e) {
throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
}
}
finally {
if (resourceHolder != null && boundHere) {
// so the channel exposed (because exposeListenerChannel is false) will be closed
resourceHolder.setSynchronizedWithTransaction(false);
}
ConnectionFactoryUtils.releaseResources(resourceHolder);
if (boundHere) {
// unbind if we bound
TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
if (!isExposeListenerChannel() && isChannelLocallyTransacted(channelToUse)) {
/*
* commit the temporary channel we exposed; the consumer's channel
* will be committed later. Note that when exposing a different channel
* when there's no transaction manager, the exposed channel is committed
* on each message, and not based on txSize.
*/
RabbitUtils.commitIfNecessary(channelToUse);
}
}
}
}
参考:https://blog.csdn.net/wangming520liwei/article/details/79523130
rabbitmq 手动提交_RabbitMQ 相关问题总结--RabbitMQ 如何确保消息发送和消费?相关推荐
- rabbitmq 手动提交_RabbitMQ系列(四)RabbitMQ事务和Confirm发送方消息确认——深入解读 - 王磊的博客 - 博客园...
RabbitMQ事务和Confirm发送方消息确认--深入解读 RabbitMQ系列文章 引言 根据前面的知识( 深入了解RabbitMQ工作原理及简单使用 . Rabbit的几种工作模式介绍与实践 ...
- rabbitmq 手动提交_第四章----SpringBoot+RabbitMQ发送确认和消费手动确认机制
1. 配置RabbitMQ # 发送确认 spring.rabbitmq.publisher-confirms=true # 发送回调 spring.rabbitmq.publisher-return ...
- RabbitMQ如何保证消息发送、消费成功
好记忆不如烂笔头,能记下点东西,就记下点,有时间拿出来看看,也会发觉不一样的感受. 目录 1.发送确认机制设置 2.消息丢失.非信任或失败 3.消息重复消费 4.消费成功通知 5.总结 消息因为其:削 ...
- python kafka offset自动提交_Spring-Kafka —— 实现批量消费和手动提交offset
spring-kafka的官方文档介绍,可以知道自1.1版本之后, @KafkaListener开始支持批量消费,只需要设置batchListener参数为true 把application.yml中 ...
- Kafka手动提交偏移量的作用到底是什么???
手动提交偏移量的原因 最近拜读了很多文章,都谈到为了保证消息的安全消费(避免消息丢失和消息重复读取),建议消费者客户端手动提交偏移量.具体如下: 1.当设置为自动提交时,当kafka消费者读取到消息后 ...
- java rabbitmq 工具类_RabbitMq通用管理工具类
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client ...
- rabbitmq订单模块_RabbitMQ播放模块! 构架
rabbitmq订单模块 RabbitMQ提供了具有可预测且一致的吞吐量和延迟的高可用性,可伸缩和便携式消息传递系统. RabbitMQ是AMQP (业务消息传递的开放标准)的领先实现 ,并且通过适配 ...
- RabbitMQ手动确认模式(项目开发常用模式)
RabbitMQ 手动确认模式(日常项目开发常用模式) 借鉴导言 架构及工作原理 项目开发使用(公司常用) 借鉴导言 此文借鉴多名CSDN用户博客,并将其博文中关于MQ常用的点,进行了归纳整理 借鉴博 ...
- Rabbitmq手动应答代码实现与测试
前言 rabbitmq作为现在流行的消息队列,它拥有流量削峰.应用解耦.异步处理等优点,使用数量也是较多的.其中重要的特性也就是手动应答避免消息丢失的特点更是使其更上一层楼.消息队列基础的处理流程是: ...
- rabbitmq 同步策略_RabbitMQ(三):消息持久化策略
一.前言 在正常的服务器运行过程中,时常会面临服务器宕机重启的情况,那么我们的消息此时会如何呢?很不幸的事情就是,我们的消息可能会消失,这肯定不是我们希望见到的结果.所以我们希望AMQP服务器崩溃了也 ...
最新文章
- ML基石_3_TypesOfLearning
- 敏捷嘉年华——敏捷之旅2012(上海站)
- 文件 图片 上传 及少许正则校验
- 马来西亚热情拥抱阿里巴巴 马云倡议的eWTP首次落地海外
- HDU 3486 Interviewe RMQ
- MAC报错:-bash: mysqlbinlog : command not found
- Android TV开发总结(七)构建一个TV app中的剧集列表控件
- Django静态文件的加载以及STATIC_URL、 STATIC_ROOT 、STATICFILES_DIRS的区别
- python画彩色螺旋线_解决python彩色螺旋线绘制引发的问题
- MFC动态链接库和WIN32动态链接库 及区别
- mysql 图像数据类型_MySQL数据类型
- linux命令行工具大全,七款极为实用的Linux命令行工具
- HBuilder开发app,扫描枪中,使用input输入框,然后点击扫描,获取不到条码!
- 对比线程,一个VCPU是什么
- 创业公司必备,20个提升团队工作效率的工具神器
- Photoshop CS4 CS5 CS6永久序列号全面整理
- 名片识别 java_基于JAVA的名片识别接口调用代码实例
- 情人节单身的你,是否用一张智能名片,进行表白
- PMI考试收获的学习思维
- 软件构造Lab2总结
热门文章
- html如何解决412问题,网站412怎么解决?
- 南理工计算机学院宋杰,周骏 - 计算机与信息科学学院 - Powered by 西南大学
- 体温枪PCBA设计生产流程
- android 实体 快捷键,as快捷键
- 信号与系统(关于流程框图的一个简单问题记录)
- SELinux audit2allow命令使用
- 她学术造假导致导师自杀,后将这段学术经历出书贩卖,一年收入上百万...
- JavaSE基础笔记——JOptionPane编写员工管理系统;GUI使用;写一个超级数组
- 基于蓝墨云班课的翻转课堂实践
- [转]c# 语音卡控制--语音卡电话呼叫系统