一、简介

RocketMq是阿里开发出来的一个消息中间件,后捐献给Apache。官网上是这样介绍的:

Apache RocketMQ™ is a unified messaging engine, lightweight data processing platform.

RocketMQ是一个统一的处理消息引擎,轻量级的数据处理平台。

  • 低延迟,在高压下,1毫秒内的响应延迟超过99.6%。
  • 高可用,具有跟踪和审核功能
  • 万亿级消息容量保证
  • 自最新的4.1版本以来,使用新的开放式分布式消息传递和流媒体标准
  • 批量传输,多功能集成,以提高吞吐量
  • 如果空间足够,可以在不损失性能的情况下存盘

二、基本概念

1、 消息模型(Message Model)

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

2、 消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

3、 消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

4、 主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

5、 代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

6、 名字服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

7、 拉取式消费(Pull Consumer)

Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

8、 推动式消费(Push Consumer)

Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

9、 生产者组(Producer Group)

同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

10、 消费者组(Consumer Group)

同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

11、 集群消费(Clustering)

集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。

12、 广播消费(Broadcasting)

广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

13、 普通顺序消息(Normal Ordered Message)

普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

14、 严格顺序消息(Strictly Ordered Message)

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

15、 消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

16、 标签(Tag)

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

三、架构设计

1、技术架构

RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
  1. Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
  2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
  3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
  4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
  5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

2、部署架构

RocketMQ 网络部署特点

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

结合部署架构图,描述集群工作流程:

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

四、示例

1、 基本样例

在基本样例中我们提供如下的功能场景:

  • 使用RocketMQ发送三种类型的消息:同步消息、异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答。
  • 使用RocketMQ来消费接收到的消息。1.1 加入依赖:
    maven:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.3.0</version>
</dependency>

gradle:

compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

1.2 消息发送

1、Producer端发送同步消息 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

public class SyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送消息到一个BrokerSendResult sendResult = producer.send(msg);// 通过sendResult返回消息是否成功送达System.out.printf("%s%n", sendResult);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

2、发送异步消息 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

public class AsyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount = 100;// 根据消息数量实例化倒计时计算器final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);for (int i = 0; i < messageCount; i++) {final int index = i;// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));// SendCallback接收异步返回结果的回调producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});}// 等待5scountDownLatch.await(5, TimeUnit.SECONDS);// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

3、单向发送消息 这种方式主要用在不特别关心发送结果的场景,例如日志发送。

public class OnewayProducer {public static void main(String[] args) throws Exception{// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送单向消息,没有任何返回结果producer.sendOneway(msg);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

1.3 消费消息

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// 设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe("TopicTest", "*");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();System.out.printf("Consumer Started.%n");}
}

1.4 发送延时消息

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// 实例化一个生产者来产生延时消息DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// 启动生产者producer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);}// 关闭生产者producer.shutdown();}
}

1.5 批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {producer.send(messages);
} catch (Exception e) {e.printStackTrace();//处理error
}

复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:

public class ListSplitter implements Iterator<List<Message>> { private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) { this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size(); }@Override public List<Message> next() { int startIndex = getStartIndex();int nextIndex = startIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex); int tmpSize = calcMessageSize(message);if (tmpSize + totalSize > SIZE_LIMIT) {break; } else {totalSize += tmpSize; }}List<Message> subList = messages.subList(startIndex, nextIndex); currIndex = nextIndex;return subList;}private int getStartIndex() {Message currMessage = messages.get(currIndex); int tmpSize = calcMessageSize(currMessage); while(tmpSize > SIZE_LIMIT) {currIndex += 1;Message message = messages.get(curIndex); tmpSize = calcMessageSize(message);}return currIndex; }private int calcMessageSize(Message message) {int tmpSize = message.getTopic().length() + message.getBody().length(); Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length(); }tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节return tmpSize; }
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {try {List<Message>  listItem = splitter.next();producer.send(listItem);} catch (Exception e) {e.printStackTrace();//处理error}
}

1.6 过滤消息

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

1 基本语法

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

数值比较,比如:>,>=,<,<=,BETWEEN,=;
字符比较,比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
常量支持类型为:数值,比如:123,3.1415;
字符,比如:'abc',必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

2、生产者样例 发送消息时,你能通过putUserProperty来设置消息的属性

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",tag,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);producer.shutdown();

3、消费者样例 用MessageSelector.bySql来使用sql筛选消息

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
consumer.start();

五、集群部署

1 集群搭建

1.1 单Master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

1)启动 NameServer

