zoukankan      html  css  js  c++  java
  • go语言nsq源码解读八 http.go、http_server.go

    这篇讲另两个文件http.go、http_server.go,这两个文件和第六讲go语言nsq源码解读六 tcp.go、tcp_server.go里的两个文件是相对应的。那两个文件用于处理tcp请求,而这两个是处理http请求的。

    http_sesrver.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    package util

    import (
        "log"
        "net"
        "net/http"
        "strings"
    )

    //创建httpServer,并注册了处理函数,处理函数在nsqlookupd里指定到了http.go里的httpServer类型,其有一个方法ServeHTTP
    func HTTPServer(listener net.Listener, handler http.Handler) {
        log.Printf("HTTP: listening on %s", listener.Addr().String())

        server := &http.Server{
            Handler: handler,
        }
        err := server.Serve(listener)
        // theres no direct way to detect this error because it is not exposed
        if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
            log.Printf("ERROR: http.Serve() - %s", err.Error())
        }

        log.Printf("HTTP: closing %s", listener.Addr().String())
    }

    http.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    314
    315
    316
    317
    318
    319
    320
    321
    322
    323
    324
    325
    326
    327
    328
    329
    330
    331
    332
    333
    334
    335
    336
    337
    338
    339
    340
    341
    342
    343
    344
    345
    346
    347
    348
    349
    350
    351
    352
    353
    354
    355
    package nsqlookupd

    import (
        "fmt"
        "io"
        "log"
        "net/http"

        "github.com/bitly/go-nsq"
        "github.com/bitly/nsq/util"
    )

    type httpServer struct {
        context *Context
    }

    //根据URL进行请求分发,不同的URL,调用不同的方法来处理,如果路径不合法,则返回404
    func (*httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
        switch req.URL.Path {
        case "/ping":
            s.pingHandler(w, req)
        case "/info":
            s.infoHandler(w, req)
        case "/lookup":
            s.lookupHandler(w, req)
        case "/topics":
            s.topicsHandler(w, req)
        case "/channels":
            s.channelsHandler(w, req)
        case "/nodes":
            s.nodesHandler(w, req)
        case "/delete_topic":
            s.deleteTopicHandler(w, req)
        case "/delete_channel":
            s.deleteChannelHandler(w, req)
        case "/tombstone_topic_producer":
            s.tombstoneTopicProducerHandler(w, req)
        case "/create_topic":
            s.createTopicHandler(w, req)
        case "/create_channel":
            s.createChannelHandler(w, req)
        case "/debug":
            s.debugHandler(w, req)
        default:
            util.ApiResponse(w, 404, "NOT_FOUND", nil)
        }
    }

    //向ping请求返回ok
    func (*httpServer) pingHandler(w http.ResponseWriter, req *http.Request) {
        w.Header().Set("Content-Length", "2")
        io.WriteString(w, "OK")
    }

    //查询所有的topic
    func (*httpServer) topicsHandler(w http.ResponseWriter, req *http.Request) {
        topics := s.context.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys()
        data := make(map[string]interface{})
        data["topics"] = topics
        util.ApiResponse(w, 200, "OK", data)
    }

    //获取符合指定topic的channel,URL中需有名为topic的参数
    func (*httpServer) channelsHandler(w http.ResponseWriter, req *http.Request) {
        reqParams, err := util.NewReqParams(req)
        if err != nil {
            util.ApiResponse(w, 500, "INVALID_REQUEST", nil)
            return
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            util.ApiResponse(w, 500, "MISSING_ARG_TOPIC", nil)
            return
        }

        channels := s.context.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
        data := make(map[string]interface{})
        data["channels"] = channels
        util.ApiResponse(w, 200, "OK", data)
    }

    //查询服务,
    func (*httpServer) lookupHandler(w http.ResponseWriter, req *http.Request) {
        reqParams, err := util.NewReqParams(req)
        if err != nil {
            util.ApiResponse(w, 500, "INVALID_REQUEST", nil)
            return
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            util.ApiResponse(w, 500, "MISSING_ARG_TOPIC", nil)
            return
        }

        registration := s.context.nsqlookupd.DB.FindRegistrations("topic", topicName, "")

        if len(registration) == 0 {
            util.ApiResponse(w, 500, "INVALID_ARG_TOPIC", nil)
            return
        }

        //在DB里根据条件查询
        channels := s.context.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
        producers := s.context.nsqlookupd.DB.FindProducers("topic", topicName, "")
        producers = producers.FilterByActive(s.context.nsqlookupd.options.InactiveProducerTimeout,
            s.context.nsqlookupd.options.TombstoneLifetime)
        data := make(map[string]interface{})

        //查询到channels和producers,以JSON格式返回
        data["channels"] = channels
        data["producers"] = producers.PeerInfo()

        util.ApiResponse(w, 200, "OK", data)
    }

    //创建一个topic
    func (*httpServer) createTopicHandler(w http.ResponseWriter, req *http.Request) {
        reqParams, err := util.NewReqParams(req)
        if err != nil {
            util.ApiResponse(w, 500, "INVALID_REQUEST", nil)
            return
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            util.ApiResponse(w, 500, "MISSING_ARG_TOPIC", nil)
            return
        }

        if !nsq.IsValidTopicName(topicName) {
            util.ApiResponse(w, 500, "INVALID_TOPIC", nil)
            return
        }

        //Registration使用的category为topic
        log.Printf("DB: adding topic(%s)", topicName)
        key := Registration{"topic", topicName, ""}
        s.context.nsqlookupd.DB.AddRegistration(key)

        util.ApiResponse(w, 200, "OK", nil)
    }

    //删除topic,逻辑比较简单,从URL中取到"topic"参数的值,然后查询registrations并删除
    func (*httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request) {
        reqParams, err := util.NewReqParams(req)
        if err != nil {
            util.ApiResponse(w, 500, "INVALID_REQUEST", nil)
            return
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            util.ApiResponse(w, 500, "MISSING_ARG_TOPIC", nil)
            return
        }

        //category为channel的Registration
        registrations := s.context.nsqlookupd.DB.FindRegistrations("channel", topicName, "*")
        for _, registration := range registrations {
            log.Printf("DB: removing channel(%s) from topic(%s)", registration.SubKey, topicName)
            s.context.nsqlookupd.DB.RemoveRegistration(registration)
        }

        //category为topic的Registration
        registrations = s.context.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
        for _, registration := range registrations {
            log.Printf("DB: removing topic(%s)", topicName)
            s.context.nsqlookupd.DB.RemoveRegistration(registration)
        }

        util.ApiResponse(w, 200, "OK", nil)
    }

    //将指定的Producer标记为墓碑状态
    func (*httpServer) tombstoneTopicProducerHandler(w http.ResponseWriter, req *http.Request) {
        reqParams, err := util.NewReqParams(req)
        if err != nil {
            util.ApiResponse(w, 500, "INVALID_REQUEST", nil)
            return
        }

        topicName, err := reqParams.Get("topic")
        if err != nil {
            util.ApiResponse(w, 500, "MISSING_ARG_TOPIC", nil)
            return
        }
        //获取node参数的值
        node, err := reqParams.Get("node")
        if err != nil {
            util.ApiResponse(w, 500, "MISSING_ARG_NODE", nil)
            return
        }

        log.Printf("DB: setting tombstone for producer@%s of topic(%s)", node, topicName)
        producers := s.context.nsqlookupd.DB.FindProducers("topic", topicName, "")
        for _, p := range producers {
            //从这句可看出,URL中node参数的值为"BroadcastAddress:HttpPort"的格式
            thisNode := fmt.Sprintf("%s:%d", p.peerInfo.BroadcastAddress, p.peerInfo.HttpPort)

            //将指定node标记为墓碑状态
            if thisNode == node {
                p.Tombstone()
            }
        }

        util.ApiResponse(w, 200, "OK", nil)
    }

    //创建channel
    func (*httpServer) createChannelHandler(w http.ResponseWriter, req *http.Request) {
        reqParams, err := util.NewReqParams(req)
        if err != nil {
            util.ApiResponse(w, 500, "INVALID_REQUEST", nil)
            return
        }

        topicName, channelName, err := util.GetTopicChannelArgs(reqParams)
        if err != nil {
            util.ApiResponse(w, 500, err.Error(), nil)
            return
        }

        log.Printf("DB: adding channel(%s) in topic(%s)", channelName, topicName)

        //用"channel"作为category,定义Registration类型的变量并加入DB
        key := Registration{"channel", topicName, channelName}
        s.context.nsqlookupd.DB.AddRegistration(key)

        //用"topic"作为category,定义Registration类型的变量并加入DB
        log.Printf("DB: adding topic(%s)", topicName)
        key = Registration{"topic", topicName, ""}
        s.context.nsqlookupd.DB.AddRegistration(key)

        util.ApiResponse(w, 200, "OK", nil)
    }

    //根据参数中的topicName和channelName获取所有Registration并删除
    func (*httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Request) {
        reqParams, err := util.NewReqParams(req)
        if err != nil {
            util.ApiResponse(w, 500, "INVALID_REQUEST", nil)
            return
        }

        topicName, channelName, err := util.GetTopicChannelArgs(reqParams)
        if err != nil {
            util.ApiResponse(w, 500, err.Error(), nil)
            return
        }

        //根据参数查找Registration
        registrations := s.context.nsqlookupd.DB.FindRegistrations("channel", topicName, channelName)
        if len(registrations) == 0 {
            util.ApiResponse(w, 404, "NOT_FOUND", nil)
            return
        }

        //删除查找到的所有Registration
        log.Printf("DB: removing channel(%s) from topic(%s)", channelName, topicName)
        for _, registration := range registrations {
            s.context.nsqlookupd.DB.RemoveRegistration(registration)
        }

        util.ApiResponse(w, 200, "OK", nil)
    }

    // note: we can't embed the *Producer here because embeded objects are ignored for json marshalling
    type node struct {
        RemoteAddress    string   `json:"remote_address"`
        Hostname         string   `json:"hostname"`
        BroadcastAddress string   `json:"broadcast_address"`
        TcpPort          int      `json:"tcp_port"`
        HttpPort         int      `json:"http_port"`
        Version          string   `json:"version"`
        Tombstones       []bool   `json:"tombstones"`
        Topics           []string `json:"topics"`
    }

    func (*httpServer) nodesHandler(w http.ResponseWriter, req *http.Request) {
        // dont filter out tombstoned nodes
        //FilterByActive第二个参数设为0,标记为tombstoned的节点并未被过滤
        producers := s.context.nsqlookupd.DB.FindProducers("client", "", "").FilterByActive(
            s.context.nsqlookupd.options.InactiveProducerTimeout, 0)
        nodes := make([]*node, len(producers))
        for i, p := range producers {
            topics := s.context.nsqlookupd.DB.LookupRegistrations(p.peerInfo.id).Filter("topic", "*", "").Keys()

            // for each topic find the producer that matches this peer
            // to add tombstone information
            tombstones := make([]bool, len(topics))
            for j, t := range topics {
                topicProducers := s.context.nsqlookupd.DB.FindProducers("topic", t, "")
                for _, tp := range topicProducers {
                    if tp.peerInfo == p.peerInfo {
                        tombstones[j] = tp.IsTombstoned(s.context.nsqlookupd.options.TombstoneLifetime)
                    }
                }
            }

            nodes[i] = &node{
                RemoteAddress:    p.peerInfo.RemoteAddress,
                Hostname:         p.peerInfo.Hostname,
                BroadcastAddress: p.peerInfo.BroadcastAddress,
                TcpPort:          p.peerInfo.TcpPort,
                HttpPort:         p.peerInfo.HttpPort,
                Version:          p.peerInfo.Version,
                Tombstones:       tombstones,
                Topics:           topics,
            }
        }

        data := make(map[string]interface{})
        data["producers"] = nodes
        util.ApiResponse(w, 200, "OK", data)
    }

    //返回版本号
    func (*httpServer) infoHandler(w http.ResponseWriter, req *http.Request) {
        util.ApiResponse(w, 200, "OK", struct {
            Version string `json:"version"`
        }{
            Version: util.BINARY_VERSION,
        })
    }

    //用于调试,会返回RegistrationDB中所有的producer
    func (*httpServer) debugHandler(w http.ResponseWriter, req *http.Request) {
        s.context.nsqlookupd.DB.RLock()
        defer s.context.nsqlookupd.DB.RUnlock()

        data := make(map[string][]map[string]interface{})
        //遍历Map
        for r, producers := range s.context.nsqlookupd.DB.registrationMap {
            key := r.Category + ":" + r.Key + ":" + r.SubKey
            data[key] = make([]map[string]interface{}, 0)
            //遍历registration下所有的producers
            for _, p := range producers {
                m := make(map[string]interface{})
                m["id"] = p.peerInfo.id
                m["hostname"] = p.peerInfo.Hostname
                m["broadcast_address"] = p.peerInfo.BroadcastAddress
                m["tcp_port"] = p.peerInfo.TcpPort
                m["http_port"] = p.peerInfo.HttpPort
                m["version"] = p.peerInfo.Version
                m["last_update"] = p.peerInfo.lastUpdate.UnixNano()
                m["tombstoned"] = p.tombstoned
                m["tombstoned_at"] = p.tombstonedAt.UnixNano()
                data[key] = append(data[key], m)
            }
        }

        util.ApiResponse(w, 200, "OK", data)
    }

    由于http.go里多处用到util.ApiResponse方法,这个方法定义在util/api_response.go文件里,所以这里简单解释一下这个文件:
    api_response.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    package util

    import (
        "encoding/json"
        "fmt"
        "net/http"
        "strconv"
    )

    //将数据以JSON格式返回给客户端,在nsqlookupd里,主要是在给nsqadmin提供的接口里使用。
    //nsqadmin通过HTTP的方式获取nsqlookupd用ApiResponse函数返回的JSON数据,解析,然后显示在页面上。
    func ApiResponse(w http.ResponseWriter, statusCode int, statusTxt string, data interface{}) {

        //将数据转化为JSON格式输出
        response, err := json.Marshal(struct {
            StatusCode int         `json:"status_code"`
            StatusTxt  string      `json:"status_txt"`
            Data       interface{} `json:"data"`
        }{
            statusCode,
            statusTxt,
            data,
        })

        //转json字符串出错时返回500错误
        if err != nil {
            response = []byte(fmt.Sprintf(`{"status_code":500, "status_txt":"%s", "data":null}`, err.Error()))
        }

        w.Header().Set("Content-Type", "application/json; charset=utf-8")
        w.Header().Set("Content-Length", strconv.Itoa(len(response)))
        w.WriteHeader(statusCode)
        w.Write(response)
    }

  • 相关阅读:
    重新安装AD RMS
    Exp00009错误解決
    AD RMS 问题解决 事件ID:139
    如何破解微软的正版增值烦恼
    如何批量更改帐号的属性
    ORA31694,原来是文件有问题
    Standby异常解决
    ASP.NET 常用网站
    .NET C#读取Excel内容
    AD Usre lastLogon Time
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7456986.html
Copyright © 2011-2022 走看看