分布式消息队列RocketMQ

一、RocketMQ简介

RocketMQ(火箭MQ) 出自于阿里,后开源给apache成为apache的顶级开源项目之一,顶住了淘宝10年的 双11压力 是电商产品的不二选择 (略微有点夸张)

1、MQ概述

Message Queue,是一种提供消息队列服务的中间件,也成为消息中间件,是一套提供了消息生产、存储、消费全过程API的软件系统

2、MQ用途

(1)、限流削峰

系统A每秒只能处理50请求 一般来讲如过收到请求大于处理请求,则多余请求会舍去。如果加入MQ 多出来的请求就会存储在MQ中,每秒向系统A发送50请求

(2)、异步解耦

用户请求上游系统模块,上游再去调用下游,在下游做出回应之前,一直处于等待状态,这也是同步调用。

加入MQ后则不用等待一条下单链结束后才返回结果给用户,让库存系统和邮件系统可以同时执行。

可以看出原本需要 50+50+50 = 150ms的下单链 现在只需要 50+50 = 100ms就可以解决

(3)、数据收集

分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为(用户点了哪里,看了什么。。)等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台必备的计数。通过MQ完成此类数据收集是最好的选择

3、常见的MQ

(1)、ActiveMQ

使用java开发的一款MQ产品。早些年公司项目都在使用,现在社区活跃度低。现在的项目中已经很少使用了

(2)、RabbitMQ

使用Erlang开发的一款MQ产品。吞吐量较kafka和RocketMQ较低,由于不是使用java语言开发。因此对其定制化开发较难

(3)、kafaka

使用Erlang/java开发的一款MQ产品。其最大的特点就是吞吐率高,常用于大数据领域实时计算、日志采集等场景。没有遵循任何的MQ协议,而是使用自研协议

(4)、RocketMQ

使用java开发的一款MQ产品。是阿里基于kafka开发而来。经过数年阿里双11考研,性能与稳定性非常高,没有遵循任何常见MQ协议(与基于kafka开发有关系),而是使用自研协议

4、常见MQ协议

因主要学习的 kafaka/RocketMQ 并不遵循常见的MQ协议 因此先放一放。

二、RocketMQ安装

1、基本概念

(1)、消息(Message)

消息是指消息系统所传输信息的无力再提,生产和消费数据的最小单位,每条消息必须属于一个主题

(2)、主题(Topic)

Topic(主题)可以看做消息的规类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。

Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。
一个 Topic 也可以被 0个、1个、多个消费者订阅。

(3)、标签(Tag)

Tag(标签)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag

(4)、队列(Queue)

存储消息的物理实体。一个 Topic可以包含多个Queue,每个Queue中存放的就是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的分区

一个 Topic 中 一个 Queue 的消息只能被一个 消费者组 中的消费者消费

分片(Sharding)不同于分区。在RocketMQ中,分片指的是存放相应 Topic 的 Broker(主机)。每个分片中会创建出相应数量的分区,即 Queue,每个Queue的大小都是相同的

(5)、消息标识(MessageID/Key)

RocketMQ中每个消息拥有MessageID,且可以携带具有业务标识的Key,以便对消息的查询。MessageID有两个:在生产者 send() 消息时会自动生成一个MessageID(msgID),当消息到达Broker后,Broker也会自动生成一个MessageID(offsetMsgID)。msgID、offsetMsgID与key都成为消息标识

msgID:由producer生成,规则为:

producerIP + 进程pid + MessageClientIdSetter类的ClassLoder的hashcode + 当前时间 + AutomicInteger自增计数器

offsetMsgID:由Broker端生成,规则为:

brokerIP + 物理分区的offset

key:由用户指定的业务相关的唯一标识

2、系统架构

(1)、Producer

消息生产者,负责产生消息,一般由业务系统负责产生消息。

  • Producer由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。
  • RocketMQ 提供了三种方式发送消息:同步、异步和单向
  • 同步发送:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。
  • 异步发送:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。
  • 单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。

(2)、Consumer

消息消费者,负责消费消息,一般是后台系统负责异步消费。消费时会均分。

  • Consumer也由用户部署,支持PUSH和PULL两种消费模式,支持集群消费广播消息,提供实时的消息订阅机制
  • Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型。
  • Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。
  • 消费者组中Consumer的数量应该小于等于订阅Topic的Queue数量。如果超出Queue数量,则多出的Consumer将不能消费消息。反之,一个Topic类型的消息可以被多个消费者组消费

(3)、Name Server

①、功能介绍

NameServer是一个Broker与Topic路由的注册中心,支持Broker动态注册与发现。

②、路由注册

NameServer是无状态的。在Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。在NameServer内部维护着一个Broker列表,用来动态存储Broker的信息。以此保证节点中的数据同步。(zk等与之相反,通知一个让其内部自己同步。麻烦,注册是简单了,如果扩容NameServer需要修改Broker指定新加的NameServer)

Broker节点会每30s发送一次心跳,将最新的的信息以心跳包的方式上报NameServer。心跳包含BrokerID、Broker地址(Ip+Port)、Broker名称、Broker所属集群名称…,NameServer接受心跳包后,会更新心跳时间戳,记录Broker最新存活时间

