zookeeper源码分析之恢复事务日志

  • 前言
  • 源码分析
  • 查看事务日志命令
  • 总结

前言

本文是基于zookeeper集群启动过程分析(https://blog.csdn.net/weixin_42442768/article/details/109247622),对zk从磁盘中读取文件并恢复为内存中的zk数据结构这一过程进行源码分析,snapshot的恢复过程见上一篇(https://blog.csdn.net/weixin_42442768/article/details/110134663),本文主要分析事务日志的恢复过程。

源码分析

首先定位到FileTxnSnapLog类的restore方法,该方法主要功能是将磁盘中的snapshots文件和事务日志文件恢复到内存中的ZKDatabase结构中,从而进行正常的工作。

    public long restore(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {long deserializeResult = snapLog.deserialize(dt, sessions);FileTxnLog txnLog = new FileTxnLog(dataDir);if (-1L == deserializeResult) {/* this means that we couldn't find any snapshot, so we need to* initialize an empty database (reported in ZOOKEEPER-2325) */if (txnLog.getLastLoggedZxid() != -1) {throw new IOException("No snapshot found, but there are log entries. " +"Something is broken!");}/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()*       or use Map on save() */save(dt, (ConcurrentHashMap<Long, Integer>)sessions);/* return a zxid of zero, since we the database is empty */return 0;}return fastForwardFromEdits(dt, sessions, listener);}

上一篇内容已经分析了快照文件的恢复过程,我们直接从fastForwardFromEdits方法开始分析事务日志的恢复过程。

    public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {//1. 快速读取事务日志,并创建日志文件迭代器`TxnIterator`TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);long highestZxid = dt.lastProcessedZxid;TxnHeader hdr;try {//2. 按事务日志的zxid顺序解析所有文件while (true) {// iterator points to// the first valid txn when initializedhdr = itr.getHeader();if (hdr == null) {//empty logsreturn dt.lastProcessedZxid;}//3. 更新zxid并处理事务if (hdr.getZxid() < highestZxid && highestZxid != 0) {LOG.error("{}(highestZxid) > {}(next log) for type {}",highestZxid, hdr.getZxid(), hdr.getType());} else {highestZxid = hdr.getZxid();}try {processTransaction(hdr,dt,sessions, itr.getTxn());} catch(KeeperException.NoNodeException e) {throw new IOException("Failed to process transaction type: " +hdr.getType() + " error: " + e.getMessage(), e);}//4. 监听器监听事务日志恢复信息listener.onTxnLoaded(hdr, itr.getTxn());if (!itr.next())break;}} finally {if (itr != null) {itr.close();}}//5. 返回最新zxidreturn highestZxid;}

整个事务日志的恢复流程如下:

  1. 快速读取事务日志,并创建日志文件迭代器TxnIterator
  2. 按事务日志的zxid顺序解析所有文件
  3. 更新zxid并处理事务
  4. 监听器监听事务日志恢复信息
  5. 返回最新zxid

下面对过程1(TxnIterator的创建)、3(processTransaction处理事务)、4(PlayBackListener监听器)进行详细说明。

TxnIterator的创建

txnLog是事务文件的存储目录,这里的参数是DataTree结构中的lastProcessedZxid+1,而DataTree是从snapshot文件恢复的内存中的数据结构,直接进入read方法:

    public TxnIterator read(long zxid) throws IOException {return read(zxid, true);}public TxnIterator read(long zxid, boolean fastForward) throws IOException {return new FileTxnIterator(logDir, zxid, fastForward);}

这里的fastForward置为true体现在从指定的zxid文件开始恢复事务文件,接着创建一个FileTxnIterator对象,先来看类定义和成员变量:

    public static class FileTxnIterator implements TxnLog.TxnIterator {File logDir;  //事务日志文件目录long zxid;        //指定要恢复的事务日志的起始zxidTxnHeader hdr;   //文件头解析类Record record;  //解析的文件信息File logFile;  //事务日志文件InputArchive ia;    //反序列化接口static final String CRC_ERROR="CRC check failed";PositionInputStream inputStream=null;  //input流//stored files is the list of files greater than//the zxid we are looking for.private ArrayList<File> storedFiles;    //存储要恢复的事务文件集合

构造方法如下:

        public FileTxnIterator(File logDir, long zxid, boolean fastForward)throws IOException {this.logDir = logDir;this.zxid = zxid;init();if (fastForward && hdr != null) {while (hdr.getZxid() < zxid) {if (!next())break;}}}

这里首先进行初始化init

        void init() throws IOException {storedFiles = new ArrayList<File>();List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false);for (File f: files) {if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {storedFiles.add(f);}// add the last logfile that is less than the zxidelse if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {storedFiles.add(f);break;}}goToNextLog();next();}

这里首先获取所有符合事务日志前缀的文件,并按zxid降序排列,和snapshot文件的获取方式类似,不详细展开。
storedFiles是存储要恢复的事务日志文件的集合,这里会把所有大于等于zxid的事务日志文件加入,同时加入小于的zxid的下一个文件(files已按降序排列)。

        private boolean goToNextLog() throws IOException {if (storedFiles.size() > 0) {this.logFile = storedFiles.remove(storedFiles.size()-1);ia = createInputArchive(this.logFile);return true;}return false;}

goToNextLog是建立storedFiles集合中末尾文件(zxid最小的)的反序列化流,事务日志文件的魔数是ZKLG,在createInputArchive方法中会对魔数进行校验,不深入分析。

        public boolean next() throws IOException {if (ia == null) {return false;}try {long crcValue = ia.readLong("crcvalue");byte[] bytes = Util.readTxnBytes(ia);// Since we preallocate, we define EOF to be anif (bytes == null || bytes.length==0) {throw new EOFException("Failed to read " + logFile);}// EOF or corrupted record// validate CRCChecksum crc = makeChecksumAlgorithm();crc.update(bytes, 0, bytes.length);if (crcValue != crc.getValue())throw new IOException(CRC_ERROR);hdr = new TxnHeader();record = SerializeUtils.deserializeTxn(bytes, hdr);} catch (EOFException e) {LOG.debug("EOF exception " + e);inputStream.close();inputStream = null;ia = null;hdr = null;// this means that the file has ended// we should go to the next fileif (!goToNextLog()) {return false;}// if we went to the next log file, we should call next() againreturn next();} catch (IOException e) {inputStream.close();throw e;}return true;}

next方法是FileTxnIterator迭代器的迭代方法,这里主要流程如下:

  1. crc校验,这个在snapshot的序列化和反序列化讲过,用户文件的验证
  2. readTxnBytes是读取事务文件的内容,进行校验
  3. 创建一个TxnHeader对象,用于反序列化文件头
  4. record记录反序列化事务日志内容(下面展开deserializeTxn方法)
  5. EOFException异常的catch部分,这里的功能包括
    a) 表示文件读到末尾,关闭流
    b) 建立下一个事务文件的流,调用goToNextLog方法
    c) 调用next方法来反序列化下一个流的相关内容,保证迭代过程的连续性

这里看一下deserializeTxn方法:

    public static Record deserializeTxn(byte txnBytes[], TxnHeader hdr)throws IOException {final ByteArrayInputStream bais = new ByteArrayInputStream(txnBytes);InputArchive ia = BinaryInputArchive.getArchive(bais);hdr.deserialize(ia, "hdr");bais.mark(bais.available());Record txn = null;switch (hdr.getType()) {case OpCode.createSession:// This isn't really an error txn; it just has the same// format. The error represents the timeouttxn = new CreateSessionTxn();break;case OpCode.closeSession:return null;case OpCode.create:case OpCode.create2:txn = new CreateTxn();break;case OpCode.createTTL:txn = new CreateTTLTxn();break;case OpCode.createContainer:txn = new CreateContainerTxn();break;case OpCode.delete:case OpCode.deleteContainer:txn = new DeleteTxn();break;case OpCode.reconfig:case OpCode.setData:txn = new SetDataTxn();break;case OpCode.setACL:txn = new SetACLTxn();break;case OpCode.error:txn = new ErrorTxn();break;case OpCode.multi:txn = new MultiTxn();break;default:throw new IOException("Unsupported Txn with type=%d" + hdr.getType());}if (txn != null) {try {txn.deserialize(ia, "txn");} catch(EOFException e) {// perhaps this is a V0 Createif (hdr.getType() == OpCode.create) {CreateTxn create = (CreateTxn)txn;bais.reset();CreateTxnV0 createv0 = new CreateTxnV0();createv0.deserialize(ia, "txn");// cool now make it V1. a -1 parentCVersion will// trigger fixup processing in processTxncreate.setPath(createv0.getPath());create.setData(createv0.getData());create.setAcl(createv0.getAcl());create.setEphemeral(createv0.getEphemeral());create.setParentCVersion(-1);} else {throw e;}}}return txn;}

这块内容比较好理解,解析每个事务的具体类型,来创建相应的事务类(都是实现Record接口),可以根据平时使用的zk命令对照来看。这些反序列化后的事务类,最终会在processTransaction中去执行。

processTransaction处理事务

回到fastForwardFromEdits方法:

    public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);long highestZxid = dt.lastProcessedZxid;TxnHeader hdr;try {while (true) {// iterator points to// the first valid txn when initializedhdr = itr.getHeader();if (hdr == null) {//empty logsreturn dt.lastProcessedZxid;}if (hdr.getZxid() < highestZxid && highestZxid != 0) {LOG.error("{}(highestZxid) > {}(next log) for type {}",highestZxid, hdr.getZxid(), hdr.getType());} else {highestZxid = hdr.getZxid();}try {processTransaction(hdr,dt,sessions, itr.getTxn());} catch(KeeperException.NoNodeException e) {throw new IOException("Failed to process transaction type: " +hdr.getType() + " error: " + e.getMessage(), e);}listener.onTxnLoaded(hdr, itr.getTxn());if (!itr.next())break;}} finally {if (itr != null) {itr.close();}}return highestZxid;}

TxnIterator迭代器创建完成,在while循环中更新highestZxid为迭代器当前事务日志文件的zxid,进入processTransaction方法:

    public void processTransaction(TxnHeader hdr,DataTree dt,Map<Long, Integer> sessions, Record txn)throws KeeperException.NoNodeException {ProcessTxnResult rc;switch (hdr.getType()) {case OpCode.createSession:sessions.put(hdr.getClientId(),((CreateSessionTxn) txn).getTimeOut());if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"playLog --- create session in log: 0x"+ Long.toHexString(hdr.getClientId())+ " with timeout: "+ ((CreateSessionTxn) txn).getTimeOut());}// give dataTree a chance to sync its lastProcessedZxidrc = dt.processTxn(hdr, txn);break;case OpCode.closeSession:sessions.remove(hdr.getClientId());if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"playLog --- close session in log: 0x"+ Long.toHexString(hdr.getClientId()));}rc = dt.processTxn(hdr, txn);break;default:rc = dt.processTxn(hdr, txn);}/*** Snapshots are lazily created. So when a snapshot is in progress,* there is a chance for later transactions to make into the* snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS* errors could occur. It should be safe to ignore these.*/if (rc.err != Code.OK.intValue()) {LOG.debug("Ignoring processTxn failure hdr: {}, error: {}, path: {}",hdr.getType(), rc.err, rc.path);}}

这里针对createSession命令和closeSession命令对sessionWithTimeouts这个map进行添加和删除,最终都会调用DataTree类的processTxn方法,将header中的类型匹配到对应的命令,将解析后的Record类实例转换成对应的子类(例如CreateTxn),调用对应的命令方法完成事务日志的执行操作。

processTxn方法除了在事务日志恢复时会被调用,在客户端向leader执行写操作时同样会执行,以后单独分析客户端和zk集群的交互过程。

  • 客户端和zk集群交互过程

最终会返回一个ProcessTxnResult对象,包含一下信息:

成员变量 类型 含义
clientId long 客户端id
cxid int 客户端操作id
zxid long 事务对应的zxid
err int 错误码
type int 客户端操作类型
path String 事务执行结果的路径
stat Stat 事务执行对应ZNode的状态信息
multiResult List 组合命令执行结果结合

PlayBackListener监听器

listener.onTxnLoaded(hdr,itr.getTxn())这里主要任务是监听每条事务日志,将事务信息存储起来,用于向集群中其他节点同步事务。

    private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {public void onTxnLoaded(TxnHeader hdr, Record txn){addCommittedProposal(hdr, txn);}};

进入addCommittedProposal方法:

    private void addCommittedProposal(TxnHeader hdr, Record txn) {Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());addCommittedProposal(r);}

创建一个Request类用户封装事务日志的相关信息。

    public void addCommittedProposal(Request request) {WriteLock wl = logLock.writeLock();try {wl.lock();if (committedLog.size() > commitLogCount) {committedLog.removeFirst();minCommittedLog = committedLog.getFirst().packet.getZxid();}if (committedLog.isEmpty()) {minCommittedLog = request.zxid;maxCommittedLog = request.zxid;}byte[] data = SerializeUtils.serializeRequest(request);QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);Proposal p = new Proposal();p.packet = pp;p.request = request;committedLog.add(p);maxCommittedLog = p.packet.getZxid();} finally {wl.unlock();}}

这里最主要的是将当前成功执行的事务日志信息添加到committedLog中,用于快速向Follower节点同步。

至此,事务日志恢复过程结束,返回执行成功的最高事务日志的zxid。

查看事务日志命令

和snapshot查看命令类似,zk提供了LogFormatter类来查看事务日志文件,可以看一下main方法:

    public static void main(String[] args) throws Exception {if (args.length != 1) {System.err.println("USAGE: LogFormatter log_file");System.exit(2);}FileInputStream fis = new FileInputStream(args[0]);BinaryInputArchive logStream = BinaryInputArchive.getArchive(fis);FileHeader fhdr = new FileHeader();fhdr.deserialize(logStream, "fileheader");if (fhdr.getMagic() != FileTxnLog.TXNLOG_MAGIC) {System.err.println("Invalid magic number for " + args[0]);System.exit(2);}System.out.println("ZooKeeper Transactional Log File with dbid "+ fhdr.getDbid() + " txnlog format version "+ fhdr.getVersion());int count = 0;while (true) {long crcValue;byte[] bytes;try {crcValue = logStream.readLong("crcvalue");bytes = logStream.readBuffer("txnEntry");} catch (EOFException e) {System.out.println("EOF reached after " + count + " txns.");return;}if (bytes.length == 0) {// Since we preallocate, we define EOF to be an// empty transactionSystem.out.println("EOF reached after " + count + " txns.");return;}Checksum crc = new Adler32();crc.update(bytes, 0, bytes.length);if (crcValue != crc.getValue()) {throw new IOException("CRC doesn't match " + crcValue +" vs " + crc.getValue());}TxnHeader hdr = new TxnHeader();Record txn = SerializeUtils.deserializeTxn(bytes, hdr);System.out.println(DateFormat.getDateTimeInstance(DateFormat.SHORT,DateFormat.LONG).format(new Date(hdr.getTime()))+ " session 0x"+ Long.toHexString(hdr.getClientId())+ " cxid 0x"+ Long.toHexString(hdr.getCxid())+ " zxid 0x"+ Long.toHexString(hdr.getZxid())+ " " + TraceFormatter.op2String(hdr.getType()) + " " + txn);if (logStream.readByte("EOR") != 'B') {LOG.error("Last transaction was partial.");throw new EOFException("Last transaction was partial.");}count++;}}

其实就是将事务日志文件进行反序列化,包括文件头校验、Adler32算法校验等操作,最终输出到控制台。

命令如下:

java -classpath .:/opt/module/apache-zookeeper-3.5.8-bin/lib/zookeeper-3.5.8.jar:/opt/module/apache-zookeeper-3.5.8-bin/lib/zookeeper-jute-3.5.8.jar:/opt/module/apache-zookeeper-3.5.8-bin/lib/slf4j-api-1.7.25.jar:/opt/module/apache-zookeeper-3.5.8-bin/lib/slf4j-log4j12-1.7.25.jar:/opt/module/apache-zookeeper-3.5.8-bin/lib/log4j-1.2.17.jar org.apache.zookeeper.server.LogFormatter log.XXX

这个命令需要按照不同的安装版本进行调整,到lib目录下观察对应jar包是否存在以及版本信息。
这是zookeeper提供了专门查看日志&快照的api工具,查看信息如下:

总结

整个事务日志恢复的过程和snapshot恢复过程类似,区别主要在于:snapshot反序列化到内存数据结构ZKDatabase,而事务日志除了反序列化到内存,还会依次执行事务并将事务信息保存到ZKDatabase的committedLog中用于同步事务。

zookeeper源码分析之恢复事务日志相关推荐

  1. Zookeeper源码分析(二) ----- zookeeper日志

    zookeeper源码分析系列文章: Zookeeper源码分析(一) ----- 源码运行环境搭建 原创博客,纯手敲,转载请注明出处,谢谢! 既然我们是要学习源码,那么如何高效地学习源代码呢?答案就 ...

  2. zookeeper源码分析之四服务端(单机)处理请求流程

    上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...

  3. 数据库中间件 MyCAT源码分析 —— XA分布式事务

    title: MyCAT 源码分析 -- XA分布式事务 date: 2017-07-15 tags: categories: MyCAT permalink: MyCAT/xa-distribute ...

  4. Spring事务源码分析责任链事务链事务不生效

    文章目录 前言 带着问题分析源码 事务源码分析 寻找Spring事务源码类 TransactionInterceptor调用栈 分析Spring AOP责任链 分析TransactionInterce ...

  5. zookeeper源码分析之五服务端(集群leader)处理请求流程

    leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

  6. zookeeper源码分析之三客户端发送请求流程

    znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的 ...

  7. RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

    在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...

  8. velocity源码分析:初始化之日志系统

    之前在"velocity源码分析:velocity初始化"文章中粗略地介绍了velocity整体的初始化过程,包括各个系统的初始化,本文主要介绍日志系统初始化. 日志系统类图: 概 ...

  9. Zookeeper源码分析:Leader角色初始化

    参考资料 <<从PAXOS到ZOOKEEPER分布式一致性原理与实践>> zookeeper-3.0.0 Leader角色初始化 在上文的选举完成之后,每个zk实例都会根据选举 ...

最新文章

  1. python中的元类_python中的元类
  2. c语言各种编程风格 微软 gnu,编程规范-c语言的编程风格
  3. 虚拟网卡与物理网卡TCP协议数据传输对比
  4. java对密码进行加密的方法_如何在JAVA中使用MD5加密对密码进行加密
  5. 使用ajax将数据显示在指定位置_AJAX学习主题之一
  6. Linux工作笔记035---设置连接Linux Centos 超时连接时间_空闲的等待时间 -bash: TMOUT: readonly variable
  7. AsnycTask的内部的实现机制
  8. 以mysql为例有几种隔离级别_mysql隔离级别有几种
  9. DWM1000 定位操作流程--[蓝点无限]
  10. ffmpeg推流到流媒体服务器
  11. Java垃圾回收的时间点
  12. 美化版缤纷彩色文字广告代码文字+网站添加AD教程
  13. 【kali】安装ibus中文输入法
  14. 电工与电子技术实验——叠加定理与戴维南定理
  15. ORACLE 库缓存
  16. Vue 点击按钮跳转其他链接
  17. Shopify 前端开发 占位符(占位图片)的使用
  18. MATLAB船舶开尔文尾迹三维仿真建模
  19. docker中安装Mkdocs
  20. 听说 520 你还没对象,来这里看看

热门文章

  1. 学习各位前辈开个blog
  2. 金融互联网之网络征信技术接口
  3. oracle组合数据类型,oracle复合数据类型-ZT
  4. 14家泰国银行支持利用区块链平台将合同数字化
  5. 2020年书法落款_学书法的请注意,落款不要写“庚子年”
  6. php自动收录导航程序,最新自动收录自带查反链导航源码
  7. 完整elasticsearch安装及其插件安装
  8. 转:POI操作Excel:cell的背景颜色类型
  9. ESP32系列--第九篇 ADC的使用
  10. Python 带你高效剪辑创作短视频