1、组件结构图

HAService:主从同步入口类

AcceptSockerService:HA Master端监听客户端连接实现类

GroupTransferService:主从同步通知实现类

HAClient:HA Client端实现类

HAConnection:HA Master服务端HA连接对象的封装,与Broker从服务器的网络读写交互

ReadSocketService:HA Master网络读实现类

WriteSocketService:HA Master网络写实现类

2、原理

主服务器启动,并在特定端口上监听从服务器的连接,从服务器主动连接主服务器,主服务器接收客户端的连接,并建立相关TCP连接。从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器。从服务器保存消息并继续发送新的消息同步请求。

2.1 启动

监听从服务器的连接。启动socket监听线程及组数据转移线程,启动ha客户端。

public void start() throws Exception {this.acceptSocketService.beginAccept();this.acceptSocketService.start();this.groupTransferService.start();this.haClient.start();}

2.2 AccpetSocketServcie

socketAddressListen:监听套接字

serverSocketChannel:服务端Socket通道

selector:事件选择器

2.2.1 beginAccept

public void beginAccept() throws Exception {this.serverSocketChannel = ServerSocketChannel.open();this.selector = RemotingUtil.openSelector();this.serverSocketChannel.socket().setReuseAddress(true);this.serverSocketChannel.socket().bind(this.socketAddressListen);this.serverSocketChannel.configureBlocking(false);this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);}
创建ServerSocketChannel,selector,设置地址重用,绑定监听器端口,设置为非阻塞,注册ACCEPT事件

2.2.2 run

public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.selector.select(1000);Set<SelectionKey> selected = this.selector.selectedKeys();if (selected != null) {for (SelectionKey k : selected) {if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();if (sc != null) {HAService.log.info("HAService receive new connection, "+ sc.socket().getRemoteSocketAddress());try {HAConnection conn = new HAConnection(HAService.this, sc);conn.start();HAService.this.addConnection(conn);} catch (Exception e) {log.error("new HAConnection exception", e);sc.close();}}} else {log.warn("Unexpected ops in select " + k.readyOps());}}selected.clear();}} catch (Exception e) {log.error(this.getServiceName() + " service has exception.", e);}}log.info(this.getServiceName() + " service end");}

选择器每1s处理一次连接就绪事件,连接事件就绪后,调用ServerSocketChannel的accept方法创建SocketChannel。然后为每个连接创建一个HAConnection对象,负责主从数据同步。

2.3 GroupTransferService

2.3.1 putRequest

public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {synchronized (this.requestsWrite) {this.requestsWrite.add(request);}this.wakeup();}

将请求放入到requestWrite列表中,同时唤醒当前线程

2.3.2 run

public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.waitForRunning(10);this.doWaitTransfer();} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info(this.getServiceName() + " service end");}

每隔10ms,执行等待复制完成。如果push2SlaveMaxOffset大于等于请求的偏移,则说明复制已经完成。如果小于并且当前时间差小于存储配置的同步刷新超时时间,则等待直到超时或者复制完成。

private void doWaitTransfer() {synchronized (this.requestsRead) {if (!this.requestsRead.isEmpty()) {for (CommitLog.GroupCommitRequest req : this.requestsRead) {boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {this.notifyTransferObject.waitForRunning(1000);transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();}if (!transferOK) {log.warn("transfer messsage to slave timeout, " + req.getNextOffset());}req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}this.requestsRead.clear();}}}

2.4 HAClient

masterAddress:Master broker的地址

reportOffset:从向主发起主从同步的拉取偏移量

socketChannel:网络传输通道

selector:NIO事件选择器

lastWriteTimestamp:上次上报是写入传输通道的时间

currentReportedOffset:反馈slave当前的复制进度,commitlog文件最大偏移量

dipatchPosition:本次已处理读缓存的指针

byteBufferRead:读缓存区,大小为4M

byteBufferBackup:读缓存区备份,与byteBufferRead进行交换。

2.4.1 connectMaster

private boolean connectMaster() throws ClosedChannelException {if (null == socketChannel) {String addr = this.masterAddress.get();if (addr != null) {SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);if (socketAddress != null) {this.socketChannel = RemotingUtil.connect(socketAddress);if (this.socketChannel != null) {this.socketChannel.register(this.selector, SelectionKey.OP_READ);}}}this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();this.lastWriteTimestamp = System.currentTimeMillis();}return this.socketChannel != null;}

slave服务器连接master服务器,如果socketChannel为空,则尝试连接master。如果master地址为空,则返回false。如果master地址不为空,则建立到master的TCP连接,然后注册OP_READ,初始化currentReportedOffset为commitlog文件的最大偏移量,lastWriteTimestamp为当前时间戳。

2.4.2 isTimeToReportOffset

private boolean isTimeToReportOffset() {long interval =HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();return needHeart;}
判断是否需要向master反馈当前待拉取偏移量,master与slave的ha心跳发送间隔为5s。

2.4.3 reportSlaveMaxOffset

private boolean reportSlaveMaxOffset(final long maxOffset) {this.reportOffset.position(0);this.reportOffset.limit(8);this.reportOffset.putLong(maxOffset);this.reportOffset.position(0);this.reportOffset.limit(8);for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {try {this.socketChannel.write(this.reportOffset);} catch (IOException e) {log.error(this.getServiceName()+ "reportSlaveMaxOffset this.socketChannel.write exception", e);return false;}}lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();return !this.reportOffset.hasRemaining();}

向master服务器反馈拉取偏移量。

2.4.4 processReadEvent

