初探 RabbitMQ 消息队列

rabbitmq基础概念常见应用场景导入依赖属性配置具体编码定义队列实体类控制器消息消费者主函数测试总结说点什么


SpringBoot 是为了简化 Spring 应用的创建、运行、调试、部署等一系列问题而诞生的产物,自动装配的特性让我们可以更好的关注业务本身而不是外部的XML配置,我们只需遵循规范,引入相关的依赖就可以轻易的搭建出一个 WEB 工程

MQ全称(Message Queue)又名消息队列,是一种异步通讯的中间件。可以将它理解成邮局,发送者将消息传递到邮局,然后由邮局帮我们发送给具体的消息接收者(消费者),具体发送过程与时间我们无需关心,它也不会干扰我进行其它事情。常见的MQ有kafka、activemq、zeromq、rabbitmq 等等,各大MQ的对比和优劣势可以自行Google

rabbitmq

RabbitMQ是一个遵循AMQP协议,由面向高并发的erlanng语言开发而成,用在实时的对可靠性要求比较高的消息传递上,支持多种语言客户端。支持延迟队列(这是一个非常有用的功能)….

基础概念

Broker:简单来说就是消息队列服务器实体
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
Queue:消息队列载体,每个消息都会被投入到一个或多个队列
Binding:绑定,它的作用就是把exchangequeue按照路由规则绑定起来
Routing Key:路由关键字,exchange根据这个关键字进行消息投递
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
producer:消息生产者,就是投递消息的程序
consumer:消息消费者,就是接受消息的程序
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

基于Centos7.x安装请参考: http://blog.battcn.com/2017/08/20/linux/linux-centos7-ribbitmq/

常见应用场景

  1. 邮箱发送:用户注册后投递消息到rabbitmq中,由消息的消费方异步的发送邮件,提升系统响应速度

  2. 流量削峰:一般在秒杀活动中应用广泛,秒杀会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。用于控制活动人数,将超过此一定阀值的订单直接丢弃。缓解短时间的高流量压垮应用。

  3. 订单超时:利用rabbitmq的延迟队列,可以很简单的实现订单超时的功能,比如用户在下单后30分钟未支付取消订单

  4. 还有更多应用场景就不一一列举了…..

导入依赖

在 pom.xml 中添加 spring-boot-starter-amqp的依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.46</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

属性配置

在 application.properties 文件中配置rabbitmq相关内容,值得注意的是这里配置了手动ACK的开关

spring.rabbitmq.username=battcn
spring.rabbitmq.password=battcn
spring.rabbitmq.host=192.168.0.133
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
# 手动ACK 不开启自动ACK模式,目的是防止报错后未正确处理消息丢失 默认 为 none
spring.rabbitmq.listener.simple.acknowledge-mode=manual

具体编码

定义队列

如果手动创建过或者RabbitMQ中已经存在该队列那么也可以省略下述代码…

package com.battcn.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ配置** @author Levin* @since 2018/4/11 0011*/
@Configuration
public class RabbitConfig {public static final String DEFAULT_BOOK_QUEUE = "dev.book.register.default.queue";public static final String MANUAL_BOOK_QUEUE = "dev.book.register.manual.queue";@Beanpublic Queue defaultBookQueue() {// 第一个是 QUEUE 的名字,第二个是消息是否需要持久化处理return new Queue(DEFAULT_BOOK_QUEUE, true);}@Beanpublic Queue manualBookQueue() {// 第一个是 QUEUE 的名字,第二个是消息是否需要持久化处理return new Queue(MANUAL_BOOK_QUEUE, true);}
}

实体类

创建一个Book

public class Book implements java.io.Serializable {private static final long serialVersionUID = -2164058270260403154L;private String id;private String name;// 省略get set ...
}

控制器

编写一个Controller类,用于消息发送工作

package com.battcn.controller;import com.battcn.config.RabbitConfig;
import com.battcn.entity.Book;
import com.battcn.handler.BookHandler;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author Levin* @since 2018/4/2 0002*/
@RestController
@RequestMapping(value = "/books")
public class BookController {private final RabbitTemplate rabbitTemplate;@Autowiredpublic BookController(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}/*** this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE, book); 对应 {@link BookHandler#listenerAutoAck}* this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOK_QUEUE, book); 对应 {@link BookHandler#listenerManualAck}*/@GetMappingpublic void defaultMessage() {Book book = new Book();book.setId("1");book.setName("一起来学Spring Boot");this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE, book);this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOK_QUEUE, book);}
}

