2019独角兽企业重金招聘Python工程师标准>>>

小弟 前段时间使用mq是因为要在Jfianl架构中使用,但Jfinal并不擅长,所以使用的是工具类创建的链接和通道。又写了消费者和生产者的公共方法。

现在有一个业务。对接银行的时候,因异步回调。导致客户在对一张A表操作 和银行回调对A表的操作产生并发。致使A表出现一个seq_no重复。余额也计算错误。领导要求集成MQ,小弟终于在3天后集成了一个基础的demo。现在记录一下:

首先 maven项目肯定要引入jar包的

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>1.4.5.RELEASE</version>
</dependency>

然后请看spring的配置:

<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:p="http://www.springframework.org/schema/p"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"
><description>rabbitmq 连接服务配置</description><rabbit:connection-factory id="connectionFactory"username="${mq.name}" password="${mq.pwd}" host="${mq.url}" port="${mq.port}"/><rabbit:admin connection-factory="connectionFactory"/><!-- spring template声明--><rabbit:template exchange="${mq.user.bill.exchange.name}" id="amqpTemplate"  connection-factory="connectionFactory"message-converter="jsonMessageConverter" /><!-- 消息对象json转换类 --><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /><!-- 业务队列 --><rabbit:queue id="user_bill_queue" name="user_bill_queue" durable="true" auto-delete="false" exclusive="false"><!-- <rabbit:queue-arguments>&lt;!&ndash; 设置死信交换机 &ndash;&gt;<entry key="x-dead-letter-exchange"><value type="java.lang.String">dead_letter_userbill_exchange</value></entry>&lt;!&ndash; 设置死信交换机的路由键 &ndash;&gt;<entry key="x-dead-letter-routing-key"><value type="java.lang.String">userbill_queue_fail</value></entry></rabbit:queue-arguments>--></rabbit:queue><!-- 死信队列 --><!--<rabbit:queue id="user_bill_dead_queue" name="user_bill_dead_queue" durable="true" auto-delete="false" exclusive="false" />--><!-- 死信交换机配置 --><!--<rabbit:direct-exchange name="dead_letter_userbill_exchange" durable="true" auto-delete="false" id="dead_letter_exchange"><rabbit:bindings><rabbit:binding queue="user_bill_dead_queue" key="userbill_queue_fail"/></rabbit:bindings></rabbit:direct-exchange>--><!-- 正常交换机配置 --><rabbit:direct-exchange name="${mq.user.bill.exchange.name}" durable="true" auto-delete="false" id="${mq.user.bill.exchange.name}"><rabbit:bindings><rabbit:binding queue="user_bill_queue" key="${mq.user.bill.routing.key}"/></rabbit:bindings></rabbit:direct-exchange><!-- 配置监听 手动ack  prefetch="1" 表示消费一条--><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" ><rabbit:listener queues="user_bill_queue" ref="queueListenter"/><!--<rabbit:listener queues="user_bill_dead_queue" ref="deadUserBillQueueListenter"/>--></rabbit:listener-container></beans>

1、强调一下命名空间不多。够用就行。这里只单单配置了mq 与spring其他文件集成可使用import不要重复引用即可

2、这里粘出来的 有异常msg的处理。也就是死信队列。后面会提到

以上基本都是固定配置。获取链接,创建admin(在消息代理中如何利用协议来配置队列,交换和绑定。实现将自动声明在一个应用上下文的Queues,Exchanges,Bindings。具体功能我也不清楚。一直没搞懂) 创建生产者模板,创建队列。创建指定路由key的交换器 并绑定队列,消息对象转json的bean等等。

3、如果想要引入消息失效时间,需要在定义队列的地方添加属性<rabbit:queue-arguments>,并指定

<entry key="x-message-ttl"><value type="java.lang.Integer">60000</value>
</entry>

表示该队列中的信息失效时间为1min。

要引入队列的等级 需要的key=x-max-priority。

下面来说下 死信队列。当有消息再消费端处理失败时。如果要ackNack的话(true),会导致不断消费这个消息,一直产生错误,一个死循环。

这时,使用死信队列就可以处理。

1、定义业务队列的时候绑定一个死信交换机。并绑定一个路由key,注意x-dead-letter-exchange和x-dead-letter-routing-key是固定参数

<rabbit:queue-arguments><!-- 设置死信交换机 --><entry key="x-dead-letter-exchange"><value type="java.lang.String">dead_letter_userbill_exchange</value></entry><!-- 设置死信交换机的路由键 --><entry key="x-dead-letter-routing-key"><value type="java.lang.String">userbill_queue_fail</value></entry>
</rabbit:queue-arguments>

2、设置一个死信队列,用来接收死信交换机转发来的异常信息(想要队列的其他属性可以自定义配置)

<rabbit:queue id="user_bill_dead_queue" name="user_bill_dead_queue" durable="true" auto-delete="false" exclusive="false" />

3、定义一个死信交换机,名称与业务队列中定义的一致,绑定死信队列和路由key(与业务队列中定义的死信交换机的路由key一致)

<rabbit:direct-exchange name="dead_letter_userbill_exchange" durable="true" auto-delete="false" id="dead_letter_exchange"><rabbit:bindings><rabbit:binding queue="user_bill_dead_queue" key="userbill_queue_fail"/></rabbit:bindings>
</rabbit:direct-exchange>

4、在监听器中将死信队列纳入监听 监听器中的ref bean 都是通过@Component注解注入的。

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" ><rabbit:listener queues="user_bill_queue" ref="queueListenter"/><rabbit:listener queues="user_bill_dead_queue" ref="deadUserBillQueueListenter"/>
</rabbit:listener-container>

这样就完成了失败消息转发到死信队列中。在设计另一个消费者deadUserBillQueueListenter 进行消息处理即可,可设计,在处理一次失败就将期ackreject

这里要提醒一下,当设计有自定义交换机时,生产者传入的就不是队列名称 ,而是交换机名称和路由key,只有在使用默认交换机时才使用队列名称

生产者代码:

package com.qiantu.core.rabbitmq;/*** @Description: 给队列发送消息接口类* @Date: create in 2018-07-30 16:36* @Author:Reynold-白*/
public interface MQProducer {/*** 发送消息到指定队列* @param queueKey* @param object*/void sendDataToQueue(String exchangeName, String routingKey, Object object);}
package com.qiantu.core.rabbitmq;import com.alibaba.fastjson.JSON;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.GenericXmlApplicationContext;
import org.springframework.stereotype.Service;/*** @Description: 发送消息实现* @Date: create in 2018-07-30 16:37* @Author:Reynold-白*/
@Service("mqProducer")
public class MQProducerImpl implements MQProducer{private final static Logger log = Logger.getLogger(MQProducerImpl.class);@Autowiredprivate AmqpTemplate amqpTemplate;@Overridepublic void sendDataToQueue(String exchangeName, String routingKey, Object object) {try {log.info("========向MQ发送消息【开始】========消息:" + object.toString());amqpTemplate.convertAndSend(exchangeName, routingKey,object);log.info("========向MQ发送消息【完成】========消息:");} catch (Exception e) {log.error("=======发送消息失败======", e);e.printStackTrace();}}}

消费者代码:

package com.qiantu.core.rabbitmq;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.qiantu.core.constants.UserBillConstants;
import com.qiantu.core.model.RabbitMQConsumerFailData;
import com.qiantu.core.service.UserBillSerivce;
import com.qiantu.core.utils.IdGenerator;
import com.rabbitmq.client.Channel;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;/*** @Description: userBill消息监听消费* @Date: create in 2018-07-30 17:08* @Author:Reynold-白*/
@Component
public class QueueListenter implements ChannelAwareMessageListener {protected static Logger log = Logger.getLogger(QueueListenter.class);@Autowiredprivate UserBillSerivce userBillSerivce;@Overridepublic void onMessage(Message message, Channel channel) {String msgStr = "";try{msgStr = new String(message.getBody(), "UTF-8");log.info("=====获取消息" + msgStr);Map<String, String> userBillParams = JSONObject.parseObject(msgStr, new TypeReference<Map<String, String>>() {});boolean result = userBillSerivce.queueMsgCreateUserBill(userBillParams);if(result){//处理成功,响应队列,删除该条信息this.basicACK(message, channel);log.info("=======消息:" + msgStr + ",处理成功!");}else{RabbitMQConsumerFailData rmcfd = new RabbitMQConsumerFailData();rmcfd.setId(IdGenerator.randomUUID());rmcfd.setData(msgStr);rmcfd.setType("0");rmcfd.setCreateBy("admin");rmcfd.setCreateTime(new Date());userBillSerivce.insertRabbitMQFailData(rmcfd);//处理失败,拒绝数据this.basicReject(message, channel);log.info("=======消息:" + msgStr + ",处理失败。回退!");}}catch(Exception e){log.error("=======消息业务处理异常=====", e);this.basicReject(message, channel);e.printStackTrace();}}//正常消费通知private void basicACK(Message message,Channel channel){try{channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(IOException e){log.error("通知服务器移除mq时异常,异常信息:"+e);}}//处理异常,消息回到异常处理队列总再处理private void basicReject(Message message,Channel channel){try {/*** 第一个参数:该消息的index* 第二个参数:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。* 第三个参数:被拒绝的是否重新入队列*/
//            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {try {log.error(new String(message.getBody(), "utf-8") + "重新进入服务器时出现异常,异常信息:", e);} catch (UnsupportedEncodingException e1) {e1.printStackTrace();}e.printStackTrace();}}}

死信队列消费者:

package com.qiantu.core.rabbitmq;import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.qiantu.core.service.UserBillSerivce;
import com.rabbitmq.client.Channel;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Map;/*** @Description: 失败信息再处理* @Date: create in 2018-08-02 15:00* @Author:Reynold-白*/
@Component
public class DeadUserBillQueueListenter implements ChannelAwareMessageListener {protected static Logger log = Logger.getLogger(QueueListenter.class);@Autowiredprivate UserBillSerivce userBillSerivce;@Overridepublic void onMessage(Message message, Channel channel) throws Exception {String msgStr = "";try{msgStr = new String(message.getBody(), "UTF-8");log.info("=====获取消息" + msgStr);Map<String, String> userBillParams = JSONObject.parseObject(msgStr, new TypeReference<Map<String, String>>() {});boolean result = userBillSerivce.queueMsgCreateUserBill(userBillParams);if(result){//处理成功,响应队列,删除该条信息this.basicACK(message, channel);log.info("=======deadUserBillQueue消息:" + msgStr + ",处理成功!");}else{//处理失败,抛弃数据this.basicNack(message, channel);log.info("=======deadUserBillQueue消息:" + msgStr + ",处理失败。回退!");}}catch(Exception e){log.error("=======deadUserBillQueue消息业务处理异常=====", e);this.basicNack(message, channel);e.printStackTrace();}}//正常消费通知private void basicACK(Message message,Channel channel){try{channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(IOException e){log.error("deadUserBillQueue通知服务器移除mq时异常,异常信息:"+e);}}//处理异常,删除信息private void basicNack(Message message,Channel channel){try {/*** 第一个参数:该消息的index* 第二个参数:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。* 第三个参数:被拒绝的是否重新入队列*/channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
//            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {log.error("deadUserBillQueue通知服务器移除mq时异常,异常信息:"+e);try {log.error(new String(message.getBody(), "utf-8") + "重新进入服务器时出现异常,异常信息:", e);} catch (UnsupportedEncodingException e1) {e1.printStackTrace();}e.printStackTrace();}}
}

亲测可实现错误消息转发,至于队列和消息的优先级可以根据队列的数据进行配置。与消息失效方式一致。

但要注意,队列和消息优先级需要 spring的版本较高至少要4.1以上(低版本主要是命名空间中的属性标签不支持),RabbitMQ3.5以上才能支持。

2018-08-09日补充:

以上demo在处理消息时还不够全面。首先如果消费端业务过于复杂导致消息 消费失败,这个时候可以使用死信队列保存(个人觉得),或者入库均可,但却无法保证 排除消息重发的这种现象。一旦消息重发,呗消费端消费,有涉及客户的小金库,那就玩完。。。通宵补数据都是轻的。

通过查阅资料得知,可以向异步接口那样,引用幂等概念进行控制。有两种方案。

1、通过MQ自身的msg-id来进行控制(这个id一直都没有找到在哪里获取);

2、可以在上游(生产端)生成一个唯一标识(类似流水号不重复的这种),在消费端进行验证。入库也好。缓存验证也行。目前采用这中方法。

以上 是个人的一点浅谈。。继续找那个msg-id去

转载于:https://my.oschina.net/u/2543341/blog/1924078

RabbitMQ与spring的集成,,基础。相关推荐

  1. Spring boot集成RabbitMQ(山东数漫江湖)

    RabbitMQ简介 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统  MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写 ...

  2. Spring boot集成RabbitMQ

    ####RabbitMQ简介 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通 ...

  3. Spring集成基础知识

    本文是我们名为" EAI的Spring集成 "的学院课程的一部分. 在本课程中,向您介绍了企业应用程序集成模式以及Spring Integration如何解决它们. 接下来,您将深 ...

  4. 消息中间件系列四:RabbitMQ与Spring集成

    一.RabbitMQ与Spring集成  准备工作: 分别新建名为RabbitMQSpringProducer和RabbitMQSpringConsumer的maven web工程 在pom.xml文 ...

  5. rabbitmq实战_RabbitMQ实战(四) - RabbitMQ amp; Spring整合开发

    0 相关源码 1 你将学到 RabbitMQ 整合 Spring AMQP实战 RabbitMQ 整合 Spring Boot实战 RabbitMQ 整合 Spring Cloud实战 2 Sprin ...

  6. spring Boot 2 基础篇 。内含 整合一个spring boot 的 小案例

    目录 springBoot2基础篇 前言与开发环境 一.快速创建Boot项目 1.使用spring提供的快速构建 2.基于maven的手动构建 3.在Idea中隐藏指定文件/文件夹 二.SpringB ...

  7. Spring Cloud Alibaba基础教程:使用Nacos实现服务注册与发现

    自Spring Cloud Alibaba发布第一个Release以来,就备受国内开发者的高度关注.虽然Spring Cloud Alibaba还没能纳入Spring Cloud的主版本管理中,但是凭 ...

  8. 6.3 Spring Boot集成mongodb开发

    6.3 Spring Boot集成mongodb开发 本章我们通过SpringBoot集成mongodb,Java,Kotlin开发一个极简社区文章博客系统. 0 mongodb简介 Mongo 的主 ...

  9. springboot(十八):使用Spring Boot集成FastDFS

    上篇文章介绍了如何使用Spring Boot上传文件,这篇文章我们介绍如何使用Spring Boot将文件上传到分布式文件系统FastDFS中. 这个项目会在上一个项目的基础上进行构建. 1.pom包 ...

  10. Spring Cloud Alibaba基础教程:Nacos配置的多文件加载与共享配置

    <Spring Cloud Alibaba基础教程>连载中,关注我一起学期!前情回顾: <使用Nacos实现服务注册与发现> <支持的几种服务消费方式> <使 ...

最新文章

  1. python 设计模式 观察者_python设计模式之观察者模式
  2. 框架:@Bean注解
  3. IOS正则表达式的用法简介
  4. 部署SCCM 2012R2之一:了解功能篇
  5. 从零开始学android编程_小白也能学得会!谷歌推出免费的Kotlin和Android开发课程...
  6. 基于python3的Opencv(一)-打开摄像头显示图像
  7. LeetCode 259. 较小的三数之和(固定一点,内层双指针)
  8. 想玩转工业界机器学习?先学Spark吧
  9. 浅谈line-height
  10. 95-280-046-源码-资源管理-磁盘
  11. android unable to instantiate activity componentinfo
  12. Node JS环境设置– Node.js安装
  13. 使用hive计算每一年的最大气温的日期+温度
  14. Android ListView更改item背景颜色
  15. luis soares mysql,mysql访问报错如下
  16. 阿克曼函数的c语言,C语言,关于阿克曼函数非递归实现的一点拙见
  17. 对讲机在哪插卡?插卡对讲机是什么意思呢?5000公里对讲机的哪点事
  18. python-微信公众个性二维码生成-生成自己名片二维码-链接二维码【超酷】
  19. phpstudy+TP5隐藏入口文件
  20. RazorSQL Mac(SQL数据库查询工具)含激活码

热门文章

  1. 计网习题总结,附答案
  2. 在matlab中如何求偏导数,求解 PDE 并计算偏导数
  3. 数据经济时代大数据四大发展趋势
  4. 计算机语言的正交性,什么是“正交性”?
  5. element-ui tamplate slot-scope 模板插槽的使用
  6. 今天是本学期的第几周的第几天? (15 分) C语言
  7. Latex 给参考文献添加doi号和超链接
  8. 三维扫描仪中投射模块/投影仪推荐
  9. 改名后火速递表,飞天云动冲刺国内元宇宙第一股,借力能否成功?
  10. Jedis的hget方法简单用法