RabbitMQ与spring的集成,,基础。
2019独角兽企业重金招聘Python工程师标准>>>
小弟 前段时间使用mq是因为要在Jfianl架构中使用,但Jfinal并不擅长,所以使用的是工具类创建的链接和通道。又写了消费者和生产者的公共方法。
现在有一个业务。对接银行的时候,因异步回调。导致客户在对一张A表操作 和银行回调对A表的操作产生并发。致使A表出现一个seq_no重复。余额也计算错误。领导要求集成MQ,小弟终于在3天后集成了一个基础的demo。现在记录一下:
首先 maven项目肯定要引入jar包的
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>1.4.5.RELEASE</version> </dependency>
然后请看spring的配置:
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:p="http://www.springframework.org/schema/p"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" ><description>rabbitmq 连接服务配置</description><rabbit:connection-factory id="connectionFactory"username="${mq.name}" password="${mq.pwd}" host="${mq.url}" port="${mq.port}"/><rabbit:admin connection-factory="connectionFactory"/><!-- spring template声明--><rabbit:template exchange="${mq.user.bill.exchange.name}" id="amqpTemplate" connection-factory="connectionFactory"message-converter="jsonMessageConverter" /><!-- 消息对象json转换类 --><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /><!-- 业务队列 --><rabbit:queue id="user_bill_queue" name="user_bill_queue" durable="true" auto-delete="false" exclusive="false"><!-- <rabbit:queue-arguments><!– 设置死信交换机 –><entry key="x-dead-letter-exchange"><value type="java.lang.String">dead_letter_userbill_exchange</value></entry><!– 设置死信交换机的路由键 –><entry key="x-dead-letter-routing-key"><value type="java.lang.String">userbill_queue_fail</value></entry></rabbit:queue-arguments>--></rabbit:queue><!-- 死信队列 --><!--<rabbit:queue id="user_bill_dead_queue" name="user_bill_dead_queue" durable="true" auto-delete="false" exclusive="false" />--><!-- 死信交换机配置 --><!--<rabbit:direct-exchange name="dead_letter_userbill_exchange" durable="true" auto-delete="false" id="dead_letter_exchange"><rabbit:bindings><rabbit:binding queue="user_bill_dead_queue" key="userbill_queue_fail"/></rabbit:bindings></rabbit:direct-exchange>--><!-- 正常交换机配置 --><rabbit:direct-exchange name="${mq.user.bill.exchange.name}" durable="true" auto-delete="false" id="${mq.user.bill.exchange.name}"><rabbit:bindings><rabbit:binding queue="user_bill_queue" key="${mq.user.bill.routing.key}"/></rabbit:bindings></rabbit:direct-exchange><!-- 配置监听 手动ack prefetch="1" 表示消费一条--><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" ><rabbit:listener queues="user_bill_queue" ref="queueListenter"/><!--<rabbit:listener queues="user_bill_dead_queue" ref="deadUserBillQueueListenter"/>--></rabbit:listener-container></beans>
1、强调一下命名空间不多。够用就行。这里只单单配置了mq 与spring其他文件集成可使用import不要重复引用即可
2、这里粘出来的 有异常msg的处理。也就是死信队列。后面会提到
以上基本都是固定配置。获取链接,创建admin(在消息代理中如何利用协议来配置队列,交换和绑定。实现将自动声明在一个应用上下文的Queues,Exchanges,Bindings。具体功能我也不清楚。一直没搞懂) 创建生产者模板,创建队列。创建指定路由key的交换器 并绑定队列,消息对象转json的bean等等。
3、如果想要引入消息失效时间,需要在定义队列的地方添加属性<rabbit:queue-arguments>,并指定
<entry key="x-message-ttl"><value type="java.lang.Integer">60000</value> </entry>
表示该队列中的信息失效时间为1min。
要引入队列的等级 需要的key=x-max-priority。
下面来说下 死信队列。当有消息再消费端处理失败时。如果要ackNack的话(true),会导致不断消费这个消息,一直产生错误,一个死循环。
这时,使用死信队列就可以处理。
1、定义业务队列的时候绑定一个死信交换机。并绑定一个路由key,注意x-dead-letter-exchange和x-dead-letter-routing-key是固定参数
<rabbit:queue-arguments><!-- 设置死信交换机 --><entry key="x-dead-letter-exchange"><value type="java.lang.String">dead_letter_userbill_exchange</value></entry><!-- 设置死信交换机的路由键 --><entry key="x-dead-letter-routing-key"><value type="java.lang.String">userbill_queue_fail</value></entry> </rabbit:queue-arguments>
2、设置一个死信队列,用来接收死信交换机转发来的异常信息(想要队列的其他属性可以自定义配置)
<rabbit:queue id="user_bill_dead_queue" name="user_bill_dead_queue" durable="true" auto-delete="false" exclusive="false" />
3、定义一个死信交换机,名称与业务队列中定义的一致,绑定死信队列和路由key(与业务队列中定义的死信交换机的路由key一致)
<rabbit:direct-exchange name="dead_letter_userbill_exchange" durable="true" auto-delete="false" id="dead_letter_exchange"><rabbit:bindings><rabbit:binding queue="user_bill_dead_queue" key="userbill_queue_fail"/></rabbit:bindings> </rabbit:direct-exchange>
4、在监听器中将死信队列纳入监听 监听器中的ref bean 都是通过@Component注解注入的。
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" ><rabbit:listener queues="user_bill_queue" ref="queueListenter"/><rabbit:listener queues="user_bill_dead_queue" ref="deadUserBillQueueListenter"/> </rabbit:listener-container>
这样就完成了失败消息转发到死信队列中。在设计另一个消费者deadUserBillQueueListenter 进行消息处理即可,可设计,在处理一次失败就将期ackreject
这里要提醒一下,当设计有自定义交换机时,生产者传入的就不是队列名称 ,而是交换机名称和路由key,只有在使用默认交换机时才使用队列名称
生产者代码:
package com.qiantu.core.rabbitmq;/*** @Description: 给队列发送消息接口类* @Date: create in 2018-07-30 16:36* @Author:Reynold-白*/ public interface MQProducer {/*** 发送消息到指定队列* @param queueKey* @param object*/void sendDataToQueue(String exchangeName, String routingKey, Object object);}
package com.qiantu.core.rabbitmq;import com.alibaba.fastjson.JSON; import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.support.GenericXmlApplicationContext; import org.springframework.stereotype.Service;/*** @Description: 发送消息实现* @Date: create in 2018-07-30 16:37* @Author:Reynold-白*/ @Service("mqProducer") public class MQProducerImpl implements MQProducer{private final static Logger log = Logger.getLogger(MQProducerImpl.class);@Autowiredprivate AmqpTemplate amqpTemplate;@Overridepublic void sendDataToQueue(String exchangeName, String routingKey, Object object) {try {log.info("========向MQ发送消息【开始】========消息:" + object.toString());amqpTemplate.convertAndSend(exchangeName, routingKey,object);log.info("========向MQ发送消息【完成】========消息:");} catch (Exception e) {log.error("=======发送消息失败======", e);e.printStackTrace();}}}
消费者代码:
package com.qiantu.core.rabbitmq;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.qiantu.core.constants.UserBillConstants; import com.qiantu.core.model.RabbitMQConsumerFailData; import com.qiantu.core.service.UserBillSerivce; import com.qiantu.core.utils.IdGenerator; import com.rabbitmq.client.Channel; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Date; import java.util.HashMap; import java.util.Map;/*** @Description: userBill消息监听消费* @Date: create in 2018-07-30 17:08* @Author:Reynold-白*/ @Component public class QueueListenter implements ChannelAwareMessageListener {protected static Logger log = Logger.getLogger(QueueListenter.class);@Autowiredprivate UserBillSerivce userBillSerivce;@Overridepublic void onMessage(Message message, Channel channel) {String msgStr = "";try{msgStr = new String(message.getBody(), "UTF-8");log.info("=====获取消息" + msgStr);Map<String, String> userBillParams = JSONObject.parseObject(msgStr, new TypeReference<Map<String, String>>() {});boolean result = userBillSerivce.queueMsgCreateUserBill(userBillParams);if(result){//处理成功,响应队列,删除该条信息this.basicACK(message, channel);log.info("=======消息:" + msgStr + ",处理成功!");}else{RabbitMQConsumerFailData rmcfd = new RabbitMQConsumerFailData();rmcfd.setId(IdGenerator.randomUUID());rmcfd.setData(msgStr);rmcfd.setType("0");rmcfd.setCreateBy("admin");rmcfd.setCreateTime(new Date());userBillSerivce.insertRabbitMQFailData(rmcfd);//处理失败,拒绝数据this.basicReject(message, channel);log.info("=======消息:" + msgStr + ",处理失败。回退!");}}catch(Exception e){log.error("=======消息业务处理异常=====", e);this.basicReject(message, channel);e.printStackTrace();}}//正常消费通知private void basicACK(Message message,Channel channel){try{channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(IOException e){log.error("通知服务器移除mq时异常,异常信息:"+e);}}//处理异常,消息回到异常处理队列总再处理private void basicReject(Message message,Channel channel){try {/*** 第一个参数:该消息的index* 第二个参数:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。* 第三个参数:被拒绝的是否重新入队列*/ // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {try {log.error(new String(message.getBody(), "utf-8") + "重新进入服务器时出现异常,异常信息:", e);} catch (UnsupportedEncodingException e1) {e1.printStackTrace();}e.printStackTrace();}}}
死信队列消费者:
package com.qiantu.core.rabbitmq;import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.qiantu.core.service.UserBillSerivce; import com.rabbitmq.client.Channel; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Map;/*** @Description: 失败信息再处理* @Date: create in 2018-08-02 15:00* @Author:Reynold-白*/ @Component public class DeadUserBillQueueListenter implements ChannelAwareMessageListener {protected static Logger log = Logger.getLogger(QueueListenter.class);@Autowiredprivate UserBillSerivce userBillSerivce;@Overridepublic void onMessage(Message message, Channel channel) throws Exception {String msgStr = "";try{msgStr = new String(message.getBody(), "UTF-8");log.info("=====获取消息" + msgStr);Map<String, String> userBillParams = JSONObject.parseObject(msgStr, new TypeReference<Map<String, String>>() {});boolean result = userBillSerivce.queueMsgCreateUserBill(userBillParams);if(result){//处理成功,响应队列,删除该条信息this.basicACK(message, channel);log.info("=======deadUserBillQueue消息:" + msgStr + ",处理成功!");}else{//处理失败,抛弃数据this.basicNack(message, channel);log.info("=======deadUserBillQueue消息:" + msgStr + ",处理失败。回退!");}}catch(Exception e){log.error("=======deadUserBillQueue消息业务处理异常=====", e);this.basicNack(message, channel);e.printStackTrace();}}//正常消费通知private void basicACK(Message message,Channel channel){try{channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(IOException e){log.error("deadUserBillQueue通知服务器移除mq时异常,异常信息:"+e);}}//处理异常,删除信息private void basicNack(Message message,Channel channel){try {/*** 第一个参数:该消息的index* 第二个参数:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。* 第三个参数:被拒绝的是否重新入队列*/channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {log.error("deadUserBillQueue通知服务器移除mq时异常,异常信息:"+e);try {log.error(new String(message.getBody(), "utf-8") + "重新进入服务器时出现异常,异常信息:", e);} catch (UnsupportedEncodingException e1) {e1.printStackTrace();}e.printStackTrace();}} }
亲测可实现错误消息转发,至于队列和消息的优先级可以根据队列的数据进行配置。与消息失效方式一致。
但要注意,队列和消息优先级需要 spring的版本较高至少要4.1以上(低版本主要是命名空间中的属性标签不支持),RabbitMQ3.5以上才能支持。
2018-08-09日补充:
以上demo在处理消息时还不够全面。首先如果消费端业务过于复杂导致消息 消费失败,这个时候可以使用死信队列保存(个人觉得),或者入库均可,但却无法保证 排除消息重发的这种现象。一旦消息重发,呗消费端消费,有涉及客户的小金库,那就玩完。。。通宵补数据都是轻的。
通过查阅资料得知,可以向异步接口那样,引用幂等概念进行控制。有两种方案。
1、通过MQ自身的msg-id来进行控制(这个id一直都没有找到在哪里获取);
2、可以在上游(生产端)生成一个唯一标识(类似流水号不重复的这种),在消费端进行验证。入库也好。缓存验证也行。目前采用这中方法。
以上 是个人的一点浅谈。。继续找那个msg-id去
转载于:https://my.oschina.net/u/2543341/blog/1924078
RabbitMQ与spring的集成,,基础。相关推荐
- Spring boot集成RabbitMQ(山东数漫江湖)
RabbitMQ简介 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写 ...
- Spring boot集成RabbitMQ
####RabbitMQ简介 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通 ...
- Spring集成基础知识
本文是我们名为" EAI的Spring集成 "的学院课程的一部分. 在本课程中,向您介绍了企业应用程序集成模式以及Spring Integration如何解决它们. 接下来,您将深 ...
- 消息中间件系列四:RabbitMQ与Spring集成
一.RabbitMQ与Spring集成 准备工作: 分别新建名为RabbitMQSpringProducer和RabbitMQSpringConsumer的maven web工程 在pom.xml文 ...
- rabbitmq实战_RabbitMQ实战(四) - RabbitMQ amp; Spring整合开发
0 相关源码 1 你将学到 RabbitMQ 整合 Spring AMQP实战 RabbitMQ 整合 Spring Boot实战 RabbitMQ 整合 Spring Cloud实战 2 Sprin ...
- spring Boot 2 基础篇 。内含 整合一个spring boot 的 小案例
目录 springBoot2基础篇 前言与开发环境 一.快速创建Boot项目 1.使用spring提供的快速构建 2.基于maven的手动构建 3.在Idea中隐藏指定文件/文件夹 二.SpringB ...
- Spring Cloud Alibaba基础教程:使用Nacos实现服务注册与发现
自Spring Cloud Alibaba发布第一个Release以来,就备受国内开发者的高度关注.虽然Spring Cloud Alibaba还没能纳入Spring Cloud的主版本管理中,但是凭 ...
- 6.3 Spring Boot集成mongodb开发
6.3 Spring Boot集成mongodb开发 本章我们通过SpringBoot集成mongodb,Java,Kotlin开发一个极简社区文章博客系统. 0 mongodb简介 Mongo 的主 ...
- springboot(十八):使用Spring Boot集成FastDFS
上篇文章介绍了如何使用Spring Boot上传文件,这篇文章我们介绍如何使用Spring Boot将文件上传到分布式文件系统FastDFS中. 这个项目会在上一个项目的基础上进行构建. 1.pom包 ...
- Spring Cloud Alibaba基础教程:Nacos配置的多文件加载与共享配置
<Spring Cloud Alibaba基础教程>连载中,关注我一起学期!前情回顾: <使用Nacos实现服务注册与发现> <支持的几种服务消费方式> <使 ...
最新文章
- python 设计模式 观察者_python设计模式之观察者模式
- 框架:@Bean注解
- IOS正则表达式的用法简介
- 部署SCCM 2012R2之一:了解功能篇
- 从零开始学android编程_小白也能学得会!谷歌推出免费的Kotlin和Android开发课程...
- 基于python3的Opencv(一)-打开摄像头显示图像
- LeetCode 259. 较小的三数之和(固定一点,内层双指针)
- 想玩转工业界机器学习?先学Spark吧
- 浅谈line-height
- 95-280-046-源码-资源管理-磁盘
- android unable to instantiate activity componentinfo
- Node JS环境设置– Node.js安装
- 使用hive计算每一年的最大气温的日期+温度
- Android ListView更改item背景颜色
- luis soares mysql,mysql访问报错如下
- 阿克曼函数的c语言,C语言,关于阿克曼函数非递归实现的一点拙见
- 对讲机在哪插卡?插卡对讲机是什么意思呢?5000公里对讲机的哪点事
- python-微信公众个性二维码生成-生成自己名片二维码-链接二维码【超酷】
- phpstudy+TP5隐藏入口文件
- RazorSQL Mac(SQL数据库查询工具)含激活码