zoukankan      html  css  js  c++  java
  • http.go

    package nsqd

    import (
        "bufio"
        "bytes"
        "encoding/json"
        "fmt"
        "io"
        "io/ioutil"
        "net"
        "net/http"
        "net/http/pprof"
        "net/url"
        "os"
        "reflect"
        "runtime"
        "strconv"
        "strings"
        "time"

        "github.com/julienschmidt/httprouter"
        "github.com/nsqio/nsq/internal/http_api"
        "github.com/nsqio/nsq/internal/protocol"
        "github.com/nsqio/nsq/internal/version"
    )

    type httpServer struct {
        ctx         *context
        tlsEnabled  bool
        tlsRequired bool
        router      http.Handler
    }

    func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer {
        log := http_api.Log(ctx.nsqd.getOpts().Logger)

        router := httprouter.New()
        router.HandleMethodNotAllowed = true
        router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.getOpts().Logger)
        router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.getOpts().Logger)
        router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.getOpts().Logger)
        s := &httpServer{
            ctx:         ctx,
            tlsEnabled:  tlsEnabled,
            tlsRequired: tlsRequired,
            router:      router,
        }

        router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))

        // v1 negotiate
        router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.NegotiateVersion))
        router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.NegotiateVersion))
        router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.NegotiateVersion))

        // only v1
        router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
        router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
        router.Handle("POST", "/topic/empty", http_api.Decorate(s.doEmptyTopic, log, http_api.V1))
        router.Handle("POST", "/topic/pause", http_api.Decorate(s.doPauseTopic, log, http_api.V1))
        router.Handle("POST", "/topic/unpause", http_api.Decorate(s.doPauseTopic, log, http_api.V1))
        router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
        router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
        router.Handle("POST", "/channel/empty", http_api.Decorate(s.doEmptyChannel, log, http_api.V1))
        router.Handle("POST", "/channel/pause", http_api.Decorate(s.doPauseChannel, log, http_api.V1))
        router.Handle("POST", "/channel/unpause", http_api.Decorate(s.doPauseChannel, log, http_api.V1))
        router.Handle("GET", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))
        router.Handle("PUT", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))

        // deprecated, v1 negotiate
        router.Handle("POST", "/put", http_api.Decorate(s.doPUB, http_api.NegotiateVersion))
        router.Handle("POST", "/mput", http_api.Decorate(s.doMPUB, http_api.NegotiateVersion))
        router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.NegotiateVersion))
        router.Handle("POST", "/create_topic", http_api.Decorate(s.doCreateTopic, log, http_api.NegotiateVersion))
        router.Handle("POST", "/delete_topic", http_api.Decorate(s.doDeleteTopic, log, http_api.NegotiateVersion))
        router.Handle("POST", "/empty_topic", http_api.Decorate(s.doEmptyTopic, log, http_api.NegotiateVersion))
        router.Handle("POST", "/pause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion))
        router.Handle("POST", "/unpause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion))
        router.Handle("POST", "/create_channel", http_api.Decorate(s.doCreateChannel, log, http_api.NegotiateVersion))
        router.Handle("POST", "/delete_channel", http_api.Decorate(s.doDeleteChannel, log, http_api.NegotiateVersion))
        router.Handle("POST", "/empty_channel", http_api.Decorate(s.doEmptyChannel, log, http_api.NegotiateVersion))
        router.Handle("POST", "/pause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion))
        router.Handle("POST", "/unpause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion))
        router.Handle("GET", "/create_topic", http_api.Decorate(s.doCreateTopic, log, http_api.NegotiateVersion))
        router.Handle("GET", "/delete_topic", http_api.Decorate(s.doDeleteTopic, log, http_api.NegotiateVersion))
        router.Handle("GET", "/empty_topic", http_api.Decorate(s.doEmptyTopic, log, http_api.NegotiateVersion))
        router.Handle("GET", "/pause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion))
        router.Handle("GET", "/unpause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion))
        router.Handle("GET", "/create_channel", http_api.Decorate(s.doCreateChannel, log, http_api.NegotiateVersion))
        router.Handle("GET", "/delete_channel", http_api.Decorate(s.doDeleteChannel, log, http_api.NegotiateVersion))
        router.Handle("GET", "/empty_channel", http_api.Decorate(s.doEmptyChannel, log, http_api.NegotiateVersion))
        router.Handle("GET", "/pause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion))
        router.Handle("GET", "/unpause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion))

        // debug
        router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
        router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
        router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
        router.HandlerFunc("POST", "/debug/pprof/symbol", pprof.Symbol)
        router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
        router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
        router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
        router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
        router.Handle("PUT", "/debug/setblockrate", http_api.Decorate(setBlockRateHandler, log, http_api.PlainText))
        router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))

        return s
    }

    func setBlockRateHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        rate, err := strconv.Atoi(req.FormValue("rate"))
        if err != nil {
            return nil, http_api.Err{http.StatusBadRequest, fmt.Sprintf("invalid block rate : %s", err.Error())}
        }
        runtime.SetBlockProfileRate(rate)
        return nil, nil
    }

    func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
        if !s.tlsEnabled && s.tlsRequired {
            resp := fmt.Sprintf(`{"message": "TLS_REQUIRED", "https_port": %d}`,
                s.ctx.nsqd.RealHTTPSAddr().Port)
            http_api.Respond(w, 403, "", resp)
            return
        }
        s.router.ServeHTTP(w, req)
    }

    func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        health := s.ctx.nsqd.GetHealth()
        if !s.ctx.nsqd.IsHealthy() {
            return nil, http_api.Err{500, health}
        }
        return health, nil
    }

    func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        hostname, err := os.Hostname()
        if err != nil {
            return nil, http_api.Err{500, err.Error()}
        }
        return struct {
            Version          string `json:"version"`
            BroadcastAddress string `json:"broadcast_address"`
            Hostname         string `json:"hostname"`
            HTTPPort         int    `json:"http_port"`
            TCPPort          int    `json:"tcp_port"`
            StartTime        int64  `json:"start_time"`
        }{
            Version:          version.Binary,
            BroadcastAddress: s.ctx.nsqd.getOpts().BroadcastAddress,
            Hostname:         hostname,
            TCPPort:          s.ctx.nsqd.RealTCPAddr().Port,
            HTTPPort:         s.ctx.nsqd.RealHTTPAddr().Port,
            StartTime:        s.ctx.nsqd.GetStartTime().Unix(),
        }, nil
    }

    func (s *httpServer) getExistingTopicFromQuery(req *http.Request) (*http_api.ReqParams, *Topic, string, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err)
            return nil, nil, "", http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
        if err != nil {
            return nil, nil, "", http_api.Err{400, err.Error()}
        }

        topic, err := s.ctx.nsqd.GetExistingTopic(topicName)
        if err != nil {
            return nil, nil, "", http_api.Err{404, "TOPIC_NOT_FOUND"}
        }

        return reqParams, topic, channelName, err
    }

    func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, error) {
        reqParams, err := url.ParseQuery(req.URL.RawQuery)
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err)
            return nil, nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicNames, ok := reqParams["topic"]
        if !ok {
            return nil, nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
        }
        topicName := topicNames[0]

        if !protocol.IsValidTopicName(topicName) {
            return nil, nil, http_api.Err{400, "INVALID_TOPIC"}
        }

        return reqParams, s.ctx.nsqd.GetTopic(topicName), nil
    }

    func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        // TODO: one day I'd really like to just error on chunked requests
        // to be able to fail "too big" requests before we even read

        if req.ContentLength > s.ctx.nsqd.getOpts().MaxMsgSize {
            return nil, http_api.Err{413, "MSG_TOO_BIG"}
        }

        // add 1 so that it's greater than our max when we test for it
        // (LimitReader returns a "fake" EOF)
        readMax := s.ctx.nsqd.getOpts().MaxMsgSize + 1
        body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
        if err != nil {
            return nil, http_api.Err{500, "INTERNAL_ERROR"}
        }
        if int64(len(body)) == readMax {
            return nil, http_api.Err{413, "MSG_TOO_BIG"}
        }
        if len(body) == 0 {
            return nil, http_api.Err{400, "MSG_EMPTY"}
        }

        reqParams, topic, err := s.getTopicFromQuery(req)
        if err != nil {
            return nil, err
        }

        var deferred time.Duration
        if ds, ok := reqParams["defer"]; ok {
            var di int64
            di, err = strconv.ParseInt(ds[0], 10, 64)
            if err != nil {
                return nil, http_api.Err{400, "INVALID_DEFER"}
            }
            deferred = time.Duration(di) * time.Millisecond
            if deferred < 0 || deferred > s.ctx.nsqd.getOpts().MaxReqTimeout {
                return nil, http_api.Err{400, "INVALID_DEFER"}
            }
        }

        msg := NewMessage(<-s.ctx.nsqd.idChan, body)
        msg.deferred = deferred
        err = topic.PutMessage(msg)
        if err != nil {
            return nil, http_api.Err{503, "EXITING"}
        }

        return "OK", nil
    }

    func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        var msgs []*Message
        var exit bool

        // TODO: one day I'd really like to just error on chunked requests
        // to be able to fail "too big" requests before we even read

        if req.ContentLength > s.ctx.nsqd.getOpts().MaxBodySize {
            return nil, http_api.Err{413, "BODY_TOO_BIG"}
        }

        reqParams, topic, err := s.getTopicFromQuery(req)
        if err != nil {
            return nil, err
        }

        _, ok := reqParams["binary"]
        if ok {
            tmp := make([]byte, 4)
            msgs, err = readMPUB(req.Body, tmp, s.ctx.nsqd.idChan,
                s.ctx.nsqd.getOpts().MaxMsgSize)
            if err != nil {
                return nil, http_api.Err{413, err.(*protocol.FatalClientErr).Code[2:]}
            }
        } else {
            // add 1 so that it's greater than our max when we test for it
            // (LimitReader returns a "fake" EOF)
            readMax := s.ctx.nsqd.getOpts().MaxBodySize + 1
            rdr := bufio.NewReader(io.LimitReader(req.Body, readMax))
            total := 0
            for !exit {
                var block []byte
                block, err = rdr.ReadBytes('
    ')
                if err != nil {
                    if err != io.EOF {
                        return nil, http_api.Err{500, "INTERNAL_ERROR"}
                    }
                    exit = true
                }
                total += len(block)
                if int64(total) == readMax {
                    return nil, http_api.Err{413, "BODY_TOO_BIG"}
                }

                if len(block) > 0 && block[len(block)-1] == '
    ' {
                    block = block[:len(block)-1]
                }

                // silently discard 0 length messages
                // this maintains the behavior pre 0.2.22
                if len(block) == 0 {
                    continue
                }

                if int64(len(block)) > s.ctx.nsqd.getOpts().MaxMsgSize {
                    return nil, http_api.Err{413, "MSG_TOO_BIG"}
                }

                msg := NewMessage(<-s.ctx.nsqd.idChan, block)
                msgs = append(msgs, msg)
            }
        }

        err = topic.PutMessages(msgs)
        if err != nil {
            return nil, http_api.Err{503, "EXITING"}
        }

        return "OK", nil
    }

    func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        _, _, err := s.getTopicFromQuery(req)
        return nil, err
    }

    func (s *httpServer) doEmptyTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err)
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
        }

        if !protocol.IsValidTopicName(topicName) {
            return nil, http_api.Err{400, "INVALID_TOPIC"}
        }

        topic, err := s.ctx.nsqd.GetExistingTopic(topicName)
        if err != nil {
            return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
        }

        err = topic.Empty()
        if err != nil {
            return nil, http_api.Err{500, "INTERNAL_ERROR"}
        }

        return nil, nil
    }

    func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err)
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
        }

        err = s.ctx.nsqd.DeleteExistingTopic(topicName)
        if err != nil {
            return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
        }

        return nil, nil
    }

    func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err)
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
        }

        topic, err := s.ctx.nsqd.GetExistingTopic(topicName)
        if err != nil {
            return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
        }

        if strings.Contains(req.URL.Path, "unpause") {
            err = topic.UnPause()
        } else {
            err = topic.Pause()
        }
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failure in %s - %s", req.URL.Path, err)
            return nil, http_api.Err{500, "INTERNAL_ERROR"}
        }

        // pro-actively persist metadata so in case of process failure
        // nsqd won't suddenly (un)pause a topic
        s.ctx.nsqd.Lock()
        s.ctx.nsqd.PersistMetadata()
        s.ctx.nsqd.Unlock()
        return nil, nil
    }

    func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        _, topic, channelName, err := s.getExistingTopicFromQuery(req)
        if err != nil {
            return nil, err
        }
        topic.GetChannel(channelName)
        return nil, nil
    }

    func (s *httpServer) doEmptyChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        _, topic, channelName, err := s.getExistingTopicFromQuery(req)
        if err != nil {
            return nil, err
        }

        channel, err := topic.GetExistingChannel(channelName)
        if err != nil {
            return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
        }

        err = channel.Empty()
        if err != nil {
            return nil, http_api.Err{500, "INTERNAL_ERROR"}
        }

        return nil, nil
    }

    func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        _, topic, channelName, err := s.getExistingTopicFromQuery(req)
        if err != nil {
            return nil, err
        }

        err = topic.DeleteExistingChannel(channelName)
        if err != nil {
            return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
        }

        return nil, nil
    }

    func (s *httpServer) doPauseChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        _, topic, channelName, err := s.getExistingTopicFromQuery(req)
        if err != nil {
            return nil, err
        }

        channel, err := topic.GetExistingChannel(channelName)
        if err != nil {
            return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
        }

        if strings.Contains(req.URL.Path, "unpause") {
            err = channel.UnPause()
        } else {
            err = channel.Pause()
        }
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failure in %s - %s", req.URL.Path, err)
            return nil, http_api.Err{500, "INTERNAL_ERROR"}
        }

        // pro-actively persist metadata so in case of process failure
        // nsqd won't suddenly (un)pause a channel
        s.ctx.nsqd.Lock()
        s.ctx.nsqd.PersistMetadata()
        s.ctx.nsqd.Unlock()
        return nil, nil
    }

    func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err)
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }
        formatString, _ := reqParams.Get("format")
        topicName, _ := reqParams.Get("topic")
        channelName, _ := reqParams.Get("channel")
        jsonFormat := formatString == "json"

        stats := s.ctx.nsqd.GetStats()
        health := s.ctx.nsqd.GetHealth()
        startTime := s.ctx.nsqd.GetStartTime()
        uptime := time.Since(startTime)

        // If we WERE given a topic-name, remove stats for all the other topics:
        if len(topicName) > 0 {
            // Find the desired-topic-index:
            for _, topicStats := range stats {
                if topicStats.TopicName == topicName {
                    // If we WERE given a channel-name, remove stats for all the other channels:
                    if len(channelName) > 0 {
                        // Find the desired-channel:
                        for _, channelStats := range topicStats.Channels {
                            if channelStats.ChannelName == channelName {
                                topicStats.Channels = []ChannelStats{channelStats}
                                // We've got the channel we were looking for:
                                break
                            }
                        }
                    }

                    // We've got the topic we were looking for:
                    stats = []TopicStats{topicStats}
                    break
                }
            }
        }

        if !jsonFormat {
            return s.printStats(stats, health, startTime, uptime), nil
        }

        return struct {
            Version   string       `json:"version"`
            Health    string       `json:"health"`
            StartTime int64        `json:"start_time"`
            Topics    []TopicStats `json:"topics"`
        }{version.Binary, health, startTime.Unix(), stats}, nil
    }

    func (s *httpServer) printStats(stats []TopicStats, health string, startTime time.Time, uptime time.Duration) []byte {
        var buf bytes.Buffer
        w := &buf
        now := time.Now()
        io.WriteString(w, fmt.Sprintf("%s
    ", version.String("nsqd")))
        io.WriteString(w, fmt.Sprintf("start_time %v
    ", startTime.Format(time.RFC3339)))
        io.WriteString(w, fmt.Sprintf("uptime %s
    ", uptime))
        if len(stats) == 0 {
            io.WriteString(w, "
    NO_TOPICS
    ")
            return buf.Bytes()
        }
        io.WriteString(w, fmt.Sprintf("
    Health: %s
    ", health))
        for _, t := range stats {
            var pausedPrefix string
            if t.Paused {
                pausedPrefix = "*P "
            } else {
                pausedPrefix = "   "
            }
            io.WriteString(w, fmt.Sprintf("
    %s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s
    ",
                pausedPrefix,
                t.TopicName,
                t.Depth,
                t.BackendDepth,
                t.MessageCount,
                t.E2eProcessingLatency))
            for _, c := range t.Channels {
                if c.Paused {
                    pausedPrefix = "   *P "
                } else {
                    pausedPrefix = "      "
                }
                io.WriteString(w,
                    fmt.Sprintf("%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s
    ",
                        pausedPrefix,
                        c.ChannelName,
                        c.Depth,
                        c.BackendDepth,
                        c.InFlightCount,
                        c.DeferredCount,
                        c.RequeueCount,
                        c.TimeoutCount,
                        c.MessageCount,
                        c.E2eProcessingLatency))
                for _, client := range c.Clients {
                    connectTime := time.Unix(client.ConnectTime, 0)
                    // truncate to the second
                    duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second
                    _, port, _ := net.SplitHostPort(client.RemoteAddress)
                    io.WriteString(w, fmt.Sprintf("        [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s
    ",
                        client.Version,
                        fmt.Sprintf("%s:%s", client.Name, port),
                        client.State,
                        client.InFlightCount,
                        client.ReadyCount,
                        client.FinishCount,
                        client.RequeueCount,
                        client.MessageCount,
                        duration,
                    ))
                }
            }
        }
        return buf.Bytes()
    }

    func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        opt := ps.ByName("opt")

        if req.Method == "PUT" {
            // add 1 so that it's greater than our max when we test for it
            // (LimitReader returns a "fake" EOF)
            readMax := s.ctx.nsqd.getOpts().MaxMsgSize + 1
            body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
            if err != nil {
                return nil, http_api.Err{500, "INTERNAL_ERROR"}
            }
            if int64(len(body)) == readMax || len(body) == 0 {
                return nil, http_api.Err{413, "INVALID_VALUE"}
            }

            opts := *s.ctx.nsqd.getOpts()
            switch opt {
            case "nsqlookupd_tcp_addresses":
                err := json.Unmarshal(body, &opts.NSQLookupdTCPAddresses)
                if err != nil {
                    return nil, http_api.Err{400, "INVALID_VALUE"}
                }
            case "verbose":
                err := json.Unmarshal(body, &opts.Verbose)
                if err != nil {
                    return nil, http_api.Err{400, "INVALID_VALUE"}
                }
            default:
                return nil, http_api.Err{400, "INVALID_OPTION"}
            }
            s.ctx.nsqd.swapOpts(&opts)
            s.ctx.nsqd.triggerOptsNotification()
        }

        v, ok := getOptByCfgName(s.ctx.nsqd.getOpts(), opt)
        if !ok {
            return nil, http_api.Err{400, "INVALID_OPTION"}
        }

        return v, nil
    }

    func getOptByCfgName(opts interface{}, name string) (interface{}, bool) {
        val := reflect.ValueOf(opts).Elem()
        typ := val.Type()
        for i := 0; i < typ.NumField(); i++ {
            field := typ.Field(i)
            flagName := field.Tag.Get("flag")
            cfgName := field.Tag.Get("cfg")
            if flagName == "" {
                continue
            }
            if cfgName == "" {
                cfgName = strings.Replace(flagName, "-", "_", -1)
            }
            if name != cfgName {
                continue
            }
            return val.FieldByName(field.Name).Interface(), true
        }
        return nil, false
    }

  • 相关阅读:
    怎么查看京东店铺的品牌ID
    PPT编辑的时候很卡,放映的时候不卡,咋回事?
    codevs 1702素数判定2
    codevs 2530大质数
    codevs 1488GangGang的烦恼
    codevs 2851 菜菜买气球
    hdu 5653 Bomber Man wants to bomb an Array
    poj 3661 Running
    poj 1651 Multiplication Puzzle
    hdu 2476 String Painter
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7457320.html
Copyright © 2011-2022 走看看