一、介绍

1、什么是MQ

MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。

2、简介

RocketMQ是阿里巴巴旗下一款开源的MQ框架,2016年底捐赠给Apache开源基金会成为孵化项目,2017年正式成为了Apache顶级项目,作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

 常见的MQ主要有:ActiveMQ、RabbitMQ、Kafka、RocketMQ

3、RocketMQ 基本概念

RocketMQ主要有四大核心组成部分:NameServerBrokerProducer以及Consumer四部分。

 与其余组件说明如下:

  • Producer: 消息生产者,负责产生消息,一般由业务系统负责产生消息
  • Producer Group:消息生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者
  • Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费
  • Consumer Group:消费者组,和生产者类似,消费同一类消息的多个 Consumer 实例组成一个消费者组
  • Topic:主题,用于将消息按主题做划分,Producer将消息发往指定的Topic,Consumer订阅该Topic就可以收到这条消息
  • Message:消息,每个message必须指定一个topic,Message 还有一个可选的 Tag 设置,以便消费端可以基于 Tag 进行过滤消息
  • Tag:标签,子主题(二级分类)对topic的进一步细化,用于区分同一个主题下的不同业务的消息
  • Broker:Broker是RocketMQ的核心模块,负责接收并存储消息,同时提供Push/Pull接口来将消息发送给Consumer。Broker同时提供消息查询的功能,可以通过MessageID和MessageKey来查询消息。Borker会将自己的Topic配置信息实时同步到NameServer
    • push:推模式: 消息到达消息服务器之后,主动推送给消费者
    • pull:拉模式: 是消费端发起请求,主动向消息服务器(Broker)拉取消息
  • Queue:Topic和Queue是1对多的关系,一个Topic下可以包含多个Queue,主要用于负载均衡,Queue数量设置建议不要比消费者数少。发送消息时,用户只指定Topic,Producer会根据Topic的路由信息选择具体发到哪个Queue上。Consumer订阅消息时,会根据负载均衡策略决定订阅哪些Queue的消息
  • Offset:RocketMQ在存储消息时会为每个Topic下的每个Queue生成一个消息的索引文件,每个Queue都对应一个Offset记录当前Queue中消息条数
  • NameServer:NameServer可以看作是RocketMQ的注册中心,它管理两部分数据:集群的Topic-Queue的路由配置;Broker的实时配置信息。其它模块通过Nameserv提供的接口获取最新的Topic配置和路由信息;各 NameServer 之间不会互相通信, 各 NameServer 都有完整的路由信息,即无状态。

4、消费模式

广播模式

一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每一个Consumer都消费一次。

集群模式

一个Consumer Group中的所有Consumer平均分摊消费消息(一个消息只会被一个消费者消费)

5、消息类型

RocketMq消息类型分为三种:普通消息、顺序消息、事务消息

  • 普通消息:有三种发送方式

    • 单向发送:单向发送是指发送方只负责发送消息,不等待服务器回应,且没有回调函数触发。即只发送请求而不管响应。
    • 同步发送:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才会发送下一个数据包的通讯方式。
    • 异步发送:异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下一个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
  • 顺序消息

一般情况下,每个主题(topic)都会有多个消息队列(message queue),假设投递了同一个主题的十条消息,那么这十条消息会分散在不同的队列中。对于消费者而言,每个消息队列是等价的,就不能确保消息总体的顺序。而顺序消息的方案就是把这十条消息都投递到同一个消息队列中。顺序消息与普通消息同样有三种发送方式。

  • 事务消息

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。

        事务消息发送步骤:

  1. 发送方将半事务消息发送至RocketMQ服务端。
  2. RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功。由于消息为半事务消息,在未收到生产者对该消息的二次确认前,此消息被标记成“暂不能投递”状态。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

        事务消息回查机制:

  1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

二、集群搭建

1、集群模式

  • 单个Master

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。

  • 多Master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master
单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

  • 多Master多Slave模式,异步复制

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级。
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master宕机后,消费者仍然可以从Slave消费,此过程对应用透明,不需要人工干预。性能同多Master模式几乎一样。
缺点:Master宕机,磁盘损坏情况,会丢失少量消息。

  • 多Master多Slave模式,同步双写

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,主备都写成功,向应用返回成功。
优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高。
缺点:性能比异步复制模式略低,大约低10%左右。

**此次我们选择双主双从集群搭建~

2、搭建双主双从同步双写模式

服务器环境

序号 IP 角色 架构模式
1 192.168.116.128 nameserver、brokerserver Master1、Slave2
2 192.168.116.132 nameserver、brokerserver Master2、Slave1

两台服务器各启动一个nameserver,各启动两个broker~

步骤一(两台服务器):上传文件

上传rocketmq压缩包并解压

解压后移动至/usr/local/rocketmq/下,没有则创建上级目录

环境变量配置,加入如下:

vim /etc/profile
#set rocketmq
ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.5.1-bin-release
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
source /etc/profile

步骤二(128服务器):broker配置

① master1

vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-a.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=192.168.116.128:9876;192.168.116.132:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/master
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/master/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/master/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/master/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/master/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/master/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

② slave2

vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-b-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=192.168.116.128:9876;192.168.116.132:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/slave
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/slave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/slave/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/slave/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/slave/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/slave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

步骤三(132服务器):broker配置

① master2

vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-b.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=192.168.116.128:9876;192.168.116.132:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/master
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/master/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/master/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/master/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/master/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/master/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

② slave1

vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-a-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=192.168.116.128:9876;192.168.116.132:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/slave
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/slave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/slave/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/slave/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/slave/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/slave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

步骤四(两台服务器):修改启动脚本

vi /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/bin/runbroker.sh

找到如下修改:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/bin/runserver.sh

找到如下修改:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

步骤五(两台服务器):启动nameserver

