zoukankan      html  css  js  c++  java
  • Derek解读Bytom源码-P2P网络 地址簿

    作者:Derek

    简介

    Github地址:https://github.com/Bytom/bytom

    Gitee地址:https://gitee.com/BytomBlockchain/bytom

    本章介绍bytom代码P2P网络中addrbook地址簿

    作者使用MacOS操作系统,其他平台也大同小异

    Golang Version: 1.8

    addrbook介绍

    addrbook用于存储P2P网络中保留最近的对端节点地址
    在MacOS下,默认的地址簿路径存储在~/Library/Bytom/addrbook.json

    地址簿格式

    ** ~/Library/Bytom/addrbook.json **

    {
        "Key": "359be6d08bc0c6e21c84bbb2",
        "Addrs": [
            {
                "Addr": {
                    "IP": "122.224.11.144",
                    "Port": 46657
                },
                "Src": {
                    "IP": "198.74.61.131",
                    "Port": 46657
                },
                "Attempts": 0,
                "LastAttempt": "2018-05-04T12:58:23.894057702+08:00",
                "LastSuccess": "0001-01-01T00:00:00Z",
                "BucketType": 1,
                "Buckets": [
                    181,
                    10
                ]
            }
        ]
    }
    

    地址类型

    在addrbook中存储的地址有两种:
    ** p2p/addrbook.go **

    const (
    	bucketTypeNew = 0x01  // 标识新地址,不可靠地址(未成功连接过)。只存储在一个bucket中
    	bucketTypeOld = 0x02  // 标识旧地址,可靠地址(已成功连接过)。可以存储在多个bucket中,最多为maxNewBucketsPerAddress个
    )
    

    注意: 一个地址的类型变更不在此文章中做介绍,后期的文章会讨论该问题

    地址簿相关结构体

    地址簿

    type AddrBook struct {
    	cmn.BaseService
    
    	mtx               sync.Mutex
    	filePath          string  // 地址簿路径
    	routabilityStrict bool  // 是否可路由,默认为true
    	rand              *rand.Rand 
    	key               string  // 地址簿标识,用于计算addrNew和addrOld的索引
    	ourAddrs          map[string]*NetAddress  // 存储本地网络地址,用于添加p2p地址时做排除使用
    	addrLookup        map[string]*knownAddress // 存储新、旧地址集,用于查询
    	addrNew           []map[string]*knownAddress // 存储新地址
    	addrOld           []map[string]*knownAddress // 存储旧地址
    	wg                sync.WaitGroup
    	nOld              int // 旧地址数量
    	nNew              int // 新地址数量
    }
    

    已知地址

    type knownAddress struct {
    	Addr        *NetAddress // 已知peer的addr
    	Src         *NetAddress // 已知peer的addr的来源addr
    	Attempts    int32 // 连接peer的重试次数
    	LastAttempt time.Time // 最近一次尝试连接的时间
    	LastSuccess time.Time // 最近一次尝试成功连接的时间
    	BucketType  byte // 地址的类型(表示可靠地址或不可靠地址)
    	Buckets     []int // 当前addr所属的buckets
    }
    

    routabilityStrict参数表示地址簿是否存储的ip是否可路由。可路由是根据RFC划分,具体参考资料:RFC标准

    初始化地址簿

    // NewAddrBook creates a new address book.
    // Use Start to begin processing asynchronous address updates.
    func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
    	am := &AddrBook{
    		rand:              rand.New(rand.NewSource(time.Now().UnixNano())),
    		ourAddrs:          make(map[string]*NetAddress),
    		addrLookup:        make(map[string]*knownAddress),
    		filePath:          filePath,
    		routabilityStrict: routabilityStrict,
    	}
    	am.init()
    	am.BaseService = *cmn.NewBaseService(nil, "AddrBook", am)
    	return am
    }
    
    // When modifying this, don't forget to update loadFromFile()
    func (a *AddrBook) init() {
      // 地址簿唯一标识
    	a.key = crypto.CRandHex(24) // 24/2 * 8 = 96 bits
    	// New addr buckets, 默认为256个大小
    	a.addrNew = make([]map[string]*knownAddress, newBucketCount)
    	for i := range a.addrNew {
    		a.addrNew[i] = make(map[string]*knownAddress)
    	}
    	// Old addr buckets,默认为64个大小
    	a.addrOld = make([]map[string]*knownAddress, oldBucketCount)
    	for i := range a.addrOld {
    		a.addrOld[i] = make(map[string]*knownAddress)
    	}
    }
    

    bytomd启动时加载本地地址簿

    loadFromFile在bytomd启动时,首先会加载本地的地址簿

    // OnStart implements Service.
    func (a *AddrBook) OnStart() error {
    	a.BaseService.OnStart()
    	a.loadFromFile(a.filePath)
    	a.wg.Add(1)
    	go a.saveRoutine()
    	return nil
    }
    
    // Returns false if file does not exist.
    // cmn.Panics if file is corrupt.
    func (a *AddrBook) loadFromFile(filePath string) bool {
    	// If doesn't exist, do nothing.
    	// 如果本地地址簿不存在则直接返回
    	_, err := os.Stat(filePath)
    	if os.IsNotExist(err) {
    		return false
    	}
    
      // 加载地址簿json内容
    	// Load addrBookJSON{}
    	r, err := os.Open(filePath)
    	if err != nil {
    		cmn.PanicCrisis(cmn.Fmt("Error opening file %s: %v", filePath, err))
    	}
    	defer r.Close()
    	aJSON := &addrBookJSON{}
    	dec := json.NewDecoder(r)
    	err = dec.Decode(aJSON)
    	if err != nil {
    		cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", filePath, err))
    	}
    
      // 填充addrNew、addrOld等
    	// Restore all the fields...
    	// Restore the key
    	a.key = aJSON.Key
    	// Restore .addrNew & .addrOld
    	for _, ka := range aJSON.Addrs {
    		for _, bucketIndex := range ka.Buckets {
    			bucket := a.getBucket(ka.BucketType, bucketIndex)
    			bucket[ka.Addr.String()] = ka
    		}
    		a.addrLookup[ka.Addr.String()] = ka
    		if ka.BucketType == bucketTypeNew {
    			a.nNew++
    		} else {
    			a.nOld++
    		}
    	}
    	return true
    }
    

    定时更新地址簿

    bytomd会定时更新本地地址簿,默认2分钟一次

    func (a *AddrBook) saveRoutine() {
    	dumpAddressTicker := time.NewTicker(dumpAddressInterval)
    out:
    	for {
    		select {
    		case <-dumpAddressTicker.C:
    			a.saveToFile(a.filePath)
    		case <-a.Quit:
    			break out
    		}
    	}
    	dumpAddressTicker.Stop()
    	a.saveToFile(a.filePath)
    	a.wg.Done()
    	log.Info("Address handler done")
    }
    
    func (a *AddrBook) saveToFile(filePath string) {
    	log.WithField("size", a.Size()).Info("Saving AddrBook to file")
    
    	a.mtx.Lock()
    	defer a.mtx.Unlock()
    	// Compile Addrs
    	addrs := []*knownAddress{}
    	for _, ka := range a.addrLookup {
    		addrs = append(addrs, ka)
    	}
    
    	aJSON := &addrBookJSON{
    		Key:   a.key,
    		Addrs: addrs,
    	}
    
    	jsonBytes, err := json.MarshalIndent(aJSON, "", "	")
    	if err != nil {
    		log.WithField("err", err).Error("Failed to save AddrBook to file")
    		return
    	}
    	err = cmn.WriteFileAtomic(filePath, jsonBytes, 0644)
    	if err != nil {
    		log.WithFields(log.Fields{
    			"file": filePath,
    			"err":  err,
    		}).Error("Failed to save AddrBook to file")
    	}
    }
    

    添加新地址

    当peer之间交换addr时,节点会收到对端节点已知的地址信息,这些信息会被当前节点添加到地址簿中

    func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) {
    	a.mtx.Lock()
    	defer a.mtx.Unlock()
    	log.WithFields(log.Fields{
    		"addr": addr,
    		"src":  src,
    	}).Debug("Add address to book")
    	a.addAddress(addr, src)
    }
    
    
    func (a *AddrBook) addAddress(addr, src *NetAddress) {
    	// 验证地址是否为可路由地址
    	if a.routabilityStrict && !addr.Routable() {
    		log.Error(cmn.Fmt("Cannot add non-routable address %v", addr))
    		return
    	}
    	// 验证地址是否为本地节点地址
    	if _, ok := a.ourAddrs[addr.String()]; ok {
    		// Ignore our own listener address.
    		return
    	}
    
    	// 验证地址是否存在地址集中
    	// 如果存在:则判断该地址是否为old可靠地址、是否超过了最大buckets中。否则根据该地址已经被ka.Buckets引用的个数来随机决定是否添加到地址集中
    	// 如果不存在:则添加到地址集中。并标识为bucketTypeNew地址类型
    	ka := a.addrLookup[addr.String()]
    
    	if ka != nil {
    		// Already old.
    		if ka.isOld() {
    			return
    		}
    		// Already in max new buckets.
    		if len(ka.Buckets) == maxNewBucketsPerAddress {
    			return
    		}
    		// The more entries we have, the less likely we are to add more.
    		factor := int32(2 * len(ka.Buckets))
    		if a.rand.Int31n(factor) != 0 {
    			return
    		}
    	} else {
    		ka = newKnownAddress(addr, src)
    	}
    
    	// 找到该地址在地址集的索引位置并添加
    	bucket := a.calcNewBucket(addr, src)
    	a.addToNewBucket(ka, bucket)
    
    	log.Info("Added new address ", "address:", addr, " total:", a.size())
    }
    

    选择最优节点

    地址簿中存储众多地址,在p2p网络中需选择最优的地址去连接
    PickAddress(newBias int)函数中newBias是由pex_reactor产生的地址评分。如何计算地址分数在其他章节中再讲
    根据地址评分随机选择地址可增加区块链安全性

    // Pick an address to connect to with new/old bias.
    func (a *AddrBook) PickAddress(newBias int) *NetAddress {
    	a.mtx.Lock()
    	defer a.mtx.Unlock()
    
    	if a.size() == 0 {
    		return nil
    	}
    	// newBias地址分数限制在0-100分数之间
    	if newBias > 100 {
    		newBias = 100
    	}
    	if newBias < 0 {
    		newBias = 0
    	}
    
    	// Bias between new and old addresses.
    	oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(newBias))
    	newCorrelation := math.Sqrt(float64(a.nNew)) * float64(newBias)
    
      // 根据地址分数计算是否从addrOld或addrNew中随机选择一个地址
    	if (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation {
    		// pick random Old bucket.
    		var bucket map[string]*knownAddress = nil
    		num := 0
    		for len(bucket) == 0 && num < oldBucketCount {
    			bucket = a.addrOld[a.rand.Intn(len(a.addrOld))]
    			num++
    		}
    		if num == oldBucketCount {
    			return nil
    		}
    		// pick a random ka from bucket.
    		randIndex := a.rand.Intn(len(bucket))
    		for _, ka := range bucket {
    			if randIndex == 0 {
    				return ka.Addr
    			}
    			randIndex--
    		}
    		cmn.PanicSanity("Should not happen")
    	} else {
    		// pick random New bucket.
    		var bucket map[string]*knownAddress = nil
    		num := 0
    		for len(bucket) == 0 && num < newBucketCount {
    			bucket = a.addrNew[a.rand.Intn(len(a.addrNew))]
    			num++
    		}
    		if num == newBucketCount {
    			return nil
    		}
    		// pick a random ka from bucket.
    		randIndex := a.rand.Intn(len(bucket))
    		for _, ka := range bucket {
    			if randIndex == 0 {
    				return ka.Addr
    			}
    			randIndex--
    		}
    		cmn.PanicSanity("Should not happen")
    	}
    	return nil
    }
    

    移除一个地址

    当一个地址被标记为Bad时则从地址集中移除。目前bytomd的代码版本并未调用过

    func (a *AddrBook) MarkBad(addr *NetAddress) {
    	a.RemoveAddress(addr)
    }
    
    // RemoveAddress removes the address from the book.
    func (a *AddrBook) RemoveAddress(addr *NetAddress) {
    	a.mtx.Lock()
    	defer a.mtx.Unlock()
    	ka := a.addrLookup[addr.String()]
    	if ka == nil {
    		return
    	}
    	log.WithField("addr", addr).Info("Remove address from book")
    	a.removeFromAllBuckets(ka)
    }
    
    func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) {
    	for _, bucketIdx := range ka.Buckets {
    		bucket := a.getBucket(ka.BucketType, bucketIdx)
    		delete(bucket, ka.Addr.String())
    	}
    	ka.Buckets = nil
    	if ka.BucketType == bucketTypeNew {
    		a.nNew--
    	} else {
    		a.nOld--
    	}
    	delete(a.addrLookup, ka.Addr.String())
    }
    
  • 相关阅读:
    Java 测试代码模板
    git 保存用户名和密码
    git 高级命令
    git 最常用命令
    git 冲突解决
    git diff命令
    nginx静态服务器的配置
    使用SFTP工具下载文件
    git log 格式化输出
    9-angular.fromJson
  • 原文地址:https://www.cnblogs.com/bytom/p/9528006.html
Copyright © 2011-2022 走看看