本文来说下RocketMQ,RocketMQ是现在使用的最多的MQ之一了

文章目录

  • 概述
  • 优点
  • 如何保证高可用
  • RocketMq消费者消费模式有几种
  • RocketMq的消息是有序的吗
  • RocketMq事务消息的实现机制
  • RocketMq会有重复消费的问题吗?如何解决
  • RocketMq延迟消息?如何实现的
  • RocketMq是推模型还是拉模型
  • RocketMq的负载均衡
  • RocketMq消息积压问题
  • 为什么要自己写NameServer而不用Zk呢
  • 四种集群方式
  • 在Broker扩容的时候会影响到其他的Broker使用吗
  • 单机版本中如何增加RocketMQ的吞吐量
  • 如何保证RocketMQ不丢失消息
  • 保证幂等性的几种方法

概述

RocketMQ是一款分布式消息中间件,最初是由阿里巴巴消息中间件团队研发并大规模应用于生产系统,满足线上海量消息堆积的需求, 在2016年底捐赠给Apache开源基金会成为孵化项目,经过不到一年时间正式成为了Apache顶级项目;早期阿里曾经基于ActiveMQ研发消息系统, 随着业务消息的规模增大,瓶颈逐渐显现,后来也考虑过Kafka,但因为在低延迟和高可靠性方面没有选择,最后才自主研发了RocketMQ, 各方面的性能都比目前已有的消息队列要好,RocketMQ和Kafka在概念和原理上都非常相似,所以也经常被拿来对比;RocketMQ默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。


优点

RocketMQ 是阿里巴巴开发的一款开源的消息中间件,具有集群消费、广播消费、消息积压能力强、防止消息丢失、顺序消息、事务型消息、保证高可用、高性能读写数据等优点。


如何保证高可用

1)master和slave 配合,master 支持读、写,slave 只读,producer 只能和 master 连接写入消息,consumer 可以连接 master 和 slave。

2)当 master 不可用或者繁忙时,consumer 会被自动切换到 slave 读。即使 master 出现故障,consumer 仍然可以从 slave 读消息,不受影响。

3)创建 topic 时,把 message queue 创建在多个 broker 组上(brokerName 一样,brokerId 不同),当一个 broker 组的 master 不可用后,其他组的 master 仍然可以用,producer 可以继续发消息。


RocketMq消费者消费模式有几种

集群消费

一条消息只会投递到一个 Consumer Group 下面的一个实例。

广播消费

消息将对一个Consumer Group 下的各个 Consumer 实例都投递一遍。即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。


RocketMq的消息是有序的吗

一个topic下有多个queue,为了保证发送有序,rocketmq提供了MessageQueueSelector队列选择机制

1)可使用hash取模法,让同一个订单发送到同一个queue中,再使用同步发送,只有消息A发送成功,再发送消息B

2)rocketmq的topic内的队列机制,可以保证存储满足FIFO,剩下的只需要消费者顺序消费即可

3)rocketmq仅保证顺序发送,顺序消费由消费者业务保证


RocketMq事务消息的实现机制

RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址

RocketMQ第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改消息的状态。

RocketMQ会定期扫描消息集群中的事务消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。


RocketMq会有重复消费的问题吗?如何解决

在网络中断的情况下可能出现,需要保证消费端处理消息的业务逻辑保持幂等性


RocketMq延迟消息?如何实现的

RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时。默认的配置是messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

Message msg = new Message(topic, tags, keys, body);msg.setDelayTimeLevel(3);

RocketMq是推模型还是拉模型

rocketmq不管是推模式还是拉模式底层都是拉模式,推模式也是在拉模式上做了一层封装.。

消息存储在broker中,通过topic和tags区分消息队列。producer在发送消息时不关心consumer对应的topic和tags,只将消息发送到对应broker的对应topic和tags中。

推模式中broker则需要知道哪些consumer拥有哪些topic和tags,但在consumer重启或更换topic时,broker无法及时获取信息,可能将消息推送到旧的consumer中。对应consumer主动获取topic,这样确保每次主动获取时他对应的topic信息都是最新的。


RocketMq的负载均衡

生产者负载均衡

从MessageQueue列表中随机选择一个(默认策略),通过自增随机数对列表大小取余获取位置信息,但获得的MessageQueue所在的集群不能是上次的失败集群。

集群超时容忍策略,先随机选择一个MessageQueue,如果因为超时等异常发送失败,会优先选择该broker集群下其他的messeagequeue进行发送。如果没有找到则从之前发送失败broker集群中选择一个MessageQueue进行发送,如果还没有找到则使用默认策略。

消费者负载均衡

1)平均分配策略(默认)(AllocateMessageQueueAveragely)

2)环形分配策略(AllocateMessageQueueAveragelyByCircle)

3)手动配置分配策略(AllocateMessageQueueByConfig)

4)机房分配策略(AllocateMessageQueueByMachineRoom)