切换到rocketmq目录bin目录下执行:

nohup sh mqnamesrv &

查看日志:

tail -f ~/logs/rocketmqlogs/namesrv.log

步骤六:启动broker

① 128服务器

nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a.properties &
tail -f ~/logs/rocketmqlogs/broker.log
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b-s.properties &
tail -f ~/logs/rocketmqlogs/broker.log

② 132服务器

nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b.properties &
tail -f ~/logs/rocketmqlogs/broker.log
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a-s.properties &
tail -f ~/logs/rocketmqlogs/broker.log

启动完成,通过jps查看java进程

关闭命令:

关闭namesrv服务:sh bin/mqshutdown namesrv
关闭broker服务 :sh bin/mqshutdown broker

三、安装可视化平台RocketMq-Dashboard

1、下载RocketMq-Dashboard源码

Git地址:https://github.com/apache/rocketmq-dashboard

加入idea运行修改yml如下:

 浏览器输入http://localhost:8080/

完结,本教程到这里就结束了,点赞关注(首席摸鱼师 微信同号)与我一起探索最新文章吧!

Linux教程:RocketMq介绍以及集群服务搭建(双主双从同步双写)并安装可视化平台RocketMq-Dashboard相关推荐

  1. [Eureka集群] 在linux上部署SpringCloudEureka的集群服务端(Dalston.SR5版本)

    搭配使用 logback日志配置: https://blog.csdn.net/a755199443/article/details/92208902 Eureka单机服务端配置: https://b ...

  2. 大数据介绍、集群环境搭建、Hadoop介绍、HDFS入门介绍

    大数据介绍.集群环境搭建.Hadoop介绍.HDFS入门介绍 文章目录 大数据介绍.集群环境搭建.Hadoop介绍.HDFS入门介绍 1.课前资料 2.课程整体介绍 3.大数据介绍 3.1 什么是大数 ...

  3. 学习笔记之-Kubernetes(K8S)介绍,集群环境搭建,Pod详解,Pod控制器详解,Service详解,数据存储,安全认证,DashBoard

    笔记来源于观看黑马程序员Kubernetes(K8S)教程 第一章 kubernetes介绍 应用部署方式演变 在部署应用程序的方式上,主要经历了三个时代: 传统部署:互联网早期,会直接将应用程序部署 ...

  4. zookeeper介绍及集群的搭建(利用虚拟机)

    ZooKeeper ​ ZooKeeper是一个分布式的,开放源码(apache)的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase.dubbox.kaf ...

  5. LVS负载均衡集群服务搭建详解(一)

    LVS概述 1.LVS:Linux Virtual Server 四层交换(路由):根据请求报文的目标IP和目标PORT将其转发至后端主机集群中的某台服务器(根据调度算法): 不能够实现应用层的负载均 ...

  6. LVS负载均衡集群服务搭建详解

    一.LVS概述  1.LVS:Linux Virtual Server 四层交换(路由):根据请求报文的目标IP和目标PORT将其转发至后端主机集群中的某台服务器(根据调度算法): 不能够实现应用层的 ...

  7. kafka介绍和集群环境搭建

    kafka概念:     kafka是一个高吞吐量的流式分布式消息系统,用来处理活动流数据.比方网页的訪问量pm,日志等,既可以实时处理大数据信息     也能离线处理.     特点:       ...

  8. 大数据系列(一)之hadoop介绍及集群搭建

    大数据系列(一)之hadoop介绍及集群搭建 文章最早发布来源,来源本人原创初版,同一个作者: https://mp.weixin.qq.com/s/fKuKRrpmHrKtxlCPY9rEYg 系列 ...

  9. Linux集群服务知识点总结及通过案例介绍如何实现高性能web服务

    转自:http://guodayong.blog.51cto.com/263451/1201101 一:集群相关概念及知识点介绍: LVS(Linux Virtual System) 本项目在1998 ...

最新文章

  1. 疫情过后人工智能是否能迎来春天?
  2. Ubuntu使用实录
  3. spark集群详细搭建过程及遇到的问题解决(三)
  4. 命令不识别_互助问答138期:GMM命令代码中如何识别年份国家及异方差检验问题...
  5. 郭明錤:iPhone 13 Pro系列将有1TB储存空间
  6. WIN8系统安装软件时提示“扩展属性不一致“的解决方法
  7. 源码方式安装最新版本snmp的过程
  8. 多元统计分析最短距离法_多元统计分析方法 -
  9. 选择率,基数计算公式
  10. macbook 连接不上蓝牙鼠标 m585 m590 解决方法
  11. Python Flask教程学习01
  12. STM32自学笔记15-步进电机驱动项目-磁编码器MT6816驱动
  13. shell-环境变量以及环境变量的配置文件
  14. 一文读懂大唐杯所有名词解释
  15. CorelDRAW X6+PhotoZoom这组合,无敌了啊!
  16. 如何实现windows自带输入法字体任意更改
  17. Retrofit流程及设计模式全解析
  18. 小程序将页面生成图片,并上传至阿里OSS
  19. 基于SSM的垃圾分类管理系统
  20. 金蝶k3界面,菜单栏只显示了按钮图标没显示出对应的文字

热门文章

  1. idea调试burpsuit插件
  2. C语言浮点数的发送和接收
  3. python urlopen函数的timeout_urllib.request.urlopen()函数详解
  4. 设计模式取舍之道:代码复杂度权衡
  5. 关于Google不能搜索之解决方案
  6. 2023年海外营销趋势:这6点跨境卖家必须了解
  7. 2022年全球及中国X射线源行业应用现状与十四五需求前景预测报告
  8. python走后端路线_python后端学习路线
  9. JavaScript大数据图表(堆积折线图)
  10. 字体分享Lucida Sans Typewriter