rabbitmq安装

rabbitmq安装 centos 7

简单参数说明

使用rabbitmq 需要新增交换机Exchange,队列Queue(队列中还要设置绑定路由关键字RoutingKey)

Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue:消息的载体,每个消息都会被投到一个或多个队列

Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。

Producer:消息生产者,就是投递消息的程序.

Consumer:消息消费者,就是接受消息的程序.

Channel:消息通道,在客户端的每个连接里,可建立多个channel.

rabbitmq管理页面设置

1.新增交换机 add a new exchange

2.新增队列 Queue

3.点击QUEUE_A,计入单个队列管理页面,进行 交换机(exchange)、队列(queue)、路由关键字(Routing Key)绑定

4.用户虚拟主机/权限设置,安装中有写

项目配置

1.依赖

<!-- rabbitmq --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2.配置文件

#rabbitmq
spring.rabbitmq.host=192.168.0.117 #rabbitmq安装机的ip地址
spring.rabbitmq.port=5672 #这里是5672端口,15672端口是web管理页面的
spring.rabbitmq.username=admin #自己添加的用户,若是host为localhost 可以用guest
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/ #对应虚拟主机
#开启发送确认
spring.rabbitmq.publisher-confirms=true
#开启发送失败返回
spring.rabbitmq.publisher-returns=true
# 开启ack确认
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

SpringBoot集成RabbitMQ确认机制分为三种:none、auto(默认)、manual

Auto
1. 如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认
2. 当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且 requeue = false(不重新入队列)
3. 当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认
4. 其他的异常,则消息会被拒绝,且 requeue = true,此时会发生死循环,可以通过 setDefaultRequeueRejected(默认是true)去设置抛弃消息
manual
设置成manual手动确认,一定要对消息做出应答,否则rabbit认为当前队列没有消费完成,将不再继续向该队列发送消息。
1.channel.basicAck(long,boolean); 确认收到消息,消息将被队列移除,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息。
2.channel.basicNack(long,boolean,boolean); 确认否定消息,第一个boolean表示一个consumer还是所有,第二个boolean表示requeue是否重新回到队列,true重新入队。
3.channel.basicReject(long,boolean); 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列。
4.当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会又接收到这条消息,如果想消息进入队尾,须确认消息后再次发送消息。

3.rabbitmq具体配置类(启动项目之前,记得要加上@EnableRabbit注解,支持Rabbit)

(注:rabbitmqTemplate没有必须是prototype类型,rabbitTemplate是thread safe的,主要是channel不能共用,但是在rabbitTemplate源码里channel是threadlocal的,所以singleton没问题。但是rabbitTemplate要设置回调类,如果是singleton,回调类就只能有一个,所以如果想要设置不同的回调类,就要设置为prototype的scope)

