zoukankan      html  css  js  c++  java
  • goreplay~http输出队列

    output-http-queue-len

    output_http.go 各种初始化

    func NewHTTPOutput(address string, config *HTTPOutputConfig) PluginReadWriter {
        o := new(HTTPOutput)
        var err error
        config.url, err = url.Parse(address)
        if err != nil {
            log.Fatal(fmt.Sprintf("[OUTPUT-HTTP] parse HTTP output URL error[%q]", err))
        }
        if config.url.Scheme == "" {
            config.url.Scheme = "http"
        }
        config.rawURL = config.url.String()
        if config.Timeout < time.Millisecond*100 {
            config.Timeout = time.Second
        }
        if config.BufferSize <= 0 {
            config.BufferSize = 100 * 1024 // 100kb
        }
        if config.WorkersMin <= 0 {
            config.WorkersMin = 1
        }
        if config.WorkersMin > 1000 {
            config.WorkersMin = 1000
        }
        if config.WorkersMax <= 0 {
            config.WorkersMax = math.MaxInt32 // idealy so large
        }
        if config.WorkersMax < config.WorkersMin {
            config.WorkersMax = config.WorkersMin
        }
        if config.QueueLen <= 0 {
            config.QueueLen = 1000
        }
        if config.RedirectLimit < 0 {
            config.RedirectLimit = 0
        }
        if config.WorkerTimeout <= 0 {
            config.WorkerTimeout = time.Second * 2
        }
        o.config = config
        o.stop = make(chan bool)
        if o.config.Stats {
            o.queueStats = NewGorStat("output_http", o.config.StatsMs)
        }
    
        o.queue = make(chan *Message, o.config.QueueLen)
        if o.config.TrackResponses {
            o.responses = make(chan *response, o.config.QueueLen)
        }
        // it should not be buffered to avoid races
        o.stopWorker = make(chan struct{})
    
        if o.config.ElasticSearch != "" {
            o.elasticSearch = new(ESPlugin)
            o.elasticSearch.Init(o.config.ElasticSearch)
        }
        o.client = NewHTTPClient(o.config)
        o.activeWorkers += int32(o.config.WorkersMin)
        for i := 0; i < o.config.WorkersMin; i++ {
            go o.startWorker()
        }
        go o.workerMaster()
        return o
    }

    队列用在哪儿

    func (o *HTTPOutput) startWorker() {
        for {
            select {
            case <-o.stopWorker:
                return
            case msg := <-o.queue:
                o.sendRequest(o.client, msg)
            }
        }
    }
    
    // PluginWrite writes message to this plugin
    func (o *HTTPOutput) PluginWrite(msg *Message) (n int, err error) {
        if !isRequestPayload(msg.Meta) {
            return len(msg.Data), nil
        }
    
        select {
        case <-o.stop:
            return 0, ErrorStopped
        case o.queue <- msg:
        }
    
        if o.config.Stats {
            o.queueStats.Write(len(o.queue))
        }
        if len(o.queue) > 0 {
            // try to start a new worker to serve
            if atomic.LoadInt32(&o.activeWorkers) < int32(o.config.WorkersMax) {
                go o.startWorker()
                atomic.AddInt32(&o.activeWorkers, 1)
            }
        }
        return len(msg.Data) + len(msg.Meta), nil
    }
  • 相关阅读:
    OPPO R9sPlus MIFlash线刷TWRP Recovery ROOT详细教程
    OPPO R11 R11plus系列 解锁BootLoader ROOT Xposed 你的手机你做主
    努比亚(nubia) M2青春版 NX573J 解锁BootLoader 并进入临时recovery ROOT
    华为 荣耀 等手机解锁BootLoader
    青橙 M4 解锁BootLoader 并刷入recovery ROOT
    程序员修炼之道阅读笔03
    冲刺8
    典型用户模板分析
    学习进度八
    冲刺7
  • 原文地址:https://www.cnblogs.com/it-worker365/p/15114622.html
Copyright © 2011-2022 走看看