5)一致性哈希分配策略(AllocateMessageQueueConsistentHash)

6)靠近机房策略(AllocateMachineRoomNearby)


RocketMq消息积压问题

提高消费并行读

同一个Consumer Group下,通过增加Consumer实例的数量来提高并行度,超过订阅队列数的Consumer实例无效。

提高单个Consumer的消费并行线程,通过修改Consumer的consumerThreadMin和consumerThreadMax来设置线程数。

批量方式消费

通过设置Consumer的consumerMessageBathMaxSize这个参数,默认是1,一次只消费一条消息,例如设置N,那么每次消费的消息条数小于等于N

丢弃非重要消息

当消息发生堆积时,如果消费速度跟不上生产速度,可以选择丢弃一些不重要的消息

优化消息消费的过程

对于消费消息的过程一般包括业务处理以及跟数据库的交互,可以试着通过一些其他的方法优化消费的逻辑。

临时解决方案:

新建一个topic,写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的queue中。临时用一部分机器来部署consumer,每一批consumer消费一个临时queue的数据。等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息。


为什么要自己写NameServer而不用Zk呢

1、NameServer是自己写的,方便扩展,去中心化,只要有一个NameServer在,整个注册中心环境就可以用

2、Zk选举需要满足过半机制才可以使用


四种集群方式

四种集群方式

1、单个Master节点:负载压力非常大,如果宕机的话,数据可能会丢失

2、多个Master阶段:分摊存储数据,但是没有Slave节点的话,宕机的情况下数据可能会丢失

3、多Master和多Slave节点,同步形式实现主从数据同步,在生产者将消息存放到主再同步到备Broker中才返回ack确认消息投递成功

4、多Master和多Slave节点,异步形式实现主从数据同步,在生产者将消息存放到主,返回ack确认消息投递成功,异步同步到备Broker中,效率高,但是数据可能会丢失


在Broker扩容的时候会影响到其他的Broker使用吗

不会,因为生产者是通过NameServer中注册的节点数通过轮询来实现数据的存放,节点数没有写死。可以缩容,但是前提是Broker中的消息要被消费完。


单机版本中如何增加RocketMQ的吞吐量

只需要增加队列和消费者


如何保证RocketMQ不丢失消息

一条消息从生产到被消费,将会经历三个阶段


生产阶段,Producer 新建消息,然后通过网络将消息投递给 MQ Broker

存储阶段,消息将会存储在 Broker 端磁盘中

消费阶段, Consumer 将会从 Broker 拉取消息

以上任一阶段都可能会丢失消息,只要找到这三个阶段丢失消息的原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题。

生产阶段

生产者(Producer) 通过网络发送消息给 Broker,当 Broker 收到之后,将会返回确认响应信息给 Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。发送消息的方式有同步和异步2种方式,不管是同步还是异步,都会碰到网络问题导致发送失败的情况。针对这种情况,我们可以设置合理的重试次数,当出现网络问题,可以自动重试。

Broker存储阶段

默认情况下,消息只要到了 Broker 端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后 Broker 定期批量的将一组消息从内存异步刷入磁盘。

这种方式减少 I/O 次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。

若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。

修改 Broker 端配置如下:

默认情况为 ASYNC_FLUSH
flushDiskType = SYNC_FLUSH

若Broker未在同步刷盘时间内(默认为5s)完成刷盘,将会返回 SendStatus.FLUSH_DISK_TIMEOUT 状态给生产者。

集群部署

为了保证可用性,Broker 通常采用一主(master)多从(slave)部署方式。为了保证消息不丢失,消息还需要复制到 slave 节点。

默认方式下,消息写入 master 成功,就可以返回确认响应给生产者,接着消息将会异步复制到 slave 节点。

注:master 配置:flushDiskType = SYNC_FLUSH

此时若 master 突然宕机且不可恢复,那么还未复制到 slave 的消息将会丢失。

为了进一步提高消息的可靠性,我们可以采用同步的复制方式,master 节点将会同步等待 slave 节点复制完成,才会返回确认响应。

同时这个过程我们还需要生产者配合,判断返回状态是否是 SendStatus.SEND_OK。若是其他状态,就需要考虑补偿重试。

虽然上述配置提高消息的高可靠性,但是会降低性能,生产实践中需要综合选择。

消费阶段

消费者从 broker 拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态给 Broker。

如果Broker未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。

我们需要注意返回消息状态。只有当业务逻辑真正执行成功,我们才能返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS,否则我们需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后再重试。

总结

虽然以上方法提高了消息的可靠性,但是可能导致消息重发,重复消费

所以对于消费客户端,需要注意保证幂等性


保证幂等性的几种方法

保证幂等性的几种方法:

1)可以在redis里面设置一个标记key,例如给已经下过的订单做个下单标记,每次下单时可以检查是否存在这个key,有就代表下过单了,直接返回;没有则继续下单。

2)另一种方法在数据库里做查询,在插入记录之前。或者使用数据库的唯一索引机制,保证唯一性。

