zoukankan      html  css  js  c++  java
  • goreplay~http输出队列

    output-http-queue-len

    output_http.go 各种初始化

    func NewHTTPOutput(address string, config *HTTPOutputConfig) PluginReadWriter {
        o := new(HTTPOutput)
        var err error
        config.url, err = url.Parse(address)
        if err != nil {
            log.Fatal(fmt.Sprintf("[OUTPUT-HTTP] parse HTTP output URL error[%q]", err))
        }
        if config.url.Scheme == "" {
            config.url.Scheme = "http"
        }
        config.rawURL = config.url.String()
        if config.Timeout < time.Millisecond*100 {
            config.Timeout = time.Second
        }
        if config.BufferSize <= 0 {
            config.BufferSize = 100 * 1024 // 100kb
        }
        if config.WorkersMin <= 0 {
            config.WorkersMin = 1
        }
        if config.WorkersMin > 1000 {
            config.WorkersMin = 1000
        }
        if config.WorkersMax <= 0 {
            config.WorkersMax = math.MaxInt32 // idealy so large
        }
        if config.WorkersMax < config.WorkersMin {
            config.WorkersMax = config.WorkersMin
        }
        if config.QueueLen <= 0 {
            config.QueueLen = 1000
        }
        if config.RedirectLimit < 0 {
            config.RedirectLimit = 0
        }
        if config.WorkerTimeout <= 0 {
            config.WorkerTimeout = time.Second * 2
        }
        o.config = config
        o.stop = make(chan bool)
        if o.config.Stats {
            o.queueStats = NewGorStat("output_http", o.config.StatsMs)
        }
    
        o.queue = make(chan *Message, o.config.QueueLen)
        if o.config.TrackResponses {
            o.responses = make(chan *response, o.config.QueueLen)
        }
        // it should not be buffered to avoid races
        o.stopWorker = make(chan struct{})
    
        if o.config.ElasticSearch != "" {
            o.elasticSearch = new(ESPlugin)
            o.elasticSearch.Init(o.config.ElasticSearch)
        }
        o.client = NewHTTPClient(o.config)
        o.activeWorkers += int32(o.config.WorkersMin)
        for i := 0; i < o.config.WorkersMin; i++ {
            go o.startWorker()
        }
        go o.workerMaster()
        return o
    }

    队列用在哪儿

    func (o *HTTPOutput) startWorker() {
        for {
            select {
            case <-o.stopWorker:
                return
            case msg := <-o.queue:
                o.sendRequest(o.client, msg)
            }
        }
    }
    
    // PluginWrite writes message to this plugin
    func (o *HTTPOutput) PluginWrite(msg *Message) (n int, err error) {
        if !isRequestPayload(msg.Meta) {
            return len(msg.Data), nil
        }
    
        select {
        case <-o.stop:
            return 0, ErrorStopped
        case o.queue <- msg:
        }
    
        if o.config.Stats {
            o.queueStats.Write(len(o.queue))
        }
        if len(o.queue) > 0 {
            // try to start a new worker to serve
            if atomic.LoadInt32(&o.activeWorkers) < int32(o.config.WorkersMax) {
                go o.startWorker()
                atomic.AddInt32(&o.activeWorkers, 1)
            }
        }
        return len(msg.Data) + len(msg.Meta), nil
    }
  • 相关阅读:
    python的编码判断_unicode_gbk/gb2312_utf8(附函数)
    stat文件状态信息结构体
    内核配置中 ramdisk 大小修改
    mount命令详解
    dirent和DIR 结构体 表示文件夹中目录内容信息
    nandwrite 参数
    mke2fs 制作ext2文件系统image
    ext2文件系统错误
    照度/感光度(Lux)
    摄像机的几个重要的技术指标
  • 原文地址:https://www.cnblogs.com/it-worker365/p/15114622.html
Copyright © 2011-2022 走看看