zoukankan      html  css  js  c++  java
  • goreplay~文件输出解析

    这个类本身没太多东西,直接看,

    package main
    
    import (
        "bufio"
        "compress/gzip"
        "errors"
        "fmt"
        "io"
        "log"
        "os"
        "path/filepath"
        "runtime/debug"
        "sort"
        "strconv"
        "strings"
        "sync"
        "time"
    
        "github.com/buger/goreplay/size"
    )
    
    var dateFileNameFuncs = map[string]func(*FileOutput) string{
        "%Y":  func(o *FileOutput) string { return time.Now().Format("2006") },
        "%m":  func(o *FileOutput) string { return time.Now().Format("01") },
        "%d":  func(o *FileOutput) string { return time.Now().Format("02") },
        "%H":  func(o *FileOutput) string { return time.Now().Format("15") },
        "%M":  func(o *FileOutput) string { return time.Now().Format("04") },
        "%S":  func(o *FileOutput) string { return time.Now().Format("05") },
        "%NS": func(o *FileOutput) string { return fmt.Sprint(time.Now().Nanosecond()) },
        "%r":  func(o *FileOutput) string { return string(o.currentID) },
        "%t":  func(o *FileOutput) string { return string(o.payloadType) },
    }
    
    // FileOutputConfig ...
    type FileOutputConfig struct {
        FlushInterval     time.Duration `json:"output-file-flush-interval"`
        SizeLimit         size.Size     `json:"output-file-size-limit"`
        OutputFileMaxSize size.Size     `json:"output-file-max-size-limit"`
        QueueLimit        int           `json:"output-file-queue-limit"`
        Append            bool          `json:"output-file-append"`
        BufferPath        string        `json:"output-file-buffer"`
        onClose           func(string)
    }
    
    // FileOutput output plugin
    type FileOutput struct {
        sync.RWMutex
        pathTemplate   string
        currentName    string
        file           *os.File
        QueueLength    int
        chunkSize      int
        writer         io.Writer
        requestPerFile bool
        currentID      []byte
        payloadType    []byte
        closed         bool
        totalFileSize  size.Size
    
        config *FileOutputConfig
    }
    
    // NewFileOutput constructor for FileOutput, accepts path
    func NewFileOutput(pathTemplate string, config *FileOutputConfig) *FileOutput {
        o := new(FileOutput)
        o.pathTemplate = pathTemplate
        o.config = config
        o.updateName()
    
        if strings.Contains(pathTemplate, "%r") {
            o.requestPerFile = true
        }
    
        if config.FlushInterval == 0 {
            config.FlushInterval = 100 * time.Millisecond
        }
    
        go func() {
            for {
                time.Sleep(config.FlushInterval)
                if o.IsClosed() {
                    break
                }
                o.updateName()
                o.flush()
            }
        }()
    
        return o
    }
    
    func getFileIndex(name string) int {
        ext := filepath.Ext(name)
        withoutExt := strings.TrimSuffix(name, ext)
    
        if idx := strings.LastIndex(withoutExt, "_"); idx != -1 {
            if i, err := strconv.Atoi(withoutExt[idx+1:]); err == nil {
                return i
            }
        }
    
        return -1
    }
    
    func setFileIndex(name string, idx int) string {
        idxS := strconv.Itoa(idx)
        ext := filepath.Ext(name)
        withoutExt := strings.TrimSuffix(name, ext)
    
        if i := strings.LastIndex(withoutExt, "_"); i != -1 {
            if _, err := strconv.Atoi(withoutExt[i+1:]); err == nil {
                withoutExt = withoutExt[:i]
            }
        }
    
        return withoutExt + "_" + idxS + ext
    }
    
    func withoutIndex(s string) string {
        if i := strings.LastIndex(s, "_"); i != -1 {
            return s[:i]
        }
    
        return s
    }
    
    type sortByFileIndex []string
    
    func (s sortByFileIndex) Len() int {
        return len(s)
    }
    
    func (s sortByFileIndex) Swap(i, j int) {
        s[i], s[j] = s[j], s[i]
    }
    
    func (s sortByFileIndex) Less(i, j int) bool {
        if withoutIndex(s[i]) == withoutIndex(s[j]) {
            return getFileIndex(s[i]) < getFileIndex(s[j])
        }
    
        return s[i] < s[j]
    }
    
    func (o *FileOutput) filename() string {
        o.RLock()
        defer o.RUnlock()
    
        path := o.pathTemplate
    
        for name, fn := range dateFileNameFuncs {
            path = strings.Replace(path, name, fn(o), -1)
        }
    
        if !o.config.Append {
            nextChunk := false
    
            if o.currentName == "" ||
                ((o.config.QueueLimit > 0 && o.QueueLength >= o.config.QueueLimit) ||
                    (o.config.SizeLimit > 0 && o.chunkSize >= int(o.config.SizeLimit))) {
                nextChunk = true
            }
    
            ext := filepath.Ext(path)
            withoutExt := strings.TrimSuffix(path, ext)
    
            if matches, err := filepath.Glob(withoutExt + "*" + ext); err == nil {
                if len(matches) == 0 {
                    return setFileIndex(path, 0)
                }
                sort.Sort(sortByFileIndex(matches))
    
                last := matches[len(matches)-1]
    
                fileIndex := 0
                if idx := getFileIndex(last); idx != -1 {
                    fileIndex = idx
    
                    if nextChunk {
                        fileIndex++
                    }
                }
    
                return setFileIndex(last, fileIndex)
            }
        }
    
        return path
    }
    
    func (o *FileOutput) updateName() {
        name := filepath.Clean(o.filename())
        o.Lock()
        o.currentName = name
        o.Unlock()
    }
    
    // PluginWrite writes message to this plugin
    func (o *FileOutput) PluginWrite(msg *Message) (n int, err error) {
        if o.requestPerFile {
            o.Lock()
            meta := payloadMeta(msg.Meta)
            o.currentID = meta[1]
            o.payloadType = meta[0]
            o.Unlock()
        }
    
        o.updateName()
        o.Lock()
        defer o.Unlock()
    
        if o.file == nil || o.currentName != o.file.Name() {
            o.closeLocked()
    
            o.file, err = os.OpenFile(o.currentName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
            o.file.Sync()
    
            if strings.HasSuffix(o.currentName, ".gz") {
                o.writer = gzip.NewWriter(o.file)
            } else {
                o.writer = bufio.NewWriter(o.file)
            }
    
            if err != nil {
                log.Fatal(o, "Cannot open file %q. Error: %s", o.currentName, err)
            }
    
            o.QueueLength = 0
        }
    
        var nn int
        n, err = o.writer.Write(msg.Meta)
        nn, err = o.writer.Write(msg.Data)
        n += nn
        nn, err = o.writer.Write(payloadSeparatorAsBytes)
        n += nn
    
        o.totalFileSize += size.Size(n)
        o.QueueLength++
    
        if Settings.OutputFileConfig.OutputFileMaxSize > 0 && o.totalFileSize >= Settings.OutputFileConfig.OutputFileMaxSize {
            return n, errors.New("File output reached size limit")
        }
    
        return n, err
    }
    
    func (o *FileOutput) flush() {
        // Don't exit on panic
        defer func() {
            if r := recover(); r != nil {
                Debug(0, "[OUTPUT-FILE] PANIC while file flush: ", r, o, string(debug.Stack()))
            }
        }()
    
        o.Lock()
        defer o.Unlock()
    
        if o.file != nil {
            if strings.HasSuffix(o.currentName, ".gz") {
                o.writer.(*gzip.Writer).Flush()
            } else {
                o.writer.(*bufio.Writer).Flush()
            }
    
            if stat, err := o.file.Stat(); err == nil {
                o.chunkSize = int(stat.Size())
            } else {
                Debug(0, "[OUTPUT-HTTP] error accessing file size", err)
            }
        }
    }
    
    func (o *FileOutput) String() string {
        return "File output: " + o.file.Name()
    }
    
    func (o *FileOutput) closeLocked() error {
        if o.file != nil {
            if strings.HasSuffix(o.currentName, ".gz") {
                o.writer.(*gzip.Writer).Close()
            } else {
                o.writer.(*bufio.Writer).Flush()
            }
            o.file.Close()
    
            if o.config.onClose != nil {
                o.config.onClose(o.file.Name())
            }
        }
    
        o.closed = true
        return nil
    }
    
    // Close closes the output file that is being written to.
    func (o *FileOutput) Close() error {
        o.Lock()
        defer o.Unlock()
        return o.closeLocked()
    }
    
    // IsClosed returns if the output file is closed or not.
    func (o *FileOutput) IsClosed() bool {
        o.Lock()
        defer o.Unlock()
        return o.closed
    }
  • 相关阅读:
    LeetCode 1275. 找出井字棋的获胜者 Find Winner on a Tic Tac Toe Game
    LeetCode 307. 区域和检索
    LeetCode 1271 十六进制魔术数字 Hexspeak
    秋实大哥与花 线段树模板
    AcWing 835. Trie字符串统计
    Leetcode 216. 组合总和 III
    Mybatis 示例之 复杂(complex)属性(property)
    Mybatis 示例之 复杂(complex)属性(property)
    Mybatis 高级结果映射 ResultMap Association Collection
    Mybatis 高级结果映射 ResultMap Association Collection
  • 原文地址:https://www.cnblogs.com/it-worker365/p/15113819.html
Copyright © 2011-2022 走看看