概述

从节点参与选举,选举结束,自身不为主,自动成为集群从节点。
从节点,一方面作为集群主节点的从节点,与其交互。
一方面,从节点可以作为集群观察者的主节点,与观察者交互。

与主交互

主干逻辑–followLeader

设置Zab状态为DISCOVERY
寻找主节点
向主节点注册包的类型为Leader.FOLLOWERINFO包的zxid为acceptedEpoch&0一个LearnerInfo对象,包含自身集群id,0x10000,自身集群版本发送包收取回复回复包的zxid中取得newEpoch如果回复包类型为Leader.LEADERINFO协议版本新的newEpoch大于自身acceptedEpoch自身acceptedEpoch采取newEpoch将自身currentEpoch设置到wrappedEpochBytes新的newEpoch和自身acceptedEpoch一致将-1设置到wrappedEpochBytes回复包包的类型Leader.ACKEPOCH自身数据实体得到的lastLoggedZxidepochBytesreturn ZxidUtils.makeZxid(newEpoch, 0)如果回复包的类型不是Leader.ACKEPOCH新的newEpoch大于自身acceptedEpoch自身acceptedEpoch采取newEpochreturn qp.getZxid();
设置Zab状态为SYNCHRONIZATION
与主节点同步设置Zab状态为BROADCAST
如果自身面向观察者监听端口配置了,om = new ObserverMaster(self, fzk, self.getObserverMasterPort());om.start();
循环迭代读取包处理包

处理包

包的类型为Leader.PING从zk服务对象得到touchTable序列化touchTable回复包包类型Leader.PING包zxid--和收取包中一致数据实体--序列化数据授权信息
包的类型为Leader.PROPOSAL反向序列化得到TxnLogEntry对象,包含事务头,事务体,事务摘要记录zxid到lastQueuedfzk.logRequest(hdr, txn, digest);Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());request.setTxnDigest(digest);if ((request.zxid & 0xffffffffL) != 0) {pendingTxns.add(request);}// 提议交由SyncProcessor--AckProcesssyncProcessor.processRequest(request);如果面向观察者充当观察者主节点om.proposalReceived(qp);
包的类型为Leader.COMMITfzk.commit(qp.getZxid());如果面向观察者充当观察者主节点om.proposalCommitted(qp.getZxid());
包的类型为Leader.UPTODATE错误日志
包的类型为Leader.REVALIDATEif (om == null || !om.revalidateLearnerSession(qp)) {revalidate(qp);}
包的类型为Leader.SYNCfzk.sync();

syncWithLeader–与主同步

读取包
包的类型为Leader.DIFF设置同步模式为DIFFif (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {snapshotNeeded = true;syncSnapshot = true;} else {snapshotNeeded = false;}
包的类型为Leader.SNAP设置同步模式为SNAP反向序列化执行快照恢复zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());syncSnapshot = true;
包的类型为Leader.TRUNC设置同步模式为TRUNCboolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
包的类型为其他未知类型报错
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
zk.createSessionTracker();
long lastQueued = 0;
boolean isPreZAB1_0 = true;
写快照,写事务日志是互斥的标志
循环迭代:读取包包的类型为Leader.PROPOSAL反向序列化得到PacketInFlight记录zxid到lastQueued加入到packetsNotCommitted包的类型为Leader.COMMIT从packetsNotCommitted查看首个需要快照zk.processTxn(packet.hdr, packet.rec);不需要快照packetsNotCommitted.add(packet);packetsCommitted.add(qp.getZxid());包的类型为Leader.UPTODATEisPreZAB1_0为true zk.takeSnapshot(syncSnapshot);self.setCurrentEpoch(newEpoch);self.setZooKeeperServer(zk);self.adminServer.setZooKeeperServer(zk);跳出循环包的类型为Leader.NEWLEADER需要快照zk.takeSnapshot(syncSnapshot);设置currentEpoch为newEpochwriteToTxnLog = true;isPreZAB1_0 = false;设置同步模式为NONEzk.startupWithoutServing();如果是从节点FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;for (PacketInFlight p : packetsNotCommitted) {fzk.logRequest(p.hdr, p.rec, p.digest);}packetsNotCommitted.clear();发送回复包的类型Leader.ACK包的zxid为newLeaderZxid
回复包包类型Leader.ACK包的zxid为newEpoch&0
zk.startServing();
self.updateElectionVote(newEpoch);
如果角色是集群从节点FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;for (PacketInFlight p : packetsNotCommitted) {fzk.logRequest(p.hdr, p.rec, p.digest);}for (Long zxid : packetsCommitted) {fzk.commit(zxid);}

