zoukankan      html  css  js  c++  java
  • 【Zookeeper】源码分析之持久化(一)之FileTxnLog

    一、前言

      前一篇已经分析了序列化,这篇接着分析Zookeeper的持久化过程源码,持久化对于数据的存储至关重要,下面进行详细分析。

    二、持久化总体框架

      持久化的类主要在包org.apache.zookeeper.server.persistence下,此次也主要是对其下的类进行分析,其包下总体的类结构如下图所示。

      

      · TxnLog,接口类型,读取事务性日志的接口。

      · FileTxnLog,实现TxnLog接口,添加了访问该事务性日志的API。

      · Snapshot,接口类型,持久层快照接口。

      · FileSnap,实现Snapshot接口,负责存储、序列化、反序列化、访问快照。

      · FileTxnSnapLog,封装了TxnLog和SnapShot。

      · Util,工具类,提供持久化所需的API。

      下面先来分析TxnLog和FileTxnLog的源码。

    三、TxnLog源码分析

      TxnLog是接口,规定了对日志的响应操作。  

    public interface TxnLog {
        
        /**
         * roll the current
         * log being appended to
         * @throws IOException 
         */
        // 回滚日志
        void rollLog() throws IOException;
        /**
         * Append a request to the transaction log
         * @param hdr the transaction header
         * @param r the transaction itself
         * returns true iff something appended, otw false 
         * @throws IOException
         */
        // 添加一个请求至事务性日志
        boolean append(TxnHeader hdr, Record r) throws IOException;
    
        /**
         * Start reading the transaction logs
         * from a given zxid
         * @param zxid
         * @return returns an iterator to read the 
         * next transaction in the logs.
         * @throws IOException
         */
        // 读取事务性日志
        TxnIterator read(long zxid) throws IOException;
        
        /**
         * the last zxid of the logged transactions.
         * @return the last zxid of the logged transactions.
         * @throws IOException
         */
        // 事务性操作的最新zxid
        long getLastLoggedZxid() throws IOException;
        
        /**
         * truncate the log to get in sync with the 
         * leader.
         * @param zxid the zxid to truncate at.
         * @throws IOException 
         */
        // 清空日志,与Leader保持同步
        boolean truncate(long zxid) throws IOException;
        
        /**
         * the dbid for this transaction log. 
         * @return the dbid for this transaction log.
         * @throws IOException
         */
        // 获取数据库的id
        long getDbId() throws IOException;
        
        /**
         * commmit the trasaction and make sure
         * they are persisted
         * @throws IOException
         */
        // 提交事务并进行确认
        void commit() throws IOException;
       
        /** 
         * close the transactions logs
         */
        // 关闭事务性日志
        void close() throws IOException;
        /**
         * an iterating interface for reading 
         * transaction logs. 
         */
        // 读取事务日志的迭代器接口
        public interface TxnIterator {
            /**
             * return the transaction header.
             * @return return the transaction header.
             */
            // 获取事务头部
            TxnHeader getHeader();
            
            /**
             * return the transaction record.
             * @return return the transaction record.
             */
            // 获取事务
            Record getTxn();
         
            /**
             * go to the next transaction record.
             * @throws IOException
             */
            // 下个事务
            boolean next() throws IOException;
            
            /**
             * close files and release the 
             * resources
             * @throws IOException
             */
            // 关闭文件释放资源
            void close() throws IOException;
        }
    }

      其中,TxnLog除了提供读写事务日志的API外,还提供了一个用于读取日志的迭代器接口TxnIterator。

    四、FileTxnLog源码分析

      对于LogFile而言,其格式可分为如下三部分

      LogFile:

        FileHeader TxnList ZeroPad

      FileHeader格式如下  

      FileHeader: {
        magic 4bytes (ZKLG)
        version 4bytes
        dbid 8bytes
      }

      TxnList格式如下

      TxnList:

        Txn || Txn TxnList

      Txn格式如下

      Txn:

        checksum Txnlen TxnHeader Record 0x42

      Txnlen格式如下

      Txnlen:
        len 4bytes

      TxnHeader格式如下

      TxnHeader: {

        sessionid 8bytes
        cxid 4bytes
          zxid 8bytes
        time 8bytes
        type 4bytes
      }

      ZeroPad格式如下

      ZeroPad:
        0 padded to EOF (filled during preallocation stage)

      了解LogFile的格式对于理解源码会有很大的帮助。

      4.1 属性  

    public class FileTxnLog implements TxnLog {
        private static final Logger LOG;
        
        // 预分配大小 64M
        static long preAllocSize =  65536 * 1024;
        
        // 魔术数字,默认为1514884167
        public final static int TXNLOG_MAGIC =
            ByteBuffer.wrap("ZKLG".getBytes()).getInt();
    
        // 版本号
        public final static int VERSION = 2;
    
        /** Maximum time we allow for elapsed fsync before WARNing */
        // 进行同步时,发出warn之前所能等待的最长时间
        private final static long fsyncWarningThresholdMS;
    
        // 静态属性,确定Logger、预分配空间大小和最长时间
        static {
            LOG = LoggerFactory.getLogger(FileTxnLog.class);
    
            String size = System.getProperty("zookeeper.preAllocSize");
            if (size != null) {
                try {
                    preAllocSize = Long.parseLong(size) * 1024;
                } catch (NumberFormatException e) {
                    LOG.warn(size + " is not a valid value for preAllocSize");
                }
            }
            fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000);
        }
        
        // 最大(新)的zxid
        long lastZxidSeen;
        // 存储数据相关的流
        volatile BufferedOutputStream logStream = null;
        volatile OutputArchive oa;
        volatile FileOutputStream fos = null;
    
        // log目录文件
        File logDir;
        
        // 是否强制同步
        private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");;
        
        // 数据库id
        long dbId;
        
        // 流列表
        private LinkedList<FileOutputStream> streamsToFlush =
            new LinkedList<FileOutputStream>();
        
        // 当前大小
        long currentSize;
        // 写日志文件
        File logFileWrite = null;
    }

      4.2. 核心函数 

      1. append函数

        public synchronized boolean append(TxnHeader hdr, Record txn)
            throws IOException
        {
            if (hdr != null) { // 事务头部不为空
                if (hdr.getZxid() <= lastZxidSeen) { // 事务的zxid小于等于最后的zxid
                    LOG.warn("Current zxid " + hdr.getZxid()
                            + " is <= " + lastZxidSeen + " for "
                            + hdr.getType());
                }
                if (logStream==null) { // 日志流为空
                   if(LOG.isInfoEnabled()){
                        LOG.info("Creating new log file: log." +  
                                Long.toHexString(hdr.getZxid()));
                   }
                   
                   // 
                   logFileWrite = new File(logDir, ("log." + 
                           Long.toHexString(hdr.getZxid())));
                   fos = new FileOutputStream(logFileWrite);
                   logStream=new BufferedOutputStream(fos);
                   oa = BinaryOutputArchive.getArchive(logStream);
                   // 
                   FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
                   // 序列化
                   fhdr.serialize(oa, "fileheader");
                   // Make sure that the magic number is written before padding.
                   // 刷新到磁盘
                   logStream.flush();
                   
                   // 当前通道的大小
                   currentSize = fos.getChannel().position();
                   // 添加fos
                   streamsToFlush.add(fos);
                }
                
                // 填充文件
                padFile(fos);
                
                // Serializes transaction header and transaction data into a byte buffer.
                // 将事务头和事务数据序列化成Byte Buffer
                byte[] buf = Util.marshallTxnEntry(hdr, txn);
                if (buf == null || buf.length == 0) { // 为空,抛出异常
                    throw new IOException("Faulty serialization for header " +
                            "and txn");
                }
                // 生成一个验证算法
                Checksum crc = makeChecksumAlgorithm();
                // Updates the current checksum with the specified array of bytes
                // 使用Byte数组来更新当前的Checksum
                crc.update(buf, 0, buf.length);
                // 写long类型数据
                oa.writeLong(crc.getValue(), "txnEntryCRC");
                // Write the serialized transaction record to the output archive.
                // 将序列化的事务记录写入OutputArchive
                Util.writeTxnBytes(oa, buf);
                
                return true;
            }
            return false;
        }
    View Code

      说明:append函数主要用做向事务日志中添加一个条目,其大体步骤如下

      ① 检查TxnHeader是否为空,若不为空,则进入②,否则,直接返回false

      ② 检查logStream是否为空(初始化为空),若不为空,则进入③,否则,进入⑤

      ③ 初始化写数据相关的流和FileHeader,并序列化FileHeader至指定文件,进入④

      ④ 强制刷新(保证数据存到磁盘),并获取当前写入数据的大小。进入⑤

      ⑤ 填充数据,填充0,进入⑥

      ⑥ 将事务头和事务序列化成ByteBuffer(使用Util.marshallTxnEntry函数),进入⑦

      ⑦ 使用Checksum算法更新步骤⑥的ByteBuffer。进入⑧

      ⑧ 将更新的ByteBuffer写入磁盘文件,返回true

      append间接调用了padLog函数,其源码如下 

        public static long padLogFile(FileOutputStream f,long currentSize,
                long preAllocSize) throws IOException{
            // 获取位置
            long position = f.getChannel().position();
            if (position + 4096 >= currentSize) { // 计算后是否大于当前大小
                // 重新设置当前大小,剩余部分填充0
                currentSize = currentSize + preAllocSize;
                fill.position(0);
                f.getChannel().write(fill, currentSize-fill.remaining());
            }
            return currentSize;
        }
    View Code

      说明:其主要作用是当文件大小不满64MB时,向文件填充0以达到64MB大小。

      2. getLogFiles函数  

        public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
            // 按照zxid对文件进行排序
            List<File> files = Util.sortDataDir(logDirList, "log", true);
            long logZxid = 0;
            // Find the log file that starts before or at the same time as the
            // zxid of the snapshot
            for (File f : files) { // 遍历文件
                // 从文件中获取zxid
                long fzxid = Util.getZxidFromName(f.getName(), "log");
                if (fzxid > snapshotZxid) { // 跳过大于snapshotZxid的文件
                    continue;
                }
                // the files
                // are sorted with zxid's
                if (fzxid > logZxid) { // 找出文件中最大的zxid(同时还需要小于等于snapshotZxid)
                    logZxid = fzxid;
                }
            }
            // 文件列表
            List<File> v=new ArrayList<File>(5);
            for (File f : files) { // 再次遍历文件
                // 从文件中获取zxid
                long fzxid = Util.getZxidFromName(f.getName(), "log");
                if (fzxid < logZxid) { // 跳过小于logZxid的文件
                    continue;
                }
                // 添加
                v.add(f);
            }
            // 转化成File[] 类型后返回
            return v.toArray(new File[0]);
    
        }
    View Code

      说明:该函数的作用是找出刚刚小于或者等于snapshot的所有log文件。其步骤大致如下。

      ① 对所有log文件按照zxid进行升序排序,进入②

      ② 遍历所有log文件并记录刚刚小于或等于给定snapshotZxid的log文件的logZxid,进入③

        ③ 再次遍历log文件,添加zxid大于等于步骤②中的logZxid的所有log文件,进入④

      ④ 转化后返回

      getLogFiles函数调用了sortDataDir,其源码如下: 

    public static List<File> sortDataDir(File[] files, String prefix, boolean ascending)
        {
            if(files==null) 
                return new ArrayList<File>(0);
            // 转化为列表
            List<File> filelist = Arrays.asList(files);
            // 进行排序,Comparator是关键,根据zxid进行排序
            Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
            return filelist;
        }
    View Code

      说明:其用于排序log文件,可以选择根据zxid进行升序或降序。

      getLogFiles函数间接调用了getZxidFromName,其源码如下: 

        // 从文件名中解析出zxid
        public static long getZxidFromName(String name, String prefix) {
            long zxid = -1;
            // 对文件名进行分割
            String nameParts[] = name.split("\.");
            if (nameParts.length == 2 && nameParts[0].equals(prefix)) { // 前缀相同
                try {
                    // 转化成长整形
                    zxid = Long.parseLong(nameParts[1], 16);
                } catch (NumberFormatException e) {
                }
            }
            return zxid;
        }
    View Code

      说明:getZxidFromName主要用作从文件名中解析zxid,并且需要从指定的前缀开始。

      3. getLastLoggedZxid函数 

        public long getLastLoggedZxid() {
            // 获取已排好序的所有的log文件
            File[] files = getLogFiles(logDir.listFiles(), 0);
            // 获取最大的zxid(最后一个log文件对应的zxid)
            long maxLog=files.length>0?
                    Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;
    
            // if a log file is more recent we must scan it to find
            // the highest zxid
            // 
            long zxid = maxLog;
            // 迭代器
            TxnIterator itr = null;
            try {
                // 新生FileTxnLog
                FileTxnLog txn = new FileTxnLog(logDir);
                // 开始读取从给定zxid之后的所有事务
                itr = txn.read(maxLog);
                while (true) { // 遍历
                    if(!itr.next()) // 是否存在下一项
                        break;
                    // 获取事务头
                    TxnHeader hdr = itr.getHeader();
                    // 获取zxid
                    zxid = hdr.getZxid();
                }
            } catch (IOException e) {
                LOG.warn("Unexpected exception", e);
            } finally {
                // 关闭迭代器
                close(itr);
            }
            return zxid;
        }
    View Code

      说明:该函数主要用于获取记录在log中的最后一个zxid。其步骤大致如下

      ① 获取已排好序的所有log文件,并从最后一个文件中取出zxid作为候选的最大zxid,进入②

      ② 新生成FileTxnLog并读取步骤①中zxid之后的所有事务,进入③

      ③ 遍历所有事务并提取出相应的zxid,最后返回。

      其中getLastLoggedZxid调用了read函数,其源码如下 

    public TxnIterator read(long zxid) throws IOException {
            // 返回事务文件访问迭代器
            return new FileTxnIterator(logDir, zxid);
        }
    View Code

      说明:read函数会生成一个FileTxnIterator,其是TxnLog.TxnIterator的子类,之后在FileTxnIterator构造函数中会调用init函数,其源码如下 

        void init() throws IOException {
            // 新生成文件列表
            storedFiles = new ArrayList<File>();
            // 进行排序
            List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
            for (File f: files) { // 遍历文件
                if (Util.getZxidFromName(f.getName(), "log") >= zxid) { // 添加zxid大于等于指定zxid的文件
                    storedFiles.add(f);
                }
                // add the last logfile that is less than the zxid
                else if (Util.getZxidFromName(f.getName(), "log") < zxid) { // 只添加一个zxid小于指定zxid的文件,然后退出
                    storedFiles.add(f);
                    break;
                }
            }
            // go to the next logfile
            // 进入下一个log文件
            goToNextLog();
            if (!next()) // 不存在下一项,返回
                return;
            while (hdr.getZxid() < zxid) { // 从事务头中获取zxid小于给定zxid,直到不存在下一项或者大于给定zxid时退出
                if (!next())
                    return;
            }
        }
    View Code

      说明:init函数用于进行初始化操作,会根据zxid的不同进行不同的初始化操作,在init函数中会调用goToNextLog函数,其源码如下  

        private boolean goToNextLog() throws IOException {
            if (storedFiles.size() > 0) { // 存储的文件列表大于0
                // 取最后一个log文件
                this.logFile = storedFiles.remove(storedFiles.size()-1);
                // 针对该文件,创建InputArchive
                ia = createInputArchive(this.logFile);
                // 返回true
                return true;
            }
            return false;
        }
    View Code

      说明:goToNextLog表示选取下一个log文件,在init函数中还调用了next函数,其源码如下  

        public boolean next() throws IOException {
            if (ia == null) { // 为空,返回false
                return false;
            }
            try {
                // 读取长整形crcValue
                long crcValue = ia.readLong("crcvalue");
                // 通过input archive读取一个事务条目
                byte[] bytes = Util.readTxnBytes(ia);
                // Since we preallocate, we define EOF to be an
                if (bytes == null || bytes.length==0) { // 对bytes进行判断
                    throw new EOFException("Failed to read " + logFile);
                }
                // EOF or corrupted record
                // validate CRC
                // 验证CRC
                Checksum crc = makeChecksumAlgorithm();
                // 更新
                crc.update(bytes, 0, bytes.length);
                if (crcValue != crc.getValue()) // 验证不相等,抛出异常
                    throw new IOException(CRC_ERROR);
                if (bytes == null || bytes.length == 0) // bytes为空,返回false
                    return false;
                // 新生成TxnHeader
                hdr = new TxnHeader();
                // 将Txn反序列化,并且将对应的TxnHeader反序列化至hdr,整个Record反序列化至record
                record = SerializeUtils.deserializeTxn(bytes, hdr);
            } catch (EOFException e) { // 抛出异常
                LOG.debug("EOF excepton " + e);
                // 关闭输入流
                inputStream.close();
                // 赋值为null
                inputStream = null;
                ia = null;
                hdr = null;
                // this means that the file has ended
                // we should go to the next file
                if (!goToNextLog()) { // 没有log文件,则返回false
                    return false;
                }
                // if we went to the next log file, we should call next() again
                // 继续调用next
                return next();
            } catch (IOException e) {
                inputStream.close();
                throw e;
            }
            // 返回true
            return true;
        }
    View Code

      说明:next表示将迭代器移动至下一个事务,方便读取,next函数的步骤如下。

      ① 读取事务的crcValue值,用于后续的验证,进入②

      ② 读取事务,使用CRC32进行更新并与①中的结果进行比对,若不相同,则抛出异常,否则,进入③

      ③ 将事务进行反序列化并保存至相应的属性中(如事务头和事务体),会确定具体的事务操作类型。

      ④ 在读取过程抛出异常时,会首先关闭流,然后再尝试调用next函数(即进入下一个事务进行读取)。

      4. commit函数  

        public synchronized void commit() throws IOException {
            if (logStream != null) {
                // 强制刷到磁盘
                logStream.flush();
            }
            for (FileOutputStream log : streamsToFlush) { // 遍历流
                // 强制刷到磁盘
                log.flush();
                if (forceSync) { // 是否强制同步
                    long startSyncNS = System.nanoTime();
                    
                    log.getChannel().force(false);
                    // 计算流式的时间
                    long syncElapsedMS =
                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                    if (syncElapsedMS > fsyncWarningThresholdMS) { // 大于阈值时则会警告
                        LOG.warn("fsync-ing the write ahead log in "
                                + Thread.currentThread().getName()
                                + " took " + syncElapsedMS
                                + "ms which will adversely effect operation latency. "
                                + "See the ZooKeeper troubleshooting guide");
                    }
                }
            }
            while (streamsToFlush.size() > 1) { // 移除流并关闭
                streamsToFlush.removeFirst().close();
            }
        }
    View Code

      说明:该函数主要用于提交事务日志至磁盘,其大致步骤如下

      ① 若日志流logStream不为空,则强制刷新至磁盘,进入②

      ② 遍历需要刷新至磁盘的所有流streamsToFlush并进行刷新,进入③

      ③ 判断是否需要强制性同步,如是,则计算每个流的流式时间并在控制台给出警告,进入④

      ④ 移除所有流并关闭。

      5. truncate函数 

        public boolean truncate(long zxid) throws IOException {
            FileTxnIterator itr = null;
            try {
                // 获取迭代器
                itr = new FileTxnIterator(this.logDir, zxid);
                PositionInputStream input = itr.inputStream;
                long pos = input.getPosition();
                // now, truncate at the current position
                // 从当前位置开始清空
                RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
                raf.setLength(pos);
                raf.close();
                while (itr.goToNextLog()) { // 存在下一个log文件
                    if (!itr.logFile.delete()) { // 删除
                        LOG.warn("Unable to truncate {}", itr.logFile);
                    }
                }
            } finally {
                // 关闭迭代器
                close(itr);
            }
            return true;
        }
    View Code

      说明:该函数用于清空大于给定zxid的所有事务日志。

    五、总结

      对于持久化中的TxnLog和FileTxnLog的源码分析就已经完成了,其源码还是相对简单,也谢谢各位园友的观看~ 

  • 相关阅读:
    Azure HPC Pack Cluster添加辅助节点
    Azure HPC Pack 辅助节点模板配置
    Azure HPC Pack配置管理系列(PART6)
    Windows HPC Pack 2012 R2配置
    Azure HPC Pack 节点提升成域控制器
    Azure HPC Pack VM 节点创建和配置
    Azure HPC Pack 部署必要条件准备
    Azure HPC Pack 基础拓扑概述
    Azure VM 性能计数器配置
    Maven私仓配置
  • 原文地址:https://www.cnblogs.com/leesf456/p/6279956.html
Copyright © 2011-2022 走看看