消息消费者

默认情况下 spring-boot-data-amqp 是自动ACK机制,就意味着 MQ 会在消息消费完毕后自动帮我们去ACK,这样依赖就存在这样一个问题:如果报错了,消息不会丢失,会无限循环消费,很容易就吧磁盘空间耗完,虽然可以配置消费的次数但这种做法也有失优雅。目前比较推荐的就是我们手动ACK然后将消费错误的消息转移到其它的消息队列中,做补偿处理

package com.battcn.handler;import com.battcn.config.RabbitConfig;
import com.battcn.entity.Book;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** BOOK_QUEUE 消费者** @author Levin* @since 2018/4/11 0011*/
@Component
public class BookHandler {private static final Logger log = LoggerFactory.getLogger(BookHandler.class);/*** <p>TODO 该方案是 spring-boot-data-amqp 默认的方式,不太推荐。具体推荐使用  listenerManualAck()</p>* 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK* 存在问题:如果报错了,消息不会丢失,但是会无限循环消费,一直报错,如果开启了错误日志很容易就吧磁盘空间耗完* 解决方案:手动ACK,或者try-catch 然后在 catch 里面讲错误的消息转移到其它的系列中去* spring.rabbitmq.listener.simple.acknowledge-mode=manual* <p>** @param book 监听的内容*/@RabbitListener(queues = {RabbitConfig.DEFAULT_BOOK_QUEUE})public void listenerAutoAck(Book book, Message message, Channel channel) {// TODO 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉final long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("[listenerAutoAck 监听的消息] - [{}]", book.toString());// TODO 通知 MQ 消息已被成功消费,可以ACK了channel.basicAck(deliveryTag, false);} catch (IOException e) {try {// TODO 处理失败,重新压入MQchannel.basicRecover();} catch (IOException e1) {e1.printStackTrace();}}}@RabbitListener(queues = {RabbitConfig.MANUAL_BOOK_QUEUE})public void listenerManualAck(Book book, Message message, Channel channel) {log.info("[listenerManualAck 监听的消息] - [{}]", book.toString());try {// TODO 通知 MQ 消息已被成功消费,可以ACK了channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {// TODO 如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列}}
}

主函数

package com.battcn;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @author Levin*/
@SpringBootApplication
public class Chapter11Application {public static void main(String[] args) {SpringApplication.run(Chapter11Application.class, args);}
}

测试

完成准备事项后,启动Chapter11Application 访问 http://localhost:8080/books 将会看到如下内容,就代表一切正常….

2018-05-22 19:04:26.708  INFO 23752 --- [cTaskExecutor-1] com.battcn.handler.BookHandler           : [listenerAutoAck 监听的消息] - [com.battcn.entity.Book@77d8be18]
2018-05-22 19:04:26.709  INFO 23752 --- [cTaskExecutor-1] com.battcn.handler.BookHandler           : [listenerManualAck 监听的消息] - [com.battcn.entity.Book@8bb452]

总结

目前很多大佬都写过关于 SpringBoot 的教程了,如有雷同,请多多包涵,本教程基于最新的 spring-boot-starter-parent:2.0.2.RELEASE编写,包括新版本的特性都会一起介绍…

初探 RabbitMQ 消息队列相关推荐

  1. RabbitMQ学习系列二:.net 环境下 C#代码使用 RabbitMQ 消息队列

    上一篇已经讲了Rabbitmq如何在Windows平台安装,不懂请移步:RabbitMQ学习系列一:windows下安装RabbitMQ服务 一.理论: .net环境下,C#代码调用RabbitMQ消 ...

  2. RabbitMQ消息队列(十三)-VirtualHost与权限管理

