springboot 集成rabbitmq 实例

个人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理此篇文章。

本文章共分为以下部分:

  • rabbitmq简介
  • springboot配置
  • rabbitmq生产者配置
  • rabbitmq消费者配置
  • 问题补充

一、rabbitmq简介

目前流程的消息队列主要有:ActivityMQ/kafka/redis/rabbitmq等,各有各自的应用场景,关于各个框架的介绍,大家可自行百度,网上很多文章介绍~其中rabbit因为其ack特性以及还算不错的性能被大多数公司采用。

概念:

  • 生产者 消息的产生方,负责将消息推送到消息队列
  • 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息
  • 队列 消息的寄存器,负责存放生产者发送的消息
  • 交换机 负责根据一定规则分发生产者产生的消息
  • 绑定 完成交换机和队列之间的绑定

模式:

  • direct
    直连模式,用于实例间的任务分发
  • topic
    话题模式,通过可配置的规则分发给绑定在该exchange上的队列
  • headers
    适用规则复杂的分发,用headers里的参数表达规则
  • fanout
    分发给所有绑定到该exchange上的队列,忽略routing key

安装

单机版安装很简单,大概步骤如下:


# 安装erlang包yum install erlang
# 安装socatyum install socat
# 安装rabbit    rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm
# 启动服务rabbitmq-server start
# 增加管理控制功能rabbitmq-plugins enable rabbitmq_management
# 增加用户:sudo rabbitmqctl add_user root passwordrabbitmqctl set_user_tags root administrator rabbitmqctl set_permissions -p / root '.*' '.*' '.*'

集群安装,可参考以下博客:
     
rabbitmq集群安装

二、springboot配置

废话少说直接上代码:
配置参数
application.yml:

spring:rabbitmq:addresses: 192.168.1.1:5672username: usernamepassword: passwordpublisher-confirms: truevirtual-host: /

java config读取参数

/*** RabbitMq配置文件读取类** @author chenhf* @create 2017-10-23 上午9:31**/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {@Value("${spring.rabbitmq.addresses}")private String addresses;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.publisher-confirms}")private Boolean publisherConfirms;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;// 构建mq实例工厂@Beanpublic ConnectionFactory connectionFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(addresses);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setPublisherConfirms(publisherConfirms);connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){return new RabbitAdmin(connectionFactory);}@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate(){RabbitTemplate template = new RabbitTemplate(connectionFactory());return template;}
}

三、rabbitmq生产者配置

主要配置了直连和话题模式,其中话题模式设置两个队列(queueTopicTest1、queueTopicTest2),此两个队列在和交换机绑定时分别设置不同的routingkey(.TEST.以及lazy.#)来验证匹配模式。


/*** 用于配置交换机和队列对应关系* 新增消息队列应该按照如下步骤* 1、增加queue bean,参见queueXXXX方法* 2、增加queue和exchange的binding* @author chenhf* @create 2017-10-23 上午10:33**/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class RabbitMqExchangeConfig {/** logger */private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);/*** @Author:chenhf* @Description: 主题型交换机* @Date:下午5:49 2017/10/23* @param* @return*/@BeanTopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());rabbitAdmin.declareExchange(contractTopicExchange);logger.debug("完成主题型交换机bean实例化");return contractTopicExchange;}/*** 直连型交换机*/@BeanDirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());rabbitAdmin.declareExchange(contractDirectExchange);logger.debug("完成直连型交换机bean实例化");return contractDirectExchange;}//在此可以定义队列@BeanQueue queueTest(RabbitAdmin rabbitAdmin){Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());rabbitAdmin.declareQueue(queue);logger.debug("测试队列实例化完成");return queue;}//topic 1@BeanQueue queueTopicTest1(RabbitAdmin rabbitAdmin){Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());rabbitAdmin.declareQueue(queue);logger.debug("话题测试队列1实例化完成");return queue;}//topic 2@BeanQueue queueTopicTest2(RabbitAdmin rabbitAdmin){Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());rabbitAdmin.declareQueue(queue);logger.debug("话题测试队列2实例化完成");return queue;}//在此处完成队列和交换机绑定@BeanBinding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());rabbitAdmin.declareBinding(binding);logger.debug("测试队列与直连型交换机绑定完成");return binding;}//topic binding1@BeanBinding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());rabbitAdmin.declareBinding(binding);logger.debug("测试队列与话题交换机1绑定完成");return binding;}//topic binding2@BeanBinding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());rabbitAdmin.declareBinding(binding);logger.debug("测试队列与话题交换机2绑定完成");return binding;}}