package cn.penghf.lovemaster.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;@Configuration
@Slf4j
public class RabbitConfiguration {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;@Value("${spring.rabbitmq.publisher-confirms}")private boolean publisherConfirms;/** Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列 **/public static final String EXCHANGE_A = "exchange_A";public static final String EXCHANGE_B = "exchange_B";public static final String EXCHANGE_C = "exchange_C";/** Queue:消息的载体,每个消息都会被投到一个或多个队列 **/public static final String QUEUE_A = "QUEUE_A";public static final String QUEUE_B = "QUEUE_B";public static final String QUEUE_C = "QUEUE_C";/** Routing Key:路由关键字,exchange根据这个关键字进行消息投递 **/public static final String ROUTINGKEY_A = "routingKey_A";public static final String ROUTINGKEY_B = "routingKey_B";public static final String ROUTINGKEY_C = "routingKey_C";@Beanpublic ConnectionFactory connectionFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(publisherConfirms);//开启发送消息确认connectionFactory.setPublisherReturns(true);//开启发送失败退回return connectionFactory;}@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)//设置多个回调类时,设置为prototype原型模式public RabbitTemplate rabbitTemplate(){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());return rabbitTemplate;}/*** 针对消费者配置* 1.设置交换机类型* 2.将队列绑定到交换机* FanoutExchange 将消息分发到所有的绑定队列,无routingKey的概念* HeadersExchange 通过添加属性key-value 匹配* DirectExchange 按照routingKey分发到制定队列* TopicExchange 多关键字匹配* @return*/@Beanpublic DirectExchange defaultExchange(){return new DirectExchange(EXCHANGE_A);}@Beanpublic DirectExchange exchangeB(){return new DirectExchange(EXCHANGE_B);}/*** 获取队列* @return*/@Beanpublic Queue queueA(){// 队列持久return new Queue(QUEUE_A,true); //new Queue(QUEUE_A)默认durable 为 true}@Beanpublic Queue queueB(){return new Queue(QUEUE_B,true);}@Beanpublic Queue queueC(){return new Queue(QUEUE_C,true);}/*** 交换机 消息队列 绑定 ,通过指定路由关键字进行消息投递* @return*/@Beanpublic Binding binding(){return BindingBuilder.bind(queueA()).to(defaultExchange()).with(ROUTINGKEY_A);}@Beanpublic Binding bindingB(){return BindingBuilder.bind(queueB()).to(exchangeB()).with(ROUTINGKEY_B);}/*** 广播 , 给fanout交换机发送消息,所有绑定了这个交换机的队列都会收到消息*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(EXCHANGE_C );}@Beanpublic Binding bindingFanoutA(){return BindingBuilder.bind(queueA()).to(fanoutExchange());}@Beanpublic Binding bindingFanoutB(){return BindingBuilder.bind(queueB()).to(fanoutExchange());}@Beanpublic Binding bindingFanoutC(){return BindingBuilder.bind(queueC()).to(fanoutExchange());}}

4.生产者

package cn.penghf.lovemaster.service.rabbitmq;import cn.penghf.common.utils.PKGenerator;
import cn.penghf.lovemaster.config.RabbitConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.SerializationUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MsgProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{//由于rabbitmqTemplate设置为prototype 所以不能自动注入private RabbitTemplate rabbitTemplate;@Autowiredpublic MsgProducer(RabbitTemplate rabbitTemplate){this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setConfirmCallback(this::confirm);rabbitTemplate.setReturnCallback(this::returnedMessage);}public void sendMsg(String content){CorrelationData correlationId = new CorrelationData(PKGenerator.generateId());//把消息放入 ROUTINGKEY_A 对应的队列中去, 对应的队列ArabbitTemplate.convertAndSend(RabbitConfiguration.EXCHANGE_A, RabbitConfiguration.ROUTINGKEY_A, content, correlationId);}/*** 回调* @param data 回调id* @param ack 消息发送确认* @param cause 发送失败原因// 实现ConfirmCallback// ACK=true仅仅标示消息已被Broker接收到,并不表示已成功投放至消息队列中// ACK=false标示消息由于Broker处理错误,消息并未处理成功*/@Overridepublic void confirm(CorrelationData data, boolean ack, String cause) {log.info("回调id:" + data);if (ack){ //确认消息已消费log.info("消息成功发送:" + ack);}else {log.info("消息发送失败:" + cause);}}/*** 消息发送失败退回// 实现ReturnCallback// 当消息发送出去找不到对应路由队列时,将会把消息退回// 如果有任何一个路由队列接收投递消息成功,则不会退回消息*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息主体: " + SerializationUtils.deserialize(message.getBody()));System.out.println("应答码: " + replyCode);System.out.println("描述:" + replyText);System.out.println("消息使用的交换器 exchange : " + exchange);System.out.println("消息使用的路由键 routing : " + routingKey);}
}

5.消费者

package cn.penghf.lovemaster.service.rabbitmq;import cn.penghf.lovemaster.config.RabbitConfiguration;
import cn.penghf.lovemaster.exception.LoveAuthorizeException;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.omg.CORBA.SystemException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
//@RabbitListener(queues = RabbitConfiguration.QUEUE_A)
@Slf4j
public class MsgReceiver {@RabbitListener(queues = RabbitConfiguration.QUEUE_A)@RabbitHandlerpublic void process(String content, Channel channel, Message message) throws IOException{log.info("处理器-0-接收处理队列A当中的消息: {}" , content);try {//模拟异常String s=null;s.toString();//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("0-----处理成功-----{}",content);} catch (Exception e) {this.confirmException(message,channel,content,e);}}@RabbitListener(queues = RabbitConfiguration.QUEUE_A)@RabbitHandlerpublic void process1(String content, Channel channel, Message message) throws IOException{log.info("处理器-1-接收处理队列A当中的消息: {}" , content);try {//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("1-----处理成功-----{}",content);} catch (Exception e) {this.confirmException(message,channel,content,e);}}@RabbitListener(queues = RabbitConfiguration.QUEUE_A)@RabbitHandlerpublic void process2(String content, Channel channel, Message message) throws IOException{log.info("处理器-2-接收处理队列A当中的消息: {}" , content);try {//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("2-----处理成功-----{}",content);} catch (Exception e) {this.confirmException(message,channel,content,e);}}private void confirmException(Message message, Channel channel, String content, Exception e) throws IOException{/*if (message.getMessageProperties().getRedelivered()) {System.out.println("消息已重复处理失败,拒绝再次接收 " + content);// 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {*/System.out.println("消息即将再次返回队列处理 " + content);// requeue为是否重新回到队列,true重新入队channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);//}//丢弃这条消息//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);//log.error("receiver fail",e);}
}

6.测试或者controller(自行解决)

