环境安装

本机电脑是mac,所以直接在终端下(无论在哪个目录下都不影响)输入 brew install erlang,先装erlang,不然rabbitmq装了跑不了,接下来就是等待的时刻。。。。

装好了后,再输入brew install rabbitmq,等待,这个会自动装在这个目录下:

/usr/local/Cellar/rabbitmq/版本号
启动的命令
brew services start rabbitmq

启动好进入控制台:进入控制台: http://localhost:15672/ 记住这里的15672是控制台的端口,而代码中rabbitmq启动端口是5672

上代码:

这是包结构

config下:

package com.cx.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;/*** @author 2019年6月25日11:04:21* rabbitmq配置类*/
@Configuration
public class RabbitConfig {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;/*** Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,* Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。* Queue:消息的载体,每个消息都会被投到一个或多个队列。* Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.* Routing Key:路由关键字,exchange根据这个关键字进行消息投递。* vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。* Producer:消息生产者,就是投递消息的程序.* Consumer:消息消费者,就是接受消息的程序.* Channel:消息通道,在客户端的每个连接里,可建立多个channel.*/public static final String EXCHANGE_A = "my-mq-exchange_A";public static final String EXCHANGE_B = "my-mq-exchange_B";public static final String EXCHANGE_C = "my-mq-exchange_C";public static final String FANOUT_EXCHANGE = "my-mq-fanout_exchange";public static final String QUEUE_A = "QUEUE_A";public static final String QUEUE_B = "QUEUE_B";public static final String QUEUE_C = "QUEUE_C";public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(true);return connectionFactory;}/*** 必须是prototype类型* @return*/@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate() {RabbitTemplate template = new RabbitTemplate(connectionFactory());return template;}/*** 针对消费者配置* 1. 设置交换机类型* 2. 将队列绑定到交换机FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念HeadersExchange :通过添加属性key-value匹配DirectExchange:按照routingkey分发到指定队列TopicExchange:多关键字匹配*/@Beanpublic DirectExchange defaultExchange() {return new DirectExchange(EXCHANGE_A);}@Beanpublic DirectExchange exchangeB() {return new DirectExchange(EXCHANGE_B);}/*** 获取队列A* @return*/@Beanpublic Queue queueA() {// 队列持久return new Queue(QUEUE_A, true);}/*** 获取队列B* @return*/@Beanpublic Queue queueB() {// 队列持久return new Queue(QUEUE_B, true);}/*** 获取队列C* @return*/@Beanpublic Queue queueC() {// 队列持久return new Queue(QUEUE_C, true);}@Beanpublic Binding binding() {return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);}@Beanpublic Binding bindingB() {return BindingBuilder.bind(queueB()).to(exchangeB()).with(RabbitConfig.ROUTINGKEY_B);}/*** 配置fanout_exchange* Fanout 就是我们熟悉的广播模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。* @return*/@BeanFanoutExchange fanoutExchange() {return new FanoutExchange(RabbitConfig.FANOUT_EXCHANGE);}//把所有的队列都绑定到这个交换机上去@BeanBinding bindingExchangeA(Queue queueA,FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueA).to(fanoutExchange);}@BeanBinding bindingExchangeB(Queue queueB, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueB).to(fanoutExchange);}@BeanBinding bindingExchangeC(Queue queueC, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueC).to(fanoutExchange);}}

consumer下的:

package com.cx.rabbitmq.consumer;import com.cx.rabbitmq.config.RabbitConfig;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiver {private final Logger logger = LoggerFactory.getLogger(this.getClass());/*** 手动确认消息,假如不确认的话,消息一直会存在在队列当中,下次消费的时候,就会出现重复消费* @param content* @param channel* @param message*/@RabbitHandlerpublic void process(String content, Channel channel, Message message) {logger.info("接收处理队列A当中的消息: {}" , content);//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {//丢弃这条消息//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);e.printStackTrace();}}/*最简单的消息消费功能@RabbitHandlerpublic void process(String content) {logger.info("接收处理队列A当中的消息: {}" , content);}*/}
package com.cx.rabbitmq.consumer;import com.cx.rabbitmq.config.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author sj2* 准备了三个类MsgReceiverB_A,MsgReceiverB_B,MsgReceiverB_C,来消费队列B当中的消息,消费的顺序是负载均衡的* 消费的顺序是无序的,也就是不保证先进来的消息先被消费*/
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_B)
public class MsgReceiverB_A {private final Logger logger = LoggerFactory.getLogger(this.getClass());@RabbitHandlerpublic void process(String content) {logger.info("处理器A接收处理队列B当中的消息: {}" , content);}}
package com.cx.rabbitmq.consumer;import com.cx.rabbitmq.config.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author sj2* 准备了三个类MsgReceiverB_A,MsgReceiverB_B,MsgReceiverB_C,来消费队列B当中的消息,消费的顺序是负载均衡的* 消费的顺序是无序的,也就是不保证先进来的消息先被消费*/
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_B)
public class MsgReceiverB_B {private final Logger logger = LoggerFactory.getLogger(this.getClass());@RabbitHandlerpublic void process(String content) {logger.info("处理器B接收处理队列B当中的消息: {}" , content);}
}
package com.cx.rabbitmq.consumer;import com.cx.rabbitmq.config.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author sj2* 准备了三个类MsgReceiverB_A,MsgReceiverB_B,MsgReceiverB_C,来消费队列B当中的消息,消费的顺序是负载均衡的* 消费的顺序是无序的,也就是不保证先进来的消息先被消费*/
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_B)
public class MsgReceiverB_C {private final Logger logger = LoggerFactory.getLogger(this.getClass());@RabbitHandlerpublic void process(String content) {logger.info("处理器C接收处理队列B当中的消息: {}" , content);}
}
package com.cx.rabbitmq.consumer;import com.cx.rabbitmq.config.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author sj2* 处理队列C当中的消息*/
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_C)
public class MsgReceiverC {private final Logger logger = LoggerFactory.getLogger(this.getClass());@RabbitHandlerpublic void process(String content) {logger.info("接收处理队列C当中的消息: {}" , content);}
}