面向外部客户提供服务

请求处理链

FollowerRequestProcessor
CommitProcessor
FinalRequestProcessor一个独立与以上体系的
SyncRequestProcessor
SendAckRequestProcessor

FollowerRequestProcessor–线程体

循环迭代从queuedRequests获取请求对象if (skipLearnerRequestToNextProcessor && request.isFromLearner()) {ServerMetrics.getMetrics().SKIP_LEARNER_REQUEST_TO_NEXT_PROCESSOR_COUNT.add(1);} else {nextProcessor.processRequest(request);}如果请求类型为OpCode.synczks.pendingSyncs.add(request);zks.getFollower().request(request);---将请求打包发送给主节点如果请求类型为OpCode.create/OpCode.create2/OpCode.createTTL/OpCode.createContainer:OpCode.delete/OpCode.deleteContainer/OpCode.setData/OpCode.reconfig/OpCode.setACL:OpCode.multi/OpCode.check:zks.getFollower().request(request);如果请求类型为OpCode.createSession/OpCode.closeSession:if (!request.isLocalSession()) {zks.getFollower().request(request);}

processRequest—处理来自其下观察者发来的请求,处理来自外部客户发来的请求

if (checkForUpgrade)
{Request upgradeRequest = null;try {upgradeRequest = zks.checkUpgradeSession(request);} catch (KeeperException ke) {if (request.getHdr() != null) {request.getHdr().setType(OpCode.error);request.setTxn(new ErrorTxn(ke.code().intValue()));}request.setException(ke);LOG.warn("Error creating upgrade request", ke);} catch (IOException ie) {LOG.error("Unexpected error in upgrade", ie);}if (upgradeRequest != null) {queuedRequests.add(upgradeRequest);}
}queuedRequests.add(request);

}

CommitProcessor–线程体

循环迭代阻塞等待committedRequests或queuedRequests非空
循环迭代如果queuedRequests为空,或达到批处理次数,跳出从queuedRequests取出请求对象请求需要提交,则加入pendingRequests。存入sid--sid下顺序等待请求对象集合。请求所属sid,有请求在pendingRequests中,存入sid--sid下顺序等待请求对象集合。请求不需提交,且所属sid无请求在pendingRequests中,直接将请求发给下一处理阶段处理等待没有请求处于提交中。
循环迭代:从committedRequests中取出请求A如果queuedWriteRequests非空,且queuedWriteRequests中首个请求的sid,cxid均与取出请求A匹配pendingRequests中取出A所属会话的排队请求容器B对排队请求容器B中需要提交的请求,取出此请求,queuedWriteRequests移除首个请求queuesToDrain加入此请求的sidcommittedRequests移除请求A对请求A执行processWrite对queuesToDrain中每个sid取得sid下所有排队等待请求,从头顺序对每个读请求,提交给下一级处理

sendToNextProcessor

临时增加numRequestsProcessing
请求排队待处理
实际处理为:流水线下一级处理递减numRequestsProcessing减少为0,执行唤醒

processWrite

流水线下一级处理

committedRequests来源

来自主节点

本级请求处理

queuedRequests中加入请求
若请求需要提交queuedWriteRequests中加入请求

FinalRequestProcessor

