作者:WilsonHe

juejin.im/post/5ea159e4f265da47f0794da5

前言

MQ的主要特点为解耦、异步、削峰,该文章主要记录与分享个人在实际项目中的RocketMQ削峰用法,用于减少数据库压力的业务场景,其中RocketMQ的核心组件概念如下:

  • Producer:生产发送消息

  • Broker:存储Producer发送过来的消息

  • Consumer:从Broker拉取消息并进行消费

  • NameServer:为Producer或Consumer路由到Broker

其中消费流程有以下几点是必须注意的:

  • RocketMQ的Consumer获取消息是通过向Broker发送拉取请求获取的,而不是由Broker发送Consumer接收的方式。

  • Consumer每次拉取消息时消息都会被均匀分发到消息队列再进行传输,所以RocketMQ中的很多参数都是针对队列而不是Topic的(这个是重点,顺便吐槽下源码的文档讲的真不清晰,很多都需要自己试错,但Dashboard做得很好),其中每个Broker消息队列(ConsumeQueue)的数量都可以通过RocketMQ DashBoard实时更改调整。

rocketmq-spring-boot-starter 用法简介

当开发中需要快速集成RocketMQ时可以考虑使用 rocketmq-spring-boot-starter 搭建RocketMQ的集成环境,但该框架并不完全具备RocketMQ所有的配置简化,如需批量消费消息便需要自定义一个DefaultMQPushConsumer bean去消费了。

个人在开发中常用的rocketmq-spring-boot-starter相关类:

  • RocketMQListener接口:消费者都需实现该接口的消费方法onMessage(msg)。

  • RocketMQPushConsumerLifecycleListener接口:当@RocketMQMessageListener中的配置不足以满足我们的需求时,可以实现该接口直接更改消费者类DefaultMQPushConsumer配置

  • @RocketMQMessageListener:被该注解标注并实现了接口RocketMQListener的bean为一个消费者并监听指定topic队列中的消息,该注解中包含消费者的一些常用配置(大部分按默认即可),一般只需更改consumerGroup(消费组)与topic。

RocketMQMessageListener中的属性配置是可以使用Placeholder(占位符)从配置文件或配置中心获取的,如下图:

业务案例

有一个点赞业务,不限制用户的点赞数只需进行记录(产品需求,开发提议无效),当每个用户都进行x连击享受数量猛增的快感时如果数据库都需要进行x个点赞数据的插入,数据库毫无疑问会塞死导致崩溃。

于是想到可以尝试下MQ削峰,比如每秒来了5000消息但数据库只能承受2000,那我消费时每次只拉取消费1600就好了,剩下的放在Broker堆积慢慢消费就好。由于之前的消息中心也在用RocketMQ,于是确认使用RocketMQ来进行削峰。

环境配置

文章例子环境:1NameServer + 2Broker + 1Consumer

添加maven依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

application.yml配置

rocketmq:name-server: 127.0.0.1:9876producer:group: praise-group
server:port: 10000spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverusername: rootpassword: tigerurl: jdbc:mysql://localhost:3306/wilson
swagger:docket:base-package: io.rocket.consumer.controller

点赞接口

PraiseRecord(点赞记录):

@Data
public class PraiseRecord implements Serializable {private Long id;private Long uid;private Long liveId;private LocalDateTime createTime;
}

MessageController(简单的测试接口):

RestController
@RequestMapping("/message")
public class MessageController {@Resourceprivate RocketMQTemplate rocketMQTemplate;@PostMapping("/praise")public ServerResponse praise(@RequestBody PraiseRecordVO vo) {rocketMQTemplate.sendOneWay(RocketConstant.Topic.PRAISE_TOPIC, MessageBuilder.withPayload(vo).build());return ServerResponse.success();}// ......}

由于用户可以连续点赞,所以考虑可以在点赞消息的处理上宽松一点(容许消息丢失)以追求更高的性能,因此选择使用sendOneyWay()进行消息发送。Java知音公众号内回复“面试题聚合”,送你一份面试题宝典

RocketMQ的消息发送方式主要含syncSend()同步发送、asyncSend()异步发送、sendOneWay()三种方式,sendOneWay()也是异步发送,区别在于不需等待Broker返回确认,所以可能会存在信息丢失的状况,但吞吐量更高,具体需根据业务情况选用。

性能:sendOneWay > asyncSend > syncSend

RocketMQTemplate的send()方法默认是同步(syncSend)的,更多可看源码实现。

PraiseListener:点赞消息消费者

@Service
@RocketMQMessageListener(topic = RocketConstant.Topic.PRAISE_TOPIC, consumerGroup = RocketConstant.ConsumerGroup.PRAISE_CONSUMER)
@Slf4j
public class PraiseListener implements RocketMQListener<PraiseRecordVO>, RocketMQPushConsumerLifecycleListener {@Resourceprivate PraiseRecordService praiseRecordService;@Overridepublic void onMessage(PraiseRecordVO vo) {praiseRecordService.insert(vo.copyProperties(PraiseRecord::new));}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {// 每次拉取的间隔,单位为毫秒consumer.setPullInterval(2000);// 设置每次从队列中拉取的消息数为16consumer.setPullBatchSize(16);}
}

单次pull消息的最大数目受broker存储的MessageStoreConfig.maxTransferCountOnMessageInMemory(默认为32)值限制,即若想要消费者从队列拉取的消息数大于32有效(pullBatchSize>32)则需更改Broker的启动参数maxTransferCountOnMessageInMemory值。

在MQ削峰的配置参数里,以下几个DefaultMQPushConsumer的参数是需要注意一下的:

