zoukankan      html  css  js  c++  java
  • nsqlookup_protocol_v1.go

    package nsqlookupd

    import (
        "bufio"
        "encoding/binary"
        "encoding/json"
        "fmt"
        "io"
        "log"
        "net"
        "os"
        "strings"
        "sync/atomic"
        "time"

        "github.com/nsqio/nsq/internal/protocol"
        "github.com/nsqio/nsq/internal/version"
    )

    type LookupProtocolV1 struct {
        ctx *Context
    }

    func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
        var err error
        var line string

        client := NewClientV1(conn)
        reader := bufio.NewReader(client)
        for {
            line, err = reader.ReadString('
    ')
            if err != nil {
                break
            }

            line = strings.TrimSpace(line)
            params := strings.Split(line, " ")

            var response []byte
            response, err = p.Exec(client, reader, params)
            if err != nil {
                ctx := ""
                if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
                    ctx = " - " + parentErr.Error()
                }
                p.ctx.nsqlookupd.logf("ERROR: [%s] - %s%s", client, err, ctx)

                _, sendErr := protocol.SendResponse(client, []byte(err.Error()))
                if sendErr != nil {
                    p.ctx.nsqlookupd.logf("ERROR: [%s] - %s%s", client, sendErr, ctx)
                    break
                }

                // errors of type FatalClientErr should forceably close the connection
                if _, ok := err.(*protocol.FatalClientErr); ok {
                    break
                }
                continue
            }

            if response != nil {
                _, err = protocol.SendResponse(client, response)
                if err != nil {
                    break
                }
            }
        }

        conn.Close()
        p.ctx.nsqlookupd.logf("CLIENT(%s): closing", client)
        if client.peerInfo != nil {
            registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id)
            for _, r := range registrations {
                if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {
                    p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
                        client, r.Category, r.Key, r.SubKey)
                }
            }
        }
        return err
    }

    func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
        switch params[0] {
        case "PING":
            return p.PING(client, params)
        case "IDENTIFY":
            return p.IDENTIFY(client, reader, params[1:])
        case "REGISTER":
            return p.REGISTER(client, reader, params[1:])
        case "UNREGISTER":
            return p.UNREGISTER(client, reader, params[1:])
        }
        return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
    }

    func getTopicChan(command string, params []string) (string, string, error) {
        if len(params) == 0 {
            return "", "", protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("%s insufficient number of params", command))
        }

        topicName := params[0]
        var channelName string
        if len(params) >= 2 {
            channelName = params[1]
        }

        if !protocol.IsValidTopicName(topicName) {
            return "", "", protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("%s topic name '%s' is not valid", command, topicName))
        }

        if channelName != "" && !protocol.IsValidChannelName(channelName) {
            return "", "", protocol.NewFatalClientErr(nil, "E_BAD_CHANNEL", fmt.Sprintf("%s channel name '%s' is not valid", command, channelName))
        }

        return topicName, channelName, nil
    }

    func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
        if client.peerInfo == nil {
            return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
        }

        topic, channel, err := getTopicChan("REGISTER", params)
        if err != nil {
            return nil, err
        }

        if channel != "" {
            key := Registration{"channel", topic, channel}
            if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
                p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s",
                    client, "channel", topic, channel)
            }
        }
        key := Registration{"topic", topic, ""}
        if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
            p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s",
                client, "topic", topic, "")
        }

        return []byte("OK"), nil
    }

    func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
        if client.peerInfo == nil {
            return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
        }

        topic, channel, err := getTopicChan("UNREGISTER", params)
        if err != nil {
            return nil, err
        }

        if channel != "" {
            key := Registration{"channel", topic, channel}
            removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id)
            if removed {
                p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
                    client, "channel", topic, channel)
            }
            // for ephemeral channels, remove the channel as well if it has no producers
            if left == 0 && strings.HasSuffix(channel, "#ephemeral") {
                p.ctx.nsqlookupd.DB.RemoveRegistration(key)
            }
        } else {
            // no channel was specified so this is a topic unregistration
            // remove all of the channel registrations...
            // normally this shouldn't happen which is why we print a warning message
            // if anything is actually removed
            registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*")
            for _, r := range registrations {
                if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {
                    p.ctx.nsqlookupd.logf("WARNING: client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s",
                        client, "channel", topic, r.SubKey)
                }
            }

            key := Registration{"topic", topic, ""}
            if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id); removed {
                p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
                    client, "topic", topic, "")
            }
        }

        return []byte("OK"), nil
    }

    func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
        var err error

        if client.peerInfo != nil {
            return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again")
        }

        var bodyLen int32
        err = binary.Read(reader, binary.BigEndian, &bodyLen)
        if err != nil {
            return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size")
        }

        body := make([]byte, bodyLen)
        _, err = io.ReadFull(reader, body)
        if err != nil {
            return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body")
        }

        // body is a json structure with producer information
        peerInfo := PeerInfo{id: client.RemoteAddr().String()}
        err = json.Unmarshal(body, &peerInfo)
        if err != nil {
            return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body")
        }

        peerInfo.RemoteAddress = client.RemoteAddr().String()

        // require all fields
        if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.HTTPPort == 0 || peerInfo.Version == "" {
            return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields")
        }

        atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano())

        p.ctx.nsqlookupd.logf("CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s",
            client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTTPPort, peerInfo.Version)

        client.peerInfo = &peerInfo
        if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) {
            p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "")
        }

        // build a response
        data := make(map[string]interface{})
        data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Port
        data["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Port
        data["version"] = version.Binary
        hostname, err := os.Hostname()
        if err != nil {
            log.Fatalf("ERROR: unable to get hostname %s", err)
        }
        data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddress
        data["hostname"] = hostname

        response, err := json.Marshal(data)
        if err != nil {
            p.ctx.nsqlookupd.logf("ERROR: marshaling %v", data)
            return []byte("OK"), nil
        }
        return response, nil
    }

    func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, error) {
        if client.peerInfo != nil {
            // we could get a PING before other commands on the same client connection
            cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate))
            now := time.Now()
            p.ctx.nsqlookupd.logf("CLIENT(%s): pinged (last ping %s)", client.peerInfo.id,
                now.Sub(cur))
            atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano())
        }
        return []byte("OK"), nil
    }

  • 相关阅读:
    第二个冲刺
    实验四主存空间的分配和回收
    Sprint总结
    Sprint回顾
    Scrum项目5.0
    hibernate jpa 注解 @Temporal(TemporalType.DATE) 格式化时间日期,页面直接得到格式化类型的值
    获得HttpServletResponse及其他对象
    UUID.randomUUID()方法介绍
    psp个人软件过程需求文档
    电子产品自动搜索比价系统设计与实现 项目愿景与范围
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7457150.html
Copyright © 2011-2022 走看看