使用spring-rabbit测试RabbitMQ消息确认(发送确认,接收确认)
1、首先是rabbitmq的配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd"><!-- spring-rabbit.xsd的版本要注意,很1.4以前很多功能都没有,要用跟jar包匹配的版本 --><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /><rabbit:connection-factory id="connectionFactory"host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"publisher-confirms="true" /><rabbit:admin connection-factory="connectionFactory" /><!-- 给模板指定转换器 --><!-- mandatory必须设置true,return callback才生效 --><rabbit:template id="amqpTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBackListener"return-callback="returnCallBackListener" mandatory="true" /><rabbit:queue name="CONFIRM_TEST" /><rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX" ><rabbit:bindings><rabbit:binding queue="CONFIRM_TEST" /></rabbit:bindings></rabbit:direct-exchange><!-- 配置consumer, 监听的类和queue的对应关系 --><rabbit:listener-containerconnection-factory="connectionFactory" acknowledge="manual" ><rabbit:listener queues="CONFIRM_TEST" ref="receiveConfirmTestListener" /></rabbit:listener-container></beans>
2、发送方:
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;@Service("publishService") public class PublishService {@Autowired private AmqpTemplate amqpTemplate; public void send(String exchange, String routingKey, Object message) { amqpTemplate.convertAndSend(exchange, routingKey, message);} }
3、消费方:
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Service;import com.rabbitmq.client.Channel;@Service("receiveConfirmTestListener") public class ReceiveConfirmTestListener implements ChannelAwareMessageListener { @Overridepublic void onMessage(Message message, Channel channel) throws Exception {try{System.out.println("consumer--:"+message.getMessageProperties()+":"+new String(message.getBody()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch(Exception e){e.printStackTrace();//TODO 业务处理channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);}} }
4、确认后回调方:
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Service;@Service("confirmCallBackListener") public class ConfirmCallBackListener implements ConfirmCallback{@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);} }
5、失败后return回调:
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.stereotype.Service;@Service("returnCallBackListener") public class ReturnCallBackListener implements ReturnCallback{@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);} }
6、测试类:
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import com.dingcheng.confirms.publish.PublishService; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:application-context.xml"}) public class TestConfirm { @Autowired private PublishService publishService; private static String exChange = "DIRECT_EX";@Test public void test1() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis();System.out.println("test1---message:"+message);//exchange,queue 都正确,confirm被回调, ack=truepublishService.send(exChange,"CONFIRM_TEST",message); Thread.sleep(1000);} @Test public void test2() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis();System.out.println("test2---message:"+message);//exchange 错误,queue 正确,confirm被回调, ack=falsepublishService.send(exChange+"NO","CONFIRM_TEST",message); Thread.sleep(1000);} @Test public void test3() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis();System.out.println("test3---message:"+message);//exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTEpublishService.send(exChange,"",message); // Thread.sleep(1000); } @Test public void test4() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis();System.out.println("test4---message:"+message);//exchange 错误,queue 错误,confirm被回调, ack=falsepublishService.send(exChange+"NO","CONFIRM_TEST",message); Thread.sleep(1000);} }
7、测试结果:
test1---message:currentTime:1483786948506 test2---message:currentTime:1483786948532 consumer--:MessageProperties [headers={spring_return_correlation=445bc7ca-a5bd-47e2-8ba3-f0448420e441}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=DIRECT_EX, receivedRoutingKey=CONFIRM_TEST, deliveryTag=1, messageCount=0]:currentTime:1483786948506 test3---message:currentTime:1483786948536 confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40) confirm--:correlationData:null,ack:false,cause:Channel closed by application [ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40) return--message:currentTime:1483786948536,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX,routingKey: confirm--:correlationData:null,ack:true,cause:null test4---message:currentTime:1483786948546 confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40) [ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
8、总结如下:
如果消息没有到exchange,则confirm回调,ack=false
如果消息到达exchange,则confirm回调,ack=true
exchange到queue成功,则不回调return
exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
备注:需要说明,spring-rabbit和原生的rabbit-client ,表现是不一样的。测试的时候,原生的client,exchange错误的话,直接就报错了,是不会到confirmListener和returnListener的
源码地址:https://github.com/qq315737546/spring-rabbit
全文地址请点击:https://blog.csdn.net/qq315737546/article/details/54176560
使用spring-rabbit测试RabbitMQ消息确认(发送确认,接收确认)相关推荐
- 【夏目鬼鬼分享】rabbitmq消息队列发送于接收
1. 创建连接的工具类 *** Created by wzy on 2020/12/3* 用于创建连接的工具类*/ public class ConnectionUtil {public static ...
- 【Spring Boot】Spring Boot之整合RabbitMQ并实现消息的发送和接收
一.项目配置 1)引入maven坐标 <!--amqp--><dependency><groupId>org.springframework.boot</gr ...
- IBM MQ 搭建测试环境并测试消息的发送和接收
一,说明 MQ基于Linux环境的安装比较简单,读者可以去网上自行搜索. 本文主要在于MQ队列管理器相关的部分. OS Version: Red Hat Enterprise Linux Server ...
- 探索 OpenStack 之(15):oslo.messaging 和 Cinder 中 MessageQueue 消息的发送和接收
前言:上一篇文章 只是 RabbitMQ 的科普,本文将仔细分析 Cinder 中 RabbitMQ 的各组件的使用.消息的发送和接收等.由于各流程步骤很多,本文只会使用若干流程图来加以阐述,尽量做到 ...
- java activeMQ消息的发送与接收
java activeMQ消息的发送与接收 activemq是我们经常用到的消息队列之一,比如说速度快,对spring的很好的支持,支持多种协议等等,今天我们就来看一下activeMQ消息的发送与接收 ...
- mfc 开启指定服务器,用MFC实现消息的发送和接收(含服务器)
<用MFC实现消息的发送和接收(含服务器)>由会员分享,可在线阅读,更多相关<用MFC实现消息的发送和接收(含服务器)(33页珍藏版)>请在人人文库网上搜索. 1.精品好资料学 ...
- java kafka消息的发送与接收
java kafka消息的发送与接收 消息队列在java EE级开发是很常用到的工具之一,在众多消息队列当中,active mq与kafka相对比较受开发者的喜爱,那么kafka是怎样实现消息的发送与 ...
- 【Unity】文字游戏制作插件Fungus教程(6)碰撞触发和消息的发送和接收
如果在开发3D项目的时候 我们还可以用到Fungus插件来丰富我们游戏的内容 比如游戏的操作方法等提示信息我们完全可以使用碰撞和消息的接收和发送来触发提示语句 我举一个例子比如开发一个没有地图指示的探 ...
- java mq发送sdk_【转载】java实现rabbitmq消息的发送接受
本文不介绍amqp和rabbitmq相关知识,请自行网上查阅 本文是基于spring-rabbit中间件来实现消息的发送接受功能 see http://www.rabbitmq.com/tutoria ...
最新文章
- R语言层次聚类:通过内平方和(Within Sum of Squares, WSS)选择最优的聚类K值、以内平方和(WSS)和K的关系并通过弯头法(elbow method)获得最优的聚类个数
- filter vue 循环_Vue - 基础
- Java进阶学习路线
- IOS开发之格式化日期时间的使用 编程中常见问题
- 第4章 第三节 内核同步
- springboot 上传文件解析入库_SpringBoot + easyexcel + WebUploader 实现文件上传并解析
- php导出csv插件,PHP导出CSV,EXCEL
- java 数组大数乘法_java – 在数组中查找3个数字的最大乘积
- Maven-Eclipse使用maven创建HelloWorld Java项目,maven常用的命令解析
- 计算机驱动空间的c盘不足怎么办,C盘磁盘空间不足怎么解决
- 大量大数据如何进行查询
- 入门物联网还得靠嵌入式
- Mave概念及其配置
- Win8系统如何设置时间自动同步方法 电脑系统时间不能同步怎么设置
- android组件圆角,Android实现圆角控件
- !!!---1588|Sum of All Odd Length Subarrays(新)
- SAP JCo 功能
- 地图上如何量方位角_地图投影怎么做到按条件(等角、等面积、等距)投影的?...
- HTML浏览器解析位置错误,各浏览器对CSS错误解析规则的差异及CSS hack.pdf
- 靶机渗透练习90-Grotesque:1.0.1
热门文章
- spark如何解决文件不存在_Spark Read.json无法找到文件
- idc网站html源码,40个网页常用小代码
- python 列表 换行_python基础语法学习——参考Python Crash Course
- 三人表决器逻辑表达式与非_机器学习 | 关于参数模型与非参数模型研究
- ajax中itemtexts,从Jquery Ajax调用CodeMirror textarea的值设置
- python怎么输出小数部分_python 输出小数控制
- c#如何跳出一个函数_C# mysql 学生信息管理系统
- 嵌入式linux实时化技术,嵌入式Linux实时化技术
- vue学习:v-text,v-html, v-model, {{}}之间的异同
- SQL判断是否“存在“,还在用 count 操作?