zoukankan      html  css  js  c++  java
  • zk 文件存储

    zk 有 2 种文件,快照和事务日志,快照是某一时刻的全量数据,事务日志中记录了数据的修改事件。

    快照的文件名是 snapshot.zxid,zxid 是当前最大的事务 id

    // org.apache.zookeeper.server.persistence.FileTxnSnapLog#save
    public void save(DataTree dataTree,
            ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
        throws IOException {
        long lastZxid = dataTree.lastProcessedZxid;
        // 快照文件名为 snapshot.lastZxid
        File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
        LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
                snapshotFile);
        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);
        
    }

     事务日志的文件名是 log.zxid,zxid 是当前文件第一条日志的事务 id

    // org.apache.zookeeper.server.persistence.FileTxnLog#append
    public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        if (hdr == null) {
            return false;
        }
    
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn("Current zxid " + hdr.getZxid()
                    + " is <= " + lastZxidSeen + " for "
                    + hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }
    
        if (logStream==null) {
           if(LOG.isInfoEnabled()){
                LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
           }
           // 创建一个新的事务日志文件 log.zxid,这里的 zxid 是第一条事务日志的 zxid
           logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
           fos = new FileOutputStream(logFileWrite);
           logStream=new BufferedOutputStream(fos);
           oa = BinaryOutputArchive.getArchive(logStream);
           FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
           fhdr.serialize(oa, "fileheader");
           // Make sure that the magic number is written before padding.
           logStream.flush();
           filePadding.setCurrentSize(fos.getChannel().position());
           streamsToFlush.add(fos);
        }
        filePadding.padFile(fos.getChannel());
        byte[] buf = Util.marshallTxnEntry(hdr, txn);
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " +
                    "and txn");
        }
        Checksum crc = makeChecksumAlgorithm();
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");
        Util.writeTxnBytes(oa, buf);
    
        return true;
    }

     zk 加载数据:从 snap 文件和 log 文件解析出全量数据

    // org.apache.zookeeper.server.persistence.FileTxnSnapLog#restore
    public long restore(DataTree dt, Map<Long, Integer> sessions, 
            PlayBackListener listener) throws IOException {
        snapLog.deserialize(dt, sessions);
        return fastForwardFromEdits(dt, sessions, listener);
    }

    逆序选择 100 个 snap 文件,从最新的文件开始解析,如果有一个文件校验正确,则退出

    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
            throws IOException {
        // we run through 100 snapshots (not all of them)
        // if we cannot get it running within 100 snapshots
        // we should  give up
        List<File> snapList = findNValidSnapshots(100);
        if (snapList.size() == 0) {
            return -1L;
        }
        File snap = null;
        boolean foundValid = false;
        for (int i = 0; i < snapList.size(); i++) {
            snap = snapList.get(i);
            InputStream snapIS = null;
            CheckedInputStream crcIn = null;
            try {
                LOG.info("Reading snapshot " + snap);
                snapIS = new BufferedInputStream(new FileInputStream(snap));
                crcIn = new CheckedInputStream(snapIS, new Adler32());
                InputArchive ia = BinaryInputArchive.getArchive(crcIn);
                deserialize(dt, sessions, ia);
                long checkSum = crcIn.getChecksum().getValue();
                long val = ia.readLong("val");
                if (val != checkSum) {
                    throw new IOException("CRC corruption in snapshot :  " + snap);
                }
                foundValid = true;
                break;
            } catch(IOException e) {
                LOG.warn("problem reading snap file " + snap, e);
            } finally {
                if (snapIS != null) 
                    snapIS.close();
                if (crcIn != null) 
                    crcIn.close();
            } 
        }
        if (!foundValid) {
            throw new IOException("Not able to find valid snapshots in " + snapDir);
        }
        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
        return dt.lastProcessedZxid;
    }

    对比已经解析出的最大 zxid,选择对应的 log 文件

    // org.apache.zookeeper.server.persistence.FileTxnLog.FileTxnIterator#init
    void init() throws IOException {
        storedFiles = new ArrayList<File>();
        List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false);
        for (File f: files) {
            if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
                storedFiles.add(f);
            }
            // add the last logfile that is less than the zxid
            else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
                storedFiles.add(f);
                break;
            }
        }
        goToNextLog();
        if (!next())
            return;
        while (hdr.getZxid() < zxid) {
            if (!next())
                return;
        }
    }
  • 相关阅读:
    C# XML 文档注释
    大数据知识学习
    现在的人,买个钢铁做的车,每天擦,每周打蜡。可对自已的身体最应该保养的“车”,却从不养护
    Asp.net项目因Session阻塞导致页面打开速度变慢
    AvoidRepeatSubmit通过Javascript避免客户端重复提交请求
    Linux下Attansic L2 网卡驱动安装
    如果知道dll文件是面向32位系统还是面向64位系统的?
    整理C# 二进制,十进制,十六进制 互转
    连接Oracle时出现“System.AccessViolationException: 尝试读取或写入受保护的内存。这通常指示其他内存已损坏。”错误的问题
    [转]删除hbase表region块脚本
  • 原文地址:https://www.cnblogs.com/allenwas3/p/11840780.html
Copyright © 2011-2022 走看看