zoukankan      html  css  js  c++  java
  • lookup_peer.go

    package nsqd

    import (
        "encoding/binary"
        "fmt"
        "io"
        "net"
        "time"

        "github.com/nsqio/go-nsq"
    )

    // lookupPeer is a low-level type for connecting/reading/writing to nsqlookupd
    //
    // A lookupPeer instance is designed to connect lazily to nsqlookupd and reconnect
    // gracefully (i.e. it is all handled by the library).  Clients can simply use the
    // Command interface to perform a round-trip.
    type lookupPeer struct {
        l               Logger
        addr            string
        conn            net.Conn
        state           int32
        connectCallback func(*lookupPeer)
        maxBodySize     int64
        Info            peerInfo
    }

    // peerInfo contains metadata for a lookupPeer instance (and is JSON marshalable)
    type peerInfo struct {
        TCPPort          int    `json:"tcp_port"`
        HTTPPort         int    `json:"http_port"`
        Version          string `json:"version"`
        BroadcastAddress string `json:"broadcast_address"`
    }

    // newLookupPeer creates a new lookupPeer instance connecting to the supplied address.
    //
    // The supplied connectCallback will be called *every* time the instance connects.
    func newLookupPeer(addr string, maxBodySize int64, l Logger, connectCallback func(*lookupPeer)) *lookupPeer {
        return &lookupPeer{
            l:               l,
            addr:            addr,
            state:           stateDisconnected,
            maxBodySize:     maxBodySize,
            connectCallback: connectCallback,
        }
    }

    // Connect will Dial the specified address, with timeouts
    func (lp *lookupPeer) Connect() error {
        lp.l.Output(2, fmt.Sprintf("LOOKUP connecting to %s", lp.addr))
        conn, err := net.DialTimeout("tcp", lp.addr, time.Second)
        if err != nil {
            return err
        }
        lp.conn = conn
        return nil
    }

    // String returns the specified address
    func (lp *lookupPeer) String() string {
        return lp.addr
    }

    // Read implements the io.Reader interface, adding deadlines
    func (lp *lookupPeer) Read(data []byte) (int, error) {
        lp.conn.SetReadDeadline(time.Now().Add(time.Second))
        return lp.conn.Read(data)
    }

    // Write implements the io.Writer interface, adding deadlines
    func (lp *lookupPeer) Write(data []byte) (int, error) {
        lp.conn.SetWriteDeadline(time.Now().Add(time.Second))
        return lp.conn.Write(data)
    }

    // Close implements the io.Closer interface
    func (lp *lookupPeer) Close() error {
        lp.state = stateDisconnected
        if lp.conn != nil {
            return lp.conn.Close()
        }
        return nil
    }

    // Command performs a round-trip for the specified Command.
    //
    // It will lazily connect to nsqlookupd and gracefully handle
    // reconnecting in the event of a failure.
    //
    // It returns the response from nsqlookupd as []byte
    func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) {
        initialState := lp.state
        if lp.state != stateConnected {
            err := lp.Connect()
            if err != nil {
                return nil, err
            }
            lp.state = stateConnected
            lp.Write(nsq.MagicV1)
            if initialState == stateDisconnected {
                lp.connectCallback(lp)
            }
        }
        if cmd == nil {
            return nil, nil
        }
        _, err := cmd.WriteTo(lp)
        if err != nil {
            lp.Close()
            return nil, err
        }
        resp, err := readResponseBounded(lp, lp.maxBodySize)
        if err != nil {
            lp.Close()
            return nil, err
        }
        return resp, nil
    }

    func readResponseBounded(r io.Reader, limit int64) ([]byte, error) {
        var msgSize int32

        // message size
        err := binary.Read(r, binary.BigEndian, &msgSize)
        if err != nil {
            return nil, err
        }

        if int64(msgSize) > limit {
            return nil, fmt.Errorf("response body size (%d) is greater than limit (%d)",
                msgSize, limit)
        }

        // message binary data
        buf := make([]byte, msgSize)
        _, err = io.ReadFull(r, buf)
        if err != nil {
            return nil, err
        }

        return buf, nil
    }

  • 相关阅读:
    switch语句(上)(转载)
    MSIL实用指南-生成if...else...语句
    Docker 部署Jira8.1.0
    K8S从入门到放弃系列-(13)Kubernetes集群mertics-server部署
    K8S踩坑篇-master节点作为node节点加入集群
    K8S从入门到放弃系列-(12)Kubernetes集群Coredns部署
    K8S从入门到放弃系列-(11)kubernetes集群网络Calico部署
    K8S从入门到放弃系列-(10)kubernetes集群之kube-proxy部署
    K8S从入门到放弃系列-(9)kubernetes集群之kubelet部署
    K8S从入门到放弃系列-(8)kube-apiserver 高可用配置
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7457338.html
Copyright © 2011-2022 走看看