### 首先启动Name Server
$ nohup sh mqnamesrv &### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)启动 Broker

### 启动Broker
$ nohup sh bin/mqbroker -n localhost:9876 &### 验证Name Server 是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
$ tail -f ~/logs/rocketmqlogs/Broker.log
The broker[broker-a, 192.169.1.2:10911] boot success...

1.2 多Master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

1)启动NameServer

NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同,如下:

### 首先启动Name Server
$ nohup sh mqnamesrv &### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)启动Broker集群

### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &...

如上启动命令是在单个NameServer情况下使用的。对于多个NameServer的集群,Broker启动命令中-n后面的地址列表用分号隔开即可,例如 192.168.1.1:9876;192.161.2:9876。

1.3 多Master多Slave模式-异步复制

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
  • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

1)启动NameServer

### 首先启动Name Server
$ nohup sh mqnamesrv &### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)启动Broker集群

### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

1.4 多Master多Slave模式-同步双写

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

1)启动NameServer

### 首先启动Name Server
$ nohup sh mqnamesrv &### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2)启动Broker集群

### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

以上Broker与Slave配对是通过指定相同的BrokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数。另外一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分。$ROCKETMQ_HOME指的RocketMQ安装目录,需要用户自己设置此环境变量。

2 mqadmin管理工具

注意:

  • 执行命令方法:./mqadmin {command} {args}
  • 几乎所有命令都需要配置-n表示NameServer地址,格式为ip:port
  • 几乎所有命令都可以通过-h获取帮助
  • 如果既有Broker地址(-b)配置项又有clusterName(-c)配置项,则优先以Broker地址执行命令,如果不配置Broker地址,则对集群中所有主机执行命令,只支持一个Broker地址。-b格式为ip:port,port默认是10911
  • 在tools下可以看到很多命令,但并不是所有命令都能使用,只有在MQAdminStartup中初始化的命令才能使用,你也可以修改这个类,增加或自定义命令
  • 由于版本更新问题,少部分命令可能未及时更新,遇到错误请直接阅读相关命令源码

六、FAQ

1、新的consumer从哪里消费?

  • 如果topic里面的消息是三天以内的,新consumer从所有消息的第一条开始消费
  • 如果有超过三天的消息,新consumer从往前递推第三天的那条消息开始
  • 如果是consumer重启,从它上次的位置继续

2、消费失败了怎么处理?

如果返回 ReconsumerLater,null或者抛出异常,这条消息会重试,最多16次

3、失败的消息怎么查询?

  • 根据topic和时间片查询
  • 根据topic和messageId查询
  • 根据topic和messageKey查询

4、消息只传递一次吗?

rocketMQ能保证所有的消息至少执行一次,在大部分情况下,消息不会重复消费,但不是绝对,要自己做幂等设计

5、每条消息保存多久?

3天

6、message body的大小限制?

256kb

7、怎么设置consumer线程数?

consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);

8、常见错误

  • 一个jvm程序只能有一个生产者/消费者实例;
  • fastjson版本不能太低;
  • topic要提前新建好,并保证有权限。

9、为什么是RocketMQ?

根据我们的研究,随着队列和虚拟主题的增加,ActiveMQ IO模块达到了一个瓶颈。我们试图通过节流、断路器或降质来解决这个问题,但效果并不理想。所以我们开始关注流行的消息传递解决方案Kafka。不幸的是,Kafka不能满足我们的要求,特别是在低延迟和高可靠性方面。

在这种情况下,我们决定发明一个新的消息传递引擎来处理更广泛的用例集,从传统的发布/订阅场景到高容量实时零损失容忍事务系统。

下表展示了RocketMQ、ActiveMQ和Kafka(根据awesome java,Apache最流行的消息传递解决方案)之间的比较:

欢迎关注公众号:丰极,更多技术学习分享。

参考网址:

http://rocketmq.apache.org/

http://rocketmq.apache.org/docs/quick-start/