对请求执行applyRequest
获取请求的ServerCnxn
获取数据实体lastProcessedZxid
如果请求的类别为OpCode.ping最后操作记录为PING更新状态【请求,最后op,lastZxid】通过ServerCnxn发送回复回复类别为ClientCnxn.PING_XID回复zxid为lastProcessedZxid
如果请求的类别为OpCode.createSession最后操作记录为SESS更新状态【请求,最后op,lastZxid】完成会话初始化,体现为发送回复包含版本,会话超时,会话id,会话密码
如果请求的类别为OpCode.multi最后操作记录为MULT动态构造MultiResponse对象对rc.multiResult中每个ProcessTxnResult如果ProcessTxnResult的类别为OpCode.check,构造新的CheckResult对象如果ProcessTxnResult的类别为OpCode.create,构造新的CreateResult(subTxnResult.path);对象如果ProcessTxnResult的类别为OpCode.create2/OpCode.createTTL/OpCode.createContainer构造新的CreateResult(subTxnResult.path, subTxnResult.stat);对象如果ProcessTxnResult的类别为OpCode.delete/OpCode.deleteContainer构造新的DeleteResult对象如果ProcessTxnResult的类别为OpCode.setData构造新的SetDataResult(subTxnResult.stat)对象如果ProcessTxnResult的类别为OpCode.error构造新的new ErrorResult(subTxnResult.err);((MultiResponse) rsp).add(subResult);
如果请求的类别为OpCode.multiRead最后操作记录为MLTR动态构造新的new MultiOperationRecord()反向序列化填充MultiOperationRecord对象动态构造rsp = new MultiResponse();对象对multiReadRecord的每个Op如果类型为OpCode.getChildrenrec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo);subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren());如果类型为OpCode.getDatarec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo);GetDataResponse gdr = (GetDataResponse) rec;subResult = new GetDataResult(gdr.getData(), gdr.getStat());
如果请求的类别为OpCode.create最后操作记录为CREA动态构造new CreateResponse(rc.path)requestPathMetricsCollector.registerRequest(request.type, rc.path);
如果请求的类别为OpCode.create2/OpCode.createTTL/OpCode.createContainer最后操作记录为CREA动态构造new Create2Response(rc.path, rc.stat)requestPathMetricsCollector.registerRequest(request.type, rc.path)
如果请求的类别为OpCode.delete/OpCode.deleteContainer最后操作记录为DELErequestPathMetricsCollector.registerRequest(request.type, rc.path);
如果请求的类别为OpCode.setData最后操作记录为SETD动态构造new SetDataResponse(rc.stat)requestPathMetricsCollector.registerRequest(request.type, rc.path);
如果请求的类别为OpCode.reconfig最后操作记录为RECO动态构造new GetDataResponse(((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(UTF_8), rc.stat);
如果请求的类别为OpCode.setACL最后操作记录为SETA动态构造new SetACLResponse(rc.stat)requestPathMetricsCollector.registerRequest(request.type, rc.path);
如果请求的类别为OpCode.closeSession最后操作记录为CLOS
如果请求的类别为OpCode.sync最后操作记录为SYNC动态构造new SyncRequest()反向序列化rsp = new SyncResponse(syncRequest.getPath());requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
如果请求的类别为OpCode.check最后操作记录为CHEC动态构造rsp = new SetDataResponse(rc.stat);
如果请求的类别为OpCode.exists最后操作记录为EXIS动态构造new ExistsRequest()反向序列化Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);rsp = new ExistsResponse(stat);requestPathMetricsCollector.registerRequest(request.type, path);如果请求的类别为OpCode.getData最后操作记录为GETD动态构造new GetDataRequest()反向序列化path = getDataRequest.getPath();rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);requestPathMetricsCollector.registerRequest(request.type, path);如果请求的类别为OpCode.setWatches最后操作记录为SETW动态构造new SetWatches()反向序列化long relativeZxid = setWatches.getRelativeZxid();zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(), setWatches.getChildWatches(), Collections.emptyList(), Collections.emptyList(), cnxn);如果请求的类别为OpCode.setWatches2最后操作记录为STW2动态构造new SetWatches2()反向序列化long relativeZxid = setWatches.getRelativeZxid();zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(), setWatches.getChildWatches(), setWatches.getPersistentWatches(), setWatches.getPersistentRecursiveWatches(), cnxn);如果请求的类别为OpCode.addWatch最后操作记录为ADDW动态构造new AddWatchRequest()反向序列化zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());rsp = new ErrorResponse(0);如果请求的类别为OpCode.getACL略如果请求的类别为OpCode.getChildren最后操作记录为GETC动态构造new GetChildrenRequest()反向序列化path = getChildrenRequest.getPath();rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);requestPathMetricsCollector.registerRequest(request.type, path);如果请求的类别为OpCode.getAllChildrenNumber最后操作记录为GETACN动态构造new GetAllChildrenNumberRequest();反向序列化path = getAllChildrenNumberRequest.getPath();DataNode n = zks.getZKDatabase().getNode(path);if (n == null) {throw new KeeperException.NoNodeException();}zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo, path, null);int number = zks.getZKDatabase().getAllChildrenNumber(path);rsp = new GetAllChildrenNumberResponse(number)如果请求的类别为OpCode.getChildren2最后操作记录为GETC动态构造new GetChildren2Request()反向序列化Stat stat = new Stat();path = getChildren2Request.getPath();DataNode n = zks.getZKDatabase().getNode(path);if (n == null) {throw new KeeperException.NoNodeException();}zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo, path, null);List<String> children = zks.getZKDatabase().getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);rsp = new GetChildren2Response(children, stat);requestPathMetricsCollector.registerRequest(request.type, path);
如果请求的类别为OpCode.removeWatches最后操作记录为REMW动态构造new RemoveWatchesRequest()反向序列化WatcherType type = WatcherType.fromInt(removeWatches.getType());path = removeWatches.getPath();boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);if (!removed) {String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);throw new KeeperException.NoWatcherException(msg);}    requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath());
如果请求的类别为OpCode.whoAmI最后操作记录为HOMIrsp = new WhoAmIResponse(AuthUtil.getClientInfos(request.authInfo));
如果请求的类别为OpCode.getEphemerals最后操作记录为GETEGetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals);String prefixPath = getEphemerals.getPrefixPath();Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);List<String> ephemerals = new ArrayList<>();if (prefixPath == null || prefixPath.trim().isEmpty() || "/".equals(prefixPath.trim())) {ephemerals.addAll(allEphems);} else {for (String p : allEphems) {if (p.startsWith(prefixPath)) {ephemerals.add(p);}}}rsp = new GetEphemeralsResponse(ephemerals);ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());updateStats(request, lastOp, lastZxid);try {if (path == null || rsp == null) {responseSize = cnxn.sendResponse(hdr, rsp, "response");} else {int opCode = request.type;Stat stat = null;switch (opCode) {case OpCode.getData : {GetDataResponse getDataResponse = (GetDataResponse) rsp;stat = getDataResponse.getStat();responseSize = cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);break;}case OpCode.getChildren2 : {GetChildren2Response getChildren2Response = (GetChildren2Response) rsp;stat = getChildren2Response.getStat();responseSize = cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);break;}default:responseSize = cnxn.sendResponse(hdr, rsp, "response");}}if (request.type == OpCode.closeSession) {cnxn.sendCloseSession();}} catch (IOException e) {LOG.error("FIXMSG", e);} finally {ServerMetrics.getMetrics().RESPONSE_BYTES.add(responseSize);}