在这里用到枚举类:RabbitMqEnum

/*** 定义rabbitMq需要的常量** @author chenhf* @create 2017-10-23 下午4:07**/
public class RabbitMqEnum {/*** @param* @Author:chenhf* @Description:定义数据交换方式* @Date:下午4:08 2017/10/23* @return*/public enum Exchange {CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分发"),CONTRACT_TOPIC("CONTRACT_TOPIC", "消息订阅"),CONTRACT_DIRECT("CONTRACT_DIRECT", "点对点");private String code;private String name;Exchange(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}/*** describe: 定义队列名称* creat_user: chenhf* creat_date: 2017/10/31**/public enum QueueName {TESTQUEUE("TESTQUEUE", "测试队列"),TOPICTEST1("TOPICTEST1", "topic测试队列"),TOPICTEST2("TOPICTEST2", "topic测试队列");private String code;private String name;QueueName(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}/*** describe: 定义routing_key* creat_user: chenhf* creat_date: 2017/10/31**/public enum QueueEnum {TESTQUEUE("TESTQUEUE1", "测试队列key"),TESTTOPICQUEUE1("*.TEST.*", "topic测试队列key"),TESTTOPICQUEUE2("lazy.#", "topic测试队列key");private String code;private String name;QueueEnum(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}}

以上完成消息生产者的定义,下面封装调用接口
测试时直接调用此工具类,testUser类需自己实现

rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);

/*** rabbitmq发送消息工具类** @author chenhf* @create 2017-10-26 上午11:10**/@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{/** logger */private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);private RabbitTemplate rabbitTemplate;@Autowiredpublic RabbitMqSender(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;this.rabbitTemplate.setConfirmCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {logger.info("confirm: " + correlationData.getId());}/*** 发送到 指定routekey的指定queue* @param routeKey* @param obj*/public void sendRabbitmqDirect(String routeKey,Object obj) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());logger.info("send: " + correlationData.getId());this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);}/*** 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上* @param routeKey* @param obj*/public void sendRabbitmqTopic(String routeKey,Object obj) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());logger.info("send: " + correlationData.getId());this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);}
}

四、rabbitmq消费者配置

springboot注解方式监听队列,无法手动指定回调,所以采用了实现ChannelAwareMessageListener接口,重写onMessage来进行手动回调,详见以下代码,详细介绍可以在spring的官网上找amqp相关章节阅读

直连消费者
通过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费


/*** 消费者配置** @author chenhf* @create 2017-10-30 下午3:14**/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class ExampleAmqpConfiguration {@Bean("testQueueContainer")public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("TESTQUEUE");container.setMessageListener(exampleListener());container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}@Bean("testQueueListener")public ChannelAwareMessageListener exampleListener() {return new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());//通过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费if ("2".equals(testUser.getUserName())){System.out.println(testUser.toString());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}if ("1".equals(testUser.getUserName())){System.out.println(testUser.toString());channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}}};}}

topic消费者1

/*** 消费者配置** @author chenhf* @create 2017-10-30 下午3:14**/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration {@Bean("topicTest1Container")public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("TOPICTEST1");container.setMessageListener(exampleListener1());container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}@Bean("topicTest1Listener")public ChannelAwareMessageListener exampleListener1(){return new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());System.out.println("TOPICTEST1:"+testUser.toString());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}};}}

topic消费者2


/*** 消费者配置** @author chenhf* @create 2017-10-30 下午3:14**/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration2 {@Bean("topicTest2Container")public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("TOPICTEST2");container.setMessageListener(exampleListener());container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}@Bean("topicTest2Listener")public ChannelAwareMessageListener exampleListener() {return new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());System.out.println("TOPICTEST2:"+testUser.toString());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}};}}

问题补充

使用过程中可能出现的坑参考此篇文章
https://segmentfault.com/a/11...

