一、流程图

本文内容主要围绕这个流程图展开,利用 RabbitMQ 消息队列,实现生产者与消费者解耦,所以有必要先贴出来,涵盖了 RabbitMQ 很多知识点,如:

  • 消息发送确认机制

  • 消费确认机制

  • 消息的重新投递

  • 消费幂等性, 等等

二、实现思路

  • 1.在虚拟机创建一个CentOS7上,并安装 RabbitMQ

  • 2.开放QQ邮箱或者其它邮箱授权码,用于发送邮件

  • 3.创建邮件发送项目并编写代码

  • 4.发送邮件测试

  • 5.消息发送失败处理

三、RabbitMQ安装

RabbitMQ 基于 erlang 进行通信,相比其它的软件,安装有些麻烦,不过本例采用rpm方式安装,任何新手都可以完成安装,过程如下!

3.1、安装前命令准备

输入如下命令,完成安装前的环境准备。

yum install lsof  build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz wget vim

3.2、下载 RabbitMQ、erlang、socat 的安装包

本次下载的是RabbitMQ-3.6.5版本,采用rpm一键安装,适合新手直接上手。

先创建一个rabbitmq目录,本例的目录路径为/usr/app/rabbitmq,然后在目录下执行如下命令,下载安装包!

  • 下载erlang

wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
  • 下载socat

wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
  • 下载rabbitMQ

wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

最终目录文件如下:

3.3、安装软件包

下载完之后,按顺序依次安装软件包,这个很重要哦~

  • 安装erlang

rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
  • 安装socat

rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
  • 安装rabbitmq

rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

安装完成之后,修改rabbitmq的配置,默认配置文件在/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin目录下。

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app

修改loopback_users节点的值!

最后只需通过如下命令,启动服务即可!

rabbitmq-server start &

运行脚本之后,如果报错,例如下图!

解决办法如下:

vim /etc/rabbitmq/rabbitmq-env.conf

在文件里添加一行,如下配置!

NODENAME=rabbit@localhost

然后,再保存!再次以下命令启动服务!

rabbitmq-server start &

通过如下命令,查询服务是否启动成功!

lsof -i:5672

如果出现5672已经被监听,说明已经启动成功!

3.4、启动可视化的管控台

输入如下命令,启动控制台!

rabbitmq-plugins enable rabbitmq_management

用浏览器打开http://ip:15672,这里的ip就是 CentOS 系统的 ip,结果如下:

账号、密码,默认为guest,如果出现无法访问,检测防火墙是否开启,如果开启将其关闭即可!

登录之后的监控平台,界面如下:

四、邮箱授权码的获取

获取邮箱授权码的目的,主要是为了通过代码进行发送邮件,例如 QQ 邮箱授权码获取方式,如下图:

点击【开启】按钮,然后发送短信,即可获取授权码,该授权码就是配置文件spring.mail.password需要的密码!

五、项目介绍

  • springboot版本:2.1.5.RELEASE

  • RabbitMQ版本:3.6.5

  • SendMailUtil:发送邮件工具类

  • ProduceServiceImpl:生产者,发送消息

  • ConsumerMailService:消费者,消费消息,发送邮件

六、代码实现

6.1、创建项目

在 IDEA 下创建一个名称为smail的 Springboot 项目,pom文件中加入amqpmail

<dependencies><!--spring boot核心--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring boot 测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springmvc web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--开发环境调试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency><!--mail 支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><!--amqp 支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- commons-lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.4</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency>
</dependencies>

6.2、配置rabbitMQ、mail

application.properties文件中,配置amqpmail

#rabbitmq
spring.rabbitmq.host=192.168.0.103
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100# mail
spring.mail.default-encoding=UTF-8
spring.mail.host=smtp.qq.com
spring.mail.username=1370887518@qq.com
spring.mail.password=获取的邮箱授权码
spring.mail.from=1370887518@qq.com
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true

