Zookeeper源码分析:主从角色关系流程概述
参考资料
<<从PAXOS到ZOOKEEPER分布式一致性原理与实践>>
zookeeper-3.0.0
主从关系概述
在概述了主从角色的初始化流程之后,本文主要来梳理分析一下Zookeeper服务端的逻辑关系,包括选举的情况的分析,客户端数据与服务端的数据交互等情况。
Zookeeper主从的运行流程
运行时的主要的主从逻辑关系如图所示。
- 无论是leader还是follower,只有启动之后都会启动一个线程监控接受选举的接口数据。
- 客户端发送过来的数据,涉及到数据的更新修改都会转发到主处理。
- 所有的事务请求完成之后,都会通过主将修改同步到从处理。
选举情况概述
在推荐的集群数量中,我们默认启动的是五台,奇数个服务器数量。
服务器启动期间的选举
此时大家都是一起启动,大致流程如下:
- 每个Server会发出投票。由于是初始情况下,每台Server都会将自己作为leader服务器进行投票,首先都会投自己一票,每次投票的的内容包括自己服务器的id和事物id。
- 接受每个服务器的投票。因为每台服务器启动的时候都会启动接受其他服务器的投票,会检查该投票是否有效,并保存投票信息。
- 统计当前接受到的服务器的投票。针对接受到的所有投票进行统计,统计规则就是优先检查事务ID,ZXID最大的作为leader,如果zxid相同则比较服务器编号的myid,哪个最大作为leader服务器。在统计完成之后就计算优胜的服务器的票数是否超过半数,如果超过半数则修改服务器状态设置。
- 修改服务器状态。因为每个节点统计得胜的票数的方式是一样的,假如事务ID都为0的情况下,肯定是myid最大的服务器获胜,这在每台服务器得出的结论都是一样的,然后再检查自己是否是myid这个机器,如果不是改为follower如果是则修改状态为leader。
如上就是初次选举的执行流程。
服务器运行期间选举
假如在运行期间,leader服务器挂了,此刻follower连接leader失败就会推出follower中的循环接受数据,此时就会修改状态为LOOKING。
- 变更状态。在leader挂掉后,此时状态会被重置到LOOKING状态,此时就会进入选举流程。
- 每个Server发送投票。因为在运行过程中,此时的事务ID有可能不同,但是投票的方式还是一样,第一票都会投给自己,然后将自己的票发送给其他机器。
- 接受投票并处理投票。此时接受的投票跟初始化启动时,逻辑一样,只不过这次就是选择事务最大的ID进行投票。
- 修改状态。
此时基本的流程如上所示。如果在运行期间新加入一台机器的话,在发送选举请求的时候也会计算出当前的主的myid和主的事务id。
客户端与服务端数据交互
一旦确定了角色之后,此时客户端连接连接的数据处理流程分为连接leader和连接follower。
在前面的文章中,简单的描述了一下监听客户端连接服务器的基本内容,就是NIOServerCnxn里面的run方法。
public void run() {while (!ss.socket().isClosed()) { // 检查连接是否关闭try {selector.select(1000); // IO复用Set<SelectionKey> selected;synchronized (this) {selected = selector.selectedKeys(); // 加锁 获取 当前的触发事件描述符}ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);Collections.shuffle(selectedList);for (SelectionKey k : selectedList) { // 遍历 该列表if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { // 如果是新的请求进来SocketChannel sc = ((ServerSocketChannel) k .channel()).accept(); // 接受新连接sc.configureBlocking(false); // 设置非阻塞SelectionKey sk = sc.register(selector,SelectionKey.OP_READ); // 注册读事件NIOServerCnxn cnxn = createConnection(sc, sk); // 初始化一个NIOServerCnxn类sk.attach(cnxn); // 添加到列表中addCnxn(cnxn);} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { // 如果是读事件或者写事件则获取触发内容NIOServerCnxn c = (NIOServerCnxn) k.attachment();c.doIO(k); // 回调执行处理该事件}}selected.clear(); // 清空} catch (Exception e) { LOG.error("FIXMSG",e); // 如果报错则打印错误日志}}ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),"NIOServerCnxn factory exitedloop.");clear();LOG.error("=====> Goodbye cruel world <======");// System.exit(0);}
对于每个客户端的请求进来都生成一个NIOServerCnxn,并保存处理。当监听的事件出现时就调用doIO方法来处理。
void doIO(SelectionKey k) throws InterruptedException {try {if (sock == null) { // 如果sock为空则返回return;}if (k.isReadable()) { // 如果是可读int rc = sock.read(incomingBuffer); // 读取数据if (rc < 0) {throw new IOException("Read error"); // 如果读取长度小于0则报错}if (incomingBuffer.remaining() == 0) { // 检查是否为空incomingBuffer.flip(); if (incomingBuffer == lenBuffer) {readLength(k); // 读取数据} else if (!initialized) { // 检查是否初始化完成stats.packetsReceived++;ServerStats.getInstance().incrementPacketsReceived();readConnectRequest(); // 如果没初始化完成则检查lenBuffer.clear();incomingBuffer = lenBuffer;} else {stats.packetsReceived++;ServerStats.getInstance().incrementPacketsReceived();readRequest(); // 如果初始化完成则处理请求lenBuffer.clear();incomingBuffer = lenBuffer;}}}if (k.isWritable()) { // 是否是写入数据// ZooLog.logTraceMessage(LOG,// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK// "outgoingBuffers.size() = " +// outgoingBuffers.size());if (outgoingBuffers.size() > 0) { // 检查是否有可发送的数据// ZooLog.logTraceMessage(LOG,// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,// "sk " + k + " is valid: " +// k.isValid());/** This is going to reset the buffer position to 0 and the* limit to the size of the buffer, so that we can fill it* with data from the non-direct buffers that we need to* send.*/ByteBuffer directBuffer = factory.directBuffer;directBuffer.clear();for (ByteBuffer b : outgoingBuffers) {if (directBuffer.remaining() < b.remaining()) {/** When we call put later, if the directBuffer is to* small to hold everything, nothing will be copied,* so we've got to slice the buffer if it's too big.*/b = (ByteBuffer) b.slice().limit(directBuffer.remaining());}/** put() is going to modify the positions of both* buffers, put we don't want to change the position of* the source buffers (we'll do that after the send, if* needed), so we save and reset the position after the* copy*/int p = b.position();directBuffer.put(b);b.position(p);if (directBuffer.remaining() == 0) {break;}}/** Do the flip: limit becomes position, position gets set to* 0. This sets us up for the write.*/directBuffer.flip();int sent = sock.write(directBuffer); // 写数据ByteBuffer bb;// Remove the buffers that we have sentwhile (outgoingBuffers.size() > 0) {bb = outgoingBuffers.peek();if (bb == closeConn) {throw new IOException("closing");}int left = bb.remaining() - sent;if (left > 0) {/** We only partially sent this buffer, so we update* the position and exit the loop.*/bb.position(bb.position() + sent);break;}stats.packetsSent++;/* We've sent the whole buffer, so drop the buffer */sent -= bb.remaining();ServerStats.getInstance().incrementPacketsSent();outgoingBuffers.remove();}// ZooLog.logTraceMessage(LOG,// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,// outgoingBuffers.size() = " + outgoingBuffers.size());}synchronized (this) {if (outgoingBuffers.size() == 0) {if (!initialized&& (sk.interestOps() & SelectionKey.OP_READ) == 0) {throw new IOException("Responded to info probe");}sk.interestOps(sk.interestOps()& (~SelectionKey.OP_WRITE)); // 重重新注册除了写事件} else {sk.interestOps(sk.interestOps()| SelectionKey.OP_WRITE); // 追加写事件}}}} catch (CancelledKeyException e) {close();} catch (IOException e) {// LOG.error("FIXMSG",e);close();}}
该函数的核心逻辑就是readRequest处理请求函数和发送数据出去,当数据发送完成之后,就重新注册事件到列表中,无论是读事件还是写事件。
private void readRequest() throws IOException {// We have the request, now process and setup for nextInputStream bais = new ByteBufferInputStream(incomingBuffer); // 初始化数据BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);RequestHeader h = new RequestHeader();h.deserialize(bia, "header"); // 解析头部数据// Through the magic of byte buffers, txn will not be// pointing// to the start of the txnincomingBuffer = incomingBuffer.slice(); if (h.getType() == OpCode.auth) { // 如果是认证信息 则信息认证的处理AuthPacket authPacket = new AuthPacket();ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket);String scheme = authPacket.getScheme();AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);if (ap == null|| ap.handleAuthentication(this, authPacket.getAuth()) != KeeperException.Code.Ok) {if (ap == null)LOG.error("No authentication provider for scheme: "+ scheme + " has " + ProviderRegistry.listProviders());elseLOG.debug("Authentication failed for scheme: "+ scheme);// send a response...ReplyHeader rh = new ReplyHeader(h.getXid(), 0,KeeperException.Code.AuthFailed);sendResponse(rh, null, null);// ... and close connectionsendBuffer(NIOServerCnxn.closeConn);disableRecv();} else {LOG.debug("Authentication succeeded for scheme: "+ scheme);ReplyHeader rh = new ReplyHeader(h.getXid(), 0,KeeperException.Code.Ok);sendResponse(rh, null, null);}return;} else {zk.submitRequest(this, sessionId, h.getType(), h.getXid(),incomingBuffer, authInfo); // 如果不是认证则进入提交请求出去}if (h.getXid() >= 0) { // 获取当前的IDsynchronized (this) {outstandingRequests++;// check throttlingif (zk.getInProcess() > factory.outstandingLimit) { // 如果超过阈值则停止接受LOG.warn("Throttling recv " + zk.getInProcess());disableRecv();// following lines should not be needed since we are already// reading// } else {// enableRecv();}}}}
其中最核心的就是zk.submitRequest(this, sessionId, h.getType(), h.getXid(), incomingBuffer, authInfo)该函数的处理过程。此时的zk就分为LeaderZooKeeperServer(客户端连接leader)或是FollowerZooKeeperServer(客户端连接follower),因为这两个类都继承自ZooKeeperServer,都调用了submitRequest方法。
public void submitRequest(ServerCnxn cnxn, long sessionId, int type,int xid, ByteBuffer bb, List<Id> authInfo) {Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo); // 生成一个request实例submitRequest(si); // 处理该request}public void submitRequest(Request si) {if (firstProcessor == null) {synchronized (this) {try {while (!running) { // 检查是否在运行wait(1000); // 如果没有则等待}} catch (InterruptedException e) {LOG.error("FIXMSG",e);}if (firstProcessor == null) {throw new RuntimeException("Not started");}}}try {touch(si.cnxn); // 检查当前session是否过期,根据不同的类都会重写该函数boolean validpacket = Request.isValid(si.type); // 检查类型是否合法if (validpacket) {firstProcessor.processRequest(si); // 通过注册调用链来处理请求if (si.cnxn != null) {incInProcess();}} else {LOG.warn("Dropping packet at server of type " + si.type);// if unvalid packet drop the packet.}} catch (IOException e) {LOG.error("FIXMSG",e);}}
至此可以知道了调用了zk里面的firstProcessor类的处理流程。接下来我们就分不同的流程分析。
客户端直连leader
LeaderZooKeeperServer函数的处理,该类在setupRequestProcessors时的过程如下。
@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this); // 向客户端回复处理情况RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader().toBeApplied); // 回复事务commitProcessor = new CommitProcessor(toBeAppliedProcessor,Integer.toString(getClientPort()), false); // 提交事务RequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); // 请求事务firstProcessor = new PrepRequestProcessor(this, proposalProcessor); // 注册处理的初始流程}
PrepRequestProcessor类
首先查看一下PrepRequestProcessor类的初始化的流程。
public PrepRequestProcessor(ZooKeeperServer zks,RequestProcessor nextProcessor) {super("ProcessThread:" + zks.getClientPort());this.nextProcessor = nextProcessor; // 设置调用链的下一个this.zks = zks; // 设置zksstart(); // 开启线程run方法执行}
该函数的执行方法;
public void processRequest(Request request) {// request.addRQRec(">prep="+zks.outstandingChanges.size());submittedRequests.add(request); // 添加到运行的线程中去运行}
处理请求只是将request添加到submittedRequests列表中,然后会传入到该类的线程执行中去处理。即该类的run函数中执行。
@Overridepublic void run() {try {while (true) {Request request = submittedRequests.take(); // 从队列中获取数据long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;if (request.type == OpCode.ping) {traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;}ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); // 记录到日志if (Request.requestOfDeath == request) { // 如果终止的请求是这个则停止break;}pRequest(request); // 调用pRequest处理}} catch (InterruptedException e) {LOG.error("FIXMSG",e);}ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),"PrepRequestProcessor exited loop!");}@SuppressWarnings("unchecked")protected void pRequest(Request request) {// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +// request.type + " id = 0x" + Long.toHexString(request.sessionId));TxnHeader txnHeader = null;Record txn = null;try {switch (request.type) {case OpCode.create: // 如果是创建则创建数据txnHeader = new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), zks.getTime(), OpCode.create);zks.sessionTracker.checkSession(request.sessionId);CreateRequest createRequest = new CreateRequest();ZooKeeperServer.byteBuffer2Record(request.request,createRequest);String path = createRequest.getPath();int lastSlash = path.lastIndexOf('/');if (lastSlash == -1 || path.indexOf('\0') != -1) {throw new KeeperException.BadArgumentsException();}if (!fixupACL(request.authInfo, createRequest.getAcl())) {throw new KeeperException.InvalidACLException();}String parentPath = path.substring(0, lastSlash);ChangeRecord parentRecord = getRecordForPath(parentPath);checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,request.authInfo);int parentCVersion = parentRecord.stat.getCversion();CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());if (createMode.isSequential()) {path = path + String.format("%010d", parentCVersion);}try {if (getRecordForPath(path) != null) {throw new KeeperException.NodeExistsException();}} catch (KeeperException.NoNodeException e) {// ignore this one}boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;if (ephemeralParent) {throw new KeeperException.NoChildrenForEphemeralsException();}txn = new CreateTxn(path, createRequest.getData(),createRequest.getAcl(),createMode.isEphemeral());StatPersisted s = new StatPersisted();if (createMode.isEphemeral()) {s.setEphemeralOwner(request.sessionId);}parentRecord = parentRecord.duplicate(txnHeader.getZxid());parentRecord.childCount++;parentRecord.stat.setCversion(parentRecord.stat.getCversion() + 1);addChangeRecord(parentRecord);addChangeRecord(new ChangeRecord(txnHeader.getZxid(), path, s,0, createRequest.getAcl()));break;case OpCode.delete: // 如果是删除txnHeader = new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), zks.getTime(), OpCode.delete);zks.sessionTracker.checkSession(request.sessionId);DeleteRequest deleteRequest = new DeleteRequest();ZooKeeperServer.byteBuffer2Record(request.request,deleteRequest);path = deleteRequest.getPath();lastSlash = path.lastIndexOf('/');if (lastSlash == -1 || path.indexOf('\0') != -1|| path.equals("/")) {throw new KeeperException.BadArgumentsException();}parentPath = path.substring(0, lastSlash);parentRecord = getRecordForPath(parentPath);ChangeRecord nodeRecord = getRecordForPath(path);checkACL(zks, parentRecord.acl, ZooDefs.Perms.DELETE,request.authInfo);int version = deleteRequest.getVersion();if (version != -1 && nodeRecord.stat.getVersion() != version) {throw new KeeperException.BadVersionException();}if (nodeRecord.childCount > 0) {throw new KeeperException.NotEmptyException();}txn = new DeleteTxn(path);parentRecord = parentRecord.duplicate(txnHeader.getZxid());parentRecord.childCount--;parentRecord.stat.setCversion(parentRecord.stat.getCversion() + 1);addChangeRecord(parentRecord);addChangeRecord(new ChangeRecord(txnHeader.getZxid(), path,null, -1, null));break; case OpCode.setData: // 如果是设置txnHeader = new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), zks.getTime(), OpCode.setData);zks.sessionTracker.checkSession(request.sessionId);SetDataRequest setDataRequest = new SetDataRequest();ZooKeeperServer.byteBuffer2Record(request.request,setDataRequest);path = setDataRequest.getPath();nodeRecord = getRecordForPath(path);checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,request.authInfo);version = setDataRequest.getVersion();int currentVersion = nodeRecord.stat.getVersion();if (version != -1 && version != currentVersion) {throw new KeeperException.BadVersionException();}version = currentVersion + 1;txn = new SetDataTxn(path, setDataRequest.getData(), version);nodeRecord = nodeRecord.duplicate(txnHeader.getZxid());nodeRecord.stat.setVersion(version);addChangeRecord(nodeRecord);break;case OpCode.setACL: // 如果是设置权限txnHeader = new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), zks.getTime(), OpCode.setACL);zks.sessionTracker.checkSession(request.sessionId);SetACLRequest setAclRequest = new SetACLRequest();ZooKeeperServer.byteBuffer2Record(request.request,setAclRequest);if (!fixupACL(request.authInfo, setAclRequest.getAcl())) {throw new KeeperException.InvalidACLException();}path = setAclRequest.getPath();nodeRecord = getRecordForPath(path);checkACL(zks, nodeRecord.acl, ZooDefs.Perms.ADMIN,request.authInfo);version = setAclRequest.getVersion();currentVersion = nodeRecord.stat.getAversion();if (version != -1 && version != currentVersion) {throw new KeeperException.BadVersionException();}version = currentVersion + 1;txn = new SetACLTxn(path, setAclRequest.getAcl(), version);nodeRecord = nodeRecord.duplicate(txnHeader.getZxid());nodeRecord.stat.setAversion(version);addChangeRecord(nodeRecord);break;case OpCode.createSession: // 如果是创建会话txnHeader = new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), zks.getTime(), OpCode.createSession);request.request.rewind();int to = request.request.getInt();txn = new CreateSessionTxn(to);request.request.rewind();zks.sessionTracker.addSession(request.sessionId, to);break;case OpCode.closeSession: // 如果是关闭会话txnHeader = new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), zks.getTime(), OpCode.closeSession);HashSet<String> es = zks.dataTree.getEphemerals(request.sessionId);synchronized (zks.outstandingChanges) {for (ChangeRecord c : zks.outstandingChanges) {if (c.stat == null) {// Doing a deletees.remove(c.path);} else if (c.stat.getEphemeralOwner() == request.sessionId) {es.add(c.path);}}for (String path2Delete : es) {addChangeRecord(new ChangeRecord(txnHeader.getZxid(),path2Delete, null, 0, null));}}LOG.info("Processed session termination request for id: 0x"+ Long.toHexString(request.sessionId));break;case OpCode.sync:case OpCode.exists:case OpCode.getData:case OpCode.getACL:case OpCode.getChildren:case OpCode.ping:case OpCode.setWatches:break;}} catch (KeeperException e) {if (txnHeader != null) {txnHeader.setType(OpCode.error);txn = new ErrorTxn(e.getCode());}} catch (Exception e) {LOG.error("*********************************" + request);StringBuffer sb = new StringBuffer();ByteBuffer bb = request.request;if(bb!=null){bb.rewind();while (bb.hasRemaining()) {sb.append(Integer.toHexString(bb.get() & 0xff));}}elsesb.append("request buffer is null");LOG.error(sb.toString());LOG.error("Unexpected exception", e);if (txnHeader != null) {txnHeader.setType(OpCode.error);txn = new ErrorTxn(Code.MarshallingError);}}request.hdr = txnHeader; // 设置处理信息request.txn = txn;request.zxid = zks.getZxid(); // 获取新的事物IDnextProcessor.processRequest(request); // 调用下一个类处理该数据}
从流程中可以看出,相关的创建操作都集中在了该类中处理,处理完成之后就调用下一个类去处理reqeust,该下一个类是proposalProcessor类。
proposalProcessor类
public void processRequest(Request request) {// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +// request.type + " id = " + request.sessionId);// request.addRQRec(">prop");/* In the following IF-THEN-ELSE block, we process syncs on the leader. * If the sync is coming from a follower, then the follower* handler adds it to syncHandler. Otherwise, if it is a client of* the leader that issued the sync command, then syncHandler won't * contain the handler. In this case, we add it to syncHandler, and * call processRequest on the next processor.*/if(request instanceof FollowerSyncRequest){ // 如果是同步数据的请求zks.getLeader().processSync((FollowerSyncRequest)request); // 直接调用leader的同步数据的处理函数} else {nextProcessor.processRequest(request); // 如果不是则先调用下一个处理函数if (request.hdr != null) {// We need to sync and get consensus on any transactionszks.getLeader().propose(request); // 如果是事务 则提交搞事务syncProcessor.processRequest(request); // 同步该请求}}}
如果需要事务请求则通过propose将数据发送到follower中。
public Proposal propose(Request request) {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);try {request.hdr.serialize(boa, "hdr"); // 解析头部if (request.txn != null) {request.txn.serialize(boa, "txn");}baos.close();} catch (IOException e) {LOG.warn("This really should be impossible", e);}QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos.toByteArray(), null); // 生成事务包Proposal p = new Proposal();p.packet = pp;p.request = request; // 设置数据synchronized (this) {if (LOG.isDebugEnabled()) {LOG.debug("Proposing:: " + request);}outstandingProposals.add(p); // 将该事务添加到队列中 在后一步做确认lastProposed = p.packet.getZxid();sendPacket(pp); // 将该事务发送出去}return p;}
然后再检查是否需要做快照保存数据。接下来我们分析一下下一个调用类commitProcessor
CommitProcessor类
synchronized public void processRequest(Request request) {// request.addRQRec(">commit");if (LOG.isDebugEnabled()) { // 是否打印调试信息LOG.debug("Processing request:: " + request);}if (!finished) { // 如果没有结束则将该请求添加到队列中queuedRequests.add(request); notifyAll();}}@Overridepublic void run() {try {Request nextPending = null;ArrayList<Request> toProcess = new ArrayList<Request>();while (!finished) { // 判断是否结束int len = toProcess.size(); // 检查是否为空for (int i = 0; i < len; i++) {nextProcessor.processRequest(toProcess.get(i)); // 如果不为空则传递处理该请求}toProcess.clear();synchronized (this) {if ((queuedRequests.size() == 0 || nextPending != null) // 检查是否为空 或者 是否还有需要处理的数据&& committedRequests.size() == 0) {wait();continue; // 等待并继续}// First check and see if the commit came in for the pending// requestif ((queuedRequests.size() == 0 || nextPending != null)&& committedRequests.size() > 0) { // 检查是否队列为空但是 提交列表不为空Request r = committedRequests.remove(); // 获取该事务/** We match with nextPending so that we can move to the* next request when it is committed. We also want to* use nextPending because it has the cnxn member set* properly.*/if (nextPending != null&& nextPending.sessionId == r.sessionId&& nextPending.cxid == r.cxid) {// we want to send our version of the request.// the pointer to the connection in the requestnextPending.hdr = r.hdr;nextPending.txn = r.txn;nextPending.zxid = r.zxid; // 设置nextPendingtoProcess.add(nextPending); // 添加到该请求中nextPending = null;} else {// this request came from someone else so just// send the commit packettoProcess.add(r); // 添加到队列中}}}// We haven't matched the pending requests, so go back to// waitingif (nextPending != null) {continue;}synchronized (this) {// Process the next requests in the queuedRequestswhile (nextPending == null && queuedRequests.size() > 0) { // 检查queuedRequests是否为空Request request = queuedRequests.remove(); // 获取一个switch (request.type) {case OpCode.create:case OpCode.delete:case OpCode.setData:case OpCode.setACL:case OpCode.createSession:case OpCode.closeSession:nextPending = request;break;case OpCode.sync:if (matchSyncs) {nextPending = request; // 设置nextPending} else {toProcess.add(request);}break;default:toProcess.add(request); // 添加到处理队列中}}}}} catch (Exception e) {LOG.error("FIXMSG",e);}ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),"CommitProcessor exited loop!");}
此时主要就是调用下一个去处理请求。下一个类就是toBeAppliedProcessor
toBeAppliedProcessor类
public void processRequest(Request request) {// request.addRQRec(">tobe");next.processRequest(request); // 先处理该请求Proposal p = toBeApplied.peek(); // 从队列中获取数据if (p != null && p.request != null&& p.request.zxid == request.zxid) { // 如果事务不为空 事务id相同 则移除该事务表示该事务已经处理完成toBeApplied.remove();}}
主要处理流程如上。接下来调用的就是最后一个FinalRequestProcessor类。
FinalRequestProcessor类
public void processRequest(Request request) {if (LOG.isDebugEnabled()) { // 检查是否需要打印调试信息LOG.debug("Processing request:: " + request);}// request.addRQRec(">final");long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;if (request.type == OpCode.ping) {traceMask = ZooTrace.SERVER_PING_TRACE_MASK;}ZooTrace.logRequest(LOG, traceMask, 'E', request, ""); // 打印日志ProcessTxnResult rc = null;synchronized (zks.outstandingChanges) { // 检查该队列是否为空while (!zks.outstandingChanges.isEmpty()&& zks.outstandingChanges.get(0).zxid <= request.zxid) { // 从该队列中获取事务idif (zks.outstandingChanges.get(0).zxid < request.zxid) {LOG.warn("Zxid outstanding "+ zks.outstandingChanges.get(0).zxid+ " is less than current " + request.zxid);}zks.outstandingChanges.remove(0); // 移除比当前事务id小的事务}if (request.hdr != null) {rc = zks.dataTree.processTxn(request.hdr, request.txn); // 设置头部信息if (request.type == OpCode.createSession) { // 检查是否是创建会话if (request.txn instanceof CreateSessionTxn) {CreateSessionTxn cst = (CreateSessionTxn) request.txn;zks.sessionTracker.addSession(request.sessionId, cst.getTimeOut()); // 设置会话} else {LOG.warn("*****>>>>> Got "+ request.txn.getClass() + " "+ request.txn.toString());}} else if (request.type == OpCode.closeSession) {zks.sessionTracker.removeSession(request.sessionId); // 如果是关闭会话则移除该会话}}// do not add non quorum packets to the queue.if (Request.isQuorum(request.type)) {zks.addCommittedProposal(request);}}if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) { Factory scxn = zks.getServerCnxnFactory();// this might be possible since// we might just be playing diffs from the leaderif (scxn != null) {scxn.closeSession(request.sessionId);}}if (request.cnxn == null) {return;}zks.decInProcess();int err = 0;Record rsp = null;try {if (request.hdr != null && request.hdr.getType() == OpCode.error) { // 检查请求类型是否报错 如果报错则抛出错误throw KeeperException.create(((ErrorTxn) request.txn).getErr());}if (LOG.isDebugEnabled()) { // 是否调试LOG.debug(request);}switch (request.type) {case OpCode.ping:request.cnxn.sendResponse(new ReplyHeader(-2,zks.dataTree.lastProcessedZxid, 0), null, "response"); // 如果是ping 则返送最后一个事务id做返回return;case OpCode.createSession:request.cnxn.finishSessionInit(true); // 如果是创建会话则初始化return;case OpCode.create:rsp = new CreateResponse(rc.path); // 如果是创建则创建数据err = rc.err;break;case OpCode.delete:err = rc.err;break;case OpCode.setData:rsp = new SetDataResponse(rc.stat); // 如果是设置数据则设置err = rc.err;break;case OpCode.setACL:rsp = new SetACLResponse(rc.stat); // 如果是设置访问权限err = rc.err;break;case OpCode.closeSession:err = rc.err;break;case OpCode.sync:SyncRequest syncRequest = new SyncRequest(); // 同步数据请求ZooKeeperServer.byteBuffer2Record(request.request,syncRequest);rsp = new SyncResponse(syncRequest.getPath()); // 返回一个请求break;case OpCode.exists:// TODO we need to figure out the security requirement for this!ExistsRequest existsRequest = new ExistsRequest(); // 检查是否存在ZooKeeperServer.byteBuffer2Record(request.request,existsRequest);String path = existsRequest.getPath();if (path.indexOf('\0') != -1) {throw new KeeperException.BadArgumentsException();}Stat stat = zks.dataTree.statNode(path, existsRequest.getWatch() ? request.cnxn : null);rsp = new ExistsResponse(stat);break; case OpCode.getData:GetDataRequest getDataRequest = new GetDataRequest(); // 查询数据ZooKeeperServer.byteBuffer2Record(request.request,getDataRequest);DataNode n = zks.dataTree.getNode(getDataRequest.getPath());if (n == null) {throw new KeeperException.NoNodeException();}PrepRequestProcessor.checkACL(zks, zks.dataTree.convertLong(n.acl),ZooDefs.Perms.READ,request.authInfo);stat = new Stat();byte b[] = zks.dataTree.getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? request.cnxn : null);rsp = new GetDataResponse(b, stat);break;case OpCode.setWatches:SetWatches setWatches = new SetWatches();// XXX We really should NOT need this!!!!request.request.rewind();ZooKeeperServer.byteBuffer2Record(request.request, setWatches);long relativeZxid = setWatches.getRelativeZxid();zks.dataTree.setWatches(relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(),setWatches.getChildWatches(), request.cnxn);break;case OpCode.getACL:GetACLRequest getACLRequest = new GetACLRequest(); // 获取访问权限ZooKeeperServer.byteBuffer2Record(request.request,getACLRequest);stat = new Stat();List<ACL> acl = zks.dataTree.getACL(getACLRequest.getPath(), stat);rsp = new GetACLResponse(acl, stat);break;case OpCode.getChildren: // 获取节点下的路径GetChildrenRequest getChildrenRequest = new GetChildrenRequest();ZooKeeperServer.byteBuffer2Record(request.request,getChildrenRequest);stat = new Stat();n = zks.dataTree.getNode(getChildrenRequest.getPath());if (n == null) {throw new KeeperException.NoNodeException();}PrepRequestProcessor.checkACL(zks, zks.dataTree.convertLong(n.acl), ZooDefs.Perms.READ,request.authInfo);List<String> children = zks.dataTree.getChildren(getChildrenRequest.getPath(), stat, getChildrenRequest.getWatch() ? request.cnxn : null);rsp = new GetChildrenResponse(children);break;}} catch (KeeperException e) {err = e.getCode();} catch (Exception e) {LOG.warn("****************************** " + request);StringBuffer sb = new StringBuffer();ByteBuffer bb = request.request;bb.rewind();while (bb.hasRemaining()) {sb.append(Integer.toHexString(bb.get() & 0xff));}LOG.warn(sb.toString());LOG.error("FIXMSG",e);err = Code.MarshallingError;}ReplyHeader hdr = new ReplyHeader(request.cxid, request.zxid, err); // 生成响应投ServerStats.getInstance().updateLatency(request.createTime);try {request.cnxn.sendResponse(hdr, rsp, "response"); // 返回响应数据} catch (IOException e) {LOG.error("FIXMSG",e);}}
从该类的流程可知,该类就是最终将数据处理完成后发送出去的类,最终通过request.cnxn.sendResponse方法将处理完成的数据发送出去。
synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {if (closed) {return;}ByteArrayOutputStream baos = new ByteArrayOutputStream();// Make space for lengthBinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);try {baos.write(fourBytes); // 写入到缓冲区bos.writeRecord(h, "header");if (r != null) {bos.writeRecord(r, tag);}baos.close();} catch (IOException e) {LOG.error("Error serializing response");}byte b[] = baos.toByteArray();ByteBuffer bb = ByteBuffer.wrap(b); bb.putInt(b.length - 4).rewind();sendBuffer(bb); // 发送数据if (h.getXid() > 0) {synchronized (this.factory) {outstandingRequests--;// check throttlingif (zk.getInProcess() < factory.outstandingLimit|| outstandingRequests < 1) {sk.selector().wakeup();enableRecv();}}}}
至此,大致的服务端数据处理流程基本完成,里面涉及到的事务相关的提交分发都是通过调用链的设计模式来进行处理的。
客户端连接follower
@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this); // 最终发送数据commitProcessor = new CommitProcessor(finalProcessor,Integer.toString(getClientPort()), true); // 处理提交事务firstProcessor = new FollowerRequestProcessor(this, commitProcessor); // 第一个处理数据syncProcessor = new SyncRequestProcessor(this,new SendAckRequestProcessor(getFollower())); // 同步数据请求}
此时第一个进入的请求处理是FollowerRequestProcessor类,
FollowerRequestProcessor类
@Overridepublic void run() {try {while (!finished) {Request request = queuedRequests.take(); // 获取队列中的数据ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,'F', request, "");if (request == Request.requestOfDeath) { // 检查是否是停止break;}// We want to queue the request to be processed before we submit// the request to the leader so that we are ready to receive// the responsenextProcessor.processRequest(request); // 调用下一个去处理// We now ship the request to the leader. As with all// other quorum operations, sync also follows this code// path, but different from others, we need to keep track// of the sync operations this follower has pending, so we// add it to pendingSyncs.switch (request.type) { // 判断类型case OpCode.sync:zks.pendingSyncs.add(request); // 如果是同步 则添加到pendingSyncs队列中case OpCode.create:case OpCode.delete:case OpCode.setData:case OpCode.setACL:case OpCode.createSession:case OpCode.closeSession:zks.getFollower().request(request); // 调用follower的request方法break;}}} catch (Exception e) {LOG.error("FIXMSG",e);}ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),"FollowerRequestProcessor exited loop!");}public void processRequest(Request request) {if (!finished) {queuedRequests.add(request); // 将请求添加到queuedRequests队列中}}
此时如果是create删除等类型的数据会转发到leader去处理,通过zks.getFollower().request(request)转发到leader;
void request(Request request) throws IOException {ByteArrayOutputStream baos = new ByteArrayOutputStream();DataOutputStream oa = new DataOutputStream(baos);oa.writeLong(request.sessionId); // 写入会话oa.writeInt(request.cxid);oa.writeInt(request.type);if (request.request != null) { // 检查请求是否为空request.request.rewind();int len = request.request.remaining();byte b[] = new byte[len];request.request.get(b);request.request.rewind();oa.write(b);}oa.close();QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); // 写入包中
// QuorumPacket qp;
// if(request.type == OpCode.sync){
// qp = new QuorumPacket(Leader.SYNC, -1, baos
// .toByteArray(), request.authInfo);
// }
// else{
// qp = new QuorumPacket(Leader.REQUEST, -1, baos
// .toByteArray(), request.authInfo);
// }writePacket(qp); // 将该包发送出去}
此时数据就发送到了leader处理,此时leader的处理流程就是FollowerHandler中的run方法进行处理,此时就是将客户端的请求数据例如修改删除等数据交友leader处理。接下来如查询的相关操作就直接返回数据。剩余的两个类的执行流程已经基本概述再次就补在赘述。
总结
本文主要描述了主从角色关系之间的一些关系,在运行期间选举角色的改变是如何运作的,然后概述了数据在主从之间的交互的流程,主要就是事务等数据变更都会给leader处理,像数据读取如果客户端连接上来可以直接处理返回。里面还有follower的接受leader传来的数据的流程因为篇幅原因就没有依次列举,但是执行流程在前文中也做过描述。由于本人才疏学浅,如有错误请批评指正。
Zookeeper源码分析:主从角色关系流程概述相关推荐
- ZooKeeper源码分析之完整网络通信流程(一)
文章目录 2021SC@SDUSC 前言 ZooKeeper中网络通信执行流程 总结 2021SC@SDUSC 前言 接下来将进入源码世界来一步一步分析客户端与服务端之间是如何通过ClientCnxn ...
- zookeeper源码分析之五服务端(集群leader)处理请求流程
leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...
- zookeeper源码分析之四服务端(单机)处理请求流程
上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...
- zookeeper源码分析之三客户端发送请求流程
znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的 ...
- HDFS源码分析DataXceiver之整体流程
在<HDFS源码分析之DataXceiverServer>一文中,我们了解到在DataNode中,有一个后台工作的线程DataXceiverServer.它被用于接收来自客户端或其他数据节 ...
- Fuchsia源码分析--系统调用流程
Fuchsia源码分析--系统调用流程 以zx_channel_create为例 Fuchsia系统调用的定义 Fuchsia系统调用定义文件的编译 Fuchsia系统调用用户空间的调用流程 zx_c ...
- zookeeper源码分析之恢复事务日志
zookeeper源码分析之恢复事务日志 前言 源码分析 查看事务日志命令 总结 前言 本文是基于zookeeper集群启动过程分析(https://blog.csdn.net/weixin_4244 ...
- Zookeeper源码分析(二) ----- zookeeper日志
zookeeper源码分析系列文章: Zookeeper源码分析(一) ----- 源码运行环境搭建 原创博客,纯手敲,转载请注明出处,谢谢! 既然我们是要学习源码,那么如何高效地学习源代码呢?答案就 ...
- 源码分析Dubbo服务提供者启动流程-上篇
本节将详细分析Dubbo服务提供者的启动流程,请带着如下几个疑问进行本节的阅读,因为这几个问题将是接下来几篇文章分析的重点内容. 1.什么时候建立与注册中心的连接. 2.服务提供者什么时候向注册中 ...
最新文章
- 管理软件预警通知(Notification)功能的实现案例分析
- Hadoop新手篇:hadoop入门基础教程
- Windows8.1系统下让VS2012编译运行IIS Express 64位 调试器
- 在div中设置文字与内部div垂直居中
- 转: MySQL 赋予用户权限(grant %-远程和localhost-本地区别)
- 一起学习C语言:C语言循环结构(三)
- mysql群集配置_MySQL主主集群配置
- 跟我一起学docker(15)--监控日志和日志管理
- 《机器学习Python实践》第6章——数据理解
- Boost C++ 智能指针
- 欧洲航天局遭匿名者(Anonymous)攻击泄露大量数据
- 360极速浏览器如何设置ie8兼容模式
- 联想小新15用什么C语言程序,长处更长 优点更优 联想小新Air 15 2019首测
- 提取QQ游戏图标并显示
- 全基因组组装,注释与评估软件
- 力扣的组合总和解法 (Python)
- 吴恩达机器学习[9]-神经网络学习
- Android 状态栏工具——一行代码实现状态栏字体变黑
- 异军突起,私域流量才是真正的护城河(中)
- 树莓派双目视觉照片上传电脑及互相通信问题解决
热门文章
- 乘“云原生”之风、踏“数字化”的浪,《新程序员003》开启预售!
- Python 操作 MongoDB 数据库!
- 如何用 Slack 和 Kubernetes 构建一个聊天机器人?| 附代码
- Python 三十大实践、建议和技巧
- 百度ERNIE登顶GLUE榜单,得分首破90大关
- 一场高质量的技术盛会怎样炼成?「2019中国大数据技术大会」蓄势待发,还不快上车?...
- 三两下实现NLP训练和预测,这四个框架你要知道
- 深度学习已至“瓶颈”?英特尔:数据处理是一剂良药
- 速度提升270倍!微软和浙大联合推出全新语音合成系统FastSpeech
- 今晚8点直播 | 深入浅出理解A3C强化学习