RocketMQ源码解析-Broker的HA实现
以master异步复制为例子。
在rocketmq的slave broker机子当中,会在DefaultMessageStore的启动当中启动自己的HaService来进行自己的ha服务。
public void start() {this.acceptSocketService.beginAccept();this.acceptSocketService.start();this.groupTransferService.start();this.haClient.start();
}
针对slave可以直接看haClinet的start()方法。
@Override
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStoped()) {try {if (this.connectMaster()) {// 先汇报最大物理Offset || 定时心跳方式汇报if (this.isTimeToReportOffset()) {boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);if (!result) {this.closeMaster();}}// 等待应答this.selector.select(1000);// 接收数据boolean ok = this.processReadEvent();if (!ok) {this.closeMaster();}// 只要本地有更新,就汇报最大物理Offsetif (!reportSlaveMaxOffsetPlus()) {continue;}// 检查Master的反向心跳long interval =HAService.this.getDefaultMessageStore().getSystemClock().now()- this.lastWriteTimestamp;if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress+ "] expired, " + interval);this.closeMaster();log.warn("HAClient, master not response some time, so close connection");}}else {this.waitForRunning(1000 * 5);}}catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);this.waitForRunning(1000 * 5);}}log.info(this.getServiceName() + " service end");
}
首先会在connectMaster()方法中,尝试与master进行连接。
private boolean connectMaster() throws ClosedChannelException {if (null == socketChannel) {String addr = this.masterAddress.get();if (addr != null) {SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);if (socketAddress != null) {this.socketChannel = RemotingUtil.connect(socketAddress);if (this.socketChannel != null) {this.socketChannel.register(this.selector, SelectionKey.OP_READ);}}}// 每次连接时,要重新拿到最大的Offsetthis.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();this.lastWriteTimestamp = System.currentTimeMillis();}
可以看到,在slave成功与master连上网络连接之后,会取得在本机上消息存储的最大offset,以便后续对master报告自己的存储位置。而关于最大offset的取得,则会在MapedFileQueue当中 取得文件逻辑队列最后一个的消费位置返回给haClient。
public long getMaxOffset() {try {this.readWriteLock.readLock().lock();if (!this.mapedFiles.isEmpty()) {int lastIndex = this.mapedFiles.size() - 1;MapedFile mapedFile = this.mapedFiles.get(lastIndex);return mapedFile.getFileFromOffset() + mapedFile.getWrotePostion();}}catch (Exception e) {log.error("getMinOffset has exception.", e);}finally {this.readWriteLock.readLock().unlock();}return 0;
}
在连接成功master并且取得当前slave的最大消费位置之后,通过isTImeToReportOffset()方法来判断当前时间与上一次写时间是否宪相隔超过所配置的时间间隔,如果超过,则会通过repirtSlaveMaxOffset()方法向master报告当前最大offset并作为心跳数据。
private boolean reportSlaveMaxOffset(final long maxOffset) {this.reportOffset.position(0);this.reportOffset.limit(8);this.reportOffset.putLong(maxOffset);this.reportOffset.position(0);this.reportOffset.limit(8);for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {try {this.socketChannel.write(this.reportOffset);}catch (IOException e) {log.error(this.getServiceName()+ "reportSlaveMaxOffset this.socketChannel.write exception", e);return false;}}return !this.reportOffset.hasRemaining();
}
在这里通过3次将八字节的偏移数据向master传递,现在看master是如何处理这一请求的。
在master当中通过haConnection启动了ReadSocketService一直监听来自slave的汇报最大偏移量的消息。(每一个haconnection对应一个来自slave的请求,也就是说每一个haconnection的保存一个slave的最大偏移量)
@Override
public void run() {HAConnection.log.info(this.getServiceName() + " service started");while (!this.isStoped()) {try {this.selector.select(1000);boolean ok = this.processReadEvent();if (!ok) {HAConnection.log.error("processReadEvent error");break;}// 检测心跳间隔时间,超过则强制断开long interval =HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now()- this.lastReadTimestamp;if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr+ "] expired, " + interval);break;}}catch (Exception e) {HAConnection.log.error(this.getServiceName() + " service has exception.", e);break;}}this.makeStop();// 避免内存泄露haService.removeConnection(HAConnection.this);// 只有读线程需要执行HAConnection.this.haService.getConnectionCount().decrementAndGet();SelectionKey sk = this.socketChannel.keyFor(this.selector);if (sk != null) {sk.cancel();}try {this.selector.close();this.socketChannel.close();}catch (IOException e) {HAConnection.log.error("", e);}HAConnection.log.info(this.getServiceName() + " service end");
}
在得到消息之后,会调用processReadEven()方法对slave的offset数据进行处理。
private boolean processReadEvent() {int readSizeZeroTimes = 0;if (!this.byteBufferRead.hasRemaining()) {this.byteBufferRead.flip();this.processPostion = 0;}while (this.byteBufferRead.hasRemaining()) {try {int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {readSizeZeroTimes = 0;this.lastReadTimestamp =HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();// 接收Slave上传的offsetif ((this.byteBufferRead.position() - this.processPostion) >= 8) {int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);long readOffset = this.byteBufferRead.getLong(pos - 8);this.processPostion = pos;// 处理Slave的请求HAConnection.this.slaveAckOffset = readOffset;if (HAConnection.this.slaveRequestOffset < 0) {HAConnection.this.slaveRequestOffset = readOffset;log.info("slave[" + HAConnection.this.clientAddr + "] request offset "+ readOffset);}// 通知前端线程HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);}}else if (readSize == 0) {if (++readSizeZeroTimes >= 3) {break;}}else {log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");return false;}}catch (IOException e) {log.error("processReadEvent exception", e);return false;}}return true;
}
可以看到这里master会接收来自slave的8字节偏移量,将其保存作为slave所汇报的当前最大偏移量保存,在同步master当中将会尝试唤醒正在等待同步写操作结果的master以便继续下面的操作。
public void notifyTransferSome(final long offset) {for (long value = this.push2SlaveMaxOffset.get(); offset > value;) {boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);if (ok) {this.groupTransferService.notifyTransferSome();break;}else {value = this.push2SlaveMaxOffset.get();}}
}
在notifyTrransferSome()方法当中会判断当前slave报告的偏移量是否已经达到了master的要求,如果达到唤醒前端的groupTransferService线程确认。(同步master)异步master并没有这一步。
至于这里有什么作用,可以稍后再说。
在master前面接收到了slave所报高的最大偏移量之后,将会在writeSocketService将数据传送回给slave。
@Override
public void run() {HAConnection.log.info(this.getServiceName() + " service started");while (!this.isStoped()) {try {this.selector.select(1000);if (-1 == HAConnection.this.slaveRequestOffset) {Thread.sleep(10);continue;}// 第一次传输,需要计算从哪里开始// Slave如果本地没有数据,请求的Offset为0,那么master则从物理文件最后一个文件开始传送数据if (-1 == this.nextTransferFromWhere) {if (0 == HAConnection.this.slaveRequestOffset) {long masterOffset =HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();masterOffset =masterOffset- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMapedFileSizeCommitLog());if (masterOffset < 0) {masterOffset = 0;}this.nextTransferFromWhere = masterOffset;}else {this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;}log.info("master transfer data from " + this.nextTransferFromWhere + " to slave["+ HAConnection.this.clientAddr + "], and slave request "+ HAConnection.this.slaveRequestOffset);}if (this.lastWriteOver) {// 如果长时间没有发消息则尝试发心跳long interval =HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now()- this.lastWriteTimestamp;if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {// 向Slave发送心跳// Build Headerthis.byteBufferHeader.position(0);this.byteBufferHeader.limit(HEADER_SIZE);this.byteBufferHeader.putLong(this.nextTransferFromWhere);this.byteBufferHeader.putInt(0);this.byteBufferHeader.flip();this.lastWriteOver = this.transferData();if (!this.lastWriteOver)continue;}}// 继续传输else {this.lastWriteOver = this.transferData();if (!this.lastWriteOver)continue;}// 传输数据,// selectResult会赋值给this.selectMapedBufferResult,出现异常也会清理掉SelectMapedBufferResult selectResult =HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);if (selectResult != null) {int size = selectResult.getSize();if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {size =HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();}long thisOffset = this.nextTransferFromWhere;this.nextTransferFromWhere += size;selectResult.getByteBuffer().limit(size);this.selectMapedBufferResult = selectResult;// Build Headerthis.byteBufferHeader.position(0);this.byteBufferHeader.limit(HEADER_SIZE);this.byteBufferHeader.putLong(thisOffset);this.byteBufferHeader.putInt(size);this.byteBufferHeader.flip();this.lastWriteOver = this.transferData();}else {// 没有数据,等待通知HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);}}catch (Exception e) {// 只要抛出异常,一般是网络发生错误,连接必须断开,并清理资源HAConnection.log.error(this.getServiceName() + " service has exception.", e);break;}}// 清理资源if (this.selectMapedBufferResult != null) {this.selectMapedBufferResult.release();}this.makeStop();// 避免内存泄露haService.removeConnection(HAConnection.this);SelectionKey sk = this.socketChannel.keyFor(this.selector);if (sk != null) {sk.cancel();}try {this.selector.close();this.socketChannel.close();}catch (IOException e) {HAConnection.log.error("", e);}HAConnection.log.info(this.getServiceName() + " service end");
}
首先会判断该connection所对应的slave是否存有数据,通过判断slave所回报的偏移量可以得出。如果该slave没有数据,将会直接从最后一个物理文件的最后一个开始传输数据。否则,将会从上一次master所接受到的偏移量作为开始传输的起点。
如果该master长时间没有与该slave发送过消息,将会先发送一次心跳消息。
然后通过transferData()方法传输心跳数据。
之后将会根据计算物理文件的大小与准备发送数据的偏移量得到具体要传送消息的位置。
public SelectMapedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {int mapedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();MapedFile mapedFile = this.mapedFileQueue.findMapedFileByOffset(offset, returnFirstOnNotFound);if (mapedFile != null) {int pos = (int) (offset % mapedFileSize);SelectMapedBufferResult result = mapedFile.selectMapedBuffer(pos);return result;}return null;
}
这样具体要传送的消息得到,并且根据要传送的数据大小得出下次要发送的消息偏移量位置,可以传送给slave了。如果这里并没有取到数据,readSocketService则将会等到直到唤醒。
这个时候我们又可以回到slave的haclient的run()方法,看到得到消息master回复之后,将会调用processReadEvent()方法来对消息进行存储。
private boolean processReadEvent() {int readSizeZeroTimes = 0;while (this.byteBufferRead.hasRemaining()) {try {int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();readSizeZeroTimes = 0;boolean result = this.dispatchReadRequest();if (!result) {log.error("HAClient, dispatchReadRequest error");return false;}}else if (readSize == 0) {if (++readSizeZeroTimes >= 3) {break;}}else {// TODO ERRORlog.info("HAClient, processReadEvent read socket < 0");return false;}}catch (IOException e) {log.info("HAClient, processReadEvent read socket exception", e);return false;}}return true;
}
这里在得到master的消息之后调用dispatchReadRequest()方法,解析数据。
private boolean dispatchReadRequest() {final int MSG_HEADER_SIZE = 8 + 4; // phyoffset + sizeint readSocketPos = this.byteBufferRead.position();while (true) {int diff = this.byteBufferRead.position() - this.dispatchPostion;if (diff >= MSG_HEADER_SIZE) {long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();// 发生重大错误if (slavePhyOffset != 0) {if (slavePhyOffset != masterPhyOffset) {log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "+ slavePhyOffset + " MASTER: " + masterPhyOffset);return false;}}// 可以凑够一个请求if (diff >= (MSG_HEADER_SIZE + bodySize)) {byte[] bodyData = new byte[bodySize];this.byteBufferRead.position(this.dispatchPostion + MSG_HEADER_SIZE);this.byteBufferRead.get(bodyData);// TODO 结果是否需要处理,暂时不处理HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);this.byteBufferRead.position(readSocketPos);this.dispatchPostion += MSG_HEADER_SIZE + bodySize;if (!reportSlaveMaxOffsetPlus()) {return false;}continue;}}if (!this.byteBufferRead.hasRemaining()) {this.reallocateByteBuffer();}break;}return true;
}
在这里可见消息的头数据有12字节,首先判断该消息是否超过了12字节。前八字节保存消息的偏移量,后四字节保存消息的具体size。当消息头开始解析完,则调用appendToCommitLog将消息写入commitLog。
public boolean appendMessage(final byte[] data) {int currentPos = this.wrotePostion.get();// 表示有空余空间if ((currentPos + data.length) <= this.fileSize) {ByteBuffer byteBuffer = this.mappedByteBuffer.slice();byteBuffer.position(currentPos);byteBuffer.put(data);this.wrotePostion.addAndGet(data.length);return true;}return false;
}
最后在commitLog就是将bodyData通过字节流写入具体的物理文件内,更新新的文件写入位置,以便之后计算下一次更新偏移量大小。
Rocketmq的主从复制就是以上流程。
RocketMQ源码解析-Broker的HA实现相关推荐
- RocketMQ源码解析-Broker部分之Broker启动过程
目录 broker启动流程 broker启动可配置参数 启动入口`BrokerStartup` 1.创建brokerController 2.`BrokerController`构造函数 3.Brok ...
- RocketMQ源码解析-Broker的消息存储
Broker接收得到的来自provider的消息在sendMessageProcessor的sendMessage()方法当中处理. 在sendMessage()方法当中会直接将所接收得到的消息封装为 ...
- 6、RocketMQ 源码解析之 Broker 启动(上)
上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...
- RocketMQ源码解析之broker文件清理
原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...
- RocketMQ源码(十)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析
深入的介绍了broker的消息刷盘服务源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者是异步: 1. 同步刷盘:如上图所示,只有消息真正 ...
- RocketMQ源码(12)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析【一万字】
基于RocketMQ release-4.9.3,深入的介绍了Broker 的消息刷盘源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者异 ...
- RocketMQ源码(十七)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码
转载来源: RocketMQ源码(19)-Broker处理DefaultMQPushConsumer发起的拉取消息请求源码[一万字]_刘Java的博客-CSDN博客 此前我们学习了RocketMQ源码 ...
- RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic
此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...
- RocketMQ源码(4)—Broker启动加载消息文件以及恢复数据源码【一万字】
详细介绍了Broker启动加载消息文件以及恢复数据源码. 此前我们学习了Broker的启动源码:RocketMQ源码(3)-Broker启动流程源码解析[一万字],Broker的启动过程中,在Defa ...
最新文章
- 怎么提高单片机编程水平?
- 谷歌宣布在北京成立AI中国中心:李飞飞和李佳共同领导
- vbs结束进程代码_物联网学习教程—Linux系统编程之进程控制
- Xcode环境变量,Build Settings参数
- linux的gromacs模拟分子运动,分子动力学技术交流---gromacsamber
- 页面上有两个元素id相同,js中如何取值
- 3级调度 fpga_FPGA的软核、硬核、固核
- ios realm 文件_关于ios:具有后台进程的Realm实例会丢失数据
- Android开发之Handler
- DCL双检查锁机制实现线程安全的单例设计模式
- AndroidStudio安卓原生开发_android.view.WindowManager$BadTokenException: Unable to add---Android原生开发工作笔记129
- 小心SAP环境中的8大安全错误!快来对照、改正!
- 网站在微信中提示从浏览器打开
- 干净卸载sqlserver2019 亲测有效!
- 双系统移动硬盘备份方案:macOS Catalina 10.15.5, Win10
- SAP QM 特性导出及定性定量
- 全网最完整金融时间序列模型+动态模型
- 理解java接口和抽象类
- H5 css标签选择器
- 计算机软件系统崩溃,系统崩溃了怎么办 如何快速还原崩溃的系统【步骤方法】...