private boolean processReadEvent() {int readSizeZeroTimes = 0;while (this.byteBufferRead.hasRemaining()) {try {int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {readSizeZeroTimes = 0;boolean result = this.dispatchReadRequest();if (!result) {log.error("HAClient, dispatchReadRequest error");return false;}} else if (readSize == 0) {if (++readSizeZeroTimes >= 3) {break;}} else {log.info("HAClient, processReadEvent read socket < 0");return false;}} catch (IOException e) {log.info("HAClient, processReadEvent read socket exception", e);return false;}}return true;}

处理网络请求,从master服务器传加的消息数据,判断readByteBuffer是否有剩余空间,如果有,则调用read,将通道中的数据读入到缓存区中。如果读取到的字节数大于0,重置读取到0字节的次数,调用dispatchReadRequest将读取到的所有消息全部追加到消息内存映射文件 中,然后再次返回拉取进度给服务器。如果连续3次从网络通道读取到0个字节,则结束本次读,返回true。如果读取到的字节数小于0或发生io异常,则返回false.

2.5 HAConnection

slaveRequestOffset:从服务器请求拉取数据的偏移量

slaveAckOffset:从服务器反馈已拉取完成的数据偏移量。

byteBufferRead:网络读写缓存区,默认为1M

processPosition:当前处理指针

lastReadTimestamp:上次读取数据的时间戳。

RocketMQ中的主从复制相关推荐

  1. 如何在MySQL中设置主从复制

    原作者:Etel Sverdlov 转载&翻译:https://www.digitalocean.com/community/tutorials/how-to-set-up-master-sl ...

  2. RocketMQ中消息的优先级

    2019独角兽企业重金招聘Python工程师标准>>> 本文主要介绍RocketMQ中消息优先级的设置方法. 一.JMS中JMSPriority JMSPriority 头字段包含了 ...

  3. 深入理解RocketMQ中的NameServer

    本文来说下RocketMQ中的NameServer 文章目录 NameServer介绍 NameServer的作用 为什么要使用NameServer NameServer如何保证数据的最终一致 路由注 ...

  4. mysql 1593_Linux中MySQL主从复制中出现1593错误码的低级错误

    主从复制小编介绍过不少的文章了,但在使用过程中经常会有一些问题出现了,今天我们来看关于Linux中MySQL主从复制中出现1593错误码的低级错误问题解决办法. 今天测试shell脚本自动配置MySQ ...

  5. RocketMQ中的消息类型种类(二)

    消息种类 按照发送的特点分 同步消息 异步消息 单向消息 按照使用功能特点分 顺序消息 广播模式 延迟消息 批量消息 过滤消息 事务消息 按照发送的特点分 同步消息 同步发送是指消息发送方发出数据后, ...

  6. RocketMQ 中Topic、Tag、GroupName基本概念介绍

    本文主要介绍RocketMQ中Topic.Tag.GroupName的概念.设计初衷以及使用方法. 一.Topic 首先看看官方的定义: Topic是生产者在发送消息和消费者在拉取消息的类别.Topi ...

  7. RocketMQ中主从复制

    生产者把消息发送到master,不会发送到slave 消费者可以从maste也可以从slave消费消息 如果master挂了,那就从slave消费数据,那么slave怎么拿到master中的数据 这就 ...

  8. rocketmq中的broker设计与实现

    1.broker启动 启动逻辑在BrokerStartup和BrokerController中. 监听端口是1091. 默认存储目录是System.getProperty("user.hom ...

  9. rocketmq中producer设计与实现

    1.类层次结构 2.producer的启动 首先设置组,及NameServer,然后调用start启动.启动关键逻辑主要在MQClientInstance中. (1)启动NettyRemotingCl ...

最新文章

  1. 计算机视觉系统学习书籍/综述汇总
  2. shell脚本字符串截取
  3. rocketmq 初探(三)
  4. AIX HACMP集群切换测试实际案例解析
  5. HTTP相关知识的总结
  6. Hurst exponent(赫斯特指数)代码与R/S值计算——python
  7. sql2005通用分页存储过程
  8. 关于bacula网络备份软件的安装以及配置1
  9. iOS最新验证电话号码与手机号码的正则方法
  10. 我的凸优化学习之路(转)
  11. [LeetCode]Patching Array
  12. iPhone 二手手机到底去哪了
  13. 腾讯云 短信验证码 php,php腾讯云短信验证码
  14. 基于数字证书的windows安全登录
  15. win10计算机打印机共享怎么设置方法,Win10系统怎么设置打印机共享?Win10系统打印机共享设置教程...
  16. 遗传算法之扇贝的进化(python代码实现)
  17. csdn博客使用反馈,bug
  18. @程序员,对抗 996,你真的准备好了吗?| 文末有彩蛋
  19. 动态规划之子序列以及子数组类型的问题
  20. 华为鲲鹏HCIA考试-练习08

热门文章

  1. 3句话概括 PUT/POST 的区别
  2. SVN账号重新指定方法
  3. tomcat cpu占用过高,系统负载高问题跟踪
  4. Linux常用命令速查备忘(包括我)
  5. lucene.net 某些类的介绍
  6. python流程控制-实战案例手把手教你Python流程控制技巧
  7. python菜鸟工具-第一行Python代码之菜鸟逃离记
  8. python能做表格吗-python可以用来做excel吗
  9. python代码示例下载-Python下载网易云歌单歌曲的示例代码
  10. 如何用python画数据图-用Python如何画出数据可视化图呢?本文详解