③、路由剔除

如果NameServer没有收到Broker的心跳,NameServer可能会将其从Broker中剔除

NameServer中有一个定时任务,每隔10s会扫描Broker表,查看每一个Broker最新心跳时间距离当前是否超过120s,如果超过,则会判定Broker失效,然后将其从Broker列表中剔除。(RocketMQ没有自我保护机制)

若要停掉Broker工作,需将Broker读写权限禁用,Client等向Broker发送请求会受到NO_PERMISSION响应,然后Client会进行对其他Broker重试。

④、路由发现

RocketMQ采用的是Pull模型。当Topic路由信息发生变化时,NameServer不会主动推送给客户端,而是客户端定时拉去主题最新的路由。默认科幻段每30s拉取一次最新的路由。

1、push模型:实时性好,是一个发布订阅模型,需要维护一个长连接。耗费资源。

2、pull模型:实时性较差

3、Long Polling模型:长轮询模型。整合pull和push,维护一个长连接指定时间再去释放长连接

⑤、客户端NameServer选择策略

客户端首先生成一个随机数,在与NameServer节点数量取模,此时得到就是所要连接的节点索引,然后就会进行连接。如果连接失败,则会采用round-robin,逐个尝试去连接其他节点。首先采用随即策略,失败后采用轮询

(4)、Broker

①、功能介绍

消息中转角色,负责存储消息,转发消息。

  • Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,顺带一提底层的通信和连接都是基于Netty实现的。
  • Broker负责消息存储,以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。
  • 官网上有数据显示:具有上亿级消息堆积能力,同时可严格保证消息的有序性

②、模块构成

**Remoting Module:**整个Broker的尸体,负责处理来自Clients端的请求。而这个Broker实体则由以下模块构成。

**Client Manager:**客户端管理器。负责接受、解析客户端(Producer/Consumer)请求,管理客户端。例如:维护Consumer的Topuc订阅信息

Store Service:存储服务。提供方便简单的API接口,处理消息存储到物理硬盘 和 消息查询 功能

HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能

**Index Service:**索引服务。根据特定的Message key,投递到Broker的消息进行索引服务,同时也提供根据Message Key对消息进行快速查询的功能

③、集群部署

主备集群,Master挂掉后会启动Slave。Master与Slave的对应关系是通过指定相同的BrokerName、不同的BrokerId来确定。BrokerId为0表示Master,非0表示Slave。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer。

(5)、工作流程

1、启动NameServer,启动后开始监听端口,等待Broker,Producer,Consumer连接

2、启动Broker时,Broker会与所有的NameServer建立并保持长连接,然后每隔30s向NameServer发送心跳包

3、收发消息前,可先创Topic,创建时需要指定该Topic要存储在那些Broker上,在次高压部分华北Topic时也会将Topic与Broker的关系写到NameServer中。 此步骤时可选的,也可以在发送消息时自动创建Topic

4、Producer发送消息,启动时先跟NameServer集群中其中一台建立长连接,并从NameServer中获取路由信息,即当前发送的Topic的Queue与Broker地址(Ip+Port)的映射关系。然后根据算法策略从队列选择一个Queue,与队列所在的Broker建立长连接从而向Broker发消息。在获取到路由信息后,Producer会首先将路由信息缓存到本地,再每30s从NamesServer更新一次路由信息。

5、Consumer与Producer类似,跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息,然后根据算法策略从路由信息中获取其所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中的消息。Consumer再获取到路由信息后,同样也会每30s从NameServer更新一次路由信息。不同于Producer的是,Consumer还会向Broker发送心跳,以确保Broker的存活状态。

Topic创建模式

Topic手动创建模式有两种

集群模式:该模式下创建的Topic在该集群中,所有Broker中的Queue数量相同。

**Broker模式:**该模式下创建的Topic在该集群中,每个Broker中的Queue数量可以不同。

Topic自动创建模式有一种

默认采用的是**Broker模式:**会为每个Broker默认创建4个Queue(配置文件中取的4)

读写队列

读写队列可以不同,例:可写队列设置为16,可读队列写为14,那么生产者可以把消息卸载0-15的队列中,而消费者只能消费0-14中队列的消息。

为何如此设计?

因为这么搞可以方便缩容,例:可写可读队列都为16,现在只保存8个队列,可以先改可写队列为8,待消费者消费完可读队列中8-15的消息后在调整可读队列为8,这样一来整个缩容过程没有任何消息丢失。

三、单机的安装与启动

1、安装RocketMQ

在官网下载安装RocketMQ和JDK并解压

注:RocketMQ是zip 需要安装unzip才能解压。

官网

2、修改配置

注:RocketMQ适配于J8,J8以上需要修改配置文件

例如:RocketMQ4.9.2(2021-1)

修改RocketMQ的bin目录下的 runserver.sh、runbroker.sh、tools.sh

(1)、runserver.sh