3)使用zookeeper的目录节点等等,道理都是一样的,就是在业务执行前做标记检查(去重)。

RocketMQ常见面试题相关推荐

  1. RocketMQ 常见面试题

    RocketMQ 常见面试题 1.RocketMQ Broker中的消息被消费后会立即删除吗? 不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信 ...

  2. 「高级java工程师」常见面试题及其答案(持续更新)

    「java工程师」常见面试题及其答案请见: 「java工程师」常见面试题及其答案(持续更新)_好人老李的博客-CSDN博客 目录 java基础 常用的 jvm 调优方法? OOM的常见场景及其原因.解 ...

  3. 北大java面试,北大青鸟java 面试--常见面试题(下)

    在之前的两篇文章中,我们已经提到了java面试中的常见问题,还有部分内容,合肥北大青鸟合工大校区的袁老师在本文也给出,希望对大家的面试过程有些帮助.这是我总结的最后一部分常见面试题:分别是数据库,基础 ...

  4. 「java工程师」常见面试题及其答案(持续更新)

    「高级java工程师」常见面试题及其答案: 「高级java工程师」常见面试题及其答案(持续更新)_好人老李的博客-CSDN博客 目录 java基础 面向对象与面向过程的区别? JRE.JDK.JVM的 ...

  5. RabbitMQ消息队列常见面试题总结

    1.什么是消息队列: 1.1.消息队列的优点: (1)解耦:将系统按照不同的业务功能拆分出来,消息生产者只管把消息发布到 MQ 中而不用管谁来取,消息消费者只管从 MQ 中取消息而不管是谁发布的.消息 ...

  6. 300+ Java常见面试题总结【JavaPub版】

    点赞再看,养成习惯 答案解析见文末 我是JavaPub,专注于面试.副业,技术人的成长记录. 这份[Java常见面试题总结]我想准备很久了,前面做面试官,后来自己也面了很多一线二线互联网公司,希望通过 ...

  7. mysql关于时间的面试题,mysql时间设置默认值MySQL常见面试题

    1.limit(选出10 到20 条) select * from students order by id limit 9,10; 2.MySQL 会使用索引的操作符号 =,>,=,betwe ...

  8. java类型转换面试题_JavaSE:数据类型之间的转换(附常见面试题)

    数据类型之间的转换 分为以下几种情况: 1)低级到高级的自动类型转换: 2)高级到低级的强制类型转换(会导致溢出或丢失精度): 3)基本类型向类类型转换: 4)基本类型向字符串的转换: 5)类类型向字 ...

  9. Spring常见面试题及答案汇总1000道(春招+秋招+社招)

    Spring面试题以及答案整理[最新版]Spring高级面试题大全(2021版),发现网上很多Spring面试题都没有答案,所以花了很长时间搜集,本套Spring面试题大全,汇总了大量经典的Sprin ...

最新文章

  1. nacos如何做配置中心?自带自动刷新配置功能?这一篇文章让你明明白白!
  2. CF626E. Simple Skewness
  3. spring整合atomikos实现分布式事务的方法示例_分布式事务中的XA和JTA
  4. 今天,我要教妹子学会Spring:Aware、异步编程、计划任务
  5. spring boot jwt_springboot整合JWT
  6. flume存储到mysql_flume_实现自定义MysqlSink,写入mysql表
  7. Python数据结构与算法笔记(七):数据结构——队列,链表和哈希表
  8. ffmpeg视频裁剪,切割,crop裁剪相关
  9. Jmeter 线程数、Ramp-Up、循环次数 详解
  10. 互联网域名系统国家工程中心(ZDNS)正式运营“.ren”顶级域名
  11. ligerui中的一些知识点
  12. LeetCode1-580题汇总
  13. Html中文本域中加图片,如何在文本框中加图片
  14. vue渐进式框架的理解
  15. 广度优先搜索算法带图详解
  16. 我为 Redis 找到了一个新家——Redis 之父当年的困兽之斗
  17. Codeforces Round #767 (Div. 2)题解
  18. 怎么分析某个明星或者公众人物ins的数据?
  19. 休斯敦大学计算机学院网址,美国休斯敦大学Jiming Bao教授受邀到我院访问并做精彩报告...
  20. Java包名的命名规则

热门文章

  1. 我是不是在浪费生命?
  2. 双系统的电脑中如何完美系统其中一个操作系统
  3. 网络强制消费案例剖析
  4. IHttpModule IHttpHandler
  5. 感恩节快乐,PM2小窍门致NodeJS开发者!
  6. 新功能:阿里云负载均衡SLB支持HTTP/HTTPS超时时间自定义功能
  7. JDBC连接数据库:单线程、多线程、批处理插入数据的对比
  8. macOS下利用dSYM文件将crash文件中的内存地址转换为可读符号
  9. 用Dalvik指令集写个java类
  10. [转]Oracle中字符集的类型决定varchar2的字符长度