  • pullInterval:每次从Broker拉取消息的间隔,单位为毫秒

  • pullBatchSize:每次从Broker队列拉取到的消息数,该参数很容易让人误解,一开始我以为是每次拉取的消息总数,但测试过几次后确认了实质上是从每个队列的拉取数(源码上的注释文档真的很差,跟没有一样),即Consume每次拉取的消息总数如下:
    EachPullTotal=所有Broker上的写队列数和(writeQueueNums=readQueueNums) * pullBatchSize

  • consumeMessageBatchMaxSize:每次消费(即将多条消息合并为List消费)的最大消息数目,默认值为1,rocketmq-spring-boot-starter 目前不支持批量消费(2.1.0版本)

在消费者开始消息消费时会先从各队列中拉取一条消息进行消费,消费成功后再以每次pullBatchSize的数目进行拉取。

PraiseListener中设置了每次拉取的间隔为2s,每次从队列拉取的消息数为16,在搭建了2master broker且broker上writeQueueNums=readQueueNums=4的环境下每次拉取的消息理论数值为16 * 2 * 4 = 128,在第一次从各队列拉取1条消息(即共8条)后消费成功后会每次就会拉取最多128条消息进行消费,想验证下的可以把onMessage()insert()改为log.info("1")然后统计单位秒内打印的日志数是否为128。

根据以上配置单Conumer情况下每2s理论消费为128,即每2秒数据库新增的点赞数据大概为128条左右,有20%偏差都在个人可接受范围内,然后对点赞接口进行简单压测1s 2000请求校验MQ效果,根据消费配置理论上需要16次拉取即需32s才能消费完,压测后查看数据库校验效果:

由上图可以看出除第一次2s和最后一次2s外数据库每2s的插入数据数和一般都在128附近波动,也用了34s(因第一次拉取数较少所以比理论多花费一次拉取)消费的偏差大小可能会受每次拉取数pullBatchSize、Broker上的消息队列数、网络波动等情况影响,但需要的目的已经达到了。

我只想把单位时间内过多的数据库操作交给MQ做分隔成多个单位时间内的小批量操作,消息过多就堆积,当请求峰值过了后直到MQ堆积的消息消费完前数据库的插入数依旧会与峰值期的插入数相差不大,达到了MQ削峰填谷的效果。Java知音公众号内回复“面试题聚合”,送你一份面试题宝典

上线了但消费效率预估失误如何动态更改消费效率 ?

当把拉取数pullBatchSize设置Broker的默认最大传输值32了,线上又不想重启Broker更改maxTransferCountOnMessageInMemory参数,如有2个Broker且queue都为4,那么拉取消费效率才为32 * 2 * 4 = 256,如果想要动态调整,可以从Broker数或Broker队列数下手,可以将Broker的writeQueueNums、readQueueNums增大,如都改为8,那么效率就成了32 * 2 * 8 = 512

需要注意的是更改完queues后必须去Dashboard的Topic下的CONSUMER MANAGER查看新增的队列上是否都有Consumer成功注册上去了,因为遇到了在测试与生产上使用rocketmq-spring-boot-starter @RocketMQListener标注消费者不会自动注册到新队列上的情况,但没排除是不是RocketMQ版本的原因(个人本地的版本比环境上的高了一个小版本0.0.1,本地没出现没消费者注册到新队列上的问题),而是使用了自定义DefaultMQPushConsumer bean(原生的方式都是没有问题的)的备用方案。

当再启动新的消费者应用时CONSUMER MANAGER(下图)中就会出现 新Consumer数 * 各Broker队列数和的队列行。

如何使用RocketMQ批量消费 ?

虽然点赞业务使用MQ单条插入后TPS已经达到当前业务指标要求了,但考虑到如果后续要求在不添加机器数的情况下增加TPS,且数据量还没到分库分表的程度,个人就打算从批量消费下手,由一次插入一条点赞记录改为一次性插入多条(insertBatch)。

当然能满足现有需求能不做肯定不做的,过度优化过分碍事,但想多点方案不会坏事。rocketmq-spring-boot-starter并没有提供批量消费的功能,所以要批量消费消息需要自定义DefaultMQPushConsumer并配置其consumeMessageBatchMaxSize属性。

consumeMessageBatchMaxSize属性默认值为1,即每次只消费一条消息,需要注意的是该属性也会受pullBatchSize影响,如果consumeMessageBatchMaxSize为32但pullBatchSize只为12,那么每次批量消费的最大消息数也就只有12。

如下为个人测试批量消费Consumer的测试bean:

@Bean
public DefaultMQPushConsumer userMQPushConsumer() throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ConsumerGroup.SPRING_BOOT_USER_CONSUMER);consumer.setNamesrvAddr(nameServer);consumer.subscribe(RocketConstant.Topic.SPRING_BOOT_USER_TOPIC, "*");// 设置每次消息拉取的时间间隔,单位毫秒consumer.setPullInterval(1000);// 设置每个队列每次拉取的最大消息数consumer.setPullBatchSize(24);// 设置消费者单次批量消费的消息数目上限consumer.setConsumeMessageBatchMaxSize(12);consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)-> {List<UserInfo> userInfos = new ArrayList<>(msgs.size());Map<Integer, Integer> queueMsgMap = new HashMap<>(8);msgs.forEach(msg -> {userInfos.add(JSONObject.parseObject(msg.getBody(), UserInfo.class));queueMsgMap.compute(msg.getQueueId(), (key, val) -> val == null ? 1 : ++val);});log.info("userInfo size: {}, content: {}", userInfos.size(), userInfos);/*处理批量消息,如批量插入:userInfoMapper.insertBatch(userInfos);*/return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();return consumer;
}

