zk 有 2 种文件,快照和事务日志,快照是某一时刻的全量数据,事务日志中记录了数据的修改事件。
快照的文件名是 snapshot.zxid,zxid 是当前最大的事务 id
// org.apache.zookeeper.server.persistence.FileTxnSnapLog#save public void save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts) throws IOException { long lastZxid = dataTree.lastProcessedZxid; // 快照文件名为 snapshot.lastZxid File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid)); LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile); snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile); }
事务日志的文件名是 log.zxid,zxid 是当前文件第一条日志的事务 id
// org.apache.zookeeper.server.persistence.FileTxnLog#append public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException { if (hdr == null) { return false; } if (hdr.getZxid() <= lastZxidSeen) { LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType()); } else { lastZxidSeen = hdr.getZxid(); } if (logStream==null) { if(LOG.isInfoEnabled()){ LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid())); } // 创建一个新的事务日志文件 log.zxid,这里的 zxid 是第一条事务日志的 zxid logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid())); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); fhdr.serialize(oa, "fileheader"); // Make sure that the magic number is written before padding. logStream.flush(); filePadding.setCurrentSize(fos.getChannel().position()); streamsToFlush.add(fos); } filePadding.padFile(fos.getChannel()); byte[] buf = Util.marshallTxnEntry(hdr, txn); if (buf == null || buf.length == 0) { throw new IOException("Faulty serialization for header " + "and txn"); } Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC"); Util.writeTxnBytes(oa, buf); return true; }
zk 加载数据:从 snap 文件和 log 文件解析出全量数据
// org.apache.zookeeper.server.persistence.FileTxnSnapLog#restore public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { snapLog.deserialize(dt, sessions); return fastForwardFromEdits(dt, sessions, listener); }
逆序选择 100 个 snap 文件,从最新的文件开始解析,如果有一个文件校验正确,则退出
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException { // we run through 100 snapshots (not all of them) // if we cannot get it running within 100 snapshots // we should give up List<File> snapList = findNValidSnapshots(100); if (snapList.size() == 0) { return -1L; } File snap = null; boolean foundValid = false; for (int i = 0; i < snapList.size(); i++) { snap = snapList.get(i); InputStream snapIS = null; CheckedInputStream crcIn = null; try { LOG.info("Reading snapshot " + snap); snapIS = new BufferedInputStream(new FileInputStream(snap)); crcIn = new CheckedInputStream(snapIS, new Adler32()); InputArchive ia = BinaryInputArchive.getArchive(crcIn); deserialize(dt, sessions, ia); long checkSum = crcIn.getChecksum().getValue(); long val = ia.readLong("val"); if (val != checkSum) { throw new IOException("CRC corruption in snapshot : " + snap); } foundValid = true; break; } catch(IOException e) { LOG.warn("problem reading snap file " + snap, e); } finally { if (snapIS != null) snapIS.close(); if (crcIn != null) crcIn.close(); } } if (!foundValid) { throw new IOException("Not able to find valid snapshots in " + snapDir); } dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX); return dt.lastProcessedZxid; }
对比已经解析出的最大 zxid,选择对应的 log 文件
// org.apache.zookeeper.server.persistence.FileTxnLog.FileTxnIterator#init void init() throws IOException { storedFiles = new ArrayList<File>(); List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false); for (File f: files) { if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) { storedFiles.add(f); } // add the last logfile that is less than the zxid else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) { storedFiles.add(f); break; } } goToNextLog(); if (!next()) return; while (hdr.getZxid() < zxid) { if (!next()) return; } }