RabbitMQ消息确认以及return机制
消息确认机制
消息发送出去后,如果想确定消息发送的状态信息,发送成功或者发送失败,rabbitmq中有一套完整的检测机制来检测这类问题,我们只需要调用即可,具体的检测机制我们后面再讨论,暂且看一下怎么用代码先实现:
public class Send {private static final String EXCHANGE_NAME = "test_exchange_confirm";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();channel.confirmSelect();channel.exchangeDeclare(EXCHANGE_NAME, "topic");//send a msgString msg = "hello comfirm";String routingKey = "confirm.save";channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long l, boolean b) throws IOException {System.out.println(" ack");}@Overridepublic void handleNack(long l, boolean b) throws IOException {System.out.println("no ack");}});System.out.println("send:"+msg);
// channel.close();
// connection.close(); 这里需要注释,否则就收不到ack}
}
消息发送成功后,发送端发送成功后会收到ack,发送失败后会收到 no ack
消费端写法和之前的没什么区别,我们这次用topic的路由机制来实现一下:
public class ReceiveOne {private static final String QUEUE_NAME = "receive1_queue";private static final String EXCHANGE_NAME = "test_exchange_confirm";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);String routingKey = "confirm.#"; //看上一节的topic路由机制channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [ConsumerOne is] Received '" + message + "'");};boolean autoAck = true; //开启自动应答channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });}
}
Return 机制
return Listener用于处理一些不可路由的消息(在某些情况下,如果我们发送消息的时候,当前的exchange或者routeKey路由不到的时候,这个时候如果我们需要监听这种不可到达的消息,就要使用Return Listener)
消费端自定义监听:
生产者:
public class Produces {private static final String EXCHANGE_NAME = "test_undedine_consumer";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();String routingKey = "consumer.save";String msg = "Hello undefine consumer";for(int i = 0; i<5; i++){channel.basicPublish(EXCHANGE_NAME,routingKey,true,null,msg.getBytes());}}
消费者:
public class Consumers {private static final String QUEUE_NAME = "test_consumer_queue";private static final String EXCHANGE_NAME = "test_undedine_consumer";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();String routineKey = "consumer.#";channel.exchangeDeclare(EXCHANGE_NAME,"topic",true,false,null);channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routineKey);channel.basicConsume(QUEUE_NAME,true,new Myconsumer(channel));}
}
自定义内容:
public class Myconsumer extends DefaultConsumer {public Myconsumer(Channel channel) {super(channel);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("---------");System.out.println(consumerTag);}
}
消费端限流
生产者:(和上面的没什么区别)
public class Produces {private static final String EXCHANGE_NAME = "test_limit_flu";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();String routingKey = "qos.save";String msg = "limit fluency";for(int i = 0; i<5; i++){channel.basicPublish(EXCHANGE_NAME,routingKey,true,null,msg.getBytes());}}
}
消费者:
public class Consumers {private static final String QUEUE_NAME = "test_qos";private static final String EXCHANGE_NAME = "test_limit_flu";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();String routineKey = "qos.#";channel.exchangeDeclare(EXCHANGE_NAME,"topic",true,false,null);channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routineKey);boolean autoAck = false;channel.basicQos(0,1,false);channel.basicConsume(QUEUE_NAME,autoAck,new Myconsumer(channel));}
}
区别在自定义处理里面:
public class Myconsumer extends DefaultConsumer {private Channel channel;public Myconsumer(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("---------");System.out.println(consumerTag);//channel.basicAck(envelope.getDeliveryTag(),false);//这里之所以是false,前面我们设置的是 channel.basicQos(0,1,false); 表示只接受一个,如果改为1以上,就要相应的改为true}
}
当我们把channel.basicAck(envelope.getDeliveryTag(),false);
这一段屏蔽时,我们在后台看到的是:
总共是五条消息,只有一条能被消费,其他的四条需要等这条消费完后发ack响应才能,继续向前传递。屏蔽的那段代码就是手动响应ACK。
重回队列
死信队列
当一个消息在队列中没人去消费它,它就会被重新publish到另外一个Exchange,这个Exchange就是死信队列。
死信队列有哪几种情况?
- 当消息被拒绝(basic.nack),并且拒绝重回队列
- TTL过期了
- 队列达到最大的长度
下面用代码实现一下:
生产端:
public class Produces {private static final String EXCHANGE_NAME = "test_dlx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();String routingKey = "dlx.save";String msg = "hello dlx";channel.basicPublish(EXCHANGE_NAME,routingKey,true,null,msg.getBytes());}
}
消费端:
public class Consumers {private static final String QUEUE_NAME = "test_dlx.queue";private static final String EXCHANGE_NAME = "test_dlx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();String routineKey = "dlx.#";channel.exchangeDeclare(EXCHANGE_NAME,"topic",true,false,null);// 下面是配置死信队列Map<String,Object> arg = new HashMap<>();arg.put("x-dead-letter-exchange","dlx.exchange");channel.queueDeclare(QUEUE_NAME,true,false,false,arg); //这里表明了路由失败的话,转发到死信队列上channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routineKey);// 死信队列的声明channel.exchangeDeclare("dlx.exchange","topic",true,false,null);channel.queueDeclare("dlx.queue",true,false,false,null);channel.queueBind("dlx.queue","dlx.exchange","#");boolean autoAck = true;channel.basicQos(0,1,false);channel.basicConsume(QUEUE_NAME,autoAck,new Myconsumer(channel));}
}
这里需要说明的是我们定义了一个死信队列 “dlx.queue”,和dlx.exchange绑定在一起,也就是说消息没人消费就会路由到死信队列中去。
public class Myconsumer extends DefaultConsumer {private Channel channel;public Myconsumer(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// channel.basicAck(envelope.getDeliveryTag(),false);//这里之所以是false,前面我们设置的是 channel.basicQos(0,1,false); 表示只接受一个,如果改为1以上,就要相应的改为trueSystem.out.println(new String(body));}
}
可以看到上面的test_dlx.queue后面有个DLX的标记,也就是说,它里面的无人消费的消息都会直接转到dlx.queue中。我们可以在dlx.queue中设置监听来处理这些消息。
RabbitMQ消息确认以及return机制相关推荐
- RabbitMQ消息确认机制
文章目录 1. 事务机制 2. Confirm模式 2.1 生产者 2.1.1 普通Confirm模式 2.1.2 批量Confirm模式 2.1.3 异步Confirm模式 2.2 消费者 3. 其 ...
- RabbitMQ消息确认机制和消息重发机制
一.机制 首先我们要知道一条消息的传递过程. 生产者 -> 交换机 -> 队列 我们的生产者生产消息,生产完成的消息发送到交换机,由交换机去把这个消息转发到对应的队列上.这其中我们可能在 ...
- RabbitMQ消息确认机制-可靠抵达
消息发送到被消费的流程: JAVA的生产端的发送数据----->Broker(消息服务器)-------->达到Exchange交换机------------->通过路由键到达Que ...
- rabbitmq消息确认机制及死信队列的使用
关于rabbitmq的基本概念和相关的理论这里就不做过多介绍了,在之前的篇幅中有过相应的介绍,也可以查询一些资料详细了解一下rabbitmq的基础知识,下面要介绍的点主要包括两个方面, 1.rabbi ...
- RabbitMQ消息确认机制-07
在 Rabbitmq 中我们可以通过持久化来解决因为服务器异常而导致丢失的问题, 除此之外我们还会遇到一个问题:生产者将消息发送出去之后,消息到底有没有正确到达 Rabbit 服务器呢?如果不做出处理 ...
- RabbitMQ 消息确认机制 以及 原理解析
https://www.cnblogs.com/DBGzxx/p/10091070.html
- RabbitMQ 消息确认机制confirm代码编写
- 使用spring-rabbit测试RabbitMQ消息确认(发送确认,接收确认)
1.首先是rabbitmq的配置文件: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns ...
- springboot + rabbitmq 用了消息确认机制,感觉掉坑里了
最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人 ...
最新文章
- 面试官问:ZooKeeper 一致性协议 ZAB 原理
- SGU 332 Largest Circle(凸包内接圆半径nlogn)
- Android开源库介绍:AndLinker-Android 上的 IPC 库
- c语言用一维数组求字符串,c语言一维数组练习题.doc
- 美国WH在明尼苏达州最大光伏阵列完工
- 【转】Linux写时拷贝技术(copy-on-write)
- Asp.net中的时区
- matlab 等高线_MATLAB作图实例:39:更改等高线图的填充颜色
- 课堂笔记——Ubiquitous Computing
- 互联网又一个变态条款 “奋斗者协议”
- document.referrer已经可以用于统计搜索来源
- 一个简单示例 利用jawin完成调用window中dll的调用
- 斑斓中国BlenderCN项目库
- 一个基于SpringBoot的在线教育系统「源码开源」
- cisco 2811路由器详细配置
- 【Qt for Python官方教程】使用pyside6-rcc引入.qrc文件
- PB 级数据即席查询基于 Flink 的实践
- 计算机内存储器和外存储器相比较,计算机中内存储器和外存储器有什么区别
- 伽罗瓦2^8域下模多项式求逆python(查表)实现
- Amazon ES现更名为Amazon OpenSearch Service并支持OpenSearch 1.0
热门文章
- PaperWeekly 第十一期
- SQL Server Profiler
- 再谈Weiphp公众平台开发——1、成语接龙插件
- cxf开发基于web的webservice项目(转载)
- GROUP BY,WHERE,HAVING之间的差别和使用方法
- Spark源码系列(四)图解作业生命周期
- 创建war类型的maven工程时报web.xml is missing and failOnMissingWebXml is set to true
- 移动端ios中click点击失效
- System Center 2012 R2 ——基础篇
- Excel2007数据透视表学习(一)