    像mysql有数据库的概念并且可以指定用户对库和表等操作的权限.那RabbitMQ呢?RabbitMQ也有类似的权限管理.在RabbitMQ中可以虚拟消息服务器VirtualHost,每个Virtua ...

  3. rabbitMQ消息队列 – 面板介绍及简单demo

    首先rabbit安装好之后,运维会给一个控制面板. 默认账号密码为guest 登入以后可以看到具体界面. 在此鸣谢百度翻译给予的大力支持.. ###写一个简单的demo 编写之前..虽然说可以直接用底 ...

  4. 大数据互联网架构阶段 QuartZ定时任务+RabbitMQ消息队列

    QuartZ定时任务+RabbitMQ消息队列 一 .QuartZ定时任务解决订单系统遗留问题 情景分析: 在电商项目中 , 订单生成后 , 数据库商品数量-1 , 但是用户迟迟不进行支付操作 , 这 ...

  5. RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

    在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会 ...

  6. (四)RabbitMQ消息队列-服务详细配置与日常监控管理

    (四)RabbitMQ消息队列-服务详细配置与日常监控管理 原文:(四)RabbitMQ消息队列-服务详细配置与日常监控管理 RabbitMQ服务管理 启动服务:rabbitmq-server -de ...

  7. 使用EasyNetQ组件操作RabbitMQ消息队列服务

    RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现,是实现消息队列应用的一个中间件,消息队列中间件是分布式系统中重要的组件,主要解决应用耦合, ...

  8. 基于RabbitMQ消息队列的分布式事务解决方案 - MQ分布式消息中间件实战

    基于RabbitMQ消息队列的分布式事务解决方案 - MQ分布式消息中间件实战 参考文章: (1)基于RabbitMQ消息队列的分布式事务解决方案 - MQ分布式消息中间件实战 (2)https:// ...

  9. 消息队列——RabbitMQ消息队列集群

    RabbitMQ消息队列集群 消息队列/中间件 RabbitMQ详解 RabbitMQ单机部署 RabbitMQ集群部署 消息队列/中间件 一.前言 在我们秒杀抢购商品的时候,系统会提醒我们稍等排队中 ...

最新文章

  1. python使用imbalanced-learn的ClusterCentroids方法进行下采样处理数据不平衡问题
  2. spring boot 集成 mybatis,数据库为mysql
  3. 如何跟随有三从零进阶中级CV算法工程师
  4. 信息学奥赛一本通(1137:加密的病历单)
  5. 「附身」马云、恶搞特朗普,AI新应用助你上演「大咖模仿秀」
  6. OAuth2.0 微信授权机制
  7. Ubuntu14.04安装中文输入法以及解决Gedit中文乱码问题
  8. javascript 中Math函数在勾弦定理计算上的错误
  9. argb纯黑色_设计师应避免使用纯黑色字体,但应使用哪种深灰色字体
  10. 新浪开发者平台 android 签名,新浪微博接入流程
  11. QQ连连看外挂源码-VB源代码
  12. 【Java项目-飞翔的小鸟】附源码
  13. 计算机网络三层交换机配置,综述三层交换机配置实例 附详细命令解释
  14. 从0到1:开启商业与未来的秘密
  15. Simulink代码生成: 使能子系统及其代码
  16. Linux中update和upgrade的区别
  17. php中getopt函数的使用
  18. 在CygWin中设置proxy
  19. Python VTK计算曲面的高斯曲率和平均曲率
  20. Java中POJO及其细分XO、DAO的概念

热门文章

  1. STM32F103_DDWG窗口看门狗
  2. python制作一个简单的udp聊天器
  3. mini2440驱动分析之ADC
  4. php的list函数的使用,php list函数如何使用
  5. mysql索引 实验_“索引”实验小例
  6. Web测试常用的链接测试工具
  7. Python3 局部变量与全局变量作用域
  8. jieba库分词词频统计
  9. 正则双重过滤 /// splitKey1 第一个正则式匹配 /// splitKey2 匹配结果中再次匹配进行替...
  10. 关于spring security自定义sessionRegistry