RocketMq部署与使用
官网文档参考地址:http://rocketmq.apache.org/docs/motivation/
安装包下载地址:https://mirror.bit.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip
https://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip
1,什么是 RocketMQ?
Apache RocketMQ是由阿里巴巴开源的可支撑万亿级数据洪峰的分布式消息和流计算平台,于2016年捐赠给Apache Software Foundation,2017年9月25日成为Apache 顶级项目。由于其高稳定性、低延时、高吞吐量等特点,被大规模应用于金融、互联网、物流公司的核心交易支付、实时位置追踪、大数据分析等场景,同时也被电力、交通、汽车、零售等十几个行业的数万家企业广泛使用,是企业数字化转型的核心基础性软件。
如图所示为RocketMQ基本的部署结构,主要分为NameServer集群、Broker集群、Producer集群和Consumer集群四个部分。
NameServer集群
NameServer的作用是注册中心,类似于Zookeeper,但又有区别于它的地方。每个NameServer节点互相之间是独立的,没有任何信息交互,也就不存在任何的选主或者主从切换之类的问题,因此NameServer与Zookeeper相比更轻量级。单个NameServer节点中存储了活跃的Broker列表(包括master和slave),这里活跃的定义是与NameServer保持有心跳。
Broker集群
Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,顺带一提底层的通信和连接都是基于Netty实现的。
Broker中分master和slave两种角色,每个master可以对应多个slave,但一个slave只能对应一个master,master和slave通过指定相同的Brokername,不同的BrokerId (master为0)成为一个组。master和slave之间的同步方式分为同步双写和异步复制,异步复制方式master和slave之间虽然会存在少量的延迟,但性能较同步双写方式要高出10%左右。
Producer集群
与nameserver的关系:
单个Producer和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,生产者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。与nameserver之间没有心跳。
与broker的关系:
单个Producer和与其关联的所有broker保持长连接,并维持心跳。默认情况下消息发送采用轮询方式,会均匀发到对应Topic的所有queue中。
最佳实践:
一个应用尽可能只使用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消
息创建索引(哈希索引),应用可以通过 Topic,key 来查询返条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。对于消息不可丢失应用,务必要有消息
重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。
Consumer集群
与nameserver的关系:
单个Consumer和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。与nameserver之间没有心跳。
与broker的关系:
单个Consumer和与其关联的所有broker保持长连接,并维持心跳,失去心跳后,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。
最佳实践:
Consumer 数量要小于等于queue的总数量,由于Topic下的queue会被相对均匀的分配给Consumer,如果 Consumer 超过queue的数量,那多余的 Consumer 将没有queue可以消费消息。
消费过程要做到幂等(即消费端去重),RocketMQ为了保证性能并不支持严格的消息去重。
尽量使用批量方式消费,RocketMQ消费端采用pull方式拉取消息,通过consumeMessageBatchMaxSize参数可以增加单次拉取的消息数量,可以很大程度上提高消费吞吐量。另外,提高消费并行度也可以通过增加Consumer处理线程的方式,对应参数consumeThreadMin和consumeThreadMax。
消息发送成功或者失败,要打印消息日志。
Topic和Queue
RocketMQ的Topic/Queue和JMS中的Topic/Queue概念有一定的差异,JMS中所有消费者都会消费一个Topic消息的副本,而Queue中消息只会被一个消费者消费;但到了RocketMQ中Topic只代表普通的消息队列,而Queue是组成Topic的更小单元,集群消费模式下一个消费者只消费该Topic中部分Queue中的消息,当一个消费者开启广播模式时则会消费该Topic下所有Queue中的消息。Topic和Queue的具体关系可以参考下图
2,为什么选择RocketMQ
特色功能
弹性扩缩:
Brokers, producers, consumers, name servers都采用了特殊的部署和处理方式,具备很强的横向扩展能力。
分布式事务:
RocketMQ实现类似X/Open XA的分布事务功能,以达到事务最终一致性状态。
快速存储和持久化:
RocketMQ充分利用了系统的内存cache,数据以同步或者异步刷盘的方式持久化到文件系统中。
消息过滤:
Apache RocketMQ支持灵活的语法表达式过滤消息,减少了对于consumer无用消息的网络传输。
回溯消费:
基于Apache RocketMQ的数据存储方式,consumer可以实现按照时间回溯消费,精确到毫秒,支持向前回溯和向后回溯。
定时消息:
Apache RocketMQ支持定时消息,但是不支持任意时间精度,具有特定的level。
下表是官网给出的RocketMQ,ActiveMQ和Kafka(根据awesome-java,Apache最流行的消息传递解决方案)之间的比较:
Messaging Product |
Client SDK |
Protocol and Specification |
Ordered Message |
Scheduled Message |
Batched Message |
BroadCast Message |
Message Filter |
Server Triggered Redelivery |
Message Storage |
Message Retroactive |
Message Priority |
High Availability and Failover |
Message Track |
Configuration |
Management and Operation Tools |
ActiveMQ |
Java, .NET, C++ etc. |
Push model, support OpenWire, STOMP, AMQP, MQTT, JMS |
Exclusive Consumer or Exclusive Queues can ensure ordering |
Supported |
Not Supported |
Supported |
Supported |
Not Supported |
Supports very fast persistence using JDBC along with a high performance journal,such as levelDB, kahaDB |
Supported |
Supported |
Supported, depending on storage,if using kahadb it requires a ZooKeeper server |
Not Supported |
The default configuration is low level, user need to optimize the configuration parameters |
Supported |
Kafka |
Java, Scala etc. |
Pull model, support TCP |
Ensure ordering of messages within a partition |
Not Supported |
Supported, with async producer |
Not Supported |
Supported, you can use Kafka Streams to filter messages |
Not Supported |
High performance file storage |
Supported offset indicate |
Not Supported |
Supported, requires a ZooKeeper server |
Not Supported |
Kafka uses key-value pairs format for configuration. These values can be supplied either from a file or programmatically. |
Supported, use terminal command to expose core metrics |
RocketMQ |
Java, C++, Go |
Pull model, support TCP, JMS, OpenMessaging |
Ensure strict ordering of messages,and can scale out gracefully |
Supported |
Supported, with sync mode to avoid message loss |
Supported |
Supported, property filter expressions based on SQL92 |
Supported |
High performance and low latency file storage |
Supported timestamp and offset two indicates |
Not Supported |
Supported, Master-Slave model, without another kit |
Supported |
Work out of box,user only need to pay attention to a few configurations |
Supported, rich web and terminal command to expose core metrics |
个人总结概括一下当下几个常用消息队列中间件的对比:
国内采用的MQ有:ActiveMQ、Kafka、RabbitMQ、RocketMQ,但现在ActiveMQ用的越来越少了。主要做其他三种的调研。
kafka:
- 1、开发语言: Scala开发
- 2、性能、吞吐量: 吞吐量所有MQ里最优秀,QPS十万级、性能毫秒级、支持集群部署
- 3、功能: 功能单一
- 4、缺点: 丢数据, 因为数据先写入磁盘缓冲区,未直接落盘。机器故障会造成数据丢失
- 5、应用场景: 适当丢失数据没有关系、吞吐量要求高、不需要太多的高级功能的场景,比如大数据场景。
RabbitMQ:
- 1、开发语言: Erlang开发
- 2、性能、吞吐量: 吞吐量比较低,QPS几万级、性能u秒级、主从架构
- 3、功能: 功能单一
- 4、缺点: Erlang小众语言开发,吞吐量低,集群扩展麻烦
- 5、应用场景: 中小公司对并发和吞吐量要求不高的场景。
RocketMQ:
- 1、开发语言: java开发
- 2、性能、吞吐量: 吞吐量高,QPS十万级、性能毫秒级、支持集群部署
- 3、功能: 支持各种高级功能,比如说延迟消息、事务消息、消息回溯、死信队列、消息积压等等
- 4、缺点: 官方文档相对简单可能是RocketMQ目前唯一的缺点
- 5、应用场景: 适当丢失数据没有关系、吞吐量要求高、不需要太多的高级功能的场景,比如大数据场景。
3,安装部署
从下载链接下载安装包:rocketmq-all-4.7.0-bin-release.zip 并解压
1) 单Master模式
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
1 启动 NameServer:
$ nohup ${ROCKETMQ_HOME}/bin/mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.logThe Name Server boot success...
2 启动 Broker:
### 启动Broker
$ nohup $ROCKETMQ_HOME/bin/mqbroker -n localhost:9876 &
### 验证Name Server 是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
$ tail -f ~/logs/rocketmqlogs/broker.logThe broker[broker-a, 192.169.2.200:10911] boot success...
3 发送和接收消息测试:
在发送/接收消息之前,我们需要告诉客户端名称服务器的位置。 RocketMQ提供了多种方法来实现这一目标。 为简单起见,我们使用环境变量NAMESRV_ADDR
> export NAMESRV_ADDR=localhost:9876 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK, msgId= ... > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ConsumeMessageThread_%d Receive New Messages: [MessageExt...
2 多Master模式(这种方式一般使用比较多)
一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:
- 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
- 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
1 启动NameServer
NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同,如下:
### 首先启动Name Server $ nohup sh mqnamesrv & ### 验证Name Server 是否启动成功 $ tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...
2 启动Broker集群
### 在机器A,启动第一个Master(broker-a),例如NameServer的IP为:192.168.2.200
修改配置文件:$ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties 增加如下配置项
maxMessageSize = 4194304 #默认允许的最大消息体默认4M
sendMessageThreadPoolNums = 20 # 服务端处理消息发送线程池数量 默认就1个
useReentrantLockWhenPutMessage = true #消息存储到commitlog文件时获取锁类型,如果为true使用ReentrantLock否则使用自旋锁
listenPort = 10911 #服务端监听端口,默认10911;如果在同一个机器启多个broker是这个必须修改为不一样
storePathRootDir = /app/merce/rocketmq/rocketmq/store/broker_a #broker存储目录 默认为用户的家目录/store;如果在同一个机器启多个broker是这个必须修改为不一样
storePathCommitLog =/app/merce/rocketmq/rocketmq/store/commitlog #storePathCommitLog 需要单独配置一次不然无法自动写入到${storePathRootDir }/commitlog 下,这个是4.7版本的一个bug
启动broker:
$ nohup ${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
### 在机器B,启动第二个Master(broker-b)也需要先修改配置文件$ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties,NameServer的IP为:localhost
$ nohup ${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &
如上启动命令是在单个NameServer情况下使用的。对于多个NameServer的集群,Broker启动命令中-n后面的地址列表用分号隔开即可,例如 192.168.1.1:9876;192.161.2:9876。
更多安装方式如多Master多Slave模式-异步复制,多Master多Slave模式-同步双写可参考官网文档。
配置文件修改:
上边所说的两种安装方式均是使用的系统缺省配置
注意:
1)启动nameServer 或brocker时如果不使用 -c 指定配置文件,则系统会使用参数的确实值;如果修改了相关配置文件必须在启动时指定通过 -c 指定配置文件。
2)RocketMq启动后输出日志和相关数据存储目录会默认放到当前用户家目录下:
- log: ~/logs/rocketmqlogs
如果需要修改logs路径,比如修改到/app/rocketmq/logs :
$ cd ${ROCKETMQ_HOME}/conf$ sed -i 's#${user.home}#/app/rocketmq/logs#g' *.xml
- data: ~/store
如果需要修改,比如修改到 /app/rocketmq/store ,需要在对应broker配置文件如 ${ROCKETMQ_HOME}/conf/2m-noslave/broker-a.properties中添加如下配置:storePathRootDir = /app/rocketmq/store
- storePathCommitLog: ~/store/commitlog
需要注意 rocketmq-4.7.0参数storePathCommitLog需要单独修改,如果只修改storePathRootDir 则commitlog的存储路径不会向文档里说的自动修改到${storePathRootDir}/commitlog。
修改配置 storePathCommitLog =/app/rocketmq/store/commitlog
3)默认情况下nameserver和brocker分配内存比较大,分别是4G和8G ;如果需要进行调整,需要分别修改下面两个启动脚本的参数:
NameServer : runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
Broker : runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
4) RocketMq运行发送的messageBody默认情况下超过4K会自动进行压缩,但压缩后最大默认情况下也不允许超过4M,如果需要调大改配置参数,需要在brock端和producer 端同时进行配置,
brocker端配置参数:maxMessageSize
producer端 我们platform已经预制了该参数,默认值4194304(4M) 如需修改可通过配置文件platform_home/conf/woven.properties 修改参数:rocketmq.producer.max-message-size 对应值即可.
4,RocketMq管理工具 rocketmq-console
rocketmq-console是apache/rocketmq-externals 下的一个子项目,可以通过图形界面便捷的监控管理RocketMQ的相关信息。
下载地址:
该工具需要自己下载代码进行编译:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
安装方式:
下载后是一个可直接启动的spring-boot jar,使用如下命令启动
nohup java -jar rocketmq-console-ng-1.0.1.jar --server.port=12581 --rocketmq.config.namesrvAddr=localhost:9876 &
启动后通过浏览器访问:http://ip-adderss:12581
特别说明:
目前Apache 官方提供的rocketmq-console源码里还不支持brocker端开启accessKey权限认证,所以如果想要支持可以通过 https://download.csdn.net/download/qq_31910941/12440083 下载,这个是我自己在源码里加入了该功能编译的jar;可通过如下命令进行运行:
java -jar rocketmq-console-ng-1.0.1.jar --server.port=12581 --rocketmq.config.namesrvAddr=localhost:9876 --rocketmq.config.accessKey=rocketmq --rocketmq.config.secretKey=87654321
RocketMq部署与使用相关推荐
- RocketMQ 部署不当导致磁盘空间不释放
背景 生产环境采用 RocketMQ 三主三从集群搭建,6 个实例部署在 3 台 Linux 服务器上(节省资源),每台服务器部署一主一从,生产上运行一段时间后,发现磁盘空间报警,发现df与du显示的 ...
- Apache RocketMQ部署文档
RocketMQ的前身是MetaQ,metaq基本上是参照apache kafka开发的,把scala转为采用java开发. rocketmq现在已由阿里提交给apache社区管理. RocketMQ ...
- RocketMQ部署之动态设置JVM启动参数
这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发.拒绝营销号,拒绝标题党 背景 线上的RocketMQ集群有运行一段时间了.比如测试环境和线上环境的RocketMQ集群部署的机 ...
- RocketMQ部署安装注意事项
1.下载 下载地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.z ...
- RocketMQ部署安装(非Docker安装)
1.下载 下载地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.z ...
- 启动rocketmq_RocketMQ 部署启动指南-Docker 版
最近学习使用 rocketmq,需要搭建 rocketmq 服务端,本文主要记录 rocketmq 搭建过程以及这个过程踩到的一些坑. 准备工作 在搭建之前,我们需要做一些准备工作,这里我们需要使用 ...
- rocketmq安装,内存配置,各种命令说明,windows下安装,控制台工具
1修改内存大小 以下只是设置实例: cd /home/bigdata/installed/rocketmq-all-4.2.0 (140机器) vim bin/runserver.sh (调整name ...
- 云原生消息、事件、流超融合平台——RocketMQ 5.0 初探
简介:今天分享的主题是云原生消息事件流超融合平台 RocketMQ 5.0 初探,内容主要分为三个部分: 首先,带大家回顾业务消息领域首选 RocketMQ 4 发展历史以及 4.x 版本的演进与发展 ...
- rocketmq 消息指定_进大厂必备的RocketMQ你会吗?
点击关注"故里学Java" 右上角"设为星标"好文章不错过 关于消息队列,相信大家都不陌生,现在的中大型项目中或多或少都有使用到消息队列,对于消息队列大家可能都 ...
最新文章
- 成本VS用户体验, 服务台互动语音应答(IVR)介绍
- php 长文本_php字符串太长怎么办
- css深入浅出 宽度和高度
- ElasticSearch大数据分布式弹性搜索引擎使用—从0到1
- IIS7 与 WCF 问题总结
- Windows 2008活动目录的安装和卸载
- macos安装python3.6_在CentOS 7/Ubuntu 16.04/Debian 9/macOS上安装Python 3.6的方法
- close_wait过多解决后TIME_WAIT过多
- Codeforces 837 简要题解
- 构建微服务体系结构的最佳实践
- JDE学习report和from总结
- k2p华硕系统怎么设置_【华硕 RT-AC68U 无线路由器使用感受】管理|系统|操作|模式_摘要频道_什么值得买...
- Laravel快速入门
- PostgreSQL12中文手册
- 最火大厂面试题、面试技巧汇总及简历编写(附简历模版下载)
- CHD-5.3.6集群上sqoop安装
- ROS的 sudo rosdep init 的报错终极解决方案
- 在私有云上创建虚拟机
- python添加文字水印中间旋转45度,Python添加pdf水印
- python期末试题汇总
热门文章
- 朴灵和阮一峰吵架的代码
- 计算机组成原理——计算机系统概述
- 计算机网络学习-003
- 带蒙版的安卓剪辑软件_抖音运营干货,9款手机剪辑软件APP,从此让你用手机轻松玩转剪辑...
- 南方cass简码识别大全_cass-简码识别详细分解.doc
- UniCode编码表,过滤不可见特殊字符
- abb和plcsocket通讯_abb与西门子plc通讯问题
- 解决联想小新air13pro笔记本电脑插入耳机没声音的问题
- 《posix多线程编程》笔记(四)
- armbian 斐讯n1_树莓派/斐讯N1/ARMBIAN/安装HOME ASSISTANT