zoukankan      html  css  js  c++  java
  • goreplay~拦截器设置

    官文

    https://github.com/buger/goreplay/wiki/Dealing-with-missing-requests-and-responses

    实际操作中,发现录制流量并发达到一定量会丢失很多请求,经过阅读文档和测试,发现最相关的一个参数是 --input-raw-buffer-size

    由于goreplay本身需要对数据包进行读取,协议解析等,借助于pcap及os缓冲区,当缓冲区不足,到达的数据包不足以组装Http请求则出现丢失或失效请求,无法正确处理

    具体看代码

    该参数是作用在底层录制上 capture.go

    func (l *Listener) PcapHandle(ifi pcap.Interface) (handle *pcap.Handle, err error) {
        var inactive *pcap.InactiveHandle
        inactive, err = pcap.NewInactiveHandle(ifi.Name)
        if err != nil {
            return nil, fmt.Errorf("inactive handle error: %q, interface: %q", err, ifi.Name)
        }
        defer inactive.CleanUp()
        if l.TimestampType != "" {
            var ts pcap.TimestampSource
            ts, err = pcap.TimestampSourceFromString(l.TimestampType)
            err = inactive.SetTimestampSource(ts)
            if err != nil {
                return nil, fmt.Errorf("%q: supported timestamps: %q, interface: %q", err, inactive.SupportedTimestamps(), ifi.Name)
            }
        }
        if l.Promiscuous {
            if err = inactive.SetPromisc(l.Promiscuous); err != nil {
                return nil, fmt.Errorf("promiscuous mode error: %q, interface: %q", err, ifi.Name)
            }
        }
        if l.Monitor {
            if err = inactive.SetRFMon(l.Monitor); err != nil && !errors.Is(err, pcap.CannotSetRFMon) {
                return nil, fmt.Errorf("monitor mode error: %q, interface: %q", err, ifi.Name)
            }
        }
    
        var snap int
    
        if !l.Snaplen {
            infs, _ := net.Interfaces()
            for _, i := range infs {
                if i.Name == ifi.Name {
                    snap = i.MTU + 200
                }
            }
        }
    
        if snap == 0 {
            snap = 64<<10 + 200
        }
    
        err = inactive.SetSnapLen(snap)
        if err != nil {
            return nil, fmt.Errorf("snapshot length error: %q, interface: %q", err, ifi.Name)
        }
        if l.BufferSize > 0 {
            err = inactive.SetBufferSize(int(l.BufferSize))
            if err != nil {
                return nil, fmt.Errorf("handle buffer size error: %q, interface: %q", err, ifi.Name)
            }
        }
        if l.BufferTimeout == 0 {
            l.BufferTimeout = pcap.BlockForever
        }
        err = inactive.SetTimeout(l.BufferTimeout)
        if err != nil {
            return nil, fmt.Errorf("handle buffer timeout error: %q, interface: %q", err, ifi.Name)
        }
        handle, err = inactive.Activate()
        if err != nil {
            return nil, fmt.Errorf("PCAP Activate device error: %q, interface: %q", err, ifi.Name)
        }
        l.BPFFilter = l.Filter(ifi)
        fmt.Println("Interface:", ifi.Name, ". BPF Filter:", l.BPFFilter)
        err = handle.SetBPFFilter(l.BPFFilter)
        if err != nil {
            handle.Close()
            return nil, fmt.Errorf("BPF filter error: %q%s, interface: %q", err, l.BPFFilter, ifi.Name)
        }
        return
    }

    另一个点 copy-buffer-size

    emitter.go

    从输入到输出数据传送的事件循环

    func (e *Emitter) Start(plugins *InOutPlugins, middlewareCmd string) {
        if Settings.CopyBufferSize < 1 {
            Settings.CopyBufferSize = 5 << 20
        }
        e.plugins = plugins
    
        if middlewareCmd != "" {
            middleware := NewMiddleware(middlewareCmd)
    
            for _, in := range plugins.Inputs {
                middleware.ReadFrom(in)
            }
    
            e.plugins.Inputs = append(e.plugins.Inputs, middleware)
            e.plugins.All = append(e.plugins.All, middleware)
            e.Add(1)
            go func() {
                defer e.Done()
                if err := CopyMulty(middleware, plugins.Outputs...); err != nil {
                    Debug(2, fmt.Sprintf("[EMITTER] error during copy: %q", err))
                }
            }()
        } else {
            for _, in := range plugins.Inputs {
                e.Add(1)
                go func(in PluginReader) {
                    defer e.Done()
                    if err := CopyMulty(in, plugins.Outputs...); err != nil {
                        Debug(2, fmt.Sprintf("[EMITTER] error during copy: %q", err))
                    }
                }(in)
            }
        }
    }

    具体复制动作

    func CopyMulty(src PluginReader, writers ...PluginWriter) error {
        wIndex := 0
        modifier := NewHTTPModifier(&Settings.ModifierConfig)
        filteredRequests := make(map[string]int64)
        filteredRequestsLastCleanTime := time.Now().UnixNano()
        filteredCount := 0
    
        for {
            log.Println("src readplgunin %s", src)
            msg, err := src.PluginRead()
            log.Println("src msg %s", msg)
            if err != nil {
                if err == ErrorStopped || err == io.EOF {
                    return nil
                }
                return err
            }
            if msg != nil && len(msg.Data) > 0 {
                if len(msg.Data) > int(Settings.CopyBufferSize) {
                    msg.Data = msg.Data[:Settings.CopyBufferSize]
                }
                meta := payloadMeta(msg.Meta)
                if len(meta) < 3 {
                    Debug(2, fmt.Sprintf("[EMITTER] Found malformed record %q from %q", msg.Meta, src))
                    continue
                }
                requestID := byteutils.SliceToString(meta[1])
                // start a subroutine only when necessary
                if Settings.Verbose >= 3 {
                    Debug(3, "[EMITTER] input: ", byteutils.SliceToString(msg.Meta[:len(msg.Meta)-1]), " from: ", src)
                }
                if modifier != nil {
                    Debug(3, "[EMITTER] modifier:", requestID, "from:", src)
                    if isRequestPayload(msg.Meta) {
                        msg.Data = modifier.Rewrite(msg.Data)
                        // If modifier tells to skip request
                        if len(msg.Data) == 0 {
                            filteredRequests[requestID] = time.Now().UnixNano()
                            filteredCount++
                            continue
                        }
                        Debug(3, "[EMITTER] Rewritten input:", requestID, "from:", src)
    
                    } else {
                        if _, ok := filteredRequests[requestID]; ok {
                            delete(filteredRequests, requestID)
                            filteredCount--
                            continue
                        }
                    }
                }
    
                if Settings.PrettifyHTTP {
                    msg.Data = prettifyHTTP(msg.Data)
                    if len(msg.Data) == 0 {
                        continue
                    }
                }
    
                if Settings.SplitOutput {
                    if Settings.RecognizeTCPSessions {
                        if !PRO {
                            log.Fatal("Detailed TCP sessions work only with PRO license")
                        }
                        hasher := fnv.New32a()
                        hasher.Write(meta[1])
    
                        wIndex = int(hasher.Sum32()) % len(writers)
                        if _, err := writers[wIndex].PluginWrite(msg); err != nil {
                            return err
                        }
                    } else {
                        // Simple round robin
                        if _, err := writers[wIndex].PluginWrite(msg); err != nil {
                            return err
                        }
    
                        wIndex = (wIndex + 1) % len(writers)
                    }
                } else {
                    for _, dst := range writers {
                        if _, err := dst.PluginWrite(msg); err != nil {
                            return err
                        }
                    }
                }
            }
    
            // Run GC on each 1000 request
            if filteredCount > 0 && filteredCount%1000 == 0 {
                // Clean up filtered requests for which we didn't get a response to filter
                now := time.Now().UnixNano()
                if now-filteredRequestsLastCleanTime > int64(60*time.Second) {
                    for k, v := range filteredRequests {
                        if now-v > int64(60*time.Second) {
                            delete(filteredRequests, k)
                            filteredCount--
                        }
                    }
                    filteredRequestsLastCleanTime = time.Now().UnixNano()
                }
            }
        }
    }
  • 相关阅读:
    如何在IntelJ下用Maven创建一个Web项目
    Sublime快捷键大全
    Oracle常用数据字典
    POI 操作Excel疑难点笔记
    Oracle常见错误集锦
    SQL注入测试平台 SQLol -6.CHALLENGES挑战
    SQL注入测试平台 SQLol -5.DELETE注入测试
    SQL注入测试平台 SQLol -4.UPDATE注入测试
    SQL注入测试平台 SQLol -3.INSERT注入测试
    SQL注入测试平台 SQLol -2.SELECT注入测试
  • 原文地址:https://www.cnblogs.com/it-worker365/p/15114596.html
Copyright © 2011-2022 走看看