zookeeper leader和learner的数据同步
数据同步
在leader和follower启动期交互过程中,我们分析到整个集群完成leader选举后,learner会向leader服务器进行注册,当过半的learner服务器向leader完成注册后,就进入数据同步环节。简单讲,数据同步过程就是leader服务器将那些没有在learner服务器上提交过的事务请求同步给learner服务器。
获取Learner状态
在注册learner的最后阶段,learner服务器会发送给leader服务器一个ACKEPOCH数据包,leader会从这个数据包中解析出该learner的currentEpoch和lastZxid。
LearnerHandler.java
QuorumPacket ackEpochPacket = new QuorumPacket();ia.readRecord(ackEpochPacket, "packet");if (ackEpochPacket.getType() != Leader.ACKEPOCH) {LOG.error(ackEpochPacket.toString()+ " is not ACKEPOCH");return;}ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());leader.waitForEpochAck(this.getSid(), ss);}//从这个数据包中解析出该learner的currentEpoch和lastZxidpeerLastZxid = ss.getLastZxid();
数据同步初始化
首先,leader会从zookeeper的内存数据库中提取出事务请求对应的提议缓存队列:proposals,同时完成对以下三个ZXID值的初始化。
- peerLastZxid:该learner服务器最后处理的ZXID。
- minCommittedLog:leader服务器提议缓存队列committedLog中的最小ZXID。
maxCommittedLog:leader服务器提议缓存队列committedLog中的最大ZXID。
LearnerHandler.java 接上面代码
peerLastZxid = ss.getLastZxid();//默认发送一个SNAP包,要求follower同步数据int packetToSend = Leader.SNAP;long zxidToSend = 0;long leaderLastZxid = 0;/** the packets that the follower needs to get updates from **/long updates = peerLastZxid;ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();ReadLock rl = lock.readLock();try {rl.lock(); final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();LOG.info("Synchronizing with Follower sid: " + sid+" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)+" minCommittedLog=0x"+Long.toHexString(minCommittedLog)+" peerLastZxid=0x"+Long.toHexString(peerLastZxid));LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
数据同步通常分为四类,分别是直接差异化同步(DIFF同步),先回滚在差异化同步(TRUNC+DIFF同步),仅回滚同步(TRUNC同步)和全量同步(SNAP同步)。会根据leader和learner服务器间的数据差异情况来决定最终的数据同步方式。
直接差异化同步
场景:peerLastZxid介于minCommittedLog和maxCommittedLog间。
leader首先会向这个learner发送一个DIFF指令,用于通知“learner即将把一些proposal同步给自己”。实际同步过程中,针对每个proposal,leader都会通过发送两个数据包来完成,分别是PROPOSAL内容数据包和COMMIT指令数据包——这和zookeeper运行时leader和follower间的事务请求的提交过程是一致的。
举例,某时刻leader的提议缓存队列对应的ZXID依次是:
0x500000001,0x500000002,0x500000003,0x500000004,0x500000005
而learner最后处理的ZXID为0x500000003,于是leader依次将0x500000004和0x500000005两个提议同步给learner。
先回滚在差异化同步
场景:A,B,C三台机器,某一时刻B是leader,此时leader_epoch为5,同时当前已被集群大部分机器都提交的ZXID包括:0x500000001,0x500000002。此时leader正处理ZXID:0x500000003,并且已经将事务写入到了leader本地的事务日志中去——就在leader恰好要将该proposal发给其他follower进行投票时,leader挂了,proposal没被同步出去。此时集群进行新一轮leader选举,假设此次选的leader为A,leader_epoch变更为6,之后A和C又提交了0x600000001,0x600000002两个事务。此时B再次启动并开始数据同步。
简单讲,上面场景就是leader在已经将事务记录到本地事务日志中,但没有成功发起proposal流程时就挂了。
当leader发现某个learner包含一条自己没的事务记录,就让该learner进行事务回滚——回滚到leader上存在的,最接近peerLastZxid的ZXID,上面例子中leader会让learner回滚到ZXID为0x500000002的事务记录。
仅回滚同步
场景:peerLastZxid大于maxCommittedLog
这种场景就是上述先回滚再差异化同步的简化模式,leader会要求learner回滚到ZXID值为maxCommitedLog对应的事务操作。
全量同步(SNAP同步)
场景1:peerLastZxid小于minCommittedLog
场景2:leader上没有提议缓存队列,peerLastZxid不等于lastProcessedZxid(leader服务器数据恢复后得到的最大ZXID)
这两种场景下,只能进行全量同步。leader首先向learner发送一个SNAP指令,通知learner进行全量同步,随后leader会从内存数据库中获取到全量的数据节点和会话超时时间记录器,将它们序列化后传输给learner,learner接收到后对其反序列化后再入内存数据库中。
同步代码
//看看是否还有需要投的票 LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
//如果有,则处理这些投票 if (proposals.size() != 0) { //如果follower还没处理这个分布式事务,有可能down掉了又恢复,则继续处理这个事务 if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) { ....... // If we are here, we can use committedLog to sync with // follower. Then we only need to decide whether to // send trunc or not packetToSend = Leader.DIFF; zxidToSend = maxCommittedLog; for (Proposal propose: proposals) { // skip the proposals the peer already has //这个已经被处理过了,无视 if (propose.packet.getZxid() <= peerLastZxid) { prevProposalZxid = propose.packet.getZxid(); continue; } else { // If we are sending the first packet, figure out whether to trunc // in case the follower has some proposals that the leader doesn't //发第一个事务之前先确认folloer是否比leader超前 if (firstPacket) { firstPacket = false; // Does the peer have some proposals that the leader hasn't seen yet //follower处理事务比leader多,则发送TRUNC包,让follower回滚到和leader一致 if (prevProposalZxid < peerLastZxid) { // send a trunc message before sending the diff packetToSend = Leader.TRUNC; zxidToSend = prevProposalZxid; updates = zxidToSend; } } //将事务发送到队列 queuePacket(propose.packet); //立马接一个COMMIT包 QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(), null, null); queuePacket(qcommit); } } } //如果follower超前了,则发送TRUNC包,让其和leader同步 else if (peerLastZxid > maxCommittedLog) { LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}", Long.toHexString(maxCommittedLog), Long.toHexString(updates)); packetToSend = Leader.TRUNC; zxidToSend = maxCommittedLog; updates = zxidToSend; } else { LOG.warn("Unhandled proposal scenario"); } }
//如果follower和leader同步,则发送DIFF包,而不需要follower拉数据
else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) { ..... packetToSend = Leader.DIFF; zxidToSend = peerLastZxid; .......
//NEWLEADER包添加到发送队列 QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, ZxidUtils.makeZxid(newEpoch, 0), null, null); if (getVersion() < 0x10000) { oa.writeRecord(newLeaderQP, "packet"); } else { queuedPackets.add(newLeaderQP); } bufferedOutput.flush(); //Need to set the zxidToSend to the latest zxid if (packetToSend == Leader.SNAP) { zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); } //发送一个DIFF或SNAP包 oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet"); bufferedOutput.flush(); ...... // Start sending packets //启动一个异步发送线程 new Thread() { public void run() { Thread.currentThread().setName( "Sender-" + sock.getRemoteSocketAddress()); try { sendPackets(); } catch (InterruptedException e) { LOG.warn("Unexpected interruption",e); } } }.start(); /* * Have to wait for the first ACK, wait until * the leader is ready, and only then we can * start processing messages. */ //等待follower确认 qp = new QuorumPacket(); ia.readRecord(qp, "packet");
收尾阶段
leader在完成完差异数据后,就会将该learner加入到forwardingFollowers或observingLearners队列中,这俩队列在运行期间的事务请求处理过程中会使用到。随后leader发送一个NEWLEADER指令,用于通知learner已经将提议缓存队列中的proposal都同步给自己了。
当learner收到leader的NEWLEADER指令后会反馈给leader一个ack消息,表明自己完成了对提议缓存队列中proposal的同步。
leader收到来自learner的ack后,进入“过半策略”等待阶段,知道集群中有过半的learner机器响应了leader这个ack消息。
一旦满足“过半策略”后,leader会向所有已完成数据同步的learner发送一个UPTODATE指令,用来通知learner已完成数据同步,同时集群已有过半机器完成同步,集群已具有对外服务的能力了。
learner在收到leader的UPTODATE指令后,会终止数据同步流程,然后向leader再反馈一个ACK消息。
zookeeper leader和learner的数据同步相关推荐
- Apache ZooKeeper - ZooKeeper 集群中 Leader 与 Follower 的数据同步策略
文章目录 流程图 why ? How ? 何时触发数据同步的机制? 同步哪些数据 同步方式 DIFF 同步 TRUNC+DIFF 同步 TRUNC 同步 SNAP 同步 同步后的处理 源码分析 流程图 ...
- 不懂就问:ZooKeeper 集群如何进行数据同步?
本文作者:HelloGitHub-老荀 Hi,这里是 HelloGitHub 推出的 HelloZooKeeper 系列,免费开源.有趣.入门级的 ZooKeeper 教程,面向有编程基础的新手. 项 ...
- Apache ZooKeeper - Leader 选举 如何保证分布式数据的一致性
文章目录 Pre 流程图 Leader 的协调过程 ZK 是如何实现的 广播模式 恢复模式 源码实现 小结 Pre Apache ZooKeeper - 选举Leader源码流程深度解析 在 ZooK ...
- 集群没有leader_ZooKeeper 集群中 Leader 与 Follower 的4种数据同步策略
首先要声明一点,zk集群中,leader服务器有着比较重要的存在,Follower 服务器只是处理非事务性请求,leader服务器主要负责事务性请求,Follower 服务器在遇到事务性请求以后还是会 ...
- 18_clickhouse副本同步与高可用功能验证,分布式表与集群配置,数据副本与复制表,ZooKeeper整合,创建复制表,副本同步机制,数据原子写入与去重,负载平衡策略,案例(学习笔记)
24.副本同步与高可用功能验证 24.1.分布式表与集群配置 24.2.数据副本与复制表 24.3.ZooKeeper整合 24.4.创建复制表 24.5.副本同步机制 24.6.数据原子写入与去重 ...
- zookeeper的设计猜想-数据同步
接着上面那个结论再来思考,如果要满足这样的一个高性能集群,我们最直观的想法应该是,每个节点都能接收到请求,并且每个节点的数据都必须要保持一致.要实现各个节点的数据一致性,就势必要一个leader节点负 ...
- Apache ShenYu源码阅读系列-基于ZooKeeper的数据同步
Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关. 在ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中.Apac ...
- 根据已经commit的数据,进行leader和peon之间的同步
Leader Election基本设计 按照rank表示优先级解决冲突问题,为每个monitor预先分配了一个rank 只会接受优先级(rank)比自己高.epoch比上次已接受的epoch大的选举请 ...
- soul_admin之使用zookeeper数据同步
soul-admin修改成zookeeper的数据同步方式 参考的是项目文档https://dromara.org/zh-cn/docs/soul/user-dataSync.html 记得先启动zo ...
- TiDB 部署及数据同步
简介 TiDB 是 PingCAP 公司受 Google Spanner / F1 论文启发而设计的开源分布式 HTAP (Hybrid Transactional and Analytical Pr ...
最新文章
- laravel5.7的redis配置,一直报错Class 'Predis\Client' not found
- web项目不想放在tomcat的webapps目录下的
- Linux Shell常用技巧(二)
- 在centos6.7用yum安装redis
- Win7系统中必需记住的14个常用快捷键
- 一个情怀引发的生产事故
- 微服务和Java EE
- 任务调度及远端管理(基于Quartz.net)
- 自定义控件-----输入框
- 第三周课程总结&实验报告一
- python数字转中文大写_阿拉伯数字转换成大写汉字的Python代码
- 大型体检系统源码,PEIS医院体检管理系统源码
- 阿里云部署RSSHub踩坑笔记
- win10找不到网络里的计算机,Win10专业版找不到网络中的其他电脑
- 设计一个形状类(接口)Shape,方法:求周长和求面积形状类(接口)的子类(实现类):
- tmdb电影票房_TMDb Vue.js应用程序:电影数据库应用程序
- 怎么在oracle里执行sql语句,在Oracle中执行动态SQL的几种方法
- PTA 线性表 7-1 约瑟夫环(Josephus)问题(by Yan) (100分) 按出列次序输出每个人的编号
- 呼吸机缺关键零件,意大利小哥用3D打印救命!面临起诉风险,网友:意版“药神”?
- win10一键重装系统软件哪个好呢?