数据同步

在leader和follower启动期交互过程中,我们分析到整个集群完成leader选举后,learner会向leader服务器进行注册,当过半的learner服务器向leader完成注册后,就进入数据同步环节。简单讲,数据同步过程就是leader服务器将那些没有在learner服务器上提交过的事务请求同步给learner服务器。

获取Learner状态

在注册learner的最后阶段,learner服务器会发送给leader服务器一个ACKEPOCH数据包,leader会从这个数据包中解析出该learner的currentEpoch和lastZxid。
LearnerHandler.java

                QuorumPacket ackEpochPacket = new QuorumPacket();ia.readRecord(ackEpochPacket, "packet");if (ackEpochPacket.getType() != Leader.ACKEPOCH) {LOG.error(ackEpochPacket.toString()+ " is not ACKEPOCH");return;}ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());leader.waitForEpochAck(this.getSid(), ss);}//从这个数据包中解析出该learner的currentEpoch和lastZxidpeerLastZxid = ss.getLastZxid();

数据同步初始化

首先,leader会从zookeeper的内存数据库中提取出事务请求对应的提议缓存队列:proposals,同时完成对以下三个ZXID值的初始化。

  • peerLastZxid:该learner服务器最后处理的ZXID。
  • minCommittedLog:leader服务器提议缓存队列committedLog中的最小ZXID。
  • maxCommittedLog:leader服务器提议缓存队列committedLog中的最大ZXID。

    LearnerHandler.java 接上面代码

            peerLastZxid = ss.getLastZxid();//默认发送一个SNAP包,要求follower同步数据int packetToSend = Leader.SNAP;long zxidToSend = 0;long leaderLastZxid = 0;/** the packets that the follower needs to get updates from **/long updates = peerLastZxid;ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();ReadLock rl = lock.readLock();try {rl.lock();        final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();LOG.info("Synchronizing with Follower sid: " + sid+" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)+" minCommittedLog=0x"+Long.toHexString(minCommittedLog)+" peerLastZxid=0x"+Long.toHexString(peerLastZxid));LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();

数据同步通常分为四类,分别是直接差异化同步(DIFF同步),先回滚在差异化同步(TRUNC+DIFF同步),仅回滚同步(TRUNC同步)和全量同步(SNAP同步)。会根据leader和learner服务器间的数据差异情况来决定最终的数据同步方式。

直接差异化同步

场景:peerLastZxid介于minCommittedLog和maxCommittedLog间。
leader首先会向这个learner发送一个DIFF指令,用于通知“learner即将把一些proposal同步给自己”。实际同步过程中,针对每个proposal,leader都会通过发送两个数据包来完成,分别是PROPOSAL内容数据包和COMMIT指令数据包——这和zookeeper运行时leader和follower间的事务请求的提交过程是一致的。
举例,某时刻leader的提议缓存队列对应的ZXID依次是:
0x500000001,0x500000002,0x500000003,0x500000004,0x500000005
而learner最后处理的ZXID为0x500000003,于是leader依次将0x500000004和0x500000005两个提议同步给learner。

先回滚在差异化同步

场景:A,B,C三台机器,某一时刻B是leader,此时leader_epoch为5,同时当前已被集群大部分机器都提交的ZXID包括:0x500000001,0x500000002。此时leader正处理ZXID:0x500000003,并且已经将事务写入到了leader本地的事务日志中去——就在leader恰好要将该proposal发给其他follower进行投票时,leader挂了,proposal没被同步出去。此时集群进行新一轮leader选举,假设此次选的leader为A,leader_epoch变更为6,之后A和C又提交了0x600000001,0x600000002两个事务。此时B再次启动并开始数据同步。
简单讲,上面场景就是leader在已经将事务记录到本地事务日志中,但没有成功发起proposal流程时就挂了。
当leader发现某个learner包含一条自己没的事务记录,就让该learner进行事务回滚——回滚到leader上存在的,最接近peerLastZxid的ZXID,上面例子中leader会让learner回滚到ZXID为0x500000002的事务记录。

仅回滚同步

场景:peerLastZxid大于maxCommittedLog
这种场景就是上述先回滚再差异化同步的简化模式,leader会要求learner回滚到ZXID值为maxCommitedLog对应的事务操作。

全量同步(SNAP同步)

场景1:peerLastZxid小于minCommittedLog
场景2:leader上没有提议缓存队列,peerLastZxid不等于lastProcessedZxid(leader服务器数据恢复后得到的最大ZXID)

