一、前言

  前面分析了SyncReqeustProcessor,接着分析请求处理链中最后的一个处理器FinalRequestProcessor。

二、FinalRequestProcessor源码分析

  2.1 类的继承关系  

public class FinalRequestProcessor implements RequestProcessor {}

  说明:FinalRequestProcessor只实现了RequestProcessor接口,其需要实现processRequest方法和shutdown方法。

  2.2 类的属性 

public class FinalRequestProcessor implements RequestProcessor {private static final Logger LOG = LoggerFactory.getLogger(FinalRequestProcessor.class);// ZooKeeper服务器
    ZooKeeperServer zks;
}

  说明:其核心属性为zks,表示Zookeeper服务器,可以通过zks访问到Zookeeper内存数据库。

  2.3 类的构造函数

    public FinalRequestProcessor(ZooKeeperServer zks) {this.zks = zks;}

  2.4 核心函数分析

  1. processRequest 

    public void processRequest(Request request) {if (LOG.isDebugEnabled()) {LOG.debug("Processing request:: " + request);}// request.addRQRec(">final");long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;if (request.type == OpCode.ping) { // 请求类型为PINGtraceMask = ZooTrace.SERVER_PING_TRACE_MASK;}if (LOG.isTraceEnabled()) {ZooTrace.logRequest(LOG, traceMask, 'E', request, "");}ProcessTxnResult rc = null;synchronized (zks.outstandingChanges) { // 同步块while (!zks.outstandingChanges.isEmpty()&& zks.outstandingChanges.get(0).zxid <= request.zxid) { // outstandingChanges不为空且首个元素的zxid小于请求的zxid// 移除首个元素ChangeRecord cr = zks.outstandingChanges.remove(0);if (cr.zxid < request.zxid) { // 若Record的zxid小于请求的zxidLOG.warn("Zxid outstanding "+ cr.zxid+ " is less than current " + request.zxid);}if (zks.outstandingChangesForPath.get(cr.path) == cr) { // 根据路径得到Record并判断是否为cr// 移除cr的路径对应的记录
                    zks.outstandingChangesForPath.remove(cr.path);}}if (request.hdr != null) { // 请求头不为空// 获取请求头TxnHeader hdr = request.hdr;// 获取请求事务Record txn = request.txn;// 处理事务rc = zks.processTxn(hdr, txn);}// do not add non quorum packets to the queue.if (Request.isQuorum(request.type)) { // 只将quorum包(事务性请求)添加进队列
                zks.getZKDatabase().addCommittedProposal(request);}}if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) { // 请求头不为空并且请求类型为关闭会话ServerCnxnFactory scxn = zks.getServerCnxnFactory();// this might be possible since// we might just be playing diffs from the leaderif (scxn != null && request.cnxn == null) { // // calling this if we have the cnxn results in the client's// close session response being lost - we've already closed// the session/socket here before we can send the closeSession// in the switch block below// 关闭会话
                scxn.closeSession(request.sessionId);return;}}if (request.cnxn == null) { // 请求的cnxn为空,直接返回 return;}ServerCnxn cnxn = request.cnxn;String lastOp = "NA";zks.decInProcess();Code err = Code.OK;Record rsp = null;boolean closeSession = false;try {if (request.hdr != null && request.hdr.getType() == OpCode.error) {throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.txn).getErr()));}KeeperException ke = request.getException();if (ke != null && request.type != OpCode.multi) {throw ke;}if (LOG.isDebugEnabled()) {LOG.debug("{}",request);}switch (request.type) {case OpCode.ping: { // PING请求// 更新延迟
                zks.serverStats().updateLatency(request.createTime);lastOp = "PING";// 更新响应的状态
                cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,request.createTime, System.currentTimeMillis());// 设置响应cnxn.sendResponse(new ReplyHeader(-2,zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");return;}case OpCode.createSession: { // 创建会话请求// 更新延迟
                zks.serverStats().updateLatency(request.createTime);lastOp = "SESS";// 更新响应的状态
                cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,request.createTime, System.currentTimeMillis());// 结束会话初始化zks.finishSessionInit(request.cnxn, true);return;}case OpCode.multi: { // 多重操作
                lastOp = "MULT";rsp = new MultiResponse() ;for (ProcessTxnResult subTxnResult : rc.multiResult) { // 遍历多重操作结果
OpResult subResult ;switch (subTxnResult.type) { // 确定每个操作类型case OpCode.check: // 检查subResult = new CheckResult();break;case OpCode.create: // 创建subResult = new CreateResult(subTxnResult.path);break;case OpCode.delete: // 删除subResult = new DeleteResult();break;case OpCode.setData: // 设置数据subResult = new SetDataResult(subTxnResult.stat);break;case OpCode.error: // 错误subResult = new ErrorResult(subTxnResult.err) ;break;default: throw new IOException("Invalid type of op");}// 添加至响应结果集中
                    ((MultiResponse)rsp).add(subResult);}break;}case OpCode.create: { // 创建lastOp = "CREA";// 创建响应rsp = new CreateResponse(rc.path);err = Code.get(rc.err);break;}case OpCode.delete: { // 删除lastOp = "DELE";err = Code.get(rc.err);break;}case OpCode.setData: { // 设置数据lastOp = "SETD";rsp = new SetDataResponse(rc.stat);err = Code.get(rc.err);break;}case OpCode.setACL: { // 设置ACLlastOp = "SETA";rsp = new SetACLResponse(rc.stat);err = Code.get(rc.err);break;}case OpCode.closeSession: { // 关闭会话lastOp = "CLOS";closeSession = true;err = Code.get(rc.err);break;}case OpCode.sync: { // 同步lastOp = "SYNC";SyncRequest syncRequest = new SyncRequest();ByteBufferInputStream.byteBuffer2Record(request.request,syncRequest);rsp = new SyncResponse(syncRequest.getPath());break;}case OpCode.check: { // 检查lastOp = "CHEC";rsp = new SetDataResponse(rc.stat);err = Code.get(rc.err);break;}case OpCode.exists: { // 存在性判断lastOp = "EXIS";// TODO we need to figure out the security requirement for this!ExistsRequest existsRequest = new ExistsRequest();// 将byteBuffer转化为Record
                ByteBufferInputStream.byteBuffer2Record(request.request,existsRequest);String path = existsRequest.getPath();if (path.indexOf('\0') != -1) {throw new KeeperException.BadArgumentsException();}Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);rsp = new ExistsResponse(stat);break;}case OpCode.getData: { // 获取数据lastOp = "GETD";GetDataRequest getDataRequest = new GetDataRequest();ByteBufferInputStream.byteBuffer2Record(request.request,getDataRequest);DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());if (n == null) {throw new KeeperException.NoNodeException();}Long aclL;synchronized(n) {aclL = n.acl;}PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),ZooDefs.Perms.READ,request.authInfo);Stat stat = new Stat();byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null);rsp = new GetDataResponse(b, stat);break;}case OpCode.setWatches: { // 设置watchlastOp = "SETW";SetWatches setWatches = new SetWatches();// XXX We really should NOT need this!!!!
                request.request.rewind();ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);long relativeZxid = setWatches.getRelativeZxid();zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(),setWatches.getChildWatches(), cnxn);break;}case OpCode.getACL: { // 获取ACLlastOp = "GETA";GetACLRequest getACLRequest = new GetACLRequest();ByteBufferInputStream.byteBuffer2Record(request.request,getACLRequest);Stat stat = new Stat();List<ACL> acl = zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);rsp = new GetACLResponse(acl, stat);break;}case OpCode.getChildren: { // 获取子节点lastOp = "GETC";GetChildrenRequest getChildrenRequest = new GetChildrenRequest();ByteBufferInputStream.byteBuffer2Record(request.request,getChildrenRequest);DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());if (n == null) {throw new KeeperException.NoNodeException();}Long aclG;synchronized(n) {aclG = n.acl;}PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG), ZooDefs.Perms.READ,request.authInfo);List<String> children = zks.getZKDatabase().getChildren(getChildrenRequest.getPath(), null, getChildrenRequest.getWatch() ? cnxn : null);rsp = new GetChildrenResponse(children);break;}case OpCode.getChildren2: {lastOp = "GETC";GetChildren2Request getChildren2Request = new GetChildren2Request();ByteBufferInputStream.byteBuffer2Record(request.request,getChildren2Request);Stat stat = new Stat();DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());if (n == null) {throw new KeeperException.NoNodeException();}Long aclG;synchronized(n) {aclG = n.acl;}PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG), ZooDefs.Perms.READ,request.authInfo);List<String> children = zks.getZKDatabase().getChildren(getChildren2Request.getPath(), stat, getChildren2Request.getWatch() ? cnxn : null);rsp = new GetChildren2Response(children, stat);break;}}} catch (SessionMovedException e) {// session moved is a connection level error, we need to tear// down the connection otw ZOOKEEPER-710 might happen// ie client on slow follower starts to renew session, fails// before this completes, then tries the fast follower (leader)// and is successful, however the initial renew is then // successfully fwd/processed by the leader and as a result// the client and leader disagree on where the client is most// recently attached (and therefore invalid SESSION MOVED generated)
            cnxn.sendCloseSession();return;} catch (KeeperException e) {err = e.code();} catch (Exception e) {// log at error level as we are returning a marshalling// error to the userLOG.error("Failed to process " + request, e);StringBuilder sb = new StringBuilder();ByteBuffer bb = request.request;bb.rewind();while (bb.hasRemaining()) {sb.append(Integer.toHexString(bb.get() & 0xff));}LOG.error("Dumping request buffer: 0x" + sb.toString());err = Code.MARSHALLINGERROR;}long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();ReplyHeader hdr =new ReplyHeader(request.cxid, lastZxid, err.intValue());zks.serverStats().updateLatency(request.createTime);cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,request.createTime, System.currentTimeMillis());try {cnxn.sendResponse(hdr, rsp, "response");if (closeSession) {cnxn.sendCloseSession();}} catch (IOException e) {LOG.error("FIXMSG",e);}}

View Code

  说明:对于processRequest函数,进行分段分析  

        if (LOG.isDebugEnabled()) {LOG.debug("Processing request:: " + request);}// request.addRQRec(">final");long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;if (request.type == OpCode.ping) { // 请求类型为PINGtraceMask = ZooTrace.SERVER_PING_TRACE_MASK;}if (LOG.isTraceEnabled()) {ZooTrace.logRequest(LOG, traceMask, 'E', request, "");}

  说明:可以看到其主要作用是判断是否为PING请求,同时会根据LOG的设置确定是否进行日志记录,接着下面代码

synchronized (zks.outstandingChanges) { // 同步块while (!zks.outstandingChanges.isEmpty()&& zks.outstandingChanges.get(0).zxid <= request.zxid) { // outstandingChanges不为空且首个元素的zxid小于等于请求的zxid// 移除首个元素ChangeRecord cr = zks.outstandingChanges.remove(0);if (cr.zxid < request.zxid) { // 若Record的zxid小于请求的zxidLOG.warn("Zxid outstanding "+ cr.zxid+ " is less than current " + request.zxid);}if (zks.outstandingChangesForPath.get(cr.path) == cr) { // 根据路径得到Record并判断是否为cr// 移除cr的路径对应的记录
                    zks.outstandingChangesForPath.remove(cr.path);}}if (request.hdr != null) { // 请求头不为空// 获取请求头TxnHeader hdr = request.hdr;// 获取请求事务Record txn = request.txn;// 处理事务rc = zks.processTxn(hdr, txn);}// do not add non quorum packets to the queue.if (Request.isQuorum(request.type)) { // 只将quorum包(事务性请求)添加进队列
                zks.getZKDatabase().addCommittedProposal(request);}}

  说明:同步块处理,当outstandingChanges不为空且其首元素的zxid小于等于请求的zxid时,就会一直从outstandingChanges中取出首元素,并且对outstandingChangesForPath做相应的操作,若请求头不为空,则处理请求。若为事务性请求,则提交到ZooKeeper内存数据库中。对于processTxn函数而言,其最终会调用DataTree的processTxn,即内存数据库结构的DataTree的处理事务函数,而判断是否为事务性请求则是通过调用isQuorum函数,会改变服务器状态的(事务性)请求就是Quorum。之后调用addCommittedProposal函数将请求添加至ZKDatabase的committedLog结构中,方便follower快速同步。

  接下来会根据请求的类型进行相应的操作,如对于PING请求而言,其处理如下  

            case OpCode.ping: { // PING请求// 更新延迟
                zks.serverStats().updateLatency(request.createTime);lastOp = "PING";// 更新响应的状态
                cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,request.createTime, System.currentTimeMillis());// 设置响应cnxn.sendResponse(new ReplyHeader(-2,zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");return;}

  说明:其首先会根据请求的创建时间来更新Zookeeper服务器的延迟,updateLatency函数中会记录最大延迟、最小延迟、总的延迟和延迟次数。然后更新响应中的状态,如请求创建到响应该请求总共花费的时间、最后的操作类型等。然后设置响应后返回。而对于创建会话请求而言,其处理如下  

            case OpCode.createSession: { // 创建会话请求// 更新延迟
                zks.serverStats().updateLatency(request.createTime);lastOp = "SESS";// 更新响应的状态
                cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,request.createTime, System.currentTimeMillis());// 结束会话初始化zks.finishSessionInit(request.cnxn, true);return;}

  说明:其首先还是会根据请求的创建时间来更新Zookeeper服务器的延迟,然后设置最后的操作类型,然后更新响应的状态,之后调用finishSessionInit函数表示结束会话的初始化。其他请求与此类似,之后会根据其他请求再次更新服务器的延迟,设置响应的状态等,最后使用sendResponse函数将响应发送给请求方,其处理流程如下 

        // 获取最后处理的zxidlong lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();// 响应头ReplyHeader hdr =new ReplyHeader(request.cxid, lastZxid, err.intValue());// 更新服务器延迟
        zks.serverStats().updateLatency(request.createTime);// 更新状态
        cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,request.createTime, System.currentTimeMillis());try {// 返回响应cnxn.sendResponse(hdr, rsp, "response");if (closeSession) {// 关闭会话
                cnxn.sendCloseSession();}} catch (IOException e) {LOG.error("FIXMSG",e);}