删除:

  • -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy
  • JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
  • JAVA_OPT="JAVAOPT−Djava.ext.dirs={JAVA_OPT} -Djava.ext.dirs=JAVAO​PT−Djava.ext.dirs={JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"(删除带有Djava哪一行就行不同版本不一样)

更改

  • CLASSPATH为:BASEDIR/lib/rocketmq−broker−4.5.1.jar:{BASE_DIR}/lib/rocketmq-broker-4.5.1.jar:BASED​IR/lib/rocketmq−broker−4.5.1.jar:{BASE_DIR}/lib/*:BASEDIR/conf:{BASE_DIR}/conf:BASED​IR/conf:{CLASSPATH} rocketmq-broker-版本号.jar

4.9.2改好的

#!/bin/sh# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.#===========================================================================================
# Java Environment Setting
#===========================================================================================
error_exit ()
{echo "ERROR: $1 !!"exit 1
}[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=${BASE_DIR}/lib/rocketmq-broker-4.9.2.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}#===========================================================================================
# JVM Configuration
#===========================================================================================
# The RAMDisk initializing size in MB on Darwin OS for gc-log
DIR_SIZE_IN_MB=600choose_gc_log_directory()
{case "`uname`" inDarwin)if [ ! -d "/Volumes/RAMDisk" ]; then# create ram disk on Darwin systems as gc-log directoryDEV=`hdiutil attach -nomount ram://$((2 * 1024 * DIR_SIZE_IN_MB))` > /dev/nulldiskutil eraseVolume HFS+ RAMDisk ${DEV} > /dev/nullecho "Create RAMDisk /Volumes/RAMDisk for gc logging on Darwin OS."fiGC_LOG_DIR="/Volumes/RAMDisk";;*)# check if /dev/shm exists on other systemsif [ -d "/dev/shm" ]; thenGC_LOG_DIR="/dev/shm"elseGC_LOG_DIR=${BASE_DIR}fi;;esac
}choose_gc_options()
{# Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ...# '1' means releases befor Java 9JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -r -n 's/.* version "([0-9]*).*$/\1/p')if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; thenJAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"JAVA_OPT="${JAVA_OPT} -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8"JAVA_OPT="${JAVA_OPT} -verbose:gc -Xlog:gc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDateStamps"JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"elseJAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"fi
}choose_gc_log_directory
choose_gc_options
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"$JAVA ${JAVA_OPT} $@

(2)、runbroker.sh

删除

  • -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy
  • JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
  • JAVA_OPT="JAVAOPT−Djava.ext.dirs={JAVA_OPT} -Djava.ext.dirs=JAVAO​PT−Djava.ext.dirs={JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"(删除带有Djava哪一行就行不同版本不一样)

修改

  • CLASSPATH为:BASEDIR/lib/rocketmq−broker−4.5.1.jar:{BASE_DIR}/lib/rocketmq-broker-4.5.1.jar:BASED​IR/lib/rocketmq−broker−4.5.1.jar:{BASE_DIR}/lib/*:BASEDIR/conf:{BASE_DIR}/conf:BASED​IR/conf:{CLASSPATH} rocketmq-broker-版本号.jar
  • -Xloggc:改成-Xlog:gc

4.9.2改好的

#!/bin/sh# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.#===========================================================================================
# Java Environment Setting
#===========================================================================================
error_exit ()
{echo "ERROR: $1 !!"exit 1
}[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=${BASE_DIR}/lib/rocketmq-broker-4.9.2.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}#===========================================================================================
# JVM Configuration
#===========================================================================================
# The RAMDisk initializing size in MB on Darwin OS for gc-log
DIR_SIZE_IN_MB=600choose_gc_log_directory()
{case "`uname`" inDarwin)if [ ! -d "/Volumes/RAMDisk" ]; then# create ram disk on Darwin systems as gc-log directoryDEV=`hdiutil attach -nomount ram://$((2 * 1024 * DIR_SIZE_IN_MB))` > /dev/nulldiskutil eraseVolume HFS+ RAMDisk ${DEV} > /dev/nullecho "Create RAMDisk /Volumes/RAMDisk for gc logging on Darwin OS."fiGC_LOG_DIR="/Volumes/RAMDisk";;*)# check if /dev/shm exists on other systemsif [ -d "/dev/shm" ]; thenGC_LOG_DIR="/dev/shm"elseGC_LOG_DIR=${BASE_DIR}fi;;esac
}choose_gc_log_directoryJAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_broker_gc_%p_%t.log"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"numactl --interleave=all pwd > /dev/null 2>&1
if [ $? -eq 0 ]
thenif [ -z "$RMQ_NUMA_NODE" ] ; thennumactl --interleave=all $JAVA ${JAVA_OPT} $@elsenumactl --cpunodebind=$RMQ_NUMA_NODE --membind=$RMQ_NUMA_NODE $JAVA ${JAVA_OPT} $@fi
else$JAVA ${JAVA_OPT} $@
fi

(3)、tool.sh

删除

  • JAVA_OPT="JAVAOPT−Djava.ext.dirs={JAVA_OPT} -Djava.ext.dirs=JAVAO​PT−Djava.ext.dirs={BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext"(删除带有Djava哪一行就行不同版本不一样)

修改