zoukankan      html  css  js  c++  java
  • go语言nsq源码解读五 nsqlookupd源码registration_db.go

    本篇将讲解registration_db.go文件。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    package nsqlookupd

    import (
        "fmt"
        "sync"
        "time"
    )

    //定义了类型RegistrationDB,按字面意思:注册数据库,可理解为保存注册信息的数据库。
    type RegistrationDB struct {
        //嵌入了RWMutex,所以在后面在方法中才可以调用Lock和Unlock方法
        sync.RWMutex
        //一个MAP,以Registration为键,Producers为值
        //看下面的代码中Producers的定义,它是Producer的slice,从中可看出Registration和Producer是一对多的关系
        registrationMap map[Registration]Producers
    }

    //定义类型Registration,
    type Registration struct {
        Category string
        Key      string
        SubKey   string
    }

    //定义类型Registrations为Registration的slice
    type Registrations []Registration

    //定义类型PeerInfo
    type PeerInfo struct {
        id               string
        RemoteAddress    string `json:"remote_address"`
        Hostname         string `json:"hostname"`
        BroadcastAddress string `json:"broadcast_address"`
        TcpPort          int    `json:"tcp_port"`
        HttpPort         int    `json:"http_port"`
        Version          string `json:"version"`
        lastUpdate       time.Time
    }

    //定义类型Producer
    type Producer struct {
        //PeerInfo的指针
        peerInfo *PeerInfo
        //是否要被移除(tombstoned)
        tombstoned bool
        //移除时间
        tombstonedAt time.Time
    }

    //定义类型Producers为Producer的slice
    type Producers []*Producer

    //Producer的String方法
    func (*Producer) String() string {
        return fmt.Sprintf("%s [%d, %d]", p.peerInfo.BroadcastAddress, p.peerInfo.TcpPort, p.peerInfo.HttpPort)
    }

    //本方法将Producer标记为墓碑状态
    func (*Producer) Tombstone() {
        p.tombstoned = true
        p.tombstonedAt = time.Now()
    }

    //被标记为墓碑状态,同时距标记时间小于lifetime值。
    //比如:在0分0秒时调用了上一个函数Tombstone(),在0分1秒时调用函数IsTombstoned(5),返回结果为true
    func (*Producer) IsTombstoned(lifetime time.Duration) bool {
        return p.tombstoned && time.Now().Sub(p.tombstonedAt) < lifetime
    }

    //新建RegistrationDB类型的变量
    func NewRegistrationDB() *RegistrationDB {
        return &RegistrationDB{
            //make一个map
            registrationMap: make(map[Registration]Producers),
        }
    }

    //添加一个registration的key,只是把map的key设置了,value为一个空的Producers slice
    func (*RegistrationDB) AddRegistration(k Registration) {
        r.Lock()
        defer r.Unlock()
        _, ok := r.registrationMap[k]
        if !ok {
            r.registrationMap[k] = make(Producers, 0)
        }
    }

    //将一个Producer添加到指定的Registration里
    func (*RegistrationDB) AddProducer(k Registration, p *Producer) bool {
        r.Lock()
        defer r.Unlock()
        //producers是一个slice
        producers := r.registrationMap[k]

        //遍历producers,看这个要添加的Producer是否已经存在了
        found := false
        for _, producer := range producers {
            //通过producer.peerInfo.id来判断是否为同一个Producer
            if producer.peerInfo.id == p.peerInfo.id {
                found = true
            }
        }

        //只有要添加的Producer不存在时,才添加到Registration里
        if found == false {
            r.registrationMap[k] = append(producers, p)
        }
        return !found
    }

    // 根据producer.peerInfo.id从registration里删除一个Producer
    func (*RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) {
        r.Lock()
        defer r.Unlock()

        //map中不存在key为k的Registration记录,所以也就无需删除了。
        producers, ok := r.registrationMap[k]
        if !ok {
            return false, 0
        }
        removed := false
        //创建空的Producers slice,所有不需要删除的Producer都移动这个slice里,并在移除完成后重新赋值给Registration
        //注意学习这种从slice中移除一个元素的方式
        cleaned := make(Producers, 0)
        for _, producer := range producers {
            //id不相同,不是要删除的producer,移到cleaned slice里
            if producer.peerInfo.id != id {
                cleaned = append(cleaned, producer)
            } else {
                removed = true
            }
        }
        // Note: this leaves keys in the DB even if they have empty lists
        //重新赋值,这确保了即使cleaned是一个空slice,键值k仍然会在RegistrationDB中存在
        r.registrationMap[k] = cleaned

        //返回两个值,是否删除,新slice的长度
        return removed, len(cleaned)
    }

    // 删除Registration和它对应的Producers
    func (*RegistrationDB) RemoveRegistration(k Registration) {
        r.Lock()
        defer r.Unlock()
        delete(r.registrationMap, k)
    }

    //查找Registrations,可以看到,传入参数的三个变量与Registration类型里的三个值是对应的
    func (*RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
        r.RLock()
        defer r.RUnlock()
        results := make(Registrations, 0)
        for k := range r.registrationMap {
            //找出registrationMap中所有category,key,subkey与入参相同的Registration
            //IsMatch方法在后面的代码中定义
            if !k.IsMatch(category, key, subkey) {
                continue
            }
            results = append(results, k)
        }
        return results
    }

    //根据category key subkey查找所有的Producer
    func (*RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
        r.RLock()
        defer r.RUnlock()
        results := make(Producers, 0)
        //遍历map
        for k, producers := range r.registrationMap {
            if !k.IsMatch(category, key, subkey) {
                continue
            }
            //遍历每个registration下的producers
            for _, producer := range producers {
                found := false
                //判断producer是否已经存在了,如果存在的话,就不添加了
                for _, p := range results {
                    if producer.peerInfo.id == p.peerInfo.id {
                        found = true
                    }
                }
                if found == false {
                    results = append(results, producer)
                }
            }
        }
        return results
    }

    //根据producer.peerInfo.id查找所属的registration key
    func (*RegistrationDB) LookupRegistrations(id string) Registrations {
        r.RLock()
        defer r.RUnlock()
        results := make(Registrations, 0)
        //遍历map
        for k, producers := range r.registrationMap {
            //遍历每个registration下的producers
            for _, p := range producers {
                if p.peerInfo.id == id {
                    results = append(results, k)
                    break
                }
            }
        }
        return results
    }

    //依据Registration类型里的三个变量,判断是否与Registration匹配
    func (k Registration) IsMatch(category string, key string, subkey string) bool {
        if category != k.Category {
            return false
        }
        if key != "*" && k.Key != key {
            return false
        }
        if subkey != "*" && k.SubKey != subkey {
            return false
        }
        return true
    }

    //过滤获取所有与输入参数匹配的Registration
    func (rr Registrations) Filter(category string, key string, subkey string) Registrations {
        output := make(Registrations, 0)
        for _, k := range rr {
            if k.IsMatch(category, key, subkey) {
                output = append(output, k)
            }
        }
        return output
    }

    //获取MAP中所有Registration的key
    func (rr Registrations) Keys() []string {
        keys := make([]string, len(rr))
        for i, k := range rr {
            keys[i] = k.Key
        }
        return keys
    }

    //获取MAP中所有Registration的subkey
    func (rr Registrations) SubKeys() []string {
        subkeys := make([]string, len(rr))
        for i, k := range rr {
            subkeys[i] = k.SubKey
        }
        return subkeys
    }

    //获取所有可用的Producer
    func (pp Producers) FilterByActive(inactivityTimeout time.Duration, tombstoneLifetime time.Duration) Producers {
        now := time.Now()
        results := make(Producers, 0)
        for _, p := range pp {
            //满足以下两个判断条件的producer被忽略
            //1 超过了活跃时间,在inactivityTimeout时间内没有与nsqlookupd交互
            //2 被标记为墓碑状态,在tombstoneLifetime时间内标记的producer将被过滤掉
            if now.Sub(p.peerInfo.lastUpdate) > inactivityTimeout || p.IsTombstoned(tombstoneLifetime) {
                continue
            }
            results = append(results, p)
        }
        return results
    }

    //获取Producers中所有的PeerInfo
    func (pp Producers) PeerInfo() []*PeerInfo {
        results := make([]*PeerInfo, 0)
        for _, p := range pp {
            results = append(results, p.peerInfo)
        }
        return results
    }

    读过上述代码,可总结出,registration_db.go文件用MAP以一对多的形式保存Producer,并提供一系列增、删、改、查的操作封装。同时使用RWMutex做并发控制。

  • 相关阅读:
    51 数据中重复的数字
    64 数据流中的中位数
    79. Word Search
    93. Restore IP Addresses
    547. Friend Circles
    Epplus Excel 导入 MSSQL 数据库
    用来更新服务的bat 脚本
    ASP.Net MVC 引用动态 js 脚本
    8、结构的构造器应该显式调用!!!(坑)
    Task 线程重用导致等待!
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7456962.html
Copyright © 2011-2022 走看看