controller下:

package com.cx.rabbitmq.controller;import com.cx.rabbitmq.productor.MsgProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;/*** @author sj2*/
@Controller
public class RabbitController {@Autowiredprivate MsgProducer msgProducer;/*** 发送消息到队列A* @return*/@ResponseBody@RequestMapping("/rabbitmq/sendMsg")public String sendMsg(){int msgNum = 100;for(int i=0;i<msgNum;i++) {msgProducer.sendMsg("这是发送的第"+i+"条消息");}return "success";}/*** 发送消息到队列B* @return*/@ResponseBody@RequestMapping("/rabbitmq/sendMsgToQueueB")public String sendMsgToQueueB(){int msgNum = 100;for(int i=1;i<=msgNum;i++) {msgProducer.sendMsgToQueueB("这是发送的第"+i+"条消息");}return "success";}/*** 发送消息到队列B* @return*/@ResponseBody@RequestMapping("/rabbitmq/sendMsgAll")public String sendMsgAll(){int msgNum = 10;for(int i=1;i<=msgNum;i++) {msgProducer.sendAll("这是发送的第"+i+"条消息");}return "success";}
}

Productor下:

package com.cx.rabbitmq.productor;import com.cx.rabbitmq.config.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;@Component
public class MsgProducer implements RabbitTemplate.ConfirmCallback {private final Logger logger = LoggerFactory.getLogger(this.getClass());/*** 由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入*/private RabbitTemplate rabbitTemplate;/*** 构造方法注入rabbitTemplate*/@Autowiredpublic MsgProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;//rabbitTemplate如果为单例的话,那回调就是最后设置的内容rabbitTemplate.setConfirmCallback(this);}public void sendMsg(String content) {// CorrelationData  该数据的作用是给每条消息一个唯一的标识CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());//把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列ArabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);}public void sendMsgToQueueB(String content) {// CorrelationData  该数据的作用是给每条消息一个唯一的标识CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());//把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列ArabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_B, RabbitConfig.ROUTINGKEY_B, content, correlationId);}/*** 消息发送,这里不设置routing_key,因为设置了也无效,发送端的routing_key写任何字符都会被忽略。* @param content*/public void sendAll(String content) {CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"", content,correlationId);}/*** 回调*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {logger.info(" 回调id:" + correlationData);if (ack) {logger.info("消息成功被发送到rabbitmq");} else {logger.info("消息发送到rabbitmq失败:" + cause);}}
}

