Java服务MQ消息队列容灾方案
背景介绍
在前一段时间线上出现过一次事故,一个非常重要的消息生产者服务,由于MQ出现问题,消息大量积压,导致了该服务线程被打满,外部请求返回502,服务采用Springboot搭建,使用Springboot的Tomcat容器。
原因分析
消息的生产者服务是一个高并发量的服务,接受外部方的接口调用,并将消息推送至MQ,调用流程示意图如下:
而事故当天的情况是,MQ消息大量积压,基本等同于MQ挂掉,
大量的请求积压在推送消息到MQ的地方,导致外部的大量的请求在Tomcat的线程池积压,当Tomcat的线程池全部被打满后,服务不能再接受新的请求进入,导致抛出大量的502错误。
容灾方案分析
一、发送消息超时时间设定
首先可以想到的是,在MQ发送消息处,设置推送消息的超时时间,超过超时时间,认为消息发送失败,将消息写入文件中,当时这个方案并没有根本上解决,如果MQ挂掉,Tomcat不被打满的问题,虽然可以解决目前的生产场景的情况,但是当后续请求量更大时候,不能保证Tomcat不被打满,同时,消息推送的超时时间的设定也不好进行把握,如果由于网络波动或其他情况,导致消息推送慢,但是是可以推送成功的,但是万一超过了超时时间,消息直接不会发送,反而会影响目前的业务逻辑。
二、调整Tomcat线程池大小
事故的起因是因为Tomcat被打满,那调整Tomcat的线程池大小,调整大一些不就可以了吗?但是这是一种治标不治本的方法,并没有根本上解决MQ挂掉后,Tomcat被打满的情况,只能是延迟了被打满的时间,但是根据目前线上机器配置的情况,Tomcat增大线程池大小并不是一个合适的选择。
三、启用异步
问题的根本是MQ挂掉,主线程全部卡在MQ发送消息的部分,那么是否可以考虑,将MQ发送消息的这个操作异步化,让Tomcat主线程不在此等待,而是转由异步线程执行发送消息的操作?这个方案看来还是比较靠谱的,这里我首先考虑引入线程池,进行异步化处理,
OK,方案确认,那么线程池的参数设置需要进行考虑,一般常规的线程池线程数设置为:CPU core * 2 +1,也有其他的线程池估算算法:估算线程池数目大小 ,这里我采用传统的设置方式,初始化线程池核心数为 CPU core * 2 +1,最大线程数:4 * (CPU core * 2 +1),阻塞队列:1000。
由于我们采用了线程池,那么对于线程池的监控是必须的,这里我设置为线程数达到最大线程数的80%会进行告警,因为这时候说明MQ推送消息可能已经出现堆积的情况了,下面给出代码的实例:
消息推送异步化:
@Component
public class MessageProducer InitializingBean {@Autowiredprivate MqService mqService;private static ThreadPoolExecutor pool = null;@Overridepublic void sendToMessageBus(String message) {//线程池异步处理try {pool.execute(() -> {try {//推送消息mqService.send(message);monitorThreadPool();} catch (Exception e) {log.error("send message to message bus error, cause : {}", e);handleFailMessage(message);}});} catch (Exception e) {log.error("commit send message to thread pool error, prepare to save message in file......");handleFailMessage(message);}}@Overridepublic void afterPropertiesSet() throws Exception {pool = new ThreadPoolExecutor(5, 20, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(1000),new ThreadPoolExecutor.AbortPolicy());//启动消息失败处理线程FailMessageHandlerProducer republishThread = new FailMessageHandlerProducer();republishThread.setName("republish-thread");republishThread.start();}public static void shutdown() {if (pool != null) {pool.shutdown();}}private void handleFailMessage(String message) {JSONObject jsonObject = new JSONObject();jsonObject.put("message", message);FailMessageHandlerProducer.pushEvent(jsonObject);}private void monitorThreadPool() {try {//一级告警,线程池当前活动线程数大于阈值if (pool.getActiveCount() > 16) {//告警处理}//二级告警,线程池阻塞队列当前对象数大于阈值if (pool.getQueue().size() > 100) {//告警处理}} catch (Exception e) {log.error("monitorThreadPool alarm error, cause : {}", e);}}
}
失败消息处理:
public class FailMessageHandlerProducer extends Thread {private static LinkedBlockingQueue<JSONObject> queue = new LinkedBlockingQueue<>(10000);private static volatile boolean terminate = false;private long timeout = 10;public FailMessageHandlerProducer() {}public static synchronized void pushEvent(JSONObject republishData) {try {queue.put(republishData);} catch (Exception e) {log.error("FailMessageHandlerProducer push error", e);}}@Overridepublic void run() {while (!terminate) {try {JSONObject republishData = queue.poll(timeout, TimeUnit.SECONDS);if (republishData != null) {//写入文件} else {//停顿2sThread.sleep(2 * 1000);}} catch (Exception e) {log.error("republish message error, cause : {}", e);}}}public static void setTerminate() {terminate = false;}
}
这里我采用了线程池进行异步化发送消息,当MQ挂掉或者推送消息特别慢的时候,线程池中的线程首先会进行积压,直到线程池最大线程数,在之后进入的线程会进入阻塞队列,当阻塞队列被打满后,线程池会抛出异常,捕获异常后将消息写入文件。关于线程池的机制可以看一下我的另一篇博文:Java ThreadPoolExecutor线程池概述
压力测试
压力测试的工具,我使用是Jmeter,Jmeter使用,在PC环境下的压力测试数据如下,环境Intel 八代i5 4核 + 16G,
条件 | 并发 | 请求次数 | 每秒吞吐 |
---|---|---|---|
使用线程池(初始化5,最大线程数20,阻塞队列1000) | 1000并发/1s | 2000 | 220.8 |
使用线程池(初始化5,最大线程数20,阻塞队列1000) | 1000并发/1s | 2000 | 257.2 |
使用线程池(初始化5,最大线程数20,阻塞队列1000) | 1000并发/1s | 2000 | 217.5 |
不使用线程池 | 1000并发/1s | 2000 | 16.6 |
使用线程池(初始化50,最大线程数100,阻塞队列1000) | 2000并发/1s | 10000 | 181 |
使用线程池(初始化30,最大线程数80,阻塞队列1000) | 2000并发/1s | 10000 | 159 |
不使用线程池 | 2000并发/1s | 6000 | 16.8 |
可以看到,在高并发场景下,没有线程池的场景,吞吐量差距非常巨大,但是也可以看到,线程池的线程数并不是越大越大的,需要根据服务器的配置情况,设定好合适的线程池配置。
结语
本文结合我自己遇见的一次线上事故,采取的容灾方案,这个方案肯定不是很完美或者或者说设计的很好的,因为当MQ挂掉后,很多消息会写入文件,将这部分消息重新处理也是一个比较麻烦的事情,本文就是一个抛砖引玉的给出一个大概思路,如果你有更好的方案,欢迎留言我们讨论!
Java服务MQ消息队列容灾方案相关推荐
- java使用mq教程,Java语言快速实现简单MQ消息队列服务
使用 JAVA 语言自己动手来写一个MQ (类似ActiveMQ,RabbitMQ) 主要角色 首先我们必须需要搞明白 MQ (消息队列) 中的三个基本角色 Producer Broker Consu ...
- 微信技术分享:微信的海量IM聊天消息序列号生成实践(容灾方案篇)
1.引言 在本文的上篇<微信技术分享:微信的海量IM聊天消息序列号生成实践(算法原理篇)>中介绍了微信的消息序列号生成器 seqsvr 的算法原理.架构核心思想,以及 seqsvr 随着业 ...
- 多维度对比5款主流分布式MQ消息队列,妈妈再也不担心我的技术选型了
1.引言 对于即时通讯网来说,所有的技术文章和资料都在围绕即时通讯这个技术方向进行整理和分享,这一次也不例外.对于即时通讯系统(包括IM.消息推送系统等)来说,MQ消息中件间是非常常见的基础软件,但市 ...
- Spring Boot:使用Rabbit MQ消息队列
综合概述 消息队列 消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级.对消息队列有写权限的进程可以向消息队列中按照一定的规则添加新消息,对消息队列有读权限的进程则可以 ...
- IM开发基础知识补课(五):通俗易懂,正确理解并用好MQ消息队列
1.引言 消息是互联网信息的一种表现形式,是人利用计算机进行信息传递的有效载体,比如即时通讯网坛友最熟悉的即时通讯消息就是其具体的表现形式之一. 消息从发送者到接收者的典型传递方式有两种: 1)一种我 ...
- MQ消息队列的使用场景
一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,Rabbit ...
- MQ消息队列使用场景
一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,Rabbit ...
- MQ消息队列之前置知识
目录 一.前置知识 二.MQ前置知识 1.MQ作用 2.MQ对比 3.MQ的缺点 4.产品选型 一.前置知识 当今互联网项目微服务架构成为主流,使用微服务架构,在高并发场景下,服务之间的通信问题是我们 ...
- 阿里云ACE共创空间——MQ消息队列产品测试
一.产品背景 消息队列是阿里巴巴集团自主研发的专业消息中间件. 产品基于高可用分布式集群技术,提供消息订阅和发布.消息轨迹查询.定时(延时)消息.资源统计.监控报警等一系列消息云服务,是企业级互联网架 ...
- MQ消息队列产品测试
2019独角兽企业重金招聘Python工程师标准>>> 一.产品背景 消息队列是阿里巴巴集团自主研发的专业消息中间件. 产品基于高可用分布式集群技术,提供消息订阅和发布.消息轨迹查询 ...
最新文章
- 【Python】趣学Python变量和赋值:大师兄和二师兄教的好~
- 了解和使用类库(47)
- 求 1000 以内的完数
- P3768 简单的数学题 [狄利克雷卷积,杜教筛,莫比乌斯反演]
- 工作288:根据时间戳处理接口
- 计算机技能大赛初赛主持稿,职业技能大赛开幕式主持词
- android开发——图文并茂
- 《图说VR入门》——360全景视频
- 人民币对美元汇率中间价报6.7343元 上调13个基点
- 激活win10专业版,桌面设置我的电脑,测试过可行
- Git如何合并分支到主干及合并主干到分支
- 两台计算机的ip地址怎么配置,同一台电脑如何设置两个IP地址?电脑配置双ip地址图文教程...
- 手机拍照打卡活动制作方案,通过拍照不聚集活动,函数参数(Function parameters)是在函数定义中所列的名称。
- 使用YOLOX进行物体检测
- java中根据权重随机获取数据
- 【转】 中国老话大全
- java使用httpclient发送POST请求【java基础】
- Ripple Labs和R3联盟在XRP代币诉讼中达成和解
- 语料库数据处理个案实例(计算机搭配强度、删除表中的停用词、词料检索的KWIC实现)
- Ubuntu 安装QQ for Linux 笔记