zookeeper源码分析之恢复事务日志
zookeeper源码分析之恢复事务日志
- 前言
- 源码分析
- 查看事务日志命令
- 总结
前言
本文是基于zookeeper集群启动过程分析(https://blog.csdn.net/weixin_42442768/article/details/109247622),对zk从磁盘中读取文件并恢复为内存中的zk数据结构这一过程进行源码分析,snapshot的恢复过程见上一篇(https://blog.csdn.net/weixin_42442768/article/details/110134663),本文主要分析事务日志的恢复过程。
源码分析
首先定位到FileTxnSnapLog
类的restore
方法,该方法主要功能是将磁盘中的snapshots文件和事务日志文件恢复到内存中的ZKDatabase结构中,从而进行正常的工作。
public long restore(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {long deserializeResult = snapLog.deserialize(dt, sessions);FileTxnLog txnLog = new FileTxnLog(dataDir);if (-1L == deserializeResult) {/* this means that we couldn't find any snapshot, so we need to* initialize an empty database (reported in ZOOKEEPER-2325) */if (txnLog.getLastLoggedZxid() != -1) {throw new IOException("No snapshot found, but there are log entries. " +"Something is broken!");}/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()* or use Map on save() */save(dt, (ConcurrentHashMap<Long, Integer>)sessions);/* return a zxid of zero, since we the database is empty */return 0;}return fastForwardFromEdits(dt, sessions, listener);}
上一篇内容已经分析了快照文件的恢复过程,我们直接从fastForwardFromEdits
方法开始分析事务日志的恢复过程。
public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {//1. 快速读取事务日志,并创建日志文件迭代器`TxnIterator`TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);long highestZxid = dt.lastProcessedZxid;TxnHeader hdr;try {//2. 按事务日志的zxid顺序解析所有文件while (true) {// iterator points to// the first valid txn when initializedhdr = itr.getHeader();if (hdr == null) {//empty logsreturn dt.lastProcessedZxid;}//3. 更新zxid并处理事务if (hdr.getZxid() < highestZxid && highestZxid != 0) {LOG.error("{}(highestZxid) > {}(next log) for type {}",highestZxid, hdr.getZxid(), hdr.getType());} else {highestZxid = hdr.getZxid();}try {processTransaction(hdr,dt,sessions, itr.getTxn());} catch(KeeperException.NoNodeException e) {throw new IOException("Failed to process transaction type: " +hdr.getType() + " error: " + e.getMessage(), e);}//4. 监听器监听事务日志恢复信息listener.onTxnLoaded(hdr, itr.getTxn());if (!itr.next())break;}} finally {if (itr != null) {itr.close();}}//5. 返回最新zxidreturn highestZxid;}
整个事务日志的恢复流程如下:
- 快速读取事务日志,并创建日志文件迭代器
TxnIterator
- 按事务日志的zxid顺序解析所有文件
- 更新zxid并处理事务
- 监听器监听事务日志恢复信息
- 返回最新zxid
下面对过程1(TxnIterator
的创建)、3(processTransaction
处理事务)、4(PlayBackListener
监听器)进行详细说明。
TxnIterator的创建
txnLog
是事务文件的存储目录,这里的参数是DataTree结构中的lastProcessedZxid
+1,而DataTree是从snapshot文件恢复的内存中的数据结构,直接进入read
方法:
public TxnIterator read(long zxid) throws IOException {return read(zxid, true);}public TxnIterator read(long zxid, boolean fastForward) throws IOException {return new FileTxnIterator(logDir, zxid, fastForward);}
这里的fastForward
置为true
体现在从指定的zxid文件开始恢复事务文件,接着创建一个FileTxnIterator
对象,先来看类定义和成员变量:
public static class FileTxnIterator implements TxnLog.TxnIterator {File logDir; //事务日志文件目录long zxid; //指定要恢复的事务日志的起始zxidTxnHeader hdr; //文件头解析类Record record; //解析的文件信息File logFile; //事务日志文件InputArchive ia; //反序列化接口static final String CRC_ERROR="CRC check failed";PositionInputStream inputStream=null; //input流//stored files is the list of files greater than//the zxid we are looking for.private ArrayList<File> storedFiles; //存储要恢复的事务文件集合
构造方法如下:
public FileTxnIterator(File logDir, long zxid, boolean fastForward)throws IOException {this.logDir = logDir;this.zxid = zxid;init();if (fastForward && hdr != null) {while (hdr.getZxid() < zxid) {if (!next())break;}}}
这里首先进行初始化init
:
void init() throws IOException {storedFiles = new ArrayList<File>();List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false);for (File f: files) {if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {storedFiles.add(f);}// add the last logfile that is less than the zxidelse if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {storedFiles.add(f);break;}}goToNextLog();next();}
这里首先获取所有符合事务日志前缀的文件,并按zxid降序排列,和snapshot文件的获取方式类似,不详细展开。
storedFiles
是存储要恢复的事务日志文件的集合,这里会把所有大于等于zxid的事务日志文件加入,同时加入小于的zxid的下一个文件(files已按降序排列)。
private boolean goToNextLog() throws IOException {if (storedFiles.size() > 0) {this.logFile = storedFiles.remove(storedFiles.size()-1);ia = createInputArchive(this.logFile);return true;}return false;}
goToNextLog
是建立storedFiles
集合中末尾文件(zxid最小的)的反序列化流,事务日志文件的魔数是ZKLG
,在createInputArchive
方法中会对魔数进行校验,不深入分析。
public boolean next() throws IOException {if (ia == null) {return false;}try {long crcValue = ia.readLong("crcvalue");byte[] bytes = Util.readTxnBytes(ia);// Since we preallocate, we define EOF to be anif (bytes == null || bytes.length==0) {throw new EOFException("Failed to read " + logFile);}// EOF or corrupted record// validate CRCChecksum crc = makeChecksumAlgorithm();crc.update(bytes, 0, bytes.length);if (crcValue != crc.getValue())throw new IOException(CRC_ERROR);hdr = new TxnHeader();record = SerializeUtils.deserializeTxn(bytes, hdr);} catch (EOFException e) {LOG.debug("EOF exception " + e);inputStream.close();inputStream = null;ia = null;hdr = null;// this means that the file has ended// we should go to the next fileif (!goToNextLog()) {return false;}// if we went to the next log file, we should call next() againreturn next();} catch (IOException e) {inputStream.close();throw e;}return true;}
next
方法是FileTxnIterator
迭代器的迭代方法,这里主要流程如下:
- crc校验,这个在snapshot的序列化和反序列化讲过,用户文件的验证
readTxnBytes
是读取事务文件的内容,进行校验- 创建一个TxnHeader对象,用于反序列化文件头
record
记录反序列化事务日志内容(下面展开deserializeTxn
方法)EOFException
异常的catch部分,这里的功能包括
a) 表示文件读到末尾,关闭流
b) 建立下一个事务文件的流,调用goToNextLog
方法
c) 调用next
方法来反序列化下一个流的相关内容,保证迭代过程的连续性
这里看一下deserializeTxn
方法:
public static Record deserializeTxn(byte txnBytes[], TxnHeader hdr)throws IOException {final ByteArrayInputStream bais = new ByteArrayInputStream(txnBytes);InputArchive ia = BinaryInputArchive.getArchive(bais);hdr.deserialize(ia, "hdr");bais.mark(bais.available());Record txn = null;switch (hdr.getType()) {case OpCode.createSession:// This isn't really an error txn; it just has the same// format. The error represents the timeouttxn = new CreateSessionTxn();break;case OpCode.closeSession:return null;case OpCode.create:case OpCode.create2:txn = new CreateTxn();break;case OpCode.createTTL:txn = new CreateTTLTxn();break;case OpCode.createContainer:txn = new CreateContainerTxn();break;case OpCode.delete:case OpCode.deleteContainer:txn = new DeleteTxn();break;case OpCode.reconfig:case OpCode.setData:txn = new SetDataTxn();break;case OpCode.setACL:txn = new SetACLTxn();break;case OpCode.error:txn = new ErrorTxn();break;case OpCode.multi:txn = new MultiTxn();break;default:throw new IOException("Unsupported Txn with type=%d" + hdr.getType());}if (txn != null) {try {txn.deserialize(ia, "txn");} catch(EOFException e) {// perhaps this is a V0 Createif (hdr.getType() == OpCode.create) {CreateTxn create = (CreateTxn)txn;bais.reset();CreateTxnV0 createv0 = new CreateTxnV0();createv0.deserialize(ia, "txn");// cool now make it V1. a -1 parentCVersion will// trigger fixup processing in processTxncreate.setPath(createv0.getPath());create.setData(createv0.getData());create.setAcl(createv0.getAcl());create.setEphemeral(createv0.getEphemeral());create.setParentCVersion(-1);} else {throw e;}}}return txn;}
这块内容比较好理解,解析每个事务的具体类型,来创建相应的事务类(都是实现Record接口),可以根据平时使用的zk命令对照来看。这些反序列化后的事务类,最终会在processTransaction
中去执行。
processTransaction
处理事务
回到fastForwardFromEdits
方法:
public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,PlayBackListener listener) throws IOException {TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);long highestZxid = dt.lastProcessedZxid;TxnHeader hdr;try {while (true) {// iterator points to// the first valid txn when initializedhdr = itr.getHeader();if (hdr == null) {//empty logsreturn dt.lastProcessedZxid;}if (hdr.getZxid() < highestZxid && highestZxid != 0) {LOG.error("{}(highestZxid) > {}(next log) for type {}",highestZxid, hdr.getZxid(), hdr.getType());} else {highestZxid = hdr.getZxid();}try {processTransaction(hdr,dt,sessions, itr.getTxn());} catch(KeeperException.NoNodeException e) {throw new IOException("Failed to process transaction type: " +hdr.getType() + " error: " + e.getMessage(), e);}listener.onTxnLoaded(hdr, itr.getTxn());if (!itr.next())break;}} finally {if (itr != null) {itr.close();}}return highestZxid;}
TxnIterator
迭代器创建完成,在while循环中更新highestZxid
为迭代器当前事务日志文件的zxid,进入processTransaction
方法:
public void processTransaction(TxnHeader hdr,DataTree dt,Map<Long, Integer> sessions, Record txn)throws KeeperException.NoNodeException {ProcessTxnResult rc;switch (hdr.getType()) {case OpCode.createSession:sessions.put(hdr.getClientId(),((CreateSessionTxn) txn).getTimeOut());if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"playLog --- create session in log: 0x"+ Long.toHexString(hdr.getClientId())+ " with timeout: "+ ((CreateSessionTxn) txn).getTimeOut());}// give dataTree a chance to sync its lastProcessedZxidrc = dt.processTxn(hdr, txn);break;case OpCode.closeSession:sessions.remove(hdr.getClientId());if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"playLog --- close session in log: 0x"+ Long.toHexString(hdr.getClientId()));}rc = dt.processTxn(hdr, txn);break;default:rc = dt.processTxn(hdr, txn);}/*** Snapshots are lazily created. So when a snapshot is in progress,* there is a chance for later transactions to make into the* snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS* errors could occur. It should be safe to ignore these.*/if (rc.err != Code.OK.intValue()) {LOG.debug("Ignoring processTxn failure hdr: {}, error: {}, path: {}",hdr.getType(), rc.err, rc.path);}}
这里针对createSession
命令和closeSession
命令对sessionWithTimeouts
这个map进行添加和删除,最终都会调用DataTree
类的processTxn
方法,将header中的类型匹配到对应的命令,将解析后的Record类实例转换成对应的子类(例如CreateTxn),调用对应的命令方法完成事务日志的执行操作。
processTxn
方法除了在事务日志恢复时会被调用,在客户端向leader执行写操作时同样会执行,以后单独分析客户端和zk集群的交互过程。
- 客户端和zk集群交互过程
最终会返回一个ProcessTxnResult
对象,包含一下信息:
成员变量 | 类型 | 含义 |
---|---|---|
clientId | long | 客户端id |
cxid | int | 客户端操作id |
zxid | long | 事务对应的zxid |
err | int | 错误码 |
type | int | 客户端操作类型 |
path | String | 事务执行结果的路径 |
stat | Stat | 事务执行对应ZNode的状态信息 |
multiResult | List | 组合命令执行结果结合 |
PlayBackListener监听器
listener.onTxnLoaded(hdr,itr.getTxn())
这里主要任务是监听每条事务日志,将事务信息存储起来,用于向集群中其他节点同步事务。
private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {public void onTxnLoaded(TxnHeader hdr, Record txn){addCommittedProposal(hdr, txn);}};
进入addCommittedProposal
方法:
private void addCommittedProposal(TxnHeader hdr, Record txn) {Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());addCommittedProposal(r);}
创建一个Request
类用户封装事务日志的相关信息。
public void addCommittedProposal(Request request) {WriteLock wl = logLock.writeLock();try {wl.lock();if (committedLog.size() > commitLogCount) {committedLog.removeFirst();minCommittedLog = committedLog.getFirst().packet.getZxid();}if (committedLog.isEmpty()) {minCommittedLog = request.zxid;maxCommittedLog = request.zxid;}byte[] data = SerializeUtils.serializeRequest(request);QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);Proposal p = new Proposal();p.packet = pp;p.request = request;committedLog.add(p);maxCommittedLog = p.packet.getZxid();} finally {wl.unlock();}}
这里最主要的是将当前成功执行的事务日志信息添加到committedLog
中,用于快速向Follower节点同步。
至此,事务日志恢复过程结束,返回执行成功的最高事务日志的zxid。
查看事务日志命令
和snapshot查看命令类似,zk提供了LogFormatter
类来查看事务日志文件,可以看一下main
方法:
public static void main(String[] args) throws Exception {if (args.length != 1) {System.err.println("USAGE: LogFormatter log_file");System.exit(2);}FileInputStream fis = new FileInputStream(args[0]);BinaryInputArchive logStream = BinaryInputArchive.getArchive(fis);FileHeader fhdr = new FileHeader();fhdr.deserialize(logStream, "fileheader");if (fhdr.getMagic() != FileTxnLog.TXNLOG_MAGIC) {System.err.println("Invalid magic number for " + args[0]);System.exit(2);}System.out.println("ZooKeeper Transactional Log File with dbid "+ fhdr.getDbid() + " txnlog format version "+ fhdr.getVersion());int count = 0;while (true) {long crcValue;byte[] bytes;try {crcValue = logStream.readLong("crcvalue");bytes = logStream.readBuffer("txnEntry");} catch (EOFException e) {System.out.println("EOF reached after " + count + " txns.");return;}if (bytes.length == 0) {// Since we preallocate, we define EOF to be an// empty transactionSystem.out.println("EOF reached after " + count + " txns.");return;}Checksum crc = new Adler32();crc.update(bytes, 0, bytes.length);if (crcValue != crc.getValue()) {throw new IOException("CRC doesn't match " + crcValue +" vs " + crc.getValue());}TxnHeader hdr = new TxnHeader();Record txn = SerializeUtils.deserializeTxn(bytes, hdr);System.out.println(DateFormat.getDateTimeInstance(DateFormat.SHORT,DateFormat.LONG).format(new Date(hdr.getTime()))+ " session 0x"+ Long.toHexString(hdr.getClientId())+ " cxid 0x"+ Long.toHexString(hdr.getCxid())+ " zxid 0x"+ Long.toHexString(hdr.getZxid())+ " " + TraceFormatter.op2String(hdr.getType()) + " " + txn);if (logStream.readByte("EOR") != 'B') {LOG.error("Last transaction was partial.");throw new EOFException("Last transaction was partial.");}count++;}}
其实就是将事务日志文件进行反序列化,包括文件头校验、Adler32算法校验等操作,最终输出到控制台。
命令如下:
java -classpath .:/opt/module/apache-zookeeper-3.5.8-bin/lib/zookeeper-3.5.8.jar:/opt/module/apache-zookeeper-3.5.8-bin/lib/zookeeper-jute-3.5.8.jar:/opt/module/apache-zookeeper-3.5.8-bin/lib/slf4j-api-1.7.25.jar:/opt/module/apache-zookeeper-3.5.8-bin/lib/slf4j-log4j12-1.7.25.jar:/opt/module/apache-zookeeper-3.5.8-bin/lib/log4j-1.2.17.jar org.apache.zookeeper.server.LogFormatter log.XXX
这个命令需要按照不同的安装版本进行调整,到lib目录下观察对应jar包是否存在以及版本信息。
这是zookeeper提供了专门查看日志&快照的api工具,查看信息如下:
总结
整个事务日志恢复的过程和snapshot恢复过程类似,区别主要在于:snapshot反序列化到内存数据结构ZKDatabase,而事务日志除了反序列化到内存,还会依次执行事务并将事务信息保存到ZKDatabase的committedLog
中用于同步事务。
zookeeper源码分析之恢复事务日志相关推荐
- Zookeeper源码分析(二) ----- zookeeper日志
zookeeper源码分析系列文章: Zookeeper源码分析(一) ----- 源码运行环境搭建 原创博客,纯手敲,转载请注明出处,谢谢! 既然我们是要学习源码,那么如何高效地学习源代码呢?答案就 ...
- zookeeper源码分析之四服务端(单机)处理请求流程
上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...
- 数据库中间件 MyCAT源码分析 —— XA分布式事务
title: MyCAT 源码分析 -- XA分布式事务 date: 2017-07-15 tags: categories: MyCAT permalink: MyCAT/xa-distribute ...
- Spring事务源码分析责任链事务链事务不生效
文章目录 前言 带着问题分析源码 事务源码分析 寻找Spring事务源码类 TransactionInterceptor调用栈 分析Spring AOP责任链 分析TransactionInterce ...
- zookeeper源码分析之五服务端(集群leader)处理请求流程
leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...
- zookeeper源码分析之三客户端发送请求流程
znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的 ...
- RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)
在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...
- velocity源码分析:初始化之日志系统
之前在"velocity源码分析:velocity初始化"文章中粗略地介绍了velocity整体的初始化过程,包括各个系统的初始化,本文主要介绍日志系统初始化. 日志系统类图: 概 ...
- Zookeeper源码分析:Leader角色初始化
参考资料 <<从PAXOS到ZOOKEEPER分布式一致性原理与实践>> zookeeper-3.0.0 Leader角色初始化 在上文的选举完成之后,每个zk实例都会根据选举 ...
最新文章
- python中的元类_python中的元类
- c语言各种编程风格 微软 gnu,编程规范-c语言的编程风格
- 虚拟网卡与物理网卡TCP协议数据传输对比
- java对密码进行加密的方法_如何在JAVA中使用MD5加密对密码进行加密
- 使用ajax将数据显示在指定位置_AJAX学习主题之一
- Linux工作笔记035---设置连接Linux Centos 超时连接时间_空闲的等待时间 -bash: TMOUT: readonly variable
- AsnycTask的内部的实现机制
- 以mysql为例有几种隔离级别_mysql隔离级别有几种
- DWM1000 定位操作流程--[蓝点无限]
- ffmpeg推流到流媒体服务器
- Java垃圾回收的时间点
- 美化版缤纷彩色文字广告代码文字+网站添加AD教程
- 【kali】安装ibus中文输入法
- 电工与电子技术实验——叠加定理与戴维南定理
- ORACLE 库缓存
- Vue 点击按钮跳转其他链接
- Shopify 前端开发 占位符(占位图片)的使用
- MATLAB船舶开尔文尾迹三维仿真建模
- docker中安装Mkdocs
- 听说 520 你还没对象,来这里看看