其中,spring.mail.password第四步中获取的授权码,同时usernamefrom要一致!

6.3、RabbitConfig配置类

@Configuration
@Slf4j
public class RabbitConfig {// 发送邮件public static final String MAIL_QUEUE_NAME = "mail.queue";public static final String MAIL_EXCHANGE_NAME = "mail.exchange";public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";@Autowiredprivate CachingConnectionFactory connectionFactory;@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(converter());// 消息是否成功发送到ExchangerabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info("消息成功发送到Exchange");} else {log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);}});// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调rabbitTemplate.setMandatory(true);// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);});return rabbitTemplate;}@Beanpublic Jackson2JsonMessageConverter converter() {return new Jackson2JsonMessageConverter();}@Beanpublic Queue mailQueue() {return new Queue(MAIL_QUEUE_NAME, true);}@Beanpublic DirectExchange mailExchange() {return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);}@Beanpublic Binding mailBinding() {return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);}
}

6.4、Mail 邮件实体类

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Mail {@Pattern(regexp = "^([a-z0-9A-Z]+[-|\\.]?)+[a-z0-9A-Z]@([a-z0-9A-Z]+(-[a-z0-9A-Z]+)?\\.)+[a-zA-Z]{2,}$", message = "邮箱格式不正确")private String to;@NotBlank(message = "标题不能为空")private String title;@NotBlank(message = "正文不能为空")private String content;private String msgId;// 消息id
}

6.5、SendMailUtil邮件发送类

