RocketMQ中的主从复制
1、组件结构图
HAService:主从同步入口类
AcceptSockerService:HA Master端监听客户端连接实现类
GroupTransferService:主从同步通知实现类
HAClient:HA Client端实现类
HAConnection:HA Master服务端HA连接对象的封装,与Broker从服务器的网络读写交互
ReadSocketService:HA Master网络读实现类
WriteSocketService:HA Master网络写实现类
2、原理
主服务器启动,并在特定端口上监听从服务器的连接,从服务器主动连接主服务器,主服务器接收客户端的连接,并建立相关TCP连接。从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器。从服务器保存消息并继续发送新的消息同步请求。
2.1 启动
监听从服务器的连接。启动socket监听线程及组数据转移线程,启动ha客户端。
public void start() throws Exception {this.acceptSocketService.beginAccept();this.acceptSocketService.start();this.groupTransferService.start();this.haClient.start();}
2.2 AccpetSocketServcie
socketAddressListen:监听套接字
serverSocketChannel:服务端Socket通道
selector:事件选择器
2.2.1 beginAccept
public void beginAccept() throws Exception {this.serverSocketChannel = ServerSocketChannel.open();this.selector = RemotingUtil.openSelector();this.serverSocketChannel.socket().setReuseAddress(true);this.serverSocketChannel.socket().bind(this.socketAddressListen);this.serverSocketChannel.configureBlocking(false);this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);}
创建ServerSocketChannel,selector,设置地址重用,绑定监听器端口,设置为非阻塞,注册ACCEPT事件
2.2.2 run
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.selector.select(1000);Set<SelectionKey> selected = this.selector.selectedKeys();if (selected != null) {for (SelectionKey k : selected) {if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();if (sc != null) {HAService.log.info("HAService receive new connection, "+ sc.socket().getRemoteSocketAddress());try {HAConnection conn = new HAConnection(HAService.this, sc);conn.start();HAService.this.addConnection(conn);} catch (Exception e) {log.error("new HAConnection exception", e);sc.close();}}} else {log.warn("Unexpected ops in select " + k.readyOps());}}selected.clear();}} catch (Exception e) {log.error(this.getServiceName() + " service has exception.", e);}}log.info(this.getServiceName() + " service end");}
选择器每1s处理一次连接就绪事件,连接事件就绪后,调用ServerSocketChannel的accept方法创建SocketChannel。然后为每个连接创建一个HAConnection对象,负责主从数据同步。
2.3 GroupTransferService
2.3.1 putRequest
public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {synchronized (this.requestsWrite) {this.requestsWrite.add(request);}this.wakeup();}
将请求放入到requestWrite列表中,同时唤醒当前线程
2.3.2 run
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.waitForRunning(10);this.doWaitTransfer();} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info(this.getServiceName() + " service end");}
每隔10ms,执行等待复制完成。如果push2SlaveMaxOffset大于等于请求的偏移,则说明复制已经完成。如果小于并且当前时间差小于存储配置的同步刷新超时时间,则等待直到超时或者复制完成。
private void doWaitTransfer() {synchronized (this.requestsRead) {if (!this.requestsRead.isEmpty()) {for (CommitLog.GroupCommitRequest req : this.requestsRead) {boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {this.notifyTransferObject.waitForRunning(1000);transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();}if (!transferOK) {log.warn("transfer messsage to slave timeout, " + req.getNextOffset());}req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}this.requestsRead.clear();}}}
2.4 HAClient
masterAddress:Master broker的地址
reportOffset:从向主发起主从同步的拉取偏移量
socketChannel:网络传输通道
selector:NIO事件选择器
lastWriteTimestamp:上次上报是写入传输通道的时间
currentReportedOffset:反馈slave当前的复制进度,commitlog文件最大偏移量
dipatchPosition:本次已处理读缓存的指针
byteBufferRead:读缓存区,大小为4M
byteBufferBackup:读缓存区备份,与byteBufferRead进行交换。
2.4.1 connectMaster
private boolean connectMaster() throws ClosedChannelException {if (null == socketChannel) {String addr = this.masterAddress.get();if (addr != null) {SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);if (socketAddress != null) {this.socketChannel = RemotingUtil.connect(socketAddress);if (this.socketChannel != null) {this.socketChannel.register(this.selector, SelectionKey.OP_READ);}}}this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();this.lastWriteTimestamp = System.currentTimeMillis();}return this.socketChannel != null;}
slave服务器连接master服务器,如果socketChannel为空,则尝试连接master。如果master地址为空,则返回false。如果master地址不为空,则建立到master的TCP连接,然后注册OP_READ,初始化currentReportedOffset为commitlog文件的最大偏移量,lastWriteTimestamp为当前时间戳。
2.4.2 isTimeToReportOffset
private boolean isTimeToReportOffset() {long interval =HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();return needHeart;}
判断是否需要向master反馈当前待拉取偏移量,master与slave的ha心跳发送间隔为5s。
2.4.3 reportSlaveMaxOffset
private boolean reportSlaveMaxOffset(final long maxOffset) {this.reportOffset.position(0);this.reportOffset.limit(8);this.reportOffset.putLong(maxOffset);this.reportOffset.position(0);this.reportOffset.limit(8);for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {try {this.socketChannel.write(this.reportOffset);} catch (IOException e) {log.error(this.getServiceName()+ "reportSlaveMaxOffset this.socketChannel.write exception", e);return false;}}lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();return !this.reportOffset.hasRemaining();}
向master服务器反馈拉取偏移量。
2.4.4 processReadEvent
private boolean processReadEvent() {int readSizeZeroTimes = 0;while (this.byteBufferRead.hasRemaining()) {try {int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {readSizeZeroTimes = 0;boolean result = this.dispatchReadRequest();if (!result) {log.error("HAClient, dispatchReadRequest error");return false;}} else if (readSize == 0) {if (++readSizeZeroTimes >= 3) {break;}} else {log.info("HAClient, processReadEvent read socket < 0");return false;}} catch (IOException e) {log.info("HAClient, processReadEvent read socket exception", e);return false;}}return true;}
处理网络请求,从master服务器传加的消息数据,判断readByteBuffer是否有剩余空间,如果有,则调用read,将通道中的数据读入到缓存区中。如果读取到的字节数大于0,重置读取到0字节的次数,调用dispatchReadRequest将读取到的所有消息全部追加到消息内存映射文件 中,然后再次返回拉取进度给服务器。如果连续3次从网络通道读取到0个字节,则结束本次读,返回true。如果读取到的字节数小于0或发生io异常,则返回false.
2.5 HAConnection
slaveRequestOffset:从服务器请求拉取数据的偏移量
slaveAckOffset:从服务器反馈已拉取完成的数据偏移量。
byteBufferRead:网络读写缓存区,默认为1M
processPosition:当前处理指针
lastReadTimestamp:上次读取数据的时间戳。
RocketMQ中的主从复制相关推荐
- 如何在MySQL中设置主从复制
原作者:Etel Sverdlov 转载&翻译:https://www.digitalocean.com/community/tutorials/how-to-set-up-master-sl ...
- RocketMQ中消息的优先级
2019独角兽企业重金招聘Python工程师标准>>> 本文主要介绍RocketMQ中消息优先级的设置方法. 一.JMS中JMSPriority JMSPriority 头字段包含了 ...
- 深入理解RocketMQ中的NameServer
本文来说下RocketMQ中的NameServer 文章目录 NameServer介绍 NameServer的作用 为什么要使用NameServer NameServer如何保证数据的最终一致 路由注 ...
- mysql 1593_Linux中MySQL主从复制中出现1593错误码的低级错误
主从复制小编介绍过不少的文章了,但在使用过程中经常会有一些问题出现了,今天我们来看关于Linux中MySQL主从复制中出现1593错误码的低级错误问题解决办法. 今天测试shell脚本自动配置MySQ ...
- RocketMQ中的消息类型种类(二)
消息种类 按照发送的特点分 同步消息 异步消息 单向消息 按照使用功能特点分 顺序消息 广播模式 延迟消息 批量消息 过滤消息 事务消息 按照发送的特点分 同步消息 同步发送是指消息发送方发出数据后, ...
- RocketMQ 中Topic、Tag、GroupName基本概念介绍
本文主要介绍RocketMQ中Topic.Tag.GroupName的概念.设计初衷以及使用方法. 一.Topic 首先看看官方的定义: Topic是生产者在发送消息和消费者在拉取消息的类别.Topi ...
- RocketMQ中主从复制
生产者把消息发送到master,不会发送到slave 消费者可以从maste也可以从slave消费消息 如果master挂了,那就从slave消费数据,那么slave怎么拿到master中的数据 这就 ...
- rocketmq中的broker设计与实现
1.broker启动 启动逻辑在BrokerStartup和BrokerController中. 监听端口是1091. 默认存储目录是System.getProperty("user.hom ...
- rocketmq中producer设计与实现
1.类层次结构 2.producer的启动 首先设置组,及NameServer,然后调用start启动.启动关键逻辑主要在MQClientInstance中. (1)启动NettyRemotingCl ...
最新文章
- 计算机视觉系统学习书籍/综述汇总
- shell脚本字符串截取
- rocketmq 初探(三)
- AIX HACMP集群切换测试实际案例解析
- HTTP相关知识的总结
- Hurst exponent(赫斯特指数)代码与R/S值计算——python
- sql2005通用分页存储过程
- 关于bacula网络备份软件的安装以及配置1
- iOS最新验证电话号码与手机号码的正则方法
- 我的凸优化学习之路(转)
- [LeetCode]Patching Array
- iPhone 二手手机到底去哪了
- 腾讯云 短信验证码 php,php腾讯云短信验证码
- 基于数字证书的windows安全登录
- win10计算机打印机共享怎么设置方法,Win10系统怎么设置打印机共享?Win10系统打印机共享设置教程...
- 遗传算法之扇贝的进化(python代码实现)
- csdn博客使用反馈,bug
- @程序员,对抗 996,你真的准备好了吗?| 文末有彩蛋
- 动态规划之子序列以及子数组类型的问题
- 华为鲲鹏HCIA考试-练习08