application.properties

# 运行端口
server.port=8001
# 对于rabbitMQ的支持
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

运行即可

springboot集成rabbitMQ安装+代码相关推荐

  1. springboot 集成rabbitmq 实例

    springboot 集成rabbitmq 实例 个人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理 ...

  2. SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门

    1.Windows下安装RabbitMQ的步骤详解+图解(erlang+RabbitMQ) 2.SpringBoot集成RabbitMQ参考文章 1.RabbitMQ介绍 RabbitMQ是实现AMQ ...

  3. Springboot集成RabbitMQ一个完整案例

    springboot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,springboot 提供了 spring-boot-starter-amqp 对消息各种支持. 1.配置pom文 ...

  4. Springboot集成rabbitMQ之mandatory和备份交换机

    Springboot集成rabbitMQ之mandatory和备份交换机 mandatory 之前编写的消息队列代码中,通过重写ConfirmCallback中的confirm方法实现了消息送达的确认 ...

  5. RabbitMQ——SpringBoot集成RabbitMQ

    文章目录: 1.创建一个SpringBoot工程--消息发送者 1.创建一个SpringBoot工程--消息接收者 3.测试结果 3.1 direct 3.2 fanout 3.3 topic 3.4 ...

  6. (需求实战_进阶_02)SSM集成RabbitMQ 关键代码讲解、开发、测试

    接上一篇:(企业内部需求实战_进阶_01)SSM集成RabbitMQ 关键代码讲解.开发.测试 https://gblfy.blog.csdn.net/article/details/10419730 ...

  7. RabbitMq(八) SpringBoot整合RabbitMQ 生产者代码实现

    在本章中我们将创建RabbitMQ的生产者工程,并实现生产者端代码实现. springboot整合RabbitMQ生产者工程步骤如下: 创建maven工程 引入springboot及RabbitMQ依 ...

  8. springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式

    springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式 说明: RabbitMQ消息的可靠投递 在使用 RabbitMQ 的时 ...

  9. Springboot集成rabbitmq实现延时队列

    Springboot集成rabbitmq实现延时队列 什么是延时队列? 列举几个使用场景: 常见的种类有: 延时任务-实现方式: 详细信息:[https://www.cnblogs.com/JonaL ...

最新文章

  1. PS切图篇(一)---界面设置
  2. python中cgi到底是什么_python cgi是什么
  3. 简单的活又谈何容易呢
  4. Linux IO多路复用之epoll网络编程(含源码)
  5. Kubernetes 的原理
  6. LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
  7. JavaScript的学习--生成二维码
  8. 嵌入式系统开发笔记100:使用FlyMcu下载程序到STM32单片机
  9. Newtonsoft 六个超简单又实用的特性,值得一试 !
  10. ipv6电视直播Android,关于高校电视直播(ipv4ipv6)
  11. vbs整人小程序集合
  12. java缓存Ehcache的使用
  13. Python实现数字转变为Excel的列
  14. python的反转_Python 反转
  15. 牛客每日练习----圆圈​​​​​​​,TaoTao要吃鸡,吐泡泡
  16. PPT中图片(形状)叠加时的透明效果
  17. 海龟如何保留米帝手机号
  18. GitHub使用教程详解(下)——Git的安装以及Git命令详解
  19. c语言中程序框图含义,关于高中数学《算法的含义、程序框图 》练习题
  20. 简洁的JS图片滚动代码

热门文章

  1. ELECTRA:超越BERT,2019年最佳NLP预训练模型
  2. 刘铁岩:AI打通关键环节,加快物流行业数字化转型
  3. 70亿美金!英伟达欲竞购这家以色列芯片公司!
  4. 阿里云凌晨大规模宕机,华北部分网站陷入瘫痪
  5. 一文掌握常用的机器学习模型(文末福利)
  6. 最全技术剖析:百度视觉团队获世界最大规模目标检测竞赛冠军
  7. 活动推荐 | 百千万人才工程创新大讲堂开启报名
  8. 公开课 | 人脸识别的最新进展以及工业级大规模人脸识别实践探讨
  9. 关于Java 获取时间戳的方法,我和同事争论了半天
  10. 我同事说我写代码像写诗