@Component
@Slf4j
public class SendMailUtil {@Value("${spring.mail.from}")private String from;@Autowiredprivate JavaMailSender mailSender;/*** 发送简单邮件** @param mail*/public boolean send(Mail mail) {String to = mail.getTo();// 目标邮箱String title = mail.getTitle();// 邮件标题String content = mail.getContent();// 邮件正文SimpleMailMessage message = new SimpleMailMessage();message.setFrom(from);message.setTo(to);message.setSubject(title);message.setText(content);try {mailSender.send(message);log.info("邮件发送成功");return true;} catch (MailException e) {log.error("邮件发送失败, to: {}, title: {}", to, title, e);return false;}}
}

6.6、ProduceServiceImpl 生产者类

@Service
public class ProduceServiceImpl implements ProduceService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic boolean send(Mail mail) {//创建uuidString msgId = UUID.randomUUID().toString().replaceAll("-", "");mail.setMsgId(msgId);//发送消息到rabbitMQCorrelationData correlationData = new CorrelationData(msgId);rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME, MessageHelper.objToMsg(mail), correlationData);return true;}
}

6.7、ConsumerMailService 消费者类

@Component
@Slf4j
public class ConsumerMailService {@Autowiredprivate SendMailUtil sendMailUtil;@RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)public void consume(Message message, Channel channel) throws IOException {//将消息转化为对象String str = new String(message.getBody());Mail mail = JsonUtil.strToObj(str, Mail.class);log.info("收到消息: {}", mail.toString());MessageProperties properties = message.getMessageProperties();long tag = properties.getDeliveryTag();boolean success = sendMailUtil.send(mail);if (success) {channel.basicAck(tag, false);// 消费确认} else {channel.basicNack(tag, false, true);}}
}

6.8、TestController 控制层类

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {@Autowiredprivate ProduceService testService;@PostMapping("send")public boolean sendMail(Mail mail) {return testService.send(mail);}
}

七、测试服务

启动 SpringBoot 服务之后,用 postman 模拟请求接口。

查看控制台信息。

查询接受者邮件信息。

邮件发送成功!

八、消息发送失败处理

虽然,上面案例可以成功的实现消息的发送,但是上面的流程很脆弱,例如:rabbitMQ 突然蹦了、邮件发送失败了、重启 rabbitMQ 服务器出现消息重复消费,应该怎处理呢?

很显然,我们需要对原有的逻辑进行升级改造,因此我们需要引入数据库来记录消息的发送情况。

8.1、创建消息投递日志表

CREATE TABLE `msg_log` (`msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',`msg` text COMMENT '消息体, json格式化',`exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机',`routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键',`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费',`try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',`next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`update_time` datetime DEFAULT NULL COMMENT '更新时间',PRIMARY KEY (`msg_id`),UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递日志';

8.2、编写 MsgLog 相关服务类

public interface MsgLogService {/*** 插入消息日志* @param msgLog*/void insert(MsgLog msgLog);/*** 更新消息状态* @param msgId* @param status*/void updateStatus(String msgId, Integer status);/*** 查询消息* @param msgId* @return*/MsgLog selectByMsgId(String msgId);
}

8.3、改写服务逻辑

在生产服务类中,新增数据写入。

同时,在RabbitConfig服务配置,当消息发送成功之后,新增更新消息状态逻辑。

改造消费者ConsumerMailService,每次消费的时候,从数据库中查询,如果消息已经被消费,不用再重复发送数据!

这样即可保证,如果 rabbitMQ 服务器,即使重启之后重新推送消息,通过数据库判断,也不会重复消费进而发生业务异常!

8.4、利用定数任务对消息投递失败进行补偿

当 rabbitMQ 服务器突然挂掉之后,生成者就无法正常进行投递数据,此时因为消息已经被记录到数据库,因此我们可以利用定数任务查询出没有投递成功的消息,进行补偿投递。

利用定数任务,对投递失败的消息进行补偿投递,基本可以保证消息 100% 消费成功!

九、总结

本文主要是通过发送邮件这个业务案例,来讲解 Springboot 与 rabbitMQ 技术的整合和使用!

当然解决这个业务需求的技术方案还有很多,例如 Springboot 与 rocketMQ 也可以实现这个需求,这个会在后期的文章讲解!

同时,Springboot + rabbitMQ 这种架构方案更适合于集群应用,如果是单体应用,直接通过服务类操作即可实现邮件推送!

本篇主要是Springboot 与 rabbitMQ整合和基本使用教程,希望小伙伴能有所收获!

十、参考

1、简书 - wangzaiplus - springboot + rabbitmq发送邮件(保证消息100%投递成功并被消费)

有道无术,术可成;有术无道,止于术

欢迎大家关注Java之道公众号

好文章,我在看❤️

利用SpringBoot+RabbitMQ,实现一个邮件推送服务相关推荐

  1. 如何利用springboot快速搭建一个消息推送系统

    最近在完善毕设的路上,由于是设计一个远程控制物联网系统,所以服务端到硬件我选用了MQTT协议.因为MQTT的发布/订阅模式很适合这种场景.接下来就来聊聊遇到的一些问题以及解决思路吧. 毕设技术栈:sp ...

  2. springboot开发基于阿里云的短信服务、邮件推送服务

    短信服务以及邮件推送服务使用的都是阿里云的产品,短信服务是收费的,所以需要选择短信服务的套餐并预充值才能使用.邮件推送服务是有200条的免费额度. 一:短信服务 1.申请短信签名.短信模板 到阿里云的 ...

  3. 解决.NET Core中MailKit无法使用阿里云邮件推送服务的问题

    在博问中(.net core怎么实现邮件发送)知道了MailKit无法使用阿里云邮件推送服务发送邮件的问题,自已实测也遇到同样的问题,而用自己搭建的邮件服务器没这个问题. 于是,向阿里云提交了工单.. ...

  4. 使用阿里云邮件推送服务架设自己邮件验证与推送体系

    提示:阅读本文需提前了解的相关知识 1.电子邮件协议(http://baike.baidu.com/view/2367542.htm) 2.阿里云邮件推送(https://www.aliyun.com ...

  5. 阿里云-邮件推送 java 代码 ,测试邮件推送服务,阿里云邮件推送,java

    阿里云-邮件推送Java 测试代码 1.阿里云-邮件推送 配置 购买域名 配置域名 2.创建 Access Key 2.1登录 Access Key 管理控制台. 2.2单击页面右上角按钮 创建 Ac ...

  6. 基于腾讯信鸽设计一个微型推送服务

    今日科技快讯 据印度媒体报道,对于旗下短视频应用抖音国际版TikTok在印度遭封杀,母公司字节跳动表现得似乎"非常乐观",并计划未来三年在印度投资10亿美元.在接受采访时,字节跳动 ...

  7. wordpress使用阿里云邮件推送服务实现发送邮件

    之前用腾迅云时,配置了wordpress是可以使用邮件服务的,然而到了阿里云,却无法使用了,有人说是因为阿里云关了25端口,但腾迅好像也关了. 百度看看有没有其他方法,最终让我找到个方法,可惜不是很完 ...

  8. PHPCMS 邮件发送 - 使用阿里云邮件推送服务的详细设置

    自从阿里云和腾讯云相继默认封闭了云主机的25端口(可申请解封) 我就使用了阿里云的邮件推送服务,感觉很是好用.把我使用PHPCMS做的网站的邮件发送也设置成了使用了阿里云的邮件推送. 我使用的邮箱是阿 ...

  9. 阿里云邮件推送服务配置

    前言 距博客评论.留言功能上线以来,虽然访问人数不多. 但是前段时间发现有人评论了文章,并留下了疑问.但是已经过去多时,所以打算完善评论.留言功能,并添加邮件推送以便第一时间知晓. 准备工作 本文采用 ...

最新文章

  1. cmake重新编译matlab,ubuntu系统下cmake 编译matlab中mex文件
  2. SAP smartforms之Zebra print control language
  3. oracle sql命令行中上下左右使用
  4. 区块链学堂(1):区块链引子
  5. hive kerberos java_Kerberos身份验证错误 - Sqoop通过Hive从SQL导入HDFS
  6. soul群聊显示服务器异常,soul群聊状态是什么
  7. 北京达内python价格_记录在北京达内学Python-day07
  8. Spring JdbcTemplate示例
  9. mycat集群执行带有join的sql语句时报错_can‘t find table define in schema_分片join---Linux运维工作笔记052
  10. lol最克制诺手的英雄_LOL“英雄恐惧症”,当你上单遇到诺手时,你会用什么英雄对线...
  11. Adobe Reader历史版本安装包下载
  12. 汇编指令大全(带注释)
  13. 2019区块链将走向何方?硅谷知名投资大咖如是说
  14. Lion Disk Maker让你一键制作Lion系统安装U盘
  15. java工程设计选题管理系统_基于javaee的毕设选题测试及管理系统的设计与实现 毕设.doc...
  16. 霍纳法则c语言算法代码,霍纳法则(Horner's rule)
  17. 我的组会内容分享(部分)CDR+CTLE+DFE
  18. RS485总线灵魂问答,看你知道几个?
  19. 浪涌电流Inrush Current产生原因以及解决方案
  20. DVWA全关教程手册

热门文章

  1. SQLite允许向一个integer型字段中插入字符串
  2. matlab调用c函数语言,MATLAB调用C/C++函数的方法
  3. java radio_java radioButton
  4. 三,位操作类指令:包括逻辑运算指令,测试指令和移位指令
  5. 操作系统之文件管理:5、文件物理结构(连续分配、链式(显式、隐式)分配、索引分配(链接、多层索引、混合索引))
  6. 计算机组成原理题目题型总结)第三章:存储器
  7. 全面介绍Windows内存管理机制及C++内存分配实例(一):进程空间
  8. 面试题55 - I. 二叉树的深度
  9. docker 设置国内镜像源(网易、ustc、中国科技大学、阿里云容器)
  10. shell获取路径文件的文件名