这两种场景下,只能进行全量同步。leader首先向learner发送一个SNAP指令,通知learner进行全量同步,随后leader会从内存数据库中获取到全量的数据节点和会话超时时间记录器,将它们序列化后传输给learner,learner接收到后对其反序列化后再入内存数据库中。

同步代码

//看看是否还有需要投的票  LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
//如果有,则处理这些投票  if (proposals.size() != 0) {  //如果follower还没处理这个分布式事务,有可能down掉了又恢复,则继续处理这个事务  if ((maxCommittedLog >= peerLastZxid)  && (minCommittedLog <= peerLastZxid)) {  .......  // If we are here, we can use committedLog to sync with  // follower. Then we only need to decide whether to  // send trunc or not  packetToSend = Leader.DIFF;  zxidToSend = maxCommittedLog;  for (Proposal propose: proposals) {  // skip the proposals the peer already has  //这个已经被处理过了,无视  if (propose.packet.getZxid() <= peerLastZxid) {  prevProposalZxid = propose.packet.getZxid();  continue;  } else {  // If we are sending the first packet, figure out whether to trunc  // in case the follower has some proposals that the leader doesn't  //发第一个事务之前先确认folloer是否比leader超前  if (firstPacket) {  firstPacket = false;  // Does the peer have some proposals that the leader hasn't seen yet  //follower处理事务比leader多,则发送TRUNC包,让follower回滚到和leader一致  if (prevProposalZxid < peerLastZxid) {  // send a trunc message before sending the diff  packetToSend = Leader.TRUNC;                                          zxidToSend = prevProposalZxid;  updates = zxidToSend;  }  }  //将事务发送到队列  queuePacket(propose.packet);  //立马接一个COMMIT包  QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),  null, null);  queuePacket(qcommit);  }  }  }   //如果follower超前了,则发送TRUNC包,让其和leader同步  else if (peerLastZxid > maxCommittedLog) {  LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",  Long.toHexString(maxCommittedLog),  Long.toHexString(updates));  packetToSend = Leader.TRUNC;  zxidToSend = maxCommittedLog;  updates = zxidToSend;  } else {  LOG.warn("Unhandled proposal scenario");  }  }
//如果follower和leader同步,则发送DIFF包,而不需要follower拉数据
else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {  .....  packetToSend = Leader.DIFF;  zxidToSend = peerLastZxid;  .......
//NEWLEADER包添加到发送队列  QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,  ZxidUtils.makeZxid(newEpoch, 0), null, null);  if (getVersion() < 0x10000) {  oa.writeRecord(newLeaderQP, "packet");  } else {  queuedPackets.add(newLeaderQP);  }  bufferedOutput.flush();  //Need to set the zxidToSend to the latest zxid  if (packetToSend == Leader.SNAP) {  zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();  }  //发送一个DIFF或SNAP包  oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");  bufferedOutput.flush();  ......  // Start sending packets  //启动一个异步发送线程  new Thread() {  public void run() {  Thread.currentThread().setName(  "Sender-" + sock.getRemoteSocketAddress());  try {  sendPackets();  } catch (InterruptedException e) {  LOG.warn("Unexpected interruption",e);  }  }  }.start();  /* * Have to wait for the first ACK, wait until  * the leader is ready, and only then we can * start processing messages. */  //等待follower确认  qp = new QuorumPacket();  ia.readRecord(qp, "packet");

收尾阶段

leader在完成完差异数据后,就会将该learner加入到forwardingFollowers或observingLearners队列中,这俩队列在运行期间的事务请求处理过程中会使用到。随后leader发送一个NEWLEADER指令,用于通知learner已经将提议缓存队列中的proposal都同步给自己了。
当learner收到leader的NEWLEADER指令后会反馈给leader一个ack消息,表明自己完成了对提议缓存队列中proposal的同步。

leader收到来自learner的ack后,进入“过半策略”等待阶段,知道集群中有过半的learner机器响应了leader这个ack消息。
一旦满足“过半策略”后,leader会向所有已完成数据同步的learner发送一个UPTODATE指令,用来通知learner已完成数据同步,同时集群已有过半机器完成同步,集群已具有对外服务的能力了。

learner在收到leader的UPTODATE指令后,会终止数据同步流程,然后向leader再反馈一个ACK消息。

zookeeper leader和learner的数据同步相关推荐

  1. Apache ZooKeeper - ZooKeeper 集群中 Leader 与 Follower 的数据同步策略

    文章目录 流程图 why ? How ? 何时触发数据同步的机制? 同步哪些数据 同步方式 DIFF 同步 TRUNC+DIFF 同步 TRUNC 同步 SNAP 同步 同步后的处理 源码分析 流程图 ...

  2. 不懂就问:ZooKeeper 集群如何进行数据同步?

    本文作者:HelloGitHub-老荀 Hi,这里是 HelloGitHub 推出的 HelloZooKeeper 系列,免费开源.有趣.入门级的 ZooKeeper 教程,面向有编程基础的新手. 项 ...

  3. Apache ZooKeeper - Leader 选举 如何保证分布式数据的一致性

    文章目录 Pre 流程图 Leader 的协调过程 ZK 是如何实现的 广播模式 恢复模式 源码实现 小结 Pre Apache ZooKeeper - 选举Leader源码流程深度解析 在 ZooK ...

  4. 集群没有leader_ZooKeeper 集群中 Leader 与 Follower 的4种数据同步策略

    首先要声明一点,zk集群中,leader服务器有着比较重要的存在,Follower 服务器只是处理非事务性请求,leader服务器主要负责事务性请求,Follower 服务器在遇到事务性请求以后还是会 ...

  5. 18_clickhouse副本同步与高可用功能验证,分布式表与集群配置,数据副本与复制表,ZooKeeper整合,创建复制表,副本同步机制,数据原子写入与去重,负载平衡策略,案例(学习笔记)

    24.副本同步与高可用功能验证 24.1.分布式表与集群配置 24.2.数据副本与复制表 24.3.ZooKeeper整合 24.4.创建复制表 24.5.副本同步机制 24.6.数据原子写入与去重 ...

  6. zookeeper的设计猜想-数据同步

    接着上面那个结论再来思考,如果要满足这样的一个高性能集群,我们最直观的想法应该是,每个节点都能接收到请求,并且每个节点的数据都必须要保持一致.要实现各个节点的数据一致性,就势必要一个leader节点负 ...

  7. Apache ShenYu源码阅读系列-基于ZooKeeper的数据同步

    Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关. 在ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中.Apac ...

  8. 根据已经commit的数据,进行leader和peon之间的同步

    Leader Election基本设计 按照rank表示优先级解决冲突问题,为每个monitor预先分配了一个rank 只会接受优先级(rank)比自己高.epoch比上次已接受的epoch大的选举请 ...

  9. soul_admin之使用zookeeper数据同步

    soul-admin修改成zookeeper的数据同步方式 参考的是项目文档https://dromara.org/zh-cn/docs/soul/user-dataSync.html 记得先启动zo ...

  10. TiDB 部署及数据同步

    简介 TiDB 是 PingCAP 公司受 Google Spanner / F1 论文启发而设计的开源分布式 HTAP (Hybrid Transactional and Analytical Pr ...

最新文章

  1. laravel5.7的redis配置,一直报错Class 'Predis\Client' not found
  2. web项目不想放在tomcat的webapps目录下的
  3. Linux Shell常用技巧(二)
  4. 在centos6.7用yum安装redis
  5. Win7系统中必需记住的14个常用快捷键
  6. 一个情怀引发的生产事故
  7. 微服务和Java EE
  8. 任务调度及远端管理(基于Quartz.net)
  9. 自定义控件-----输入框
  10. 第三周课程总结&实验报告一
  11. python数字转中文大写_阿拉伯数字转换成大写汉字的Python代码
  12. 大型体检系统源码,PEIS医院体检管理系统源码
  13. 阿里云部署RSSHub踩坑笔记
  14. win10找不到网络里的计算机,Win10专业版找不到网络中的其他电脑
  15. 设计一个形状类(接口)Shape,方法:求周长和求面积形状类(接口)的子类(实现类):
  16. tmdb电影票房_TMDb Vue.js应用程序:电影数据库应用程序
  17. 怎么在oracle里执行sql语句,在Oracle中执行动态SQL的几种方法
  18. PTA 线性表 7-1 约瑟夫环(Josephus)问题(by Yan) (100分) 按出列次序输出每个人的编号
  19. 呼吸机缺关键零件,意大利小哥用3D打印救命!面临起诉风险,网友:意版“药神”?
  20. win10一键重装系统软件哪个好呢?

热门文章

  1. IOS点击事件延迟300ms踩坑
  2. iOS 给三方日历加上农历
  3. git 将本地master分支代码提到远程develop分支
  4. 六.爬虫--京东登录破解(二)
  5. 实战丨Web云开发项目—TodoList待办事项
  6. 电脑能正常上网,但是显示无Internet
  7. 如何把vs2003转化成vs2005
  8. RabbitMQ, Kafka和Pulsar (一)
  9. cpython cython_Cython笔记
  10. ❁将xls批量转换成xlsx