消息确认机制

消息发送出去后,如果想确定消息发送的状态信息,发送成功或者发送失败,rabbitmq中有一套完整的检测机制来检测这类问题,我们只需要调用即可,具体的检测机制我们后面再讨论,暂且看一下怎么用代码先实现:

public class Send {private static final String EXCHANGE_NAME = "test_exchange_confirm";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();channel.confirmSelect();channel.exchangeDeclare(EXCHANGE_NAME, "topic");//send a msgString msg = "hello comfirm";String routingKey = "confirm.save";channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long l, boolean b) throws IOException {System.out.println(" ack");}@Overridepublic void handleNack(long l, boolean b) throws IOException {System.out.println("no ack");}});System.out.println("send:"+msg);
//        channel.close();
//        connection.close();  这里需要注释,否则就收不到ack}
}

消息发送成功后,发送端发送成功后会收到ack,发送失败后会收到 no ack

消费端写法和之前的没什么区别,我们这次用topic的路由机制来实现一下:

public class ReceiveOne {private static final String QUEUE_NAME = "receive1_queue";private static final String EXCHANGE_NAME = "test_exchange_confirm";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);String routingKey = "confirm.#";  //看上一节的topic路由机制channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [ConsumerOne is] Received '" + message + "'");};boolean autoAck = true;  //开启自动应答channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });}
}

Return 机制

return Listener用于处理一些不可路由的消息(在某些情况下,如果我们发送消息的时候,当前的exchange或者routeKey路由不到的时候,这个时候如果我们需要监听这种不可到达的消息,就要使用Return Listener)


消费端自定义监听:

生产者:

public class Produces {private static final String EXCHANGE_NAME = "test_undedine_consumer";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();String routingKey = "consumer.save";String msg = "Hello undefine consumer";for(int i = 0; i<5; i++){channel.basicPublish(EXCHANGE_NAME,routingKey,true,null,msg.getBytes());}}

消费者:

public class Consumers {private static final String QUEUE_NAME = "test_consumer_queue";private static final String EXCHANGE_NAME = "test_undedine_consumer";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();String routineKey = "consumer.#";channel.exchangeDeclare(EXCHANGE_NAME,"topic",true,false,null);channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routineKey);channel.basicConsume(QUEUE_NAME,true,new Myconsumer(channel));}
}

自定义内容:

public class Myconsumer extends DefaultConsumer {public Myconsumer(Channel channel) {super(channel);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("---------");System.out.println(consumerTag);}
}

消费端限流

生产者:(和上面的没什么区别)

public class Produces {private static final String EXCHANGE_NAME = "test_limit_flu";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();String routingKey = "qos.save";String msg = "limit fluency";for(int i = 0; i<5; i++){channel.basicPublish(EXCHANGE_NAME,routingKey,true,null,msg.getBytes());}}
}

消费者:

public class Consumers {private static final String QUEUE_NAME = "test_qos";private static final String EXCHANGE_NAME = "test_limit_flu";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();String routineKey = "qos.#";channel.exchangeDeclare(EXCHANGE_NAME,"topic",true,false,null);channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routineKey);boolean autoAck = false;channel.basicQos(0,1,false);channel.basicConsume(QUEUE_NAME,autoAck,new Myconsumer(channel));}
}

区别在自定义处理里面:

public class Myconsumer extends DefaultConsumer {private Channel channel;public Myconsumer(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("---------");System.out.println(consumerTag);//channel.basicAck(envelope.getDeliveryTag(),false);//这里之所以是false,前面我们设置的是  channel.basicQos(0,1,false); 表示只接受一个,如果改为1以上,就要相应的改为true}
}

当我们把channel.basicAck(envelope.getDeliveryTag(),false); 这一段屏蔽时,我们在后台看到的是:

总共是五条消息,只有一条能被消费,其他的四条需要等这条消费完后发ack响应才能,继续向前传递。屏蔽的那段代码就是手动响应ACK。


重回队列

死信队列

当一个消息在队列中没人去消费它,它就会被重新publish到另外一个Exchange,这个Exchange就是死信队列。

死信队列有哪几种情况?