如果默认配置情况下log打印出的userInfo size恒为1,但由于设置了consumeMessageBatchMaxSize与pullBatchSize,且pullBatchSize较小,所以每次消费的消息数最大值为12,如下图:

附本文相关信息

确保mqnamesrv与mqbroker已启动成功,如该文章环境的启动:

mqnamesrv -n 127.0.0.1:9876
mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-a.properties
mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-b.properties

END

Java面试题专栏

【61期】MySQL行锁和表锁的含义及区别(MySQL面试第四弹)

【62期】解释一下MySQL中内连接,外连接等的区别(MySQL面试第五弹)

【63期】谈谈MySQL 索引,B+树原理,以及建索引的几大原则(MySQL面试第六弹)

【64期】MySQL 服务占用cpu 100%,如何排查问题? (MySQL面试第七弹)

【65期】Spring的IOC是啥?有什么好处?

【66期】Java容器面试题:谈谈你对 HashMap 的理解

【67期】谈谈ConcurrentHashMap是如何保证线程安全的?

【68期】面试官:对并发熟悉吗?说说Synchronized及实现原理

【69期】面试官:对并发熟悉吗?谈谈线程间的协作(wait/notify/sleep/yield/join)

【70期】面试官:对并发熟悉吗?谈谈对volatile的使用及其原理

我知道你 “在看”