applyRequest

ProcessTxnResult rc = zks.processTxn(request);
if (request.type == OpCode.closeSession && connClosedByClient(request))
{if (closeSession(zks.serverCnxnFactory, request.sessionId) || closeSession(zks.secureServerCnxnFactory, request.sessionId)) {return rc;}
}

SyncRequestProcessor–执行体

将请求的事务头,事务体,事务摘要写入事务日志文件
满足生成快照条件时事务日志刷新到磁盘生成快照文件
toFlush.add(si)
满足刷新条件时,刷新事务日志文件内容到磁盘清空toFlush
如果请求不需写事务日志且没有等待刷新请求if (nextProcessor != null) {nextProcessor.processRequest(si);if (nextProcessor instanceof Flushable) {((Flushable) nextProcessor).flush();}}

刷新

    zks.getZKDatabase().commit();if (this.nextProcessor == null) {this.toFlush.clear();} else {while (!this.toFlush.isEmpty()) {final Request i = this.toFlush.remove();this.nextProcessor.processRequest(i);}if (this.nextProcessor instanceof Flushable) {((Flushable) this.nextProcessor).flush();}}

processRequest

queuedRequests.add(request);

SendAckRequestProcessor

processRequest

如果包类型不是OpCode.syncQuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);learner.writePacket(qp, false);

充当集群观察者的主节点–暂不分析观察者

