springboot集成rabbitMQ安装+代码
环境安装
本机电脑是mac,所以直接在终端下(无论在哪个目录下都不影响)输入 brew install erlang,先装erlang,不然rabbitmq装了跑不了,接下来就是等待的时刻。。。。
装好了后,再输入brew install rabbitmq,等待,这个会自动装在这个目录下:
/usr/local/Cellar/rabbitmq/版本号
启动的命令 brew services start rabbitmq
启动好进入控制台:进入控制台: http://localhost:15672/ 记住这里的15672是控制台的端口,而代码中rabbitmq启动端口是5672
上代码:
这是包结构
config下:
package com.cx.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;/*** @author 2019年6月25日11:04:21* rabbitmq配置类*/
@Configuration
public class RabbitConfig {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;/*** Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,* Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。* Queue:消息的载体,每个消息都会被投到一个或多个队列。* Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.* Routing Key:路由关键字,exchange根据这个关键字进行消息投递。* vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。* Producer:消息生产者,就是投递消息的程序.* Consumer:消息消费者,就是接受消息的程序.* Channel:消息通道,在客户端的每个连接里,可建立多个channel.*/public static final String EXCHANGE_A = "my-mq-exchange_A";public static final String EXCHANGE_B = "my-mq-exchange_B";public static final String EXCHANGE_C = "my-mq-exchange_C";public static final String FANOUT_EXCHANGE = "my-mq-fanout_exchange";public static final String QUEUE_A = "QUEUE_A";public static final String QUEUE_B = "QUEUE_B";public static final String QUEUE_C = "QUEUE_C";public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(true);return connectionFactory;}/*** 必须是prototype类型* @return*/@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate() {RabbitTemplate template = new RabbitTemplate(connectionFactory());return template;}/*** 针对消费者配置* 1. 设置交换机类型* 2. 将队列绑定到交换机FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念HeadersExchange :通过添加属性key-value匹配DirectExchange:按照routingkey分发到指定队列TopicExchange:多关键字匹配*/@Beanpublic DirectExchange defaultExchange() {return new DirectExchange(EXCHANGE_A);}@Beanpublic DirectExchange exchangeB() {return new DirectExchange(EXCHANGE_B);}/*** 获取队列A* @return*/@Beanpublic Queue queueA() {// 队列持久return new Queue(QUEUE_A, true);}/*** 获取队列B* @return*/@Beanpublic Queue queueB() {// 队列持久return new Queue(QUEUE_B, true);}/*** 获取队列C* @return*/@Beanpublic Queue queueC() {// 队列持久return new Queue(QUEUE_C, true);}@Beanpublic Binding binding() {return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);}@Beanpublic Binding bindingB() {return BindingBuilder.bind(queueB()).to(exchangeB()).with(RabbitConfig.ROUTINGKEY_B);}/*** 配置fanout_exchange* Fanout 就是我们熟悉的广播模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。* @return*/@BeanFanoutExchange fanoutExchange() {return new FanoutExchange(RabbitConfig.FANOUT_EXCHANGE);}//把所有的队列都绑定到这个交换机上去@BeanBinding bindingExchangeA(Queue queueA,FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueA).to(fanoutExchange);}@BeanBinding bindingExchangeB(Queue queueB, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueB).to(fanoutExchange);}@BeanBinding bindingExchangeC(Queue queueC, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueC).to(fanoutExchange);}}
consumer下的:
package com.cx.rabbitmq.consumer;import com.cx.rabbitmq.config.RabbitConfig;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiver {private final Logger logger = LoggerFactory.getLogger(this.getClass());/*** 手动确认消息,假如不确认的话,消息一直会存在在队列当中,下次消费的时候,就会出现重复消费* @param content* @param channel* @param message*/@RabbitHandlerpublic void process(String content, Channel channel, Message message) {logger.info("接收处理队列A当中的消息: {}" , content);//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {//丢弃这条消息//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);e.printStackTrace();}}/*最简单的消息消费功能@RabbitHandlerpublic void process(String content) {logger.info("接收处理队列A当中的消息: {}" , content);}*/}
package com.cx.rabbitmq.consumer;import com.cx.rabbitmq.config.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author sj2* 准备了三个类MsgReceiverB_A,MsgReceiverB_B,MsgReceiverB_C,来消费队列B当中的消息,消费的顺序是负载均衡的* 消费的顺序是无序的,也就是不保证先进来的消息先被消费*/
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_B)
public class MsgReceiverB_A {private final Logger logger = LoggerFactory.getLogger(this.getClass());@RabbitHandlerpublic void process(String content) {logger.info("处理器A接收处理队列B当中的消息: {}" , content);}}
package com.cx.rabbitmq.consumer;import com.cx.rabbitmq.config.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author sj2* 准备了三个类MsgReceiverB_A,MsgReceiverB_B,MsgReceiverB_C,来消费队列B当中的消息,消费的顺序是负载均衡的* 消费的顺序是无序的,也就是不保证先进来的消息先被消费*/
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_B)
public class MsgReceiverB_B {private final Logger logger = LoggerFactory.getLogger(this.getClass());@RabbitHandlerpublic void process(String content) {logger.info("处理器B接收处理队列B当中的消息: {}" , content);}
}
package com.cx.rabbitmq.consumer;import com.cx.rabbitmq.config.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author sj2* 准备了三个类MsgReceiverB_A,MsgReceiverB_B,MsgReceiverB_C,来消费队列B当中的消息,消费的顺序是负载均衡的* 消费的顺序是无序的,也就是不保证先进来的消息先被消费*/
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_B)
public class MsgReceiverB_C {private final Logger logger = LoggerFactory.getLogger(this.getClass());@RabbitHandlerpublic void process(String content) {logger.info("处理器C接收处理队列B当中的消息: {}" , content);}
}
package com.cx.rabbitmq.consumer;import com.cx.rabbitmq.config.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author sj2* 处理队列C当中的消息*/
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_C)
public class MsgReceiverC {private final Logger logger = LoggerFactory.getLogger(this.getClass());@RabbitHandlerpublic void process(String content) {logger.info("接收处理队列C当中的消息: {}" , content);}
}
controller下:
package com.cx.rabbitmq.controller;import com.cx.rabbitmq.productor.MsgProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;/*** @author sj2*/
@Controller
public class RabbitController {@Autowiredprivate MsgProducer msgProducer;/*** 发送消息到队列A* @return*/@ResponseBody@RequestMapping("/rabbitmq/sendMsg")public String sendMsg(){int msgNum = 100;for(int i=0;i<msgNum;i++) {msgProducer.sendMsg("这是发送的第"+i+"条消息");}return "success";}/*** 发送消息到队列B* @return*/@ResponseBody@RequestMapping("/rabbitmq/sendMsgToQueueB")public String sendMsgToQueueB(){int msgNum = 100;for(int i=1;i<=msgNum;i++) {msgProducer.sendMsgToQueueB("这是发送的第"+i+"条消息");}return "success";}/*** 发送消息到队列B* @return*/@ResponseBody@RequestMapping("/rabbitmq/sendMsgAll")public String sendMsgAll(){int msgNum = 10;for(int i=1;i<=msgNum;i++) {msgProducer.sendAll("这是发送的第"+i+"条消息");}return "success";}
}
Productor下:
package com.cx.rabbitmq.productor;import com.cx.rabbitmq.config.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;@Component
public class MsgProducer implements RabbitTemplate.ConfirmCallback {private final Logger logger = LoggerFactory.getLogger(this.getClass());/*** 由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入*/private RabbitTemplate rabbitTemplate;/*** 构造方法注入rabbitTemplate*/@Autowiredpublic MsgProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;//rabbitTemplate如果为单例的话,那回调就是最后设置的内容rabbitTemplate.setConfirmCallback(this);}public void sendMsg(String content) {// CorrelationData 该数据的作用是给每条消息一个唯一的标识CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());//把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列ArabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);}public void sendMsgToQueueB(String content) {// CorrelationData 该数据的作用是给每条消息一个唯一的标识CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());//把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列ArabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_B, RabbitConfig.ROUTINGKEY_B, content, correlationId);}/*** 消息发送,这里不设置routing_key,因为设置了也无效,发送端的routing_key写任何字符都会被忽略。* @param content*/public void sendAll(String content) {CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"", content,correlationId);}/*** 回调*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {logger.info(" 回调id:" + correlationData);if (ack) {logger.info("消息成功被发送到rabbitmq");} else {logger.info("消息发送到rabbitmq失败:" + cause);}}
}
application.properties
# 运行端口
server.port=8001
# 对于rabbitMQ的支持
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
运行即可
springboot集成rabbitMQ安装+代码相关推荐
- springboot 集成rabbitmq 实例
springboot 集成rabbitmq 实例 个人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理 ...
- SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门
1.Windows下安装RabbitMQ的步骤详解+图解(erlang+RabbitMQ) 2.SpringBoot集成RabbitMQ参考文章 1.RabbitMQ介绍 RabbitMQ是实现AMQ ...
- Springboot集成RabbitMQ一个完整案例
springboot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,springboot 提供了 spring-boot-starter-amqp 对消息各种支持. 1.配置pom文 ...
- Springboot集成rabbitMQ之mandatory和备份交换机
Springboot集成rabbitMQ之mandatory和备份交换机 mandatory 之前编写的消息队列代码中,通过重写ConfirmCallback中的confirm方法实现了消息送达的确认 ...
- RabbitMQ——SpringBoot集成RabbitMQ
文章目录: 1.创建一个SpringBoot工程--消息发送者 1.创建一个SpringBoot工程--消息接收者 3.测试结果 3.1 direct 3.2 fanout 3.3 topic 3.4 ...
- (需求实战_进阶_02)SSM集成RabbitMQ 关键代码讲解、开发、测试
接上一篇:(企业内部需求实战_进阶_01)SSM集成RabbitMQ 关键代码讲解.开发.测试 https://gblfy.blog.csdn.net/article/details/10419730 ...
- RabbitMq(八) SpringBoot整合RabbitMQ 生产者代码实现
在本章中我们将创建RabbitMQ的生产者工程,并实现生产者端代码实现. springboot整合RabbitMQ生产者工程步骤如下: 创建maven工程 引入springboot及RabbitMQ依 ...
- springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式
springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式 说明: RabbitMQ消息的可靠投递 在使用 RabbitMQ 的时 ...
- Springboot集成rabbitmq实现延时队列
Springboot集成rabbitmq实现延时队列 什么是延时队列? 列举几个使用场景: 常见的种类有: 延时任务-实现方式: 详细信息:[https://www.cnblogs.com/JonaL ...
最新文章
- PS切图篇(一)---界面设置
- python中cgi到底是什么_python cgi是什么
- 简单的活又谈何容易呢
- Linux IO多路复用之epoll网络编程(含源码)
- Kubernetes 的原理
- LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
- JavaScript的学习--生成二维码
- 嵌入式系统开发笔记100:使用FlyMcu下载程序到STM32单片机
- Newtonsoft 六个超简单又实用的特性,值得一试 !
- ipv6电视直播Android,关于高校电视直播(ipv4ipv6)
- vbs整人小程序集合
- java缓存Ehcache的使用
- Python实现数字转变为Excel的列
- python的反转_Python 反转
- 牛客每日练习----圆圈​​​​​​​,TaoTao要吃鸡,吐泡泡
- PPT中图片(形状)叠加时的透明效果
- 海龟如何保留米帝手机号
- GitHub使用教程详解(下)——Git的安装以及Git命令详解
- c语言中程序框图含义,关于高中数学《算法的含义、程序框图 》练习题
- 简洁的JS图片滚动代码