zoukankan      html  css  js  c++  java
  • goreplay~文件输出解析

    这个类本身没太多东西,直接看,

    package main
    
    import (
        "bufio"
        "compress/gzip"
        "errors"
        "fmt"
        "io"
        "log"
        "os"
        "path/filepath"
        "runtime/debug"
        "sort"
        "strconv"
        "strings"
        "sync"
        "time"
    
        "github.com/buger/goreplay/size"
    )
    
    var dateFileNameFuncs = map[string]func(*FileOutput) string{
        "%Y":  func(o *FileOutput) string { return time.Now().Format("2006") },
        "%m":  func(o *FileOutput) string { return time.Now().Format("01") },
        "%d":  func(o *FileOutput) string { return time.Now().Format("02") },
        "%H":  func(o *FileOutput) string { return time.Now().Format("15") },
        "%M":  func(o *FileOutput) string { return time.Now().Format("04") },
        "%S":  func(o *FileOutput) string { return time.Now().Format("05") },
        "%NS": func(o *FileOutput) string { return fmt.Sprint(time.Now().Nanosecond()) },
        "%r":  func(o *FileOutput) string { return string(o.currentID) },
        "%t":  func(o *FileOutput) string { return string(o.payloadType) },
    }
    
    // FileOutputConfig ...
    type FileOutputConfig struct {
        FlushInterval     time.Duration `json:"output-file-flush-interval"`
        SizeLimit         size.Size     `json:"output-file-size-limit"`
        OutputFileMaxSize size.Size     `json:"output-file-max-size-limit"`
        QueueLimit        int           `json:"output-file-queue-limit"`
        Append            bool          `json:"output-file-append"`
        BufferPath        string        `json:"output-file-buffer"`
        onClose           func(string)
    }
    
    // FileOutput output plugin
    type FileOutput struct {
        sync.RWMutex
        pathTemplate   string
        currentName    string
        file           *os.File
        QueueLength    int
        chunkSize      int
        writer         io.Writer
        requestPerFile bool
        currentID      []byte
        payloadType    []byte
        closed         bool
        totalFileSize  size.Size
    
        config *FileOutputConfig
    }
    
    // NewFileOutput constructor for FileOutput, accepts path
    func NewFileOutput(pathTemplate string, config *FileOutputConfig) *FileOutput {
        o := new(FileOutput)
        o.pathTemplate = pathTemplate
        o.config = config
        o.updateName()
    
        if strings.Contains(pathTemplate, "%r") {
            o.requestPerFile = true
        }
    
        if config.FlushInterval == 0 {
            config.FlushInterval = 100 * time.Millisecond
        }
    
        go func() {
            for {
                time.Sleep(config.FlushInterval)
                if o.IsClosed() {
                    break
                }
                o.updateName()
                o.flush()
            }
        }()
    
        return o
    }
    
    func getFileIndex(name string) int {
        ext := filepath.Ext(name)
        withoutExt := strings.TrimSuffix(name, ext)
    
        if idx := strings.LastIndex(withoutExt, "_"); idx != -1 {
            if i, err := strconv.Atoi(withoutExt[idx+1:]); err == nil {
                return i
            }
        }
    
        return -1
    }
    
    func setFileIndex(name string, idx int) string {
        idxS := strconv.Itoa(idx)
        ext := filepath.Ext(name)
        withoutExt := strings.TrimSuffix(name, ext)
    
        if i := strings.LastIndex(withoutExt, "_"); i != -1 {
            if _, err := strconv.Atoi(withoutExt[i+1:]); err == nil {
                withoutExt = withoutExt[:i]
            }
        }
    
        return withoutExt + "_" + idxS + ext
    }
    
    func withoutIndex(s string) string {
        if i := strings.LastIndex(s, "_"); i != -1 {
            return s[:i]
        }
    
        return s
    }
    
    type sortByFileIndex []string
    
    func (s sortByFileIndex) Len() int {
        return len(s)
    }
    
    func (s sortByFileIndex) Swap(i, j int) {
        s[i], s[j] = s[j], s[i]
    }
    
    func (s sortByFileIndex) Less(i, j int) bool {
        if withoutIndex(s[i]) == withoutIndex(s[j]) {
            return getFileIndex(s[i]) < getFileIndex(s[j])
        }
    
        return s[i] < s[j]
    }
    
    func (o *FileOutput) filename() string {
        o.RLock()
        defer o.RUnlock()
    
        path := o.pathTemplate
    
        for name, fn := range dateFileNameFuncs {
            path = strings.Replace(path, name, fn(o), -1)
        }
    
        if !o.config.Append {
            nextChunk := false
    
            if o.currentName == "" ||
                ((o.config.QueueLimit > 0 && o.QueueLength >= o.config.QueueLimit) ||
                    (o.config.SizeLimit > 0 && o.chunkSize >= int(o.config.SizeLimit))) {
                nextChunk = true
            }
    
            ext := filepath.Ext(path)
            withoutExt := strings.TrimSuffix(path, ext)
    
            if matches, err := filepath.Glob(withoutExt + "*" + ext); err == nil {
                if len(matches) == 0 {
                    return setFileIndex(path, 0)
                }
                sort.Sort(sortByFileIndex(matches))
    
                last := matches[len(matches)-1]
    
                fileIndex := 0
                if idx := getFileIndex(last); idx != -1 {
                    fileIndex = idx
    
                    if nextChunk {
                        fileIndex++
                    }
                }
    
                return setFileIndex(last, fileIndex)
            }
        }
    
        return path
    }
    
    func (o *FileOutput) updateName() {
        name := filepath.Clean(o.filename())
        o.Lock()
        o.currentName = name
        o.Unlock()
    }
    
    // PluginWrite writes message to this plugin
    func (o *FileOutput) PluginWrite(msg *Message) (n int, err error) {
        if o.requestPerFile {
            o.Lock()
            meta := payloadMeta(msg.Meta)
            o.currentID = meta[1]
            o.payloadType = meta[0]
            o.Unlock()
        }
    
        o.updateName()
        o.Lock()
        defer o.Unlock()
    
        if o.file == nil || o.currentName != o.file.Name() {
            o.closeLocked()
    
            o.file, err = os.OpenFile(o.currentName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
            o.file.Sync()
    
            if strings.HasSuffix(o.currentName, ".gz") {
                o.writer = gzip.NewWriter(o.file)
            } else {
                o.writer = bufio.NewWriter(o.file)
            }
    
            if err != nil {
                log.Fatal(o, "Cannot open file %q. Error: %s", o.currentName, err)
            }
    
            o.QueueLength = 0
        }
    
        var nn int
        n, err = o.writer.Write(msg.Meta)
        nn, err = o.writer.Write(msg.Data)
        n += nn
        nn, err = o.writer.Write(payloadSeparatorAsBytes)
        n += nn
    
        o.totalFileSize += size.Size(n)
        o.QueueLength++
    
        if Settings.OutputFileConfig.OutputFileMaxSize > 0 && o.totalFileSize >= Settings.OutputFileConfig.OutputFileMaxSize {
            return n, errors.New("File output reached size limit")
        }
    
        return n, err
    }
    
    func (o *FileOutput) flush() {
        // Don't exit on panic
        defer func() {
            if r := recover(); r != nil {
                Debug(0, "[OUTPUT-FILE] PANIC while file flush: ", r, o, string(debug.Stack()))
            }
        }()
    
        o.Lock()
        defer o.Unlock()
    
        if o.file != nil {
            if strings.HasSuffix(o.currentName, ".gz") {
                o.writer.(*gzip.Writer).Flush()
            } else {
                o.writer.(*bufio.Writer).Flush()
            }
    
            if stat, err := o.file.Stat(); err == nil {
                o.chunkSize = int(stat.Size())
            } else {
                Debug(0, "[OUTPUT-HTTP] error accessing file size", err)
            }
        }
    }
    
    func (o *FileOutput) String() string {
        return "File output: " + o.file.Name()
    }
    
    func (o *FileOutput) closeLocked() error {
        if o.file != nil {
            if strings.HasSuffix(o.currentName, ".gz") {
                o.writer.(*gzip.Writer).Close()
            } else {
                o.writer.(*bufio.Writer).Flush()
            }
            o.file.Close()
    
            if o.config.onClose != nil {
                o.config.onClose(o.file.Name())
            }
        }
    
        o.closed = true
        return nil
    }
    
    // Close closes the output file that is being written to.
    func (o *FileOutput) Close() error {
        o.Lock()
        defer o.Unlock()
        return o.closeLocked()
    }
    
    // IsClosed returns if the output file is closed or not.
    func (o *FileOutput) IsClosed() bool {
        o.Lock()
        defer o.Unlock()
        return o.closed
    }
  • 相关阅读:
    【2020-04-03】多注意一下自己闲下来的思绪
    vue 去哪网项目 学习笔记(一)
    数据分析相关的内容
    vue 自学项目笔记
    vue 所有的指令
    vue 自学笔记(5) 列表渲染
    vue 自学笔记(4): 样式绑定与条件渲染
    vue 自学笔记(三) 计算属性与侦听器
    自学vue笔记 (二) : 实例与生命周期
    杜教BM模板
  • 原文地址:https://www.cnblogs.com/it-worker365/p/15113819.html
Copyright © 2011-2022 走看看