zoukankan      html  css  js  c++  java
  • http.go

    package nsqd

    import (
        "bufio"
        "bytes"
        "encoding/json"
        "fmt"
        "io"
        "io/ioutil"
        "net"
        "net/http"
        "net/http/pprof"
        "net/url"
        "os"
        "reflect"
        "runtime"
        "strconv"
        "strings"
        "time"

        "github.com/julienschmidt/httprouter"
        "github.com/nsqio/nsq/internal/http_api"
        "github.com/nsqio/nsq/internal/protocol"
        "github.com/nsqio/nsq/internal/version"
    )

    type httpServer struct {
        ctx         *context
        tlsEnabled  bool
        tlsRequired bool
        router      http.Handler
    }

    func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer {
        log := http_api.Log(ctx.nsqd.getOpts().Logger)

        router := httprouter.New()
        router.HandleMethodNotAllowed = true
        router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.getOpts().Logger)
        router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.getOpts().Logger)
        router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.getOpts().Logger)
        s := &httpServer{
            ctx:         ctx,
            tlsEnabled:  tlsEnabled,
            tlsRequired: tlsRequired,
            router:      router,
        }

        router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))

        // v1 negotiate
        router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.NegotiateVersion))
        router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.NegotiateVersion))
        router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.NegotiateVersion))

        // only v1
        router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
        router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
        router.Handle("POST", "/topic/empty", http_api.Decorate(s.doEmptyTopic, log, http_api.V1))
        router.Handle("POST", "/topic/pause", http_api.Decorate(s.doPauseTopic, log, http_api.V1))
        router.Handle("POST", "/topic/unpause", http_api.Decorate(s.doPauseTopic, log, http_api.V1))
        router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
        router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
        router.Handle("POST", "/channel/empty", http_api.Decorate(s.doEmptyChannel, log, http_api.V1))
        router.Handle("POST", "/channel/pause", http_api.Decorate(s.doPauseChannel, log, http_api.V1))
        router.Handle("POST", "/channel/unpause", http_api.Decorate(s.doPauseChannel, log, http_api.V1))
        router.Handle("GET", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))
        router.Handle("PUT", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))

        // deprecated, v1 negotiate
        router.Handle("POST", "/put", http_api.Decorate(s.doPUB, http_api.NegotiateVersion))
        router.Handle("POST", "/mput", http_api.Decorate(s.doMPUB, http_api.NegotiateVersion))
        router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.NegotiateVersion))
        router.Handle("POST", "/create_topic", http_api.Decorate(s.doCreateTopic, log, http_api.NegotiateVersion))
        router.Handle("POST", "/delete_topic", http_api.Decorate(s.doDeleteTopic, log, http_api.NegotiateVersion))
        router.Handle("POST", "/empty_topic", http_api.Decorate(s.doEmptyTopic, log, http_api.NegotiateVersion))
        router.Handle("POST", "/pause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion))
        router.Handle("POST", "/unpause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion))
        router.Handle("POST", "/create_channel", http_api.Decorate(s.doCreateChannel, log, http_api.NegotiateVersion))
        router.Handle("POST", "/delete_channel", http_api.Decorate(s.doDeleteChannel, log, http_api.NegotiateVersion))
        router.Handle("POST", "/empty_channel", http_api.Decorate(s.doEmptyChannel, log, http_api.NegotiateVersion))
        router.Handle("POST", "/pause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion))
        router.Handle("POST", "/unpause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion))
        router.Handle("GET", "/create_topic", http_api.Decorate(s.doCreateTopic, log, http_api.NegotiateVersion))
        router.Handle("GET", "/delete_topic", http_api.Decorate(s.doDeleteTopic, log, http_api.NegotiateVersion))
        router.Handle("GET", "/empty_topic", http_api.Decorate(s.doEmptyTopic, log, http_api.NegotiateVersion))
        router.Handle("GET", "/pause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion))
        router.Handle("GET", "/unpause_topic", http_api.Decorate(s.doPauseTopic, log, http_api.NegotiateVersion))
        router.Handle("GET", "/create_channel", http_api.Decorate(s.doCreateChannel, log, http_api.NegotiateVersion))
        router.Handle("GET", "/delete_channel", http_api.Decorate(s.doDeleteChannel, log, http_api.NegotiateVersion))
        router.Handle("GET", "/empty_channel", http_api.Decorate(s.doEmptyChannel, log, http_api.NegotiateVersion))
        router.Handle("GET", "/pause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion))
        router.Handle("GET", "/unpause_channel", http_api.Decorate(s.doPauseChannel, log, http_api.NegotiateVersion))

        // debug
        router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
        router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
        router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
        router.HandlerFunc("POST", "/debug/pprof/symbol", pprof.Symbol)
        router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
        router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
        router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
        router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
        router.Handle("PUT", "/debug/setblockrate", http_api.Decorate(setBlockRateHandler, log, http_api.PlainText))
        router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))

        return s
    }

    func setBlockRateHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        rate, err := strconv.Atoi(req.FormValue("rate"))
        if err != nil {
            return nil, http_api.Err{http.StatusBadRequest, fmt.Sprintf("invalid block rate : %s", err.Error())}
        }
        runtime.SetBlockProfileRate(rate)
        return nil, nil
    }

    func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
        if !s.tlsEnabled && s.tlsRequired {
            resp := fmt.Sprintf(`{"message": "TLS_REQUIRED", "https_port": %d}`,
                s.ctx.nsqd.RealHTTPSAddr().Port)
            http_api.Respond(w, 403, "", resp)
            return
        }
        s.router.ServeHTTP(w, req)
    }

    func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        health := s.ctx.nsqd.GetHealth()
        if !s.ctx.nsqd.IsHealthy() {
            return nil, http_api.Err{500, health}
        }
        return health, nil
    }

    func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        hostname, err := os.Hostname()
        if err != nil {
            return nil, http_api.Err{500, err.Error()}
        }
        return struct {
            Version          string `json:"version"`
            BroadcastAddress string `json:"broadcast_address"`
            Hostname         string `json:"hostname"`
            HTTPPort         int    `json:"http_port"`
            TCPPort          int    `json:"tcp_port"`
            StartTime        int64  `json:"start_time"`
        }{
            Version:          version.Binary,
            BroadcastAddress: s.ctx.nsqd.getOpts().BroadcastAddress,
            Hostname:         hostname,
            TCPPort:          s.ctx.nsqd.RealTCPAddr().Port,
            HTTPPort:         s.ctx.nsqd.RealHTTPAddr().Port,
            StartTime:        s.ctx.nsqd.GetStartTime().Unix(),
        }, nil
    }

    func (s *httpServer) getExistingTopicFromQuery(req *http.Request) (*http_api.ReqParams, *Topic, string, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err)
            return nil, nil, "", http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
        if err != nil {
            return nil, nil, "", http_api.Err{400, err.Error()}
        }

        topic, err := s.ctx.nsqd.GetExistingTopic(topicName)
        if err != nil {
            return nil, nil, "", http_api.Err{404, "TOPIC_NOT_FOUND"}
        }

        return reqParams, topic, channelName, err
    }

    func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, error) {
        reqParams, err := url.ParseQuery(req.URL.RawQuery)
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err)
            return nil, nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicNames, ok := reqParams["topic"]
        if !ok {
            return nil, nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
        }
        topicName := topicNames[0]

        if !protocol.IsValidTopicName(topicName) {
            return nil, nil, http_api.Err{400, "INVALID_TOPIC"}
        }

        return reqParams, s.ctx.nsqd.GetTopic(topicName), nil
    }

    func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        // TODO: one day I'd really like to just error on chunked requests
        // to be able to fail "too big" requests before we even read

        if req.ContentLength > s.ctx.nsqd.getOpts().MaxMsgSize {
            return nil, http_api.Err{413, "MSG_TOO_BIG"}
        }

        // add 1 so that it's greater than our max when we test for it
        // (LimitReader returns a "fake" EOF)
        readMax := s.ctx.nsqd.getOpts().MaxMsgSize + 1
        body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
        if err != nil {
            return nil, http_api.Err{500, "INTERNAL_ERROR"}
        }
        if int64(len(body)) == readMax {
            return nil, http_api.Err{413, "MSG_TOO_BIG"}
        }
        if len(body) == 0 {
            return nil, http_api.Err{400, "MSG_EMPTY"}
        }

        reqParams, topic, err := s.getTopicFromQuery(req)
        if err != nil {
            return nil, err
        }

        var deferred time.Duration
        if ds, ok := reqParams["defer"]; ok {
            var di int64
            di, err = strconv.ParseInt(ds[0], 10, 64)
            if err != nil {
                return nil, http_api.Err{400, "INVALID_DEFER"}
            }
            deferred = time.Duration(di) * time.Millisecond
            if deferred < 0 || deferred > s.ctx.nsqd.getOpts().MaxReqTimeout {
                return nil, http_api.Err{400, "INVALID_DEFER"}
            }
        }

        msg := NewMessage(<-s.ctx.nsqd.idChan, body)
        msg.deferred = deferred
        err = topic.PutMessage(msg)
        if err != nil {
            return nil, http_api.Err{503, "EXITING"}
        }

        return "OK", nil
    }

    func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        var msgs []*Message
        var exit bool

        // TODO: one day I'd really like to just error on chunked requests
        // to be able to fail "too big" requests before we even read

        if req.ContentLength > s.ctx.nsqd.getOpts().MaxBodySize {
            return nil, http_api.Err{413, "BODY_TOO_BIG"}
        }

        reqParams, topic, err := s.getTopicFromQuery(req)
        if err != nil {
            return nil, err
        }

        _, ok := reqParams["binary"]
        if ok {
            tmp := make([]byte, 4)
            msgs, err = readMPUB(req.Body, tmp, s.ctx.nsqd.idChan,
                s.ctx.nsqd.getOpts().MaxMsgSize)
            if err != nil {
                return nil, http_api.Err{413, err.(*protocol.FatalClientErr).Code[2:]}
            }
        } else {
            // add 1 so that it's greater than our max when we test for it
            // (LimitReader returns a "fake" EOF)
            readMax := s.ctx.nsqd.getOpts().MaxBodySize + 1
            rdr := bufio.NewReader(io.LimitReader(req.Body, readMax))
            total := 0
            for !exit {
                var block []byte
                block, err = rdr.ReadBytes('
    ')
                if err != nil {
                    if err != io.EOF {
                        return nil, http_api.Err{500, "INTERNAL_ERROR"}
                    }
                    exit = true
                }
                total += len(block)
                if int64(total) == readMax {
                    return nil, http_api.Err{413, "BODY_TOO_BIG"}
                }

                if len(block) > 0 && block[len(block)-1] == '
    ' {
                    block = block[:len(block)-1]
                }

                // silently discard 0 length messages
                // this maintains the behavior pre 0.2.22
                if len(block) == 0 {
                    continue
                }

                if int64(len(block)) > s.ctx.nsqd.getOpts().MaxMsgSize {
                    return nil, http_api.Err{413, "MSG_TOO_BIG"}
                }

                msg := NewMessage(<-s.ctx.nsqd.idChan, block)
                msgs = append(msgs, msg)
            }
        }

        err = topic.PutMessages(msgs)
        if err != nil {
            return nil, http_api.Err{503, "EXITING"}
        }

        return "OK", nil
    }

    func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        _, _, err := s.getTopicFromQuery(req)
        return nil, err
    }

    func (s *httpServer) doEmptyTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err)
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
        }

        if !protocol.IsValidTopicName(topicName) {
            return nil, http_api.Err{400, "INVALID_TOPIC"}
        }

        topic, err := s.ctx.nsqd.GetExistingTopic(topicName)
        if err != nil {
            return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
        }

        err = topic.Empty()
        if err != nil {
            return nil, http_api.Err{500, "INTERNAL_ERROR"}
        }

        return nil, nil
    }

    func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err)
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
        }

        err = s.ctx.nsqd.DeleteExistingTopic(topicName)
        if err != nil {
            return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
        }

        return nil, nil
    }

    func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err)
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
        }

        topic, err := s.ctx.nsqd.GetExistingTopic(topicName)
        if err != nil {
            return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
        }

        if strings.Contains(req.URL.Path, "unpause") {
            err = topic.UnPause()
        } else {
            err = topic.Pause()
        }
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failure in %s - %s", req.URL.Path, err)
            return nil, http_api.Err{500, "INTERNAL_ERROR"}
        }

        // pro-actively persist metadata so in case of process failure
        // nsqd won't suddenly (un)pause a topic
        s.ctx.nsqd.Lock()
        s.ctx.nsqd.PersistMetadata()
        s.ctx.nsqd.Unlock()
        return nil, nil
    }

    func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        _, topic, channelName, err := s.getExistingTopicFromQuery(req)
        if err != nil {
            return nil, err
        }
        topic.GetChannel(channelName)
        return nil, nil
    }

    func (s *httpServer) doEmptyChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        _, topic, channelName, err := s.getExistingTopicFromQuery(req)
        if err != nil {
            return nil, err
        }

        channel, err := topic.GetExistingChannel(channelName)
        if err != nil {
            return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
        }

        err = channel.Empty()
        if err != nil {
            return nil, http_api.Err{500, "INTERNAL_ERROR"}
        }

        return nil, nil
    }

    func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        _, topic, channelName, err := s.getExistingTopicFromQuery(req)
        if err != nil {
            return nil, err
        }

        err = topic.DeleteExistingChannel(channelName)
        if err != nil {
            return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
        }

        return nil, nil
    }

    func (s *httpServer) doPauseChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        _, topic, channelName, err := s.getExistingTopicFromQuery(req)
        if err != nil {
            return nil, err
        }

        channel, err := topic.GetExistingChannel(channelName)
        if err != nil {
            return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
        }

        if strings.Contains(req.URL.Path, "unpause") {
            err = channel.UnPause()
        } else {
            err = channel.Pause()
        }
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failure in %s - %s", req.URL.Path, err)
            return nil, http_api.Err{500, "INTERNAL_ERROR"}
        }

        // pro-actively persist metadata so in case of process failure
        // nsqd won't suddenly (un)pause a channel
        s.ctx.nsqd.Lock()
        s.ctx.nsqd.PersistMetadata()
        s.ctx.nsqd.Unlock()
        return nil, nil
    }

    func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            s.ctx.nsqd.logf("ERROR: failed to parse request params - %s", err)
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }
        formatString, _ := reqParams.Get("format")
        topicName, _ := reqParams.Get("topic")
        channelName, _ := reqParams.Get("channel")
        jsonFormat := formatString == "json"

        stats := s.ctx.nsqd.GetStats()
        health := s.ctx.nsqd.GetHealth()
        startTime := s.ctx.nsqd.GetStartTime()
        uptime := time.Since(startTime)

        // If we WERE given a topic-name, remove stats for all the other topics:
        if len(topicName) > 0 {
            // Find the desired-topic-index:
            for _, topicStats := range stats {
                if topicStats.TopicName == topicName {
                    // If we WERE given a channel-name, remove stats for all the other channels:
                    if len(channelName) > 0 {
                        // Find the desired-channel:
                        for _, channelStats := range topicStats.Channels {
                            if channelStats.ChannelName == channelName {
                                topicStats.Channels = []ChannelStats{channelStats}
                                // We've got the channel we were looking for:
                                break
                            }
                        }
                    }

                    // We've got the topic we were looking for:
                    stats = []TopicStats{topicStats}
                    break
                }
            }
        }

        if !jsonFormat {
            return s.printStats(stats, health, startTime, uptime), nil
        }

        return struct {
            Version   string       `json:"version"`
            Health    string       `json:"health"`
            StartTime int64        `json:"start_time"`
            Topics    []TopicStats `json:"topics"`
        }{version.Binary, health, startTime.Unix(), stats}, nil
    }

    func (s *httpServer) printStats(stats []TopicStats, health string, startTime time.Time, uptime time.Duration) []byte {
        var buf bytes.Buffer
        w := &buf
        now := time.Now()
        io.WriteString(w, fmt.Sprintf("%s
    ", version.String("nsqd")))
        io.WriteString(w, fmt.Sprintf("start_time %v
    ", startTime.Format(time.RFC3339)))
        io.WriteString(w, fmt.Sprintf("uptime %s
    ", uptime))
        if len(stats) == 0 {
            io.WriteString(w, "
    NO_TOPICS
    ")
            return buf.Bytes()
        }
        io.WriteString(w, fmt.Sprintf("
    Health: %s
    ", health))
        for _, t := range stats {
            var pausedPrefix string
            if t.Paused {
                pausedPrefix = "*P "
            } else {
                pausedPrefix = "   "
            }
            io.WriteString(w, fmt.Sprintf("
    %s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s
    ",
                pausedPrefix,
                t.TopicName,
                t.Depth,
                t.BackendDepth,
                t.MessageCount,
                t.E2eProcessingLatency))
            for _, c := range t.Channels {
                if c.Paused {
                    pausedPrefix = "   *P "
                } else {
                    pausedPrefix = "      "
                }
                io.WriteString(w,
                    fmt.Sprintf("%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s
    ",
                        pausedPrefix,
                        c.ChannelName,
                        c.Depth,
                        c.BackendDepth,
                        c.InFlightCount,
                        c.DeferredCount,
                        c.RequeueCount,
                        c.TimeoutCount,
                        c.MessageCount,
                        c.E2eProcessingLatency))
                for _, client := range c.Clients {
                    connectTime := time.Unix(client.ConnectTime, 0)
                    // truncate to the second
                    duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second
                    _, port, _ := net.SplitHostPort(client.RemoteAddress)
                    io.WriteString(w, fmt.Sprintf("        [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s
    ",
                        client.Version,
                        fmt.Sprintf("%s:%s", client.Name, port),
                        client.State,
                        client.InFlightCount,
                        client.ReadyCount,
                        client.FinishCount,
                        client.RequeueCount,
                        client.MessageCount,
                        duration,
                    ))
                }
            }
        }
        return buf.Bytes()
    }

    func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        opt := ps.ByName("opt")

        if req.Method == "PUT" {
            // add 1 so that it's greater than our max when we test for it
            // (LimitReader returns a "fake" EOF)
            readMax := s.ctx.nsqd.getOpts().MaxMsgSize + 1
            body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
            if err != nil {
                return nil, http_api.Err{500, "INTERNAL_ERROR"}
            }
            if int64(len(body)) == readMax || len(body) == 0 {
                return nil, http_api.Err{413, "INVALID_VALUE"}
            }

            opts := *s.ctx.nsqd.getOpts()
            switch opt {
            case "nsqlookupd_tcp_addresses":
                err := json.Unmarshal(body, &opts.NSQLookupdTCPAddresses)
                if err != nil {
                    return nil, http_api.Err{400, "INVALID_VALUE"}
                }
            case "verbose":
                err := json.Unmarshal(body, &opts.Verbose)
                if err != nil {
                    return nil, http_api.Err{400, "INVALID_VALUE"}
                }
            default:
                return nil, http_api.Err{400, "INVALID_OPTION"}
            }
            s.ctx.nsqd.swapOpts(&opts)
            s.ctx.nsqd.triggerOptsNotification()
        }

        v, ok := getOptByCfgName(s.ctx.nsqd.getOpts(), opt)
        if !ok {
            return nil, http_api.Err{400, "INVALID_OPTION"}
        }

        return v, nil
    }

    func getOptByCfgName(opts interface{}, name string) (interface{}, bool) {
        val := reflect.ValueOf(opts).Elem()
        typ := val.Type()
        for i := 0; i < typ.NumField(); i++ {
            field := typ.Field(i)
            flagName := field.Tag.Get("flag")
            cfgName := field.Tag.Get("cfg")
            if flagName == "" {
                continue
            }
            if cfgName == "" {
                cfgName = strings.Replace(flagName, "-", "_", -1)
            }
            if name != cfgName {
                continue
            }
            return val.FieldByName(field.Name).Interface(), true
        }
        return nil, false
    }

  • 相关阅读:
    Navicat for MySQL远程连接的时候报错mysql 1130的解决方法
    阿里云主机 CentOS6.5 安装Mysql php Apache
    MAC下使用feddler进行抓包
    javascript钩子之Backbone里的实现
    SASS编译
    动态代理模式和AOP探究
    二分查找算法
    MyBatis在非Spring环境下第三方DataSource设置-Druid篇
    写字节流转换String 代码示例
    SpringAOP代理报错问题
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7457320.html
Copyright © 2011-2022 走看看