  • 当消息被拒绝(basic.nack),并且拒绝重回队列
  • TTL过期了
  • 队列达到最大的长度

下面用代码实现一下:

生产端:

public class Produces {private static final String EXCHANGE_NAME = "test_dlx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();String routingKey = "dlx.save";String msg = "hello dlx";channel.basicPublish(EXCHANGE_NAME,routingKey,true,null,msg.getBytes());}
}

消费端:

public class Consumers {private static final String QUEUE_NAME = "test_dlx.queue";private static final String EXCHANGE_NAME = "test_dlx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();String routineKey = "dlx.#";channel.exchangeDeclare(EXCHANGE_NAME,"topic",true,false,null);// 下面是配置死信队列Map<String,Object> arg = new HashMap<>();arg.put("x-dead-letter-exchange","dlx.exchange");channel.queueDeclare(QUEUE_NAME,true,false,false,arg); //这里表明了路由失败的话,转发到死信队列上channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routineKey);// 死信队列的声明channel.exchangeDeclare("dlx.exchange","topic",true,false,null);channel.queueDeclare("dlx.queue",true,false,false,null);channel.queueBind("dlx.queue","dlx.exchange","#");boolean autoAck = true;channel.basicQos(0,1,false);channel.basicConsume(QUEUE_NAME,autoAck,new Myconsumer(channel));}
}

这里需要说明的是我们定义了一个死信队列 “dlx.queue”,和dlx.exchange绑定在一起,也就是说消息没人消费就会路由到死信队列中去。

public class Myconsumer extends DefaultConsumer {private Channel channel;public Myconsumer(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// channel.basicAck(envelope.getDeliveryTag(),false);//这里之所以是false,前面我们设置的是  channel.basicQos(0,1,false); 表示只接受一个,如果改为1以上,就要相应的改为trueSystem.out.println(new String(body));}
}


可以看到上面的test_dlx.queue后面有个DLX的标记,也就是说,它里面的无人消费的消息都会直接转到dlx.queue中。我们可以在dlx.queue中设置监听来处理这些消息。

RabbitMQ消息确认以及return机制相关推荐

  1. RabbitMQ消息确认机制

    文章目录 1. 事务机制 2. Confirm模式 2.1 生产者 2.1.1 普通Confirm模式 2.1.2 批量Confirm模式 2.1.3 异步Confirm模式 2.2 消费者 3. 其 ...

  2. RabbitMQ消息确认机制和消息重发机制

    一.机制 首先我们要知道一条消息的传递过程. 生产者 -> 交换机 ->  队列 我们的生产者生产消息,生产完成的消息发送到交换机,由交换机去把这个消息转发到对应的队列上.这其中我们可能在 ...

  3. RabbitMQ消息确认机制-可靠抵达

    消息发送到被消费的流程: JAVA的生产端的发送数据----->Broker(消息服务器)-------->达到Exchange交换机------------->通过路由键到达Que ...

  4. rabbitmq消息确认机制及死信队列的使用

    关于rabbitmq的基本概念和相关的理论这里就不做过多介绍了,在之前的篇幅中有过相应的介绍,也可以查询一些资料详细了解一下rabbitmq的基础知识,下面要介绍的点主要包括两个方面, 1.rabbi ...

  5. RabbitMQ消息确认机制-07

    在 Rabbitmq 中我们可以通过持久化来解决因为服务器异常而导致丢失的问题, 除此之外我们还会遇到一个问题:生产者将消息发送出去之后,消息到底有没有正确到达 Rabbit 服务器呢?如果不做出处理 ...

  6. RabbitMQ 消息确认机制 以及 原理解析

    https://www.cnblogs.com/DBGzxx/p/10091070.html

  7. RabbitMQ 消息确认机制confirm代码编写

  8. 使用spring-rabbit测试RabbitMQ消息确认(发送确认,接收确认)

    1.首先是rabbitmq的配置文件: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns ...

  9. springboot + rabbitmq 用了消息确认机制,感觉掉坑里了

    最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人 ...

最新文章

  1. 面试官问:ZooKeeper 一致性协议 ZAB 原理
  2. SGU 332 Largest Circle(凸包内接圆半径nlogn)
  3. Android开源库介绍:AndLinker-Android 上的 IPC 库
  4. c语言用一维数组求字符串,c语言一维数组练习题.doc
  5. 美国WH在明尼苏达州最大光伏阵列完工
  6. 【转】Linux写时拷贝技术(copy-on-write)
  7. Asp.net中的时区
  8. matlab 等高线_MATLAB作图实例:39:更改等高线图的填充颜色
  9. 课堂笔记——Ubiquitous Computing
  10. 互联网又一个变态条款 “奋斗者协议”
  11. document.referrer已经可以用于统计搜索来源
  12. 一个简单示例 利用jawin完成调用window中dll的调用
  13. 斑斓中国BlenderCN项目库
  14. 一个基于SpringBoot的在线教育系统「源码开源」
  15. cisco 2811路由器详细配置
  16. 【Qt for Python官方教程】使用pyside6-rcc引入.qrc文件
  17. PB 级数据即席查询基于 Flink 的实践
  18. 计算机内存储器和外存储器相比较,计算机中内存储器和外存储器有什么区别
  19. 伽罗瓦2^8域下模多项式求逆python(查表)实现
  20. Amazon ES现更名为Amazon OpenSearch Service并支持OpenSearch 1.0

热门文章

  1. PaperWeekly 第十一期
  2. SQL Server Profiler
  3. 再谈Weiphp公众平台开发——1、成语接龙插件
  4. cxf开发基于web的webservice项目(转载)
  5. GROUP BY,WHERE,HAVING之间的差别和使用方法
  6. Spark源码系列(四)图解作业生命周期
  7. 创建war类型的maven工程时报web.xml is missing and failOnMissingWebXml is set to true
  8. 移动端ios中click点击失效
  9. System Center 2012 R2 ——基础篇
  10. Excel2007数据透视表学习(一)