zookeeper源码解析--从节点相关推荐

  1. btcd源码解析——peer节点之间的区块数据同步 (3) —— 非headersFirstMode模式

    文章目录 1. 写在前面 2. 非headersFirstMode模式下的数据同步过程 2.1 peer A 发送"获取区块哈希"的请求 2.2 peer B 响应"获取 ...

  2. Zookeeper源码解析 -- 本地事务日志持久化之FileTxnLog

    序言 在各个分布式组件中,持久化数据到本地的思想并不少见,为的是能保存内存中的数据,以及重启后能够重载上次内存状态的值.那么如何行之有效的进行,内存数据持久化到磁盘,怎么样的落盘策略合适,怎么设计持久 ...

  3. Zookeeper源码解析-Leader/Follower节点的启动

    前言: 前一篇文章介绍了Leader节点的选举过程,选举完成之后,集群中的各节点根据选举结果设置当前结果为LEADER或FOLLOWING. 设置完成之后,根据各自的节点状态进行启动服务.本文主要介绍 ...

  4. 栈解析html文件,利用栈将html源码解析为节点树

    /// 如何利用栈将html解析成节点树 /// 首先html是由一个个节点组成,最大的节点为节点   她有两个子节点 和 /// 首先我们将压入栈中 再将 压入栈中 遇到出栈  压入栈中 遇到 出栈 ...

  5. Fabric v2.0 源码解析——排序节点(Orderer)运行机制

    本文的内容还需进一步丰富,有时间会继续完善. 文章目录 1. Orderer在Fabric网络中的作用 Handle函数 2. Orderer接收的交易类型 ProcessMessage函数 3. 共 ...

  6. Vue2源码解析 虚拟节点VNode

    目录 1  什么是VNode 2  VNode的作用 3  VNode的类型 3.1  注释节点 3.2  文本节点 3.3  克隆节点 3.4  元素节点 3.5  组件节点 3.6  函数式组件 ...

  7. zookeeper源码解析--请求处理--FinalRequestProcessor

    FinalRequestProcessor LOG.debug("Processing request:: {}", request); // 日志记录 if (LOG.isTra ...

  8. dubbo源码解析-zookeeper创建节点

    前言 在之前dubbo源码解析-本地暴露中的前言部分提到了两道高频的面试题,其中一道dubbo中zookeeper做注册中心,如果注册中心集群都挂掉,那发布者和订阅者还能通信吗?在上周的dubbo源码 ...

  9. Dubbo源码解析 —— Zookeeper 订阅

    作者:肥朝 原文地址:https://www.jianshu.com/p/73224a6c07bb 友情提示:欢迎关注公众号[芋道源码].????关注后,拉你进[源码圈]微信群和[肥朝]搞基嗨皮. 友 ...

最新文章

  1. 计算机视觉编程——图像聚类
  2. MongoDb 中 serverStatus was very slow 的原因分析
  3. 对象的多态(核心、困难、重点)
  4. java 导入导出 插件_Java最优的Excel导入/导出工具开发,你用过吗?
  5. STM32F103C8T6引脚功能分布
  6. 算法笔记_面试题_2.移动零(将数组的的0元素移到末尾)
  7. 机器学习数学基础(1)-回归、梯度下降
  8. 软考:数据库系统工程师
  9. 转载:《星际争霸》韩国三大Zerg点评
  10. tp5微信公众号开发(2) ---- 微信被动回复,图文回复,图片回复等 demo实例
  11. 关于主机的思维导图_关于开展思维导图培训的通知
  12. 驱动人生带你全方位领略微软Windows 11的魅力
  13. 【 malcolmcrum】基于Java后端与Typescript前端的代码自动生成
  14. 基于三维GIS技术的公路交通数字孪生系统
  15. mysql 从data文件恢复数据库
  16. mysql去重分组_mysql 分组 去重
  17. TSP(中国旅行商问题)
  18. 如何调用当前栏目的上级栏目名称
  19. Rsync 实现 Windows 与 CentOS 之间数据同步
  20. 思科 计算机网络 第四章测试考试答案

热门文章

  1. 汉字对应拼音的首字母简拼
  2. Unity3D 知识点总结
  3. python语法助手_GitHub - xingxiaohui/onmyoji_helper: 基于python开发的阴阳师护肝助手
  4. 通过例子学TLA+(一)-- HourClock
  5. 通过例子学TLA+(五)--FIFO Sequences
  6. 根据今天的日期或者传入的日期得到本日所在周的开始日期和结束日期
  7. 13-读《狼图腾》有感
  8. 爬虫案例学习平台即将来习
  9. Win10为将用户中文名修改为英文名而修改了注册表导致开机时电脑显示“无法登陆到你的账户”的问题简单解决方案
  10. 2017年7月8日 星期六 --出埃及记 Exodus 27:20