实战 RocketMQ 流量削峰,怎么能错过这一篇!相关推荐

  1. springboot集成rabbitmq商品秒杀业务实战(流量削峰)

    消息队列如何实现流量削峰? 要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送 ...

  2. 超级简单的 RocketMQ 流量削峰实战

    知道的越多,不知道的就越多,业余的像一棵小草! 编辑:业余草 来源:https://www.xttblog.com/?p=4996 前言 MQ的主要特点为解耦.异步.削峰,该文章主要记录与分享个人在实 ...

  3. 消息中间件系列(六):什么是流量削峰?如何解决秒杀业务的削峰场景

    流量削峰的由来 主要是还是来自于互联网的业务场景,例如,马上即将开始的春节火车票抢购,大量的用户需要同一时间去抢购:以及大家熟知的阿里双11秒杀, 短时间上亿的用户涌入,瞬间流量巨大(高并发),比如: ...

  4. 高并发架构系列:什么是流量削峰?如何解决秒杀业务的削峰场景

    流量削峰的由来 主要是还是来自于互联网的业务场景,例如,马上即将开始的春节火车票抢购,大量的用户需要同一时间去抢购:以及大家熟知的阿里双11秒杀, 短时间上亿的用户涌入,瞬间流量巨大(高并发),比如: ...

  5. 怎样来实现流量削峰方案

    流量削峰的由来 主要是还是来自于互联网的业务场景,例如,马上即将开始的春节火车票抢购,大量的用户需要同一时间去抢购:以及大家熟知的阿里双11秒杀, 短时间上亿的用户涌入,瞬间流量巨大(高并发),比如: ...

  6. 秒杀系统流量削峰,这事应该怎么做?

    如果你看过秒杀系统的流量监控图的话,你会发现它是一条直线,就在秒杀开始那一秒是一条很 直很直的线,这是因为秒杀请求在时间上高度集中于某一特定的时间点.这样一来,就会导致一 个特别高的流量峰值,它对资源 ...

  7. java 最少使用(lru)置换算法_一篇文章学会如何基于LRU-K算法设计本地缓存实现流量削峰...

    专注于Java领域优质技术号,欢迎关注 作者:一个Java菜鸟 1.背景介绍 1.1.现象 QPS突然增长2倍以上(45w~60w每分钟) 将产生下面一些问题: 1)响应接口响应时长增加了5倍(qps ...

  8. 架构设计 | 高并发流量削峰,共享资源加锁机制

    本文源码:GitHub·点这里 || GitEE·点这里 一.高并发简介 在互联网的业务架构中,高并发是最难处理的业务之一,常见的使用场景:秒杀,抢购,订票系统:高并发的流程中需要处理的复杂问题非常多 ...

  9. 【商城秒杀项目】-- 流量削峰应该怎么做

    如果你看过秒杀系统的流量监控图的话,你会发现它是一条直线,在秒杀开始那一秒是一条很直很直的线,这是因为秒杀请求在时间上高度集中于某一特定的时间点.这样一来,就会导致一个特别高的流量峰值,它对资源的消耗 ...

最新文章

  1. MaxCompute 多行数据合并为一行数据
  2. 【BFS】【并查集】【Tarjan】【LCA】Gym - 101173H - Hangar Hurdles
  3. SLF4J with Logback in a Maven Project | Mograblog
  4. CBOW模型正向传播、矩阵乘积层实现
  5. 84-java版spark2.x读取es6.x
  6. Security+ 学习笔记25 硬件与数据安全
  7. H5混合开发二维码扫描以及调用本地摄像头
  8. 股票财务指标数据获取,附代码
  9. SCADA和三大工业控制系统PLC、DCS、FCS
  10. OpenWrt 18.06.1的ss-redir, 以及在乐视超4 X40上看Youtube
  11. 【干货分享】硬件测试工程师必备基本技能,看这一篇就够!
  12. A Self-paced Multiple-instance Learning Framework for Co-saliency Detection文章阅读
  13. Spark Steaming管理kafka的offset
  14. Gym - 101350E Competitive Seagulls——博弈
  15. 漫画:程序员独特的暖心瞬间
  16. 渗透bc网站教学_督导引领共交流 听课评课共成长 怀化锦溪小学开展课堂教学督导听课活动_都市新闻_新闻中心...
  17. AI虚拟人物 数字人直播,不用出镜,不用露脸的直播方式(附教程 软件)
  18. (转)视觉工程师笔试知识汇总
  19. 【Latex】Latex插入代码块
  20. 基于STM32的煤矿井下探测系统

热门文章

  1. “浴霸”改“花洒”?华为Mate 30最新保护壳谍照曝光...
  2. 疑似谷歌Pixel 4真机谍照曝光:边框宽到没朋友
  3. 特斯拉Model 3本周平均日产约900辆 7000辆周产量有望
  4. 高通芯片曾被发现一重大漏洞 影响骁龙845等30多款芯片
  5. 实习生使用微软雅黑致公司被起诉索赔千万?方正回应...
  6. json.dumps loads 终于区分出来了
  7. Java成神之路——ASM,Javassist,cglib区别。
  8. 常用加密算法之非对称加密算法
  9. java代码加载_java类中代码加载顺序
  10. python怎样画动态文字_Python制作动态字符图的实例