    @RequestMapping("/recieveMsg")public String recieveMsg(String msg, Integer i){for (Integer integer = 0; integer < i; integer++) {msgProducer.sendMsg(msg + integer);}return "OK";}

rabbitmq发送确认返回和消费确认参考:https://www.jianshu.com/p/fae8fca98522

将@RabbitListen注解加在类上报出异常:https://blog.csdn.net/u013358378/article/details/86495962

springboot 2.1 整合 rabbitmq 踩坑 砰砰砰相关推荐

  1. SpringBoot 2 快速整合 RabbitMQ

    前言 本文介绍了通过最简单方法使用 SpringBoot 2 整合 RabbitMQ,带你快速上手 RabbitMQ 的操作. 操作前需要先安装 RabbitMQ 服务.Windows 系统可以参考 ...

  2. springboot admin整合spingcloud踩坑

    一般来说springbootadmin会分为服务端,和客户端 服务端需要依赖spring-boot-admin-starter-server 客户端需要依赖spring-boot-admin-star ...

  3. idea 包拆分_idea springboot项目拆分多模块踩坑记(1)

    先说一下项目背景吧,我这个项目主要分为后台管理和前台应用两部分,后台管理用的是ruoyi,基于springboot的,前台是自己搭的一套springcloud,连接的数据库是同一个,所以就出现了前后端 ...

  4. Php amqp 效率低,php 使用rabbitmq 踩坑记录

    环境 1.安装rabbit php使用 安装包,直接安装composer 安装php-amqplib/php-amqplib 这个包 生产着 连接mq server require_once __DI ...

  5. SpringBoot+SQL Server2000驱动版本踩坑

    由于高版本驱动不支持SQL Server2000,因此需要下载低版本驱动,SQL Server2000需要使用 sqljdbc4-3.0版本,其它版本无法启动 <dependency>&l ...

  6. SpringBoot整合RabbitMQ消息队列

    RabbitMQ 一.RabbitMQ介绍 1.1 现存问题 服务调用:两个服务调用时,我们可以通过传统的HTTP方式,让服务A直接去调用服务B的接口,但是这种方式是同步的方式,虽然可以采用Sprin ...

  7. 踩坑日记一:idea 启动项目报错 java:程序包xxx不存在解决方案

    愿我所踩的坑,都是你们路上的灯 这两天开始学习idea搭建springboot项目的时候,踩坑不停,自己搭建项目碰到java:程序包xxx不存在的问题,但是项目中确实是存在的,为此问题头疼了半天,最后 ...

  8. 【Java笔记+踩坑】SpringBoot基础3——开发。热部署+配置高级+整合NoSQL/缓存/任务/邮件/监控

      导航: [黑马Java笔记+踩坑汇总]JavaSE+JavaWeb+SSM+SpringBoot+瑞吉外卖+SpringCloud/SpringCloudAlibaba+黑马旅游+谷粒商城 目录 ...

  9. nginx实现对websocket加密协议wss协议的反向代理以及websocket整合springboot入门(踩坑)

    出发点 最近公司需要有一个推送前端消息的需求 首选websocket进行推送,也做过客服系统接触过,最近看了一点websocket的相关内容记录总结一下. 什么是websocket websocket ...

最新文章

  1. OpenCV安装好了,show第一个例子,找找成就感~~~
  2. Javasc中发出HTTP请求最常用的方法
  3. docker 搜寻 包 版本_ubuntu16.04下安装docker容器
  4. JQuery实现ajax跨域
  5. Linux:文件格式化与相关处理及sed工具
  6. 【GDB调试学习笔记】GDB调试入门
  7. Web程序设计-客户端表单验证
  8. 【算法】什么是OJ系统?
  9. 行业专家对2021年的云计算发展趋势的预测
  10. python 探索性分析_python中的探索性文本分析
  11. 技术分享| 如何使用FFmpeg命令处理音视频
  12. java mqc系统开源代码_android PDF阅读器(开源) 暴强哦
  13. MySQL与Oracle主键Query性能测试结果
  14. 倪光南院士 你该检讨一下了
  15. 小孩近视用白炽灯好吗?使用护眼台灯有啥好处?
  16. C++电话号码转对应字母
  17. 线性表的练习--学生成绩统计
  18. 糯米团—重制“iPhone团购信息客户端”(零)源代码与跳的那些坑和思考
  19. Android开发五年,分享一些行业经验
  20. 黑大汇文楼计算机研究生,谈一谈我在黑龙江大学的复试经历

热门文章

  1. 用Java模拟乐观锁和悲观锁
  2. 伯乐在线优秀翻译博客
  3. 小米9android系统耗电量大,小米9品控翻车,大批手机出现耗电异常情况!
  4. SpringBoot快速入门(IDEA联网版)
  5. 1346. 检查整数及其两倍数是否存在
  6. lapack 在window下编译
  7. 医院信息化-1 信息系统概况
  8. Codeforces - Tidying Up
  9. SSL 资源发布实验
  10. CF——1611D,图论构造题打卡