rocketmq 组监听_最全的RocketMQ学习指南,程序员必备的中间件技能相关推荐

  1. java图形界面的监听_非专业码农 JAVA学习笔记 用户图形界面设计与实现-所有控件的监听事件...

    用户图形界面设计与实现-监听事件 System.applet.Applet (一)用户自定义成分 1.绘制图形 Public voit piant(Ghraphics g){  g.drawLine等 ...

  2. [html] HTML5如何监听video的全屏和退出全屏?

    [html] HTML5如何监听video的全屏和退出全屏? 监听fullscreenchange事件document.addEventListener('fullscreenchange', (ev ...

  3. vue动态监听窗口高度 - 全背景banner

    vue动态监听窗口高度 - 全背景banner 参考项目文件 src/hr/index.vue [结合下文:第一种方法] 第一种方法:[本文手写代码] data() {return {screenHe ...

  4. dcloud 5+ 监听安卓前后台切换状态 并后台运行程序

    dcloud 5+ 监听安卓前后台切换状态 并后台运行程序 监听安卓前后台切换状态 function monitorRunStatus() {// 部分型号监听"应用从前台切换到后台&quo ...

  5. Paddle_程序员必备的数学知识_转发

    程序员--必备数学知识 !!!Attention 本博客转发至百度aistudio的<深度学习7日入门-cv疫情检测>,课程非常棒!本人力推! 博客转发地址:https://aistudi ...

  6. javaweb不同用户需要几张表_程序员必备2020版:JavaWeb快速进阶全套教程

    Java Web应用由一组Servlet.HTML页.类.以及其它可以被绑定的资源构成.它可以在各种供应商提供的实现Servlet规范的Servlet容器中运行. JavaWeb项目简单来说就是一个应 ...

  7. python编程语言汇总-最全的编程语言汇总,程序员你可要存好了!

    原标题:最全的编程语言汇总,程序员你可要存好了! 编程语言(programming language),是用来定义计算机程序的形式语言.它是一种被标准化的交流技巧,用来向计算机发出指令.一种计算机语言 ...

  8. Sublime Text 2 - 性感无比的代码编辑器!程序员必备神器!跨平台支持Win/Mac/Linux,支持32与64位,支持各种流行编程语言的语法高亮、代码补全等...

    Sublime Text 2 - 性感无比的代码编辑器!程序员必备神器!跨平台支持Win/Mac/Linux,支持32与64位,支持各种流行编程语言的语法高亮.代码补全等-- 语法高亮.代码提示补全. ...

  9. 怎么调python界面颜色_手把手教你配置最漂亮的PyCharm界面,Python程序员必备!...

    高逼格超美的IDE界面,是每个程序员的梦想! 随着人工智能/机器学习的兴起,Python作为一门"漂亮的语言",再次获得广大程序员的关注.而JetBrains出品的PyCharm无 ...

最新文章

  1. Kotlin 数组的使用
  2. Mybatis:resultMap的万字使用总结
  3. 谷歌眼镜开发入门经典
  4. Android 使用加速度传感器实现摇一摇功能及优化
  5. docker 中部署一个springBoot项目
  6. .NET in Browser - Blazor
  7. foobar 添加歌词插件
  8. 计算机技能测试题九,计算机技能培训后人人过关测试试题九
  9. linux搜索一天内更新的所有文件,linux下怎样更新文件夹下所有文件的时间戳
  10. 【a701】旅行家的预算
  11. yoga710怎么进入bios_重装系统看不懂bios?超详细中英文翻译,教你1分钟识别bios各项...
  12. java cms 源码_PublicCMS开源Java系统 v4.0.190312
  13. 风尚云网学习-关于学习和前端
  14. 重新实现reuseport逻辑,实现一致性哈希
  15. Qt FlowLayout升级版
  16. 计算机总是提醒更新,联想电脑老提示更新怎么办啊
  17. 轻松完成销售业绩的6个技巧
  18. Unity开发者的C#内存管理(中篇)
  19. 深信服edr终端漏洞
  20. 2021年研究生入学考试总结和复试冲刺复习计划

热门文章

  1. 异常处理与MiniDump详解(3) SEH(Structured Exception Handling)
  2. Linux ISCSI配置
  3. TCP/IP基础概念及通信过程举例
  4. Android自定义ListView的Item无法响应OnItemClick的解决办法
  5. BZOJ 2190: [SDOI2008]仪仗队( 欧拉函数 )
  6. 32位汇编寄存器及汇编指令
  7. Windows Vista和XP系统功能大比拼
  8. MATLAB【四】 ————批量适配图片信息与excel/txt等文档信息,批量移动拷贝图片,批量存图片中点和方框
  9. 三维重建【一】——————(深度学习方式)
  10. JavaScript跨域总结与解决办法