三、总结

  本篇博文分析了请求处理链的FinalRequestProcessor,其通常是请求处理链的最后一个处理器,而对于请求处理链部分的分析也就到这里,还有其他的处理器再使用时再进行分析,也谢谢各位园友观看~

转载于:https://www.cnblogs.com/leesf456/p/6472496.html

【Zookeeper】源码分析之请求处理链(四)之FinalRequestProcessor相关推荐

  1. zookeeper源码分析之五服务端(集群leader)处理请求流程

    leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

  2. zookeeper源码分析之四服务端(单机)处理请求流程

    上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...

  3. Spring AOP 源码分析 - 拦截器链的执行过程

    1.简介 本篇文章是 AOP 源码分析系列文章的最后一篇文章,在前面的两篇文章中,我分别介绍了 Spring AOP 是如何为目标 bean 筛选合适的通知器,以及如何创建代理对象的过程.现在我们的得 ...

  4. zookeeper源码分析之三客户端发送请求流程

    znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的 ...

  5. Zookeeper源码分析(二) ----- zookeeper日志

    zookeeper源码分析系列文章: Zookeeper源码分析(一) ----- 源码运行环境搭建 原创博客,纯手敲,转载请注明出处,谢谢! 既然我们是要学习源码,那么如何高效地学习源代码呢?答案就 ...

  6. zookeeper源码分析之恢复事务日志

    zookeeper源码分析之恢复事务日志 前言 源码分析 查看事务日志命令 总结 前言 本文是基于zookeeper集群启动过程分析(https://blog.csdn.net/weixin_4244 ...

  7. 【集合框架】JDK1.8源码分析之IdentityHashMap(四)

    一.前言 前面已经分析了HashMap与LinkedHashMap,现在我们来分析不太常用的IdentityHashMap,从它的名字上也可以看出来用于表示唯一的HashMap,仔细分析了其源码,发现 ...

  8. zookeeper源码分析之leader选举

    zookeeper提供顺序一致性.原子性.统一视图.可靠性保证服务 zookeeper使用的是zab(atomic broadcast protocol)协议而非paxos协议 zookeeper能处 ...

  9. ZooKeeper源码分析之完整网络通信流程(一)

    文章目录 2021SC@SDUSC 前言 ZooKeeper中网络通信执行流程 总结 2021SC@SDUSC 前言 接下来将进入源码世界来一步一步分析客户端与服务端之间是如何通过ClientCnxn ...

最新文章

  1. day34 异常处理、断言、socket之ftp协议
  2. bootstrap 导航学习
  3. DL之yolov3:使用yolov3算法时需要对Ubuntu系统进行配置的简介、过程步骤之详细攻略
  4. eclipse中ast_JavaParser中AST节点的观察者
  5. 总结 | 深度学习之Pytorch入门教程
  6. plsql developer如何创建新用户(users)
  7. Visual studio 的教程
  8. Velox将在Pangolin上启动其算法交易机器人,并计划推出更多DeFi解决方案
  9. java实现格拉布斯准则_格拉布斯准则(java代码)
  10. 笔记本电脑装机详细步骤图文教程
  11. Github TOP100 Android开源,flutter与android混合开发
  12. Android studio开发Android图灵智能聊天机器人,课程设计报告
  13. python word文档转html
  14. 企查查爬虫python实现(一)整体方法
  15. 机智的技术童鞋,你能解开这个贺岁彩蛋吗?
  16. 测试工程师需要掌握哪些软技能?
  17. 消息钩子使用教程(转)
  18. 傅里叶变换(真正的通俗易懂)
  19. VIP邮箱和免费邮箱哪个好?个人电子邮箱哪家好
  20. 2011年3月全国计算机等级考试四级网络工程师笔试真 233,2011年3月计算机四级网络工程师考试真题及答案解析...

热门文章

  1. Mybatis 实现关联表查询
  2. submit text 插件安装教程
  3. Java String.indexOf() 函数用法小结
  4. 牛腩新闻发布系统——触发器使用
  5. HDU-2553-N皇后问题
  6. 阿里有php的研发团队么,【阿里巴巴】阿里集团-MMC技术部-研发工程师JAVA
  7. CCF CSP202112-2 序列查询新解
  8. 每天固定往一个银行卡存入100元,5年之后会有多大变化?有人能坚持吗?
  9. 工厂打工10年,现在被工厂以能力不足为由辞退,可以去仲裁吗?
  10. ios加密机制是什么?为什么无法破解?