zoukankan      html  css  js  c++  java
  • 【Zookeeper】源码分析之持久化(一)之FileTxnLog

    一、前言

      前一篇已经分析了序列化,这篇接着分析Zookeeper的持久化过程源码,持久化对于数据的存储至关重要,下面进行详细分析。

    二、持久化总体框架

      持久化的类主要在包org.apache.zookeeper.server.persistence下,此次也主要是对其下的类进行分析,其包下总体的类结构如下图所示。

      

      · TxnLog,接口类型,读取事务性日志的接口。

      · FileTxnLog,实现TxnLog接口,添加了访问该事务性日志的API。

      · Snapshot,接口类型,持久层快照接口。

      · FileSnap,实现Snapshot接口,负责存储、序列化、反序列化、访问快照。

      · FileTxnSnapLog,封装了TxnLog和SnapShot。

      · Util,工具类,提供持久化所需的API。

      下面先来分析TxnLog和FileTxnLog的源码。

    三、TxnLog源码分析

      TxnLog是接口,规定了对日志的响应操作。  

    public interface TxnLog {
        
        /**
         * roll the current
         * log being appended to
         * @throws IOException 
         */
        // 回滚日志
        void rollLog() throws IOException;
        /**
         * Append a request to the transaction log
         * @param hdr the transaction header
         * @param r the transaction itself
         * returns true iff something appended, otw false 
         * @throws IOException
         */
        // 添加一个请求至事务性日志
        boolean append(TxnHeader hdr, Record r) throws IOException;
    
        /**
         * Start reading the transaction logs
         * from a given zxid
         * @param zxid
         * @return returns an iterator to read the 
         * next transaction in the logs.
         * @throws IOException
         */
        // 读取事务性日志
        TxnIterator read(long zxid) throws IOException;
        
        /**
         * the last zxid of the logged transactions.
         * @return the last zxid of the logged transactions.
         * @throws IOException
         */
        // 事务性操作的最新zxid
        long getLastLoggedZxid() throws IOException;
        
        /**
         * truncate the log to get in sync with the 
         * leader.
         * @param zxid the zxid to truncate at.
         * @throws IOException 
         */
        // 清空日志,与Leader保持同步
        boolean truncate(long zxid) throws IOException;
        
        /**
         * the dbid for this transaction log. 
         * @return the dbid for this transaction log.
         * @throws IOException
         */
        // 获取数据库的id
        long getDbId() throws IOException;
        
        /**
         * commmit the trasaction and make sure
         * they are persisted
         * @throws IOException
         */
        // 提交事务并进行确认
        void commit() throws IOException;
       
        /** 
         * close the transactions logs
         */
        // 关闭事务性日志
        void close() throws IOException;
        /**
         * an iterating interface for reading 
         * transaction logs. 
         */
        // 读取事务日志的迭代器接口
        public interface TxnIterator {
            /**
             * return the transaction header.
             * @return return the transaction header.
             */
            // 获取事务头部
            TxnHeader getHeader();
            
            /**
             * return the transaction record.
             * @return return the transaction record.
             */
            // 获取事务
            Record getTxn();
         
            /**
             * go to the next transaction record.
             * @throws IOException
             */
            // 下个事务
            boolean next() throws IOException;
            
            /**
             * close files and release the 
             * resources
             * @throws IOException
             */
            // 关闭文件释放资源
            void close() throws IOException;
        }
    }

      其中,TxnLog除了提供读写事务日志的API外,还提供了一个用于读取日志的迭代器接口TxnIterator。

    四、FileTxnLog源码分析

      对于LogFile而言,其格式可分为如下三部分

      LogFile:

        FileHeader TxnList ZeroPad

      FileHeader格式如下  

      FileHeader: {
        magic 4bytes (ZKLG)
        version 4bytes
        dbid 8bytes
      }

      TxnList格式如下

      TxnList:

        Txn || Txn TxnList

      Txn格式如下

      Txn:

        checksum Txnlen TxnHeader Record 0x42

      Txnlen格式如下

      Txnlen:
        len 4bytes

      TxnHeader格式如下

      TxnHeader: {

        sessionid 8bytes
        cxid 4bytes
          zxid 8bytes
        time 8bytes
        type 4bytes
      }

      ZeroPad格式如下

      ZeroPad:
        0 padded to EOF (filled during preallocation stage)

      了解LogFile的格式对于理解源码会有很大的帮助。

      4.1 属性  

    public class FileTxnLog implements TxnLog {
        private static final Logger LOG;
        
        // 预分配大小 64M
        static long preAllocSize =  65536 * 1024;
        
        // 魔术数字,默认为1514884167
        public final static int TXNLOG_MAGIC =
            ByteBuffer.wrap("ZKLG".getBytes()).getInt();
    
        // 版本号
        public final static int VERSION = 2;
    
        /** Maximum time we allow for elapsed fsync before WARNing */
        // 进行同步时,发出warn之前所能等待的最长时间
        private final static long fsyncWarningThresholdMS;
    
        // 静态属性,确定Logger、预分配空间大小和最长时间
        static {
            LOG = LoggerFactory.getLogger(FileTxnLog.class);
    
            String size = System.getProperty("zookeeper.preAllocSize");
            if (size != null) {
                try {
                    preAllocSize = Long.parseLong(size) * 1024;
                } catch (NumberFormatException e) {
                    LOG.warn(size + " is not a valid value for preAllocSize");
                }
            }
            fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000);
        }
        
        // 最大(新)的zxid
        long lastZxidSeen;
        // 存储数据相关的流
        volatile BufferedOutputStream logStream = null;
        volatile OutputArchive oa;
        volatile FileOutputStream fos = null;
    
        // log目录文件
        File logDir;
        
        // 是否强制同步
        private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");;
        
        // 数据库id
        long dbId;
        
        // 流列表
        private LinkedList<FileOutputStream> streamsToFlush =
            new LinkedList<FileOutputStream>();
        
        // 当前大小
        long currentSize;
        // 写日志文件
        File logFileWrite = null;
    }

      4.2. 核心函数 

      1. append函数

        public synchronized boolean append(TxnHeader hdr, Record txn)
            throws IOException
        {
            if (hdr != null) { // 事务头部不为空
                if (hdr.getZxid() <= lastZxidSeen) { // 事务的zxid小于等于最后的zxid
                    LOG.warn("Current zxid " + hdr.getZxid()
                            + " is <= " + lastZxidSeen + " for "
                            + hdr.getType());
                }
                if (logStream==null) { // 日志流为空
                   if(LOG.isInfoEnabled()){
                        LOG.info("Creating new log file: log." +  
                                Long.toHexString(hdr.getZxid()));
                   }
                   
                   // 
                   logFileWrite = new File(logDir, ("log." + 
                           Long.toHexString(hdr.getZxid())));
                   fos = new FileOutputStream(logFileWrite);
                   logStream=new BufferedOutputStream(fos);
                   oa = BinaryOutputArchive.getArchive(logStream);
                   // 
                   FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
                   // 序列化
                   fhdr.serialize(oa, "fileheader");
                   // Make sure that the magic number is written before padding.
                   // 刷新到磁盘
                   logStream.flush();
                   
                   // 当前通道的大小
                   currentSize = fos.getChannel().position();
                   // 添加fos
                   streamsToFlush.add(fos);
                }
                
                // 填充文件
                padFile(fos);
                
                // Serializes transaction header and transaction data into a byte buffer.
                // 将事务头和事务数据序列化成Byte Buffer
                byte[] buf = Util.marshallTxnEntry(hdr, txn);
                if (buf == null || buf.length == 0) { // 为空,抛出异常
                    throw new IOException("Faulty serialization for header " +
                            "and txn");
                }
                // 生成一个验证算法
                Checksum crc = makeChecksumAlgorithm();
                // Updates the current checksum with the specified array of bytes
                // 使用Byte数组来更新当前的Checksum
                crc.update(buf, 0, buf.length);
                // 写long类型数据
                oa.writeLong(crc.getValue(), "txnEntryCRC");
                // Write the serialized transaction record to the output archive.
                // 将序列化的事务记录写入OutputArchive
                Util.writeTxnBytes(oa, buf);
                
                return true;
            }
            return false;
        }
    View Code

      说明:append函数主要用做向事务日志中添加一个条目,其大体步骤如下

      ① 检查TxnHeader是否为空,若不为空,则进入②,否则,直接返回false

      ② 检查logStream是否为空(初始化为空),若不为空,则进入③,否则,进入⑤

      ③ 初始化写数据相关的流和FileHeader,并序列化FileHeader至指定文件,进入④

      ④ 强制刷新(保证数据存到磁盘),并获取当前写入数据的大小。进入⑤

      ⑤ 填充数据,填充0,进入⑥

      ⑥ 将事务头和事务序列化成ByteBuffer(使用Util.marshallTxnEntry函数),进入⑦

      ⑦ 使用Checksum算法更新步骤⑥的ByteBuffer。进入⑧

      ⑧ 将更新的ByteBuffer写入磁盘文件,返回true

      append间接调用了padLog函数,其源码如下 

        public static long padLogFile(FileOutputStream f,long currentSize,
                long preAllocSize) throws IOException{
            // 获取位置
            long position = f.getChannel().position();
            if (position + 4096 >= currentSize) { // 计算后是否大于当前大小
                // 重新设置当前大小,剩余部分填充0
                currentSize = currentSize + preAllocSize;
                fill.position(0);
                f.getChannel().write(fill, currentSize-fill.remaining());
            }
            return currentSize;
        }
    View Code

      说明:其主要作用是当文件大小不满64MB时,向文件填充0以达到64MB大小。

      2. getLogFiles函数  

        public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
            // 按照zxid对文件进行排序
            List<File> files = Util.sortDataDir(logDirList, "log", true);
            long logZxid = 0;
            // Find the log file that starts before or at the same time as the
            // zxid of the snapshot
            for (File f : files) { // 遍历文件
                // 从文件中获取zxid
                long fzxid = Util.getZxidFromName(f.getName(), "log");
                if (fzxid > snapshotZxid) { // 跳过大于snapshotZxid的文件
                    continue;
                }
                // the files
                // are sorted with zxid's
                if (fzxid > logZxid) { // 找出文件中最大的zxid(同时还需要小于等于snapshotZxid)
                    logZxid = fzxid;
                }
            }
            // 文件列表
            List<File> v=new ArrayList<File>(5);
            for (File f : files) { // 再次遍历文件
                // 从文件中获取zxid
                long fzxid = Util.getZxidFromName(f.getName(), "log");
                if (fzxid < logZxid) { // 跳过小于logZxid的文件
                    continue;
                }
                // 添加
                v.add(f);
            }
            // 转化成File[] 类型后返回
            return v.toArray(new File[0]);
    
        }
    View Code

      说明:该函数的作用是找出刚刚小于或者等于snapshot的所有log文件。其步骤大致如下。

      ① 对所有log文件按照zxid进行升序排序,进入②

      ② 遍历所有log文件并记录刚刚小于或等于给定snapshotZxid的log文件的logZxid,进入③

        ③ 再次遍历log文件,添加zxid大于等于步骤②中的logZxid的所有log文件,进入④

      ④ 转化后返回

      getLogFiles函数调用了sortDataDir,其源码如下: 

    public static List<File> sortDataDir(File[] files, String prefix, boolean ascending)
        {
            if(files==null) 
                return new ArrayList<File>(0);
            // 转化为列表
            List<File> filelist = Arrays.asList(files);
            // 进行排序,Comparator是关键,根据zxid进行排序
            Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
            return filelist;
        }
    View Code

      说明:其用于排序log文件,可以选择根据zxid进行升序或降序。

      getLogFiles函数间接调用了getZxidFromName,其源码如下: 

        // 从文件名中解析出zxid
        public static long getZxidFromName(String name, String prefix) {
            long zxid = -1;
            // 对文件名进行分割
            String nameParts[] = name.split("\.");
            if (nameParts.length == 2 && nameParts[0].equals(prefix)) { // 前缀相同
                try {
                    // 转化成长整形
                    zxid = Long.parseLong(nameParts[1], 16);
                } catch (NumberFormatException e) {
                }
            }
            return zxid;
        }
    View Code

      说明:getZxidFromName主要用作从文件名中解析zxid,并且需要从指定的前缀开始。

      3. getLastLoggedZxid函数 

        public long getLastLoggedZxid() {
            // 获取已排好序的所有的log文件
            File[] files = getLogFiles(logDir.listFiles(), 0);
            // 获取最大的zxid(最后一个log文件对应的zxid)
            long maxLog=files.length>0?
                    Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;
    
            // if a log file is more recent we must scan it to find
            // the highest zxid
            // 
            long zxid = maxLog;
            // 迭代器
            TxnIterator itr = null;
            try {
                // 新生FileTxnLog
                FileTxnLog txn = new FileTxnLog(logDir);
                // 开始读取从给定zxid之后的所有事务
                itr = txn.read(maxLog);
                while (true) { // 遍历
                    if(!itr.next()) // 是否存在下一项
                        break;
                    // 获取事务头
                    TxnHeader hdr = itr.getHeader();
                    // 获取zxid
                    zxid = hdr.getZxid();
                }
            } catch (IOException e) {
                LOG.warn("Unexpected exception", e);
            } finally {
                // 关闭迭代器
                close(itr);
            }
            return zxid;
        }
    View Code

      说明:该函数主要用于获取记录在log中的最后一个zxid。其步骤大致如下

      ① 获取已排好序的所有log文件,并从最后一个文件中取出zxid作为候选的最大zxid,进入②

      ② 新生成FileTxnLog并读取步骤①中zxid之后的所有事务,进入③

      ③ 遍历所有事务并提取出相应的zxid,最后返回。

      其中getLastLoggedZxid调用了read函数,其源码如下 

    public TxnIterator read(long zxid) throws IOException {
            // 返回事务文件访问迭代器
            return new FileTxnIterator(logDir, zxid);
        }
    View Code

      说明:read函数会生成一个FileTxnIterator,其是TxnLog.TxnIterator的子类,之后在FileTxnIterator构造函数中会调用init函数,其源码如下 

        void init() throws IOException {
            // 新生成文件列表
            storedFiles = new ArrayList<File>();
            // 进行排序
            List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
            for (File f: files) { // 遍历文件
                if (Util.getZxidFromName(f.getName(), "log") >= zxid) { // 添加zxid大于等于指定zxid的文件
                    storedFiles.add(f);
                }
                // add the last logfile that is less than the zxid
                else if (Util.getZxidFromName(f.getName(), "log") < zxid) { // 只添加一个zxid小于指定zxid的文件,然后退出
                    storedFiles.add(f);
                    break;
                }
            }
            // go to the next logfile
            // 进入下一个log文件
            goToNextLog();
            if (!next()) // 不存在下一项,返回
                return;
            while (hdr.getZxid() < zxid) { // 从事务头中获取zxid小于给定zxid,直到不存在下一项或者大于给定zxid时退出
                if (!next())
                    return;
            }
        }
    View Code

      说明:init函数用于进行初始化操作,会根据zxid的不同进行不同的初始化操作,在init函数中会调用goToNextLog函数,其源码如下  

        private boolean goToNextLog() throws IOException {
            if (storedFiles.size() > 0) { // 存储的文件列表大于0
                // 取最后一个log文件
                this.logFile = storedFiles.remove(storedFiles.size()-1);
                // 针对该文件,创建InputArchive
                ia = createInputArchive(this.logFile);
                // 返回true
                return true;
            }
            return false;
        }
    View Code

      说明:goToNextLog表示选取下一个log文件,在init函数中还调用了next函数,其源码如下  

        public boolean next() throws IOException {
            if (ia == null) { // 为空,返回false
                return false;
            }
            try {
                // 读取长整形crcValue
                long crcValue = ia.readLong("crcvalue");
                // 通过input archive读取一个事务条目
                byte[] bytes = Util.readTxnBytes(ia);
                // Since we preallocate, we define EOF to be an
                if (bytes == null || bytes.length==0) { // 对bytes进行判断
                    throw new EOFException("Failed to read " + logFile);
                }
                // EOF or corrupted record
                // validate CRC
                // 验证CRC
                Checksum crc = makeChecksumAlgorithm();
                // 更新
                crc.update(bytes, 0, bytes.length);
                if (crcValue != crc.getValue()) // 验证不相等,抛出异常
                    throw new IOException(CRC_ERROR);
                if (bytes == null || bytes.length == 0) // bytes为空,返回false
                    return false;
                // 新生成TxnHeader
                hdr = new TxnHeader();
                // 将Txn反序列化,并且将对应的TxnHeader反序列化至hdr,整个Record反序列化至record
                record = SerializeUtils.deserializeTxn(bytes, hdr);
            } catch (EOFException e) { // 抛出异常
                LOG.debug("EOF excepton " + e);
                // 关闭输入流
                inputStream.close();
                // 赋值为null
                inputStream = null;
                ia = null;
                hdr = null;
                // this means that the file has ended
                // we should go to the next file
                if (!goToNextLog()) { // 没有log文件,则返回false
                    return false;
                }
                // if we went to the next log file, we should call next() again
                // 继续调用next
                return next();
            } catch (IOException e) {
                inputStream.close();
                throw e;
            }
            // 返回true
            return true;
        }
    View Code

      说明:next表示将迭代器移动至下一个事务,方便读取,next函数的步骤如下。

      ① 读取事务的crcValue值,用于后续的验证,进入②

      ② 读取事务,使用CRC32进行更新并与①中的结果进行比对,若不相同,则抛出异常,否则,进入③

      ③ 将事务进行反序列化并保存至相应的属性中(如事务头和事务体),会确定具体的事务操作类型。

      ④ 在读取过程抛出异常时,会首先关闭流,然后再尝试调用next函数(即进入下一个事务进行读取)。

      4. commit函数  

        public synchronized void commit() throws IOException {
            if (logStream != null) {
                // 强制刷到磁盘
                logStream.flush();
            }
            for (FileOutputStream log : streamsToFlush) { // 遍历流
                // 强制刷到磁盘
                log.flush();
                if (forceSync) { // 是否强制同步
                    long startSyncNS = System.nanoTime();
                    
                    log.getChannel().force(false);
                    // 计算流式的时间
                    long syncElapsedMS =
                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                    if (syncElapsedMS > fsyncWarningThresholdMS) { // 大于阈值时则会警告
                        LOG.warn("fsync-ing the write ahead log in "
                                + Thread.currentThread().getName()
                                + " took " + syncElapsedMS
                                + "ms which will adversely effect operation latency. "
                                + "See the ZooKeeper troubleshooting guide");
                    }
                }
            }
            while (streamsToFlush.size() > 1) { // 移除流并关闭
                streamsToFlush.removeFirst().close();
            }
        }
    View Code

      说明:该函数主要用于提交事务日志至磁盘,其大致步骤如下

      ① 若日志流logStream不为空,则强制刷新至磁盘,进入②

      ② 遍历需要刷新至磁盘的所有流streamsToFlush并进行刷新,进入③

      ③ 判断是否需要强制性同步,如是,则计算每个流的流式时间并在控制台给出警告,进入④

      ④ 移除所有流并关闭。

      5. truncate函数 

        public boolean truncate(long zxid) throws IOException {
            FileTxnIterator itr = null;
            try {
                // 获取迭代器
                itr = new FileTxnIterator(this.logDir, zxid);
                PositionInputStream input = itr.inputStream;
                long pos = input.getPosition();
                // now, truncate at the current position
                // 从当前位置开始清空
                RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
                raf.setLength(pos);
                raf.close();
                while (itr.goToNextLog()) { // 存在下一个log文件
                    if (!itr.logFile.delete()) { // 删除
                        LOG.warn("Unable to truncate {}", itr.logFile);
                    }
                }
            } finally {
                // 关闭迭代器
                close(itr);
            }
            return true;
        }
    View Code

      说明:该函数用于清空大于给定zxid的所有事务日志。

    五、总结

      对于持久化中的TxnLog和FileTxnLog的源码分析就已经完成了,其源码还是相对简单,也谢谢各位园友的观看~ 

  • 相关阅读:
    暑假日报-11
    暑假日报-10
    暑假日报-9
    暑假日报-8
    暑假日报-7
    暑假日报-6
    暑假日报-5
    暑假日报-4
    暑假日报-3
    第二次集训的每日感想
  • 原文地址:https://www.cnblogs.com/leesf456/p/6279956.html
Copyright © 2011-2022 走看看