zoukankan      html  css  js  c++  java
  • goreplay~http输出工作线程

    http输出工作线程

    NewHTTPOutput 默认情况

        if config.WorkersMin <= 0 {
            config.WorkersMin = 1
        }
        if config.WorkersMin > 1000 {
            config.WorkersMin = 1000
        }
        if config.WorkersMax <= 0 {
            config.WorkersMax = math.MaxInt32 // idealy so large
        }
        if config.WorkersMax < config.WorkersMin {
            config.WorkersMax = config.WorkersMin
        }
        if config.WorkerTimeout <= 0 {
    config.WorkerTimeout = time.Second * 2
    }

    配置后启动httpclient,然后

        o.client = NewHTTPClient(o.config)
        o.activeWorkers += int32(o.config.WorkersMin)
        for i := 0; i < o.config.WorkersMin; i++ {
            go o.startWorker()
        }

    启动多个发送进程

    func (o *HTTPOutput) startWorker() {
        for {
            select {
            case <-o.stopWorker:
                return
            case msg := <-o.queue:
                o.sendRequest(o.client, msg)
            }
        }
    }

    执行发送

    func (o *HTTPOutput) sendRequest(client *HTTPClient, msg *Message) {
        if !isRequestPayload(msg.Meta) {
            return
        }
        uuid := payloadID(msg.Meta)
        start := time.Now()
        resp, err := client.Send(msg.Data)
        stop := time.Now()
    
        if err != nil {
            Debug(1, fmt.Sprintf("[HTTP-OUTPUT] error when sending: %q", err))
            return
        }
        if resp == nil {
            return
        }
    
        if o.config.TrackResponses {
            o.responses <- &response{resp, uuid, start.UnixNano(), stop.UnixNano() - start.UnixNano()}
        }
    
        if o.elasticSearch != nil {
            o.elasticSearch.ResponseAnalyze(msg.Data, resp, start, stop)
        }
    }

    发送细节,各种配置生效点

    func (c *HTTPClient) Send(data []byte) ([]byte, error) {
        var req *http.Request
        var resp *http.Response
        var err error
    
        req, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(data)))
        if err != nil {
            return nil, err
        }
        // we don't send CONNECT or OPTIONS request
        if req.Method == http.MethodConnect {
            return nil, nil
        }
    
        if !c.config.OriginalHost {
            req.Host = c.config.url.Host
        }
    
        // fix #862
        if c.config.url.Path == "" && c.config.url.RawQuery == "" {
            req.URL.Scheme = c.config.url.Scheme
            req.URL.Host = c.config.url.Host
        } else {
            req.URL = c.config.url
        }
    
        // force connection to not be closed, which can affect the global client
        req.Close = false
        // it's an error if this is not equal to empty string
        req.RequestURI = ""
    
        resp, err = c.Client.Do(req)
        if err != nil {
            return nil, err
        }
        if c.config.TrackResponses {
            return httputil.DumpResponse(resp, true)
        }
        _ = resp.Body.Close()
        return nil, nil
    }

    master工作进程,超时设置生效等

    func (o *HTTPOutput) workerMaster() {
        var timer = time.NewTimer(o.config.WorkerTimeout)
        defer func() {
            // recover from panics caused by trying to send in
            // a closed chan(o.stopWorker)
            recover()
        }()
        defer timer.Stop()
        for {
            select {
            case <-o.stop:
                return
            default:
                <-timer.C
            }
            // rollback workers
        rollback:
            if atomic.LoadInt32(&o.activeWorkers) > int32(o.config.WorkersMin) && len(o.queue) < 1 {
                // close one worker
                o.stopWorker <- struct{}{}
                atomic.AddInt32(&o.activeWorkers, -1)
                goto rollback
            }
            timer.Reset(o.config.WorkerTimeout)
        }
    }
  • 相关阅读:
    Git本地库在哪
    Git&GitHub-添加提交以及查看状态
    linux命令——find
    正则表达式
    再访JavaScript对象(原型链和闭包)
    RabbitQM(消息队列)
    Java泛型(T)与通配符?
    Linux设置文件权限和归属
    英语单词
    RabbitQM使用笔记
  • 原文地址:https://www.cnblogs.com/it-worker365/p/15113764.html
Copyright © 2011-2022 走看看