springboot 集成rabbitmq 实例相关推荐

  1. SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门

    1.Windows下安装RabbitMQ的步骤详解+图解(erlang+RabbitMQ) 2.SpringBoot集成RabbitMQ参考文章 1.RabbitMQ介绍 RabbitMQ是实现AMQ ...

  2. Springboot集成RabbitMQ一个完整案例

    springboot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,springboot 提供了 spring-boot-starter-amqp 对消息各种支持. 1.配置pom文 ...

  3. springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式

    springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式 说明: RabbitMQ消息的可靠投递 在使用 RabbitMQ 的时 ...

  4. Springboot集成rabbitmq实现延时队列

    Springboot集成rabbitmq实现延时队列 什么是延时队列? 列举几个使用场景: 常见的种类有: 延时任务-实现方式: 详细信息:[https://www.cnblogs.com/JonaL ...

  5. Springboot集成rabbitMQ之mandatory和备份交换机

    Springboot集成rabbitMQ之mandatory和备份交换机 mandatory 之前编写的消息队列代码中,通过重写ConfirmCallback中的confirm方法实现了消息送达的确认 ...

  6. RabbitMQ——SpringBoot集成RabbitMQ

    文章目录: 1.创建一个SpringBoot工程--消息发送者 1.创建一个SpringBoot工程--消息接收者 3.测试结果 3.1 direct 3.2 fanout 3.3 topic 3.4 ...

  7. springboot集成rabbitmq商品秒杀业务实战(流量削峰)

    消息队列如何实现流量削峰? 要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送 ...

  8. SpringBoot集成rabbitmq错误:org.springframework.amqp.AmqpConnectException: java.net.ConnectException的解决办法

    在集成rabbitmq后,运行项目,报错日志: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Co ...

  9. springboot集成rabbitMQ安装+代码

    环境安装 本机电脑是mac,所以直接在终端下(无论在哪个目录下都不影响)输入 brew install erlang,先装erlang,不然rabbitmq装了跑不了,接下来就是等待的时刻.... 装 ...

最新文章

  1. Silverlight中文件的生成操作与其对应的获取方法
  2. mybaits十二:使用collection嵌套结果集查询
  3. Machine Learning week 2 quiz: Linear Regression with Multiple Variables
  4. 数据库设计与查询语句的优化
  5. 时间序列 线性回归 区别_时间序列分析的完整介绍(带R)::线性过程I
  6. JPG PNG GIF BMP图片格式的区别
  7. 玩转Java注解:元注解、内置注解、自定义注解的原理和实现
  8. Zara精讲C#.Cache、它和Redis区别是什么???
  9. 考虑SOC蓄电池 双向DC/DC 充放电控制 matlab仿真模型 buck boost
  10. 美洽客服报表功能:用数据驱动企业业绩增长
  11. 玩转基因组浏览器之使用IGV查看基因结构信息
  12. KKB:二进制知识:15瓶水,其中只有一瓶水有毒,请问至少需要几只小白鼠可以一次性喝出来?
  13. ddr走线教程_DDR3 Fly By走线精讲
  14. 关于下载表格数据乱码的解决方案
  15. 论文翻译:混合维在庞加莱几何三维骨架的动作识别
  16. linux分区管理,Linux下磁盘分区管理
  17. 关于MySQL错误提示ERROR 1265 (01000)的问题分析及解决方案
  18. 皮肤结构走向运用仿制图章
  19. win11 / win10 彻底删除系统“快速访问”中自动添加的文件夹 - 不再自动添加
  20. C++ GDAL/OGR 图层求交集Intersection

热门文章

  1. 使用.NET中的XML注释(一) -- XML注释标签讲解
  2. 新版上线时发现的数据库优化问题
  3. java 构造函数抛出异常,构造函数抛出异常;嵌套异常是java.lang.NoClassDefFoundError:javax/servlet/ServletContext...
  4. 近期低分纯生信友好的期刊简介
  5. 毕业论文选题三步搞定!
  6. mysql gtid 5.7_MySQL5.7之GTID复制
  7. 获取map第一个的key和value_谁要是再敢用Map传参,我过去就是一JIO
  8. Ubuntu | ubuntu 中配置静态 IP
  9. SQLite学习手册(实例代码一)
  10. 在html中引入css内部样式表使用,CSS样式学习笔记(三)html文件引入CSS的方法(2)...