zoukankan      html  css  js  c++  java
  • http.go

    package nsqlookupd

    import (
        "fmt"
        "net/http"
        "net/http/pprof"
        "sync/atomic"

        "github.com/julienschmidt/httprouter"
        "github.com/nsqio/nsq/internal/http_api"
        "github.com/nsqio/nsq/internal/protocol"
        "github.com/nsqio/nsq/internal/version"
    )

    type httpServer struct {
        ctx    *Context
        router http.Handler
    }

    func newHTTPServer(ctx *Context) *httpServer {
        log := http_api.Log(ctx.nsqlookupd.opts.Logger)

        router := httprouter.New()
        router.HandleMethodNotAllowed = true
        router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.opts.Logger)
        router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.opts.Logger)
        router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.opts.Logger)
        s := &httpServer{
            ctx:    ctx,
            router: router,
        }

        router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))

        // v1 negotiate
        router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.NegotiateVersion))
        router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.NegotiateVersion))
        router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.NegotiateVersion))
        router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.NegotiateVersion))
        router.Handle("GET", "/nodes", http_api.Decorate(s.doNodes, log, http_api.NegotiateVersion))

        // only v1
        router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
        router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
        router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
        router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
        router.Handle("POST", "/topic/tombstone", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.V1))

        // deprecated, v1 negotiate
        router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.NegotiateVersion))
        router.Handle("POST", "/create_topic", http_api.Decorate(s.doCreateTopic, log, http_api.NegotiateVersion))
        router.Handle("POST", "/delete_topic", http_api.Decorate(s.doDeleteTopic, log, http_api.NegotiateVersion))
        router.Handle("POST", "/create_channel", http_api.Decorate(s.doCreateChannel, log, http_api.NegotiateVersion))
        router.Handle("POST", "/delete_channel", http_api.Decorate(s.doDeleteChannel, log, http_api.NegotiateVersion))
        router.Handle("POST", "/tombstone_topic_producer", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.NegotiateVersion))
        router.Handle("GET", "/create_topic", http_api.Decorate(s.doCreateTopic, log, http_api.NegotiateVersion))
        router.Handle("GET", "/delete_topic", http_api.Decorate(s.doDeleteTopic, log, http_api.NegotiateVersion))
        router.Handle("GET", "/create_channel", http_api.Decorate(s.doCreateChannel, log, http_api.NegotiateVersion))
        router.Handle("GET", "/delete_channel", http_api.Decorate(s.doDeleteChannel, log, http_api.NegotiateVersion))
        router.Handle("GET", "/tombstone_topic_producer", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.NegotiateVersion))

        // debug
        router.HandlerFunc("GET", "/debug/pprof", pprof.Index)
        router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
        router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
        router.HandlerFunc("POST", "/debug/pprof/symbol", pprof.Symbol)
        router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
        router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
        router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
        router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
        router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))

        return s
    }

    func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
        s.router.ServeHTTP(w, req)
    }

    func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        return "OK", nil
    }

    func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        return struct {
            Version string `json:"version"`
        }{
            Version: version.Binary,
        }, nil
    }

    func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        topics := s.ctx.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys()
        return map[string]interface{}{
            "topics": topics,
        }, nil
    }

    func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
        }

        channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
        return map[string]interface{}{
            "channels": channels,
        }, nil
    }

    func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
        }

        registration := s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
        if len(registration) == 0 {
            return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
        }

        channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
        producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "")
        producers = producers.FilterByActive(s.ctx.nsqlookupd.opts.InactiveProducerTimeout,
            s.ctx.nsqlookupd.opts.TombstoneLifetime)
        return map[string]interface{}{
            "channels":  channels,
            "producers": producers.PeerInfo(),
        }, nil
    }

    func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
        }

        if !protocol.IsValidTopicName(topicName) {
            return nil, http_api.Err{400, "INVALID_ARG_TOPIC"}
        }

        s.ctx.nsqlookupd.logf("DB: adding topic(%s)", topicName)
        key := Registration{"topic", topicName, ""}
        s.ctx.nsqlookupd.DB.AddRegistration(key)

        return nil, nil
    }

    func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
        }

        registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*")
        for _, registration := range registrations {
            s.ctx.nsqlookupd.logf("DB: removing channel(%s) from topic(%s)", registration.SubKey, topicName)
            s.ctx.nsqlookupd.DB.RemoveRegistration(registration)
        }

        registrations = s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
        for _, registration := range registrations {
            s.ctx.nsqlookupd.logf("DB: removing topic(%s)", topicName)
            s.ctx.nsqlookupd.DB.RemoveRegistration(registration)
        }

        return nil, nil
    }

    func (s *httpServer) doTombstoneTopicProducer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
        }

        node, err := reqParams.Get("node")
        if err != nil {
            return nil, http_api.Err{400, "MISSING_ARG_NODE"}
        }

        s.ctx.nsqlookupd.logf("DB: setting tombstone for producer@%s of topic(%s)", node, topicName)
        producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "")
        for _, p := range producers {
            thisNode := fmt.Sprintf("%s:%d", p.peerInfo.BroadcastAddress, p.peerInfo.HTTPPort)
            if thisNode == node {
                p.Tombstone()
            }
        }

        return nil, nil
    }

    func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
        if err != nil {
            return nil, http_api.Err{400, err.Error()}
        }

        s.ctx.nsqlookupd.logf("DB: adding channel(%s) in topic(%s)", channelName, topicName)
        key := Registration{"channel", topicName, channelName}
        s.ctx.nsqlookupd.DB.AddRegistration(key)

        s.ctx.nsqlookupd.logf("DB: adding topic(%s)", topicName)
        key = Registration{"topic", topicName, ""}
        s.ctx.nsqlookupd.DB.AddRegistration(key)

        return nil, nil
    }

    func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        reqParams, err := http_api.NewReqParams(req)
        if err != nil {
            return nil, http_api.Err{400, "INVALID_REQUEST"}
        }

        topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
        if err != nil {
            return nil, http_api.Err{400, err.Error()}
        }

        registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, channelName)
        if len(registrations) == 0 {
            return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
        }

        s.ctx.nsqlookupd.logf("DB: removing channel(%s) from topic(%s)", channelName, topicName)
        for _, registration := range registrations {
            s.ctx.nsqlookupd.DB.RemoveRegistration(registration)
        }

        return nil, nil
    }

    type node struct {
        RemoteAddress    string   `json:"remote_address"`
        Hostname         string   `json:"hostname"`
        BroadcastAddress string   `json:"broadcast_address"`
        TCPPort          int      `json:"tcp_port"`
        HTTPPort         int      `json:"http_port"`
        Version          string   `json:"version"`
        Tombstones       []bool   `json:"tombstones"`
        Topics           []string `json:"topics"`
    }

    func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        // dont filter out tombstoned nodes
        producers := s.ctx.nsqlookupd.DB.FindProducers("client", "", "").FilterByActive(
            s.ctx.nsqlookupd.opts.InactiveProducerTimeout, 0)
        nodes := make([]*node, len(producers))
        for i, p := range producers {
            topics := s.ctx.nsqlookupd.DB.LookupRegistrations(p.peerInfo.id).Filter("topic", "*", "").Keys()

            // for each topic find the producer that matches this peer
            // to add tombstone information
            tombstones := make([]bool, len(topics))
            for j, t := range topics {
                topicProducers := s.ctx.nsqlookupd.DB.FindProducers("topic", t, "")
                for _, tp := range topicProducers {
                    if tp.peerInfo == p.peerInfo {
                        tombstones[j] = tp.IsTombstoned(s.ctx.nsqlookupd.opts.TombstoneLifetime)
                    }
                }
            }

            nodes[i] = &node{
                RemoteAddress:    p.peerInfo.RemoteAddress,
                Hostname:         p.peerInfo.Hostname,
                BroadcastAddress: p.peerInfo.BroadcastAddress,
                TCPPort:          p.peerInfo.TCPPort,
                HTTPPort:         p.peerInfo.HTTPPort,
                Version:          p.peerInfo.Version,
                Tombstones:       tombstones,
                Topics:           topics,
            }
        }

        return map[string]interface{}{
            "producers": nodes,
        }, nil
    }

    func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
        s.ctx.nsqlookupd.DB.RLock()
        defer s.ctx.nsqlookupd.DB.RUnlock()

        data := make(map[string][]map[string]interface{})
        for r, producers := range s.ctx.nsqlookupd.DB.registrationMap {
            key := r.Category + ":" + r.Key + ":" + r.SubKey
            for _, p := range producers {
                m := map[string]interface{}{
                    "id":                p.peerInfo.id,
                    "hostname":          p.peerInfo.Hostname,
                    "broadcast_address": p.peerInfo.BroadcastAddress,
                    "tcp_port":          p.peerInfo.TCPPort,
                    "http_port":         p.peerInfo.HTTPPort,
                    "version":           p.peerInfo.Version,
                    "last_update":       atomic.LoadInt64(&p.peerInfo.lastUpdate),
                    "tombstoned":        p.tombstoned,
                    "tombstoned_at":     p.tombstonedAt.UnixNano(),
                }
                data[key] = append(data[key], m)
            }
        }

        return data, nil
    }

  • 相关阅读:
    澳洲中产收入水平[转]
    [转载]在澳洲做IT人士的收入差别
    取消excel 工作保护 密码的宏
    SAP ML 物料分类账详解(含取消激活物料帐方法)
    ABAP--如何创建自定义打印条码
    SAP 产品条码WMS结合 以及ABAP script的集成 BarCode
    SAP 以工序为基准进行发料 机加工行业 Goods Issue to Routing
    SAP HR模块的基础数据表和增强配置
    SAP财务供应链与金库管理的联系与区别
    评点SAP HR功能及人力资源管理软件
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7457153.html
Copyright © 2011-2022 走看看