zoukankan      html  css  js  c++  java
  • 以太坊 --- 交易池的特点 与 中断恢复

    作者:林冠宏 / 指尖下的幽灵。转载者,请: 务必标明出处。

    博客:http://www.cnblogs.com/linguanh/

    掘金:https://juejin.im/user/1785262612681997

    GitHub : https://github.com/af913337456/

    出版的书籍:


    目录

    • 前序
    • 以太坊交易池知识点总结
    • 源码探秘
      • 本地交易
        • 本地钱包地址的初始化
        • 加载本地交易
        • pool.journal.load
        • pool.AddLocals
        • 本地交易文件的更新
      • 远程交易
        • P2P 通讯模块的初始化
        • 接收 P2P 消息
        • 添加远程交易到交易池
    • “ 彩蛋 ”

    21年的第一篇文章,开源写作6年。

    最近比特币以太坊的价格也已然起飞,现在一个 BTC 已能全款辆某斯拉 model 3汽车。离谱。

    发布这篇文章:从区块链技术研发者的角度,说说我的区块链从业经历和对它的理解 的时候,是去年,现在回首去看最后那段话,一语成谶


    言归正传。

    一般做数据池之类的开发。比如:订单池,请求池...,传统的服务端思想会引导我们直接向消息中间件想去。使用各类消息组件去实现,比如 RocketMQ,Redis,Kafka...

    然而,在区块链公链应用中,现已知的多条公链,每一条,都有交易池这么一个功能模块,且,它们的代码实现都没有引入消息中间件去实现。

    早前在阅读以太坊公链源码的时候,我就对以太坊交易池这一块的实现思想感到新颖,今天总结下,分享给大家看看,区块链公链应用中不依赖消息中间件去实现交易池的做法及其特点。


    以太坊交易池知识点总结 _(BTW:面试的时候可死记)

    1. 交易的分类:
      • 从本地文件存与不存的角度去看:
        1. 本地交易,若交易的发送者地址是配置变量指定的地址,则认为是本地交易:
          • 节点启动的时候,可以在配置文件指定,不开启本地交易的操作
        2. 远程交易,不满足 1 条件的交易。
      • 从内存存储的角度去看:
        1. Queue,待进入 Pending 的交易,结构是 map[addr]TxList
        2. Pending,待进入打包队列的交易,结构和 Queue 一样,由 1 转化而来。
    2. 交易的输入(产生):
      • 程序启动之初:
        1. 本地交易,从本地文件加载到内存,本地若没,自然是 0 输入;
        2. 远程交易,由 P2P 通讯模块,接收到交易数据,存储到内存。
      • 程序运行中:
        1. 自己接收交易的 RPC请求,SendTransaction 或 SendRawTransaction;
        2. 通过 P2P 通讯模块,接收其它节点的信息,包含的动作有:
          1. 旧交易的移除;
          2. 新交易的增加。
    3. 交易的持久化策略:
      • 本地交易:
        1. 定时从 Pending 和 Queue 中选出本地交易存储到本地文件
        2. 存储方式,文件替换,先 new 一个,再 rename 一波;
        3. 注意第 2 点,文件的替换,意味着即是更新也是删除操作;
        4. 编码方式,rlp 编码,不是 json。
      • 远程交易:
        1. 不存,不进行持久化,总是依赖由其它节点 P2P 通讯同步过来。
    4. 中断恢复:
      1. 本地交易,同上面 程序启动之初 的操作;
      2. 远程交易,没有恢复,内存中的交易丢了就是丢了,不影响。即使当初正在打包,即使当前节点挂了,其它节点还在工作。

    上面第 4 点,中断恢复,对比于传统后端服务的消息中间件,对消息的不丢失保障性,区块链公链的做法,完全是靠分布式来维持的,单节点的数据丢失,可以从其它节点同步过来。所以,它们交易池的实现的实现,相对来说,更加灵活,编码难点在消息同步部分。


    下面进入枯燥的源码分析阶段,读有余力的读者可以继续

    要看注释。

    本地交易

    1. 本地钱包地址的初始化

    源码文件:tx_pool.go,config.Locals 由配置文件指定,是以太坊钱包地址数组。

    func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
    	...
    	for _, addr := range config.Locals { // 从配置文件添加 本地地址
    		log.Info("Setting new local account", "address", addr)
    		// 添加到 locals 变量里面,后面会用它来过滤出一个地址是否是本地地址
    		pool.locals.add(addr) 
    	}
    	...
    }
    
    

    2. 从本地文件,加载交易数据数据,即加载本地交易

    func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
    	...
    	pool.locals = newAccountSet(pool.signer)
    	for _, addr := range config.Locals {
    		log.Info("Setting new local account", "address", addr)
    		pool.locals.add(addr)
    	}
    	...	
     	// 上面添加完了
    	// If local transactions and journaling is enabled, load from disk
    	if !config.NoLocals && config.Journal != "" { // 如果配置开启了本地加载的需求
    		pool.journal = newTxJournal(config.Journal)
       		// load 是加载函数,pool.AddLocals 是实际添加函数
    		if err := pool.journal.load(pool.AddLocals); err != nil {
    			log.Warn("Failed to load transaction journal", "err", err)
    		}
    		if err := pool.journal.rotate(pool.local()); err != nil {
    			log.Warn("Failed to rotate transaction journal", "err", err)
    		}
    	}
    	...
        go pool.loop() // 循环处理事件
    }
    

    3. pool.journal.load

    源码文件:tx_journal.go

    func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
    	// Skip the parsing if the journal file doesn't exist at all
    	if _, err := os.Stat(journal.path); os.IsNotExist(err) {
    		return nil
    	}
    	// Open the journal for loading any past transactions
    	input, err := os.Open(journal.path) // 打开文件,读取流数据
    	if err != nil {
    		return err
    	}
    	...
    	stream := rlp.NewStream(input, 0) // 使用 rlp 编码算法解码数据
    	...
    	loadBatch := func(txs types.Transactions) {
    		for _, err := range add(txs) { // 调用 add 函数,进行添加
    			if err != nil {
    				log.Debug("Failed to add journaled transaction", "err", err)
    				dropped++
    			}
    		}
    	}
    	// loadBatch 在下面会被调用
    	...
    }
    

    4. pool.AddLocals

    pool.AddLocals 是实际的添加函数。内部的一系列调用后,最终到 tx_pool.add 函数。pool 的 queue 都是 map 结构,能根据相同 key 去重。

    func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
    	...
     	// 下面的 if,如果已在 pool.pending 里面,那么证明之前已经添加过在 queue 里
    	if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
    		...
     		pool.journalTx(from, tx) // 内部调用 journal.insert
    		return old != nil, nil
    	}
    	replaced, err = pool.enqueueTx(hash, tx) // 这里,会添加到 pool.enqueue 里面
    	if err != nil {
    		return false, err
    	}
    	pool.journalTx(from, tx) // 内部调用 journal.insert
    	...
    }
    
    func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
    	// 本地钱包地址,没指定的话,就跳过
    	if pool.journal == nil || !pool.locals.contains(from) {
    		return
    	}
     	// insert 会在造成重复添加,但是 load 出来的时候会根据 addr 去重
    	if err := pool.journal.insert(tx); err != nil {
    		log.Warn("Failed to journal local transaction", "err", err)
    	}
    }
    

    截止到上面,本地交易已经被添加到 pool 的 queue 里面了。

    节点启动之初,除了会从本地 load 交易到 queue 外,还会不停地监听链的事件,比如接收交易,再 add 交易 到 queue 里。

    5. 本地交易文件的更新 ( 插入 / 删除 )

    loop 是触发的入口。除了主动的 journal.insert 达到了插入本地交易的目的之外。

    下面的更新操作,也达到了包含插入的目的:以替换的手段,从文件删除旧交易,存储新交易到文件

    func (pool *TxPool) loop() {
    	...
    	for {
    		select {
    		...
    		// Handle local transaction journal rotation
     		// journal 定时器,定时执行下面的本地交易数据文件的更新 journal.rotate
    		case <-journal.C:
    			if pool.journal != nil {
    				pool.mu.Lock()
    				if err := pool.journal.rotate(pool.local()); err != nil {
    					log.Warn("Failed to rotate local tx journal", "err", err)
    				}
    				pool.mu.Unlock()
    			}
    		}
    	}
    }
    
    

    journal.rotate 的做法,使用文件替换的方式,来从 pool 的交易 pending 和 queue 中存储 locals 钱包地址相关的交易到文件。注意,只存本地钱包地址的,其它的,不存。

    //输入
    func (pool *TxPool) local() map[common.Address]types.Transactions {
    	...
    	for addr := range pool.locals.accounts {
    		if pending := pool.pending[addr]; pending != nil {
     			// 添加 pending 的
    			txs[addr] = append(txs[addr], pending.Flatten()...)
    		}
    		if queued := pool.queue[addr]; queued != nil {
     			// 添加 queue 的
    			txs[addr] = append(txs[addr], queued.Flatten()...)
    		}
    	}
    	return txs
    }
    
    // all 参数,来源于上面 local()
    func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error {
    	...
    	// journal.path+".new" 后缀 .new
    	replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
    	if err != nil {
    		return err
    	}
    	journaled := 0
    	for _, txs := range all {
    		for _, tx := range txs {
    			if err = rlp.Encode(replacement, tx); err != nil {
    				replacement.Close()
    				return err
    			}
    		}
    		journaled += len(txs)
    	}
    	replacement.Close()
     	// rename,重命名文件到原始的 path,达到更新,替换目的
    	if err = os.Rename(journal.path+".new", journal.path); err != nil {
    		return err
    	}
    	sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755)
    	if err != nil {
    		return err
    	}
    	...
    	return nil
    }
    
    

    远程交易

    P2P 通讯模块的初始化

    源码文件:eth/backend.go

    func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
    	...
    	if config.TxPool.Journal != "" {
    		config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
    	}
     	// 初始化交易池
    	eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
    	...
     	// 使用 交易池指针对象 作为参数初始化 protocolManager
    	if eth.protocolManager, err = NewProtocolManager(
        		chainConfig, checkpoint, config.SyncMode, config.NetworkId, 
                	eth.eventMux, `eth.txPool`, eth.engine, 
                    eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil {
    		return nil, err
    	}
    	...
    	return eth, nil
    }
    
    func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
    	// 下面初始化 tx_fetcher,使用 txpool.AddRemotes 赋值给函数变量 addTxs
    	manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, fetchTx)
    }
    
    

    接收 P2P 消息

    源码文件:eth/handler.go

    func (pm *ProtocolManager) handleMsg(p *peer) error {
    	...
        switch {
        ...
        // 接收到其它节点的交易数据
        case msg.Code == TransactionMsg || (msg.Code == PooledTransactionsMsg && p.version >= eth65):
    		...
     		// Enqueue 将交易添加到交易池
    		pm.txFetcher.Enqueue(p.id, txs, msg.Code == PooledTransactionsMsg)
    
        }
        ...
    }
    // tx_fetcher.go 文件
    func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
     	...
    	errs := f.addTxs(txs) // 执行添加,这个函数其实就是 tx_pool.go 的 AddRemotes
    	...
    }
    

    添加远程交易到交易池

    // tx_pool.go
    // addTxs 内部就会把交易添加到 Pending 和 Queue 里面
    func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
    	return pool.addTxs(txs, false, false)
    }
    

    打完收工

    更多以太坊的开发知识,见我的书籍:

    《2.0-区块链DApp开发:基于以太坊和比特币公链》

  • 相关阅读:
    每日总结2021.9.14
    jar包下载mvn
    每日总结EL表达语言 JSTL标签
    每日学习总结之数据中台概述
    Server Tomcat v9.0 Server at localhost failed to start
    Server Tomcat v9.0 Server at localhost failed to start(2)
    链表 java
    MVC 中用JS跳转窗体Window.Location.href
    Oracle 关键字
    MVC 配置路由 反复走控制其中的action (int?)
  • 原文地址:https://www.cnblogs.com/linguanh/p/14260697.html
Copyright © 2011-2022 走看看