zoukankan      html  css  js  c++  java
  • diskqueue.go

    package nsqd

    import (
        "bufio"
        "bytes"
        "encoding/binary"
        "errors"
        "fmt"
        "io"
        "math/rand"
        "os"
        "path"
        "sync"
        "sync/atomic"
        "time"
    )

    // diskQueue implements the BackendQueue interface
    // providing a filesystem backed FIFO queue
    type diskQueue struct {
        // 64bit atomic vars need to be first for proper alignment on 32bit platforms

        // run-time state (also persisted to disk)
        readPos      int64
        writePos     int64
        readFileNum  int64
        writeFileNum int64
        depth        int64

        sync.RWMutex

        // instantiation time metadata
        name            string
        dataPath        string
        maxBytesPerFile int64 // currently this cannot change once created
        minMsgSize      int32
        maxMsgSize      int32
        syncEvery       int64         // number of writes per fsync
        syncTimeout     time.Duration // duration of time per fsync
        exitFlag        int32
        needSync        bool

        // keeps track of the position where we have read
        // (but not yet sent over readChan)
        nextReadPos     int64
        nextReadFileNum int64

        readFile  *os.File
        writeFile *os.File
        reader    *bufio.Reader
        writeBuf  bytes.Buffer

        // exposed via ReadChan()
        readChan chan []byte

        // internal channels
        writeChan         chan []byte
        writeResponseChan chan error
        emptyChan         chan int
        emptyResponseChan chan error
        exitChan          chan int
        exitSyncChan      chan int

        logger Logger
    }

    // newDiskQueue instantiates a new instance of diskQueue, retrieving metadata
    // from the filesystem and starting the read ahead goroutine
    func newDiskQueue(name string, dataPath string, maxBytesPerFile int64,
        minMsgSize int32, maxMsgSize int32,
        syncEvery int64, syncTimeout time.Duration,
        logger Logger) BackendQueue {
        d := diskQueue{
            name:              name,
            dataPath:          dataPath,
            maxBytesPerFile:   maxBytesPerFile,
            minMsgSize:        minMsgSize,
            maxMsgSize:        maxMsgSize,
            readChan:          make(chan []byte),
            writeChan:         make(chan []byte),
            writeResponseChan: make(chan error),
            emptyChan:         make(chan int),
            emptyResponseChan: make(chan error),
            exitChan:          make(chan int),
            exitSyncChan:      make(chan int),
            syncEvery:         syncEvery,
            syncTimeout:       syncTimeout,
            logger:            logger,
        }

        // no need to lock here, nothing else could possibly be touching this instance
        err := d.retrieveMetaData()
        if err != nil && !os.IsNotExist(err) {
            d.logf("ERROR: diskqueue(%s) failed to retrieveMetaData - %s", d.name, err)
        }

        go d.ioLoop()

        return &d
    }

    func (d *diskQueue) logf(f string, args ...interface{}) {
        if d.logger == nil {
            return
        }
        d.logger.Output(2, fmt.Sprintf(f, args...))
    }

    // Depth returns the depth of the queue
    func (d *diskQueue) Depth() int64 {
        return atomic.LoadInt64(&d.depth)
    }

    // ReadChan returns the []byte channel for reading data
    func (d *diskQueue) ReadChan() chan []byte {
        return d.readChan
    }

    // Put writes a []byte to the queue
    func (d *diskQueue) Put(data []byte) error {
        d.RLock()
        defer d.RUnlock()

        if d.exitFlag == 1 {
            return errors.New("exiting")
        }

        d.writeChan <- data
        return <-d.writeResponseChan
    }

    // Close cleans up the queue and persists metadata
    func (d *diskQueue) Close() error {
        err := d.exit(false)
        if err != nil {
            return err
        }
        return d.sync()
    }

    func (d *diskQueue) Delete() error {
        return d.exit(true)
    }

    func (d *diskQueue) exit(deleted bool) error {
        d.Lock()
        defer d.Unlock()

        d.exitFlag = 1

        if deleted {
            d.logf("DISKQUEUE(%s): deleting", d.name)
        } else {
            d.logf("DISKQUEUE(%s): closing", d.name)
        }

        close(d.exitChan)
        // ensure that ioLoop has exited
        <-d.exitSyncChan

        if d.readFile != nil {
            d.readFile.Close()
            d.readFile = nil
        }

        if d.writeFile != nil {
            d.writeFile.Close()
            d.writeFile = nil
        }

        return nil
    }

    // Empty destructively clears out any pending data in the queue
    // by fast forwarding read positions and removing intermediate files
    func (d *diskQueue) Empty() error {
        d.RLock()
        defer d.RUnlock()

        if d.exitFlag == 1 {
            return errors.New("exiting")
        }

        d.logf("DISKQUEUE(%s): emptying", d.name)

        d.emptyChan <- 1
        return <-d.emptyResponseChan
    }

    func (d *diskQueue) deleteAllFiles() error {
        err := d.skipToNextRWFile()

        innerErr := os.Remove(d.metaDataFileName())
        if innerErr != nil && !os.IsNotExist(innerErr) {
            d.logf("ERROR: diskqueue(%s) failed to remove metadata file - %s", d.name, innerErr)
            return innerErr
        }

        return err
    }

    func (d *diskQueue) skipToNextRWFile() error {
        var err error

        if d.readFile != nil {
            d.readFile.Close()
            d.readFile = nil
        }

        if d.writeFile != nil {
            d.writeFile.Close()
            d.writeFile = nil
        }

        for i := d.readFileNum; i <= d.writeFileNum; i++ {
            fn := d.fileName(i)
            innerErr := os.Remove(fn)
            if innerErr != nil && !os.IsNotExist(innerErr) {
                d.logf("ERROR: diskqueue(%s) failed to remove data file - %s", d.name, innerErr)
                err = innerErr
            }
        }

        d.writeFileNum++
        d.writePos = 0
        d.readFileNum = d.writeFileNum
        d.readPos = 0
        d.nextReadFileNum = d.writeFileNum
        d.nextReadPos = 0
        atomic.StoreInt64(&d.depth, 0)

        return err
    }

    // readOne performs a low level filesystem read for a single []byte
    // while advancing read positions and rolling files, if necessary
    func (d *diskQueue) readOne() ([]byte, error) {
        var err error
        var msgSize int32

        if d.readFile == nil {
            curFileName := d.fileName(d.readFileNum)
            d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
            if err != nil {
                return nil, err
            }

            d.logf("DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)

            if d.readPos > 0 {
                _, err = d.readFile.Seek(d.readPos, 0)
                if err != nil {
                    d.readFile.Close()
                    d.readFile = nil
                    return nil, err
                }
            }

            d.reader = bufio.NewReader(d.readFile)
        }

        err = binary.Read(d.reader, binary.BigEndian, &msgSize)
        if err != nil {
            d.readFile.Close()
            d.readFile = nil
            return nil, err
        }

        if msgSize < d.minMsgSize || msgSize > d.maxMsgSize {
            // this file is corrupt and we have no reasonable guarantee on
            // where a new message should begin
            d.readFile.Close()
            d.readFile = nil
            return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
        }

        readBuf := make([]byte, msgSize)
        _, err = io.ReadFull(d.reader, readBuf)
        if err != nil {
            d.readFile.Close()
            d.readFile = nil
            return nil, err
        }

        totalBytes := int64(4 + msgSize)

        // we only advance next* because we have not yet sent this to consumers
        // (where readFileNum, readPos will actually be advanced)
        d.nextReadPos = d.readPos + totalBytes
        d.nextReadFileNum = d.readFileNum

        // TODO: each data file should embed the maxBytesPerFile
        // as the first 8 bytes (at creation time) ensuring that
        // the value can change without affecting runtime
        if d.nextReadPos > d.maxBytesPerFile {
            if d.readFile != nil {
                d.readFile.Close()
                d.readFile = nil
            }

            d.nextReadFileNum++
            d.nextReadPos = 0
        }

        return readBuf, nil
    }

    // writeOne performs a low level filesystem write for a single []byte
    // while advancing write positions and rolling files, if necessary
    func (d *diskQueue) writeOne(data []byte) error {
        var err error

        if d.writeFile == nil {
            curFileName := d.fileName(d.writeFileNum)
            d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
            if err != nil {
                return err
            }

            d.logf("DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)

            if d.writePos > 0 {
                _, err = d.writeFile.Seek(d.writePos, 0)
                if err != nil {
                    d.writeFile.Close()
                    d.writeFile = nil
                    return err
                }
            }
        }

        dataLen := int32(len(data))

        if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
            return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
        }

        d.writeBuf.Reset()
        err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
        if err != nil {
            return err
        }

        _, err = d.writeBuf.Write(data)
        if err != nil {
            return err
        }

        // only write to the file once
        _, err = d.writeFile.Write(d.writeBuf.Bytes())
        if err != nil {
            d.writeFile.Close()
            d.writeFile = nil
            return err
        }

        totalBytes := int64(4 + dataLen)
        d.writePos += totalBytes
        atomic.AddInt64(&d.depth, 1)

        if d.writePos > d.maxBytesPerFile {
            d.writeFileNum++
            d.writePos = 0

            // sync every time we start writing to a new file
            err = d.sync()
            if err != nil {
                d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err)
            }

            if d.writeFile != nil {
                d.writeFile.Close()
                d.writeFile = nil
            }
        }

        return err
    }

    // sync fsyncs the current writeFile and persists metadata
    func (d *diskQueue) sync() error {
        if d.writeFile != nil {
            err := d.writeFile.Sync()
            if err != nil {
                d.writeFile.Close()
                d.writeFile = nil
                return err
            }
        }

        err := d.persistMetaData()
        if err != nil {
            return err
        }

        d.needSync = false
        return nil
    }

    // retrieveMetaData initializes state from the filesystem
    func (d *diskQueue) retrieveMetaData() error {
        var f *os.File
        var err error

        fileName := d.metaDataFileName()
        f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
        if err != nil {
            return err
        }
        defer f.Close()

        var depth int64
        _, err = fmt.Fscanf(f, "%d
    %d,%d
    %d,%d
    ",
            &depth,
            &d.readFileNum, &d.readPos,
            &d.writeFileNum, &d.writePos)
        if err != nil {
            return err
        }
        atomic.StoreInt64(&d.depth, depth)
        d.nextReadFileNum = d.readFileNum
        d.nextReadPos = d.readPos

        return nil
    }

    // persistMetaData atomically writes state to the filesystem
    func (d *diskQueue) persistMetaData() error {
        var f *os.File
        var err error

        fileName := d.metaDataFileName()
        tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())

        // write to tmp file
        f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600)
        if err != nil {
            return err
        }

        _, err = fmt.Fprintf(f, "%d
    %d,%d
    %d,%d
    ",
            atomic.LoadInt64(&d.depth),
            d.readFileNum, d.readPos,
            d.writeFileNum, d.writePos)
        if err != nil {
            f.Close()
            return err
        }
        f.Sync()
        f.Close()

        // atomically rename
        return atomicRename(tmpFileName, fileName)
    }

    func (d *diskQueue) metaDataFileName() string {
        return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.meta.dat"), d.name)
    }

    func (d *diskQueue) fileName(fileNum int64) string {
        return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum)
    }

    func (d *diskQueue) checkTailCorruption(depth int64) {
        if d.readFileNum < d.writeFileNum || d.readPos < d.writePos {
            return
        }

        // we've reached the end of the diskqueue
        // if depth isn't 0 something went wrong
        if depth != 0 {
            if depth < 0 {
                d.logf(
                    "ERROR: diskqueue(%s) negative depth at tail (%d), metadata corruption, resetting 0...",
                    d.name, depth)
            } else if depth > 0 {
                d.logf(
                    "ERROR: diskqueue(%s) positive depth at tail (%d), data loss, resetting 0...",
                    d.name, depth)
            }
            // force set depth 0
            atomic.StoreInt64(&d.depth, 0)
            d.needSync = true
        }

        if d.readFileNum != d.writeFileNum || d.readPos != d.writePos {
            if d.readFileNum > d.writeFileNum {
                d.logf(
                    "ERROR: diskqueue(%s) readFileNum > writeFileNum (%d > %d), corruption, skipping to next writeFileNum and resetting 0...",
                    d.name, d.readFileNum, d.writeFileNum)
            }

            if d.readPos > d.writePos {
                d.logf(
                    "ERROR: diskqueue(%s) readPos > writePos (%d > %d), corruption, skipping to next writeFileNum and resetting 0...",
                    d.name, d.readPos, d.writePos)
            }

            d.skipToNextRWFile()
            d.needSync = true
        }
    }

    func (d *diskQueue) moveForward() {
        oldReadFileNum := d.readFileNum
        d.readFileNum = d.nextReadFileNum
        d.readPos = d.nextReadPos
        depth := atomic.AddInt64(&d.depth, -1)

        // see if we need to clean up the old file
        if oldReadFileNum != d.nextReadFileNum {
            // sync every time we start reading from a new file
            d.needSync = true

            fn := d.fileName(oldReadFileNum)
            err := os.Remove(fn)
            if err != nil {
                d.logf("ERROR: failed to Remove(%s) - %s", fn, err)
            }
        }

        d.checkTailCorruption(depth)
    }

    func (d *diskQueue) handleReadError() {
        // jump to the next read file and rename the current (bad) file
        if d.readFileNum == d.writeFileNum {
            // if you can't properly read from the current write file it's safe to
            // assume that something is fucked and we should skip the current file too
            if d.writeFile != nil {
                d.writeFile.Close()
                d.writeFile = nil
            }
            d.writeFileNum++
            d.writePos = 0
        }

        badFn := d.fileName(d.readFileNum)
        badRenameFn := badFn + ".bad"

        d.logf(
            "NOTICE: diskqueue(%s) jump to next file and saving bad file as %s",
            d.name, badRenameFn)

        err := atomicRename(badFn, badRenameFn)
        if err != nil {
            d.logf(
                "ERROR: diskqueue(%s) failed to rename bad diskqueue file %s to %s",
                d.name, badFn, badRenameFn)
        }

        d.readFileNum++
        d.readPos = 0
        d.nextReadFileNum = d.readFileNum
        d.nextReadPos = 0

        // significant state change, schedule a sync on the next iteration
        d.needSync = true
    }

    // ioLoop provides the backend for exposing a go channel (via ReadChan())
    // in support of multiple concurrent queue consumers
    //
    // it works by looping and branching based on whether or not the queue has data
    // to read and blocking until data is either read or written over the appropriate
    // go channels
    //
    // conveniently this also means that we're asynchronously reading from the filesystem
    func (d *diskQueue) ioLoop() {
        var dataRead []byte
        var err error
        var count int64
        var r chan []byte

        syncTicker := time.NewTicker(d.syncTimeout)

        for {
            // dont sync all the time :)
            if count == d.syncEvery {
                d.needSync = true
            }

            if d.needSync {
                err = d.sync()
                if err != nil {
                    d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err)
                }
                count = 0
            }

            if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
                if d.nextReadPos == d.readPos {
                    dataRead, err = d.readOne()
                    if err != nil {
                        d.logf("ERROR: reading from diskqueue(%s) at %d of %s - %s",
                            d.name, d.readPos, d.fileName(d.readFileNum), err)
                        d.handleReadError()
                        continue
                    }
                }
                r = d.readChan
            } else {
                r = nil
            }

            select {
            // the Go channel spec dictates that nil channel operations (read or write)
            // in a select are skipped, we set r to d.readChan only when there is data to read
            case r <- dataRead:
                count++
                // moveForward sets needSync flag if a file is removed
                d.moveForward()
            case <-d.emptyChan:
                d.emptyResponseChan <- d.deleteAllFiles()
                count = 0
            case dataWrite := <-d.writeChan:
                count++
                d.writeResponseChan <- d.writeOne(dataWrite)
            case <-syncTicker.C:
                if count == 0 {
                    // avoid sync when there's no activity
                    continue
                }
                d.needSync = true
            case <-d.exitChan:
                goto exit
            }
        }

    exit:
        d.logf("DISKQUEUE(%s): closing ... ioLoop", d.name)
        syncTicker.Stop()
        d.exitSyncChan <- 1
    }

  • 相关阅读:
    request.getParameterMap 跟request.getParameter区别
    SQL语句中---删除表数据drop、truncate和delete的用法
    今日出现两个错误
    html和jsp的区别及优缺点
    怎么将 美国的日期格式改成中国的日期格式
    java web相关的面试题
    i++与++i的关系
    Oracle常见的语法,以及跟MySQL的区别
    DBA
    java基础之印象笔记
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7457294.html
Copyright © 2011-2022 走看看