zoukankan      html  css  js  c++  java
  • wukong引擎源码分析之索引——part 2 持久化 直接set(key,docID数组)在kv存储里

    前面说过,接收indexerRequest的代码在index_worker.go里:

    func (engine *Engine) indexerAddDocumentWorker(shard int) {
        for {
            request := <-engine.indexerAddDocumentChannels[shard] //关键
            addInvertedIndex := engine.indexers[shard].AddDocument(request.document, request.dealDocInfoChan) // 向反向索引表(数组)中加入一个文档
            // save
            if engine.initOptions.UsePersistentStorage {
                for k, v := range addInvertedIndex {
                    engine.persistentStorageIndexDocumentChannels[shard] <- persistentStorageIndexDocumentRequest{
                        typ:            "index",
                        keyword:        k,
                        keywordIndices: v,
                    }
                }
            }
    
            atomic.AddUint64(&engine.numTokenIndexAdded,
                uint64(len(request.document.Keywords)))
            atomic.AddUint64(&engine.numDocumentsIndexed, 1)
        }
    } 

    持久化的代码:engine/persistent_storage_worker.go

    package engine
    
    import (
        "bytes"
        "encoding/binary"
        "encoding/gob"
        "github.com/huichen/wukong/core"
        "github.com/huichen/wukong/types"
        "sync"
        "sync/atomic"
    )
    
    type persistentStorageIndexDocumentRequest struct {
        typ string //"info"or"index"
    
        // typ=="info"时,以下两个字段有效
        docId   uint64
        docInfo *types.DocInfo
    
        // typ=="index"时,以下两个字段有效
        keyword        string
        keywordIndices *types.KeywordIndices
    }
    
    func (engine *Engine) persistentStorageIndexDocumentWorker(shard int) {
        for {
            request := <-engine.persistentStorageIndexDocumentChannels[shard]
            switch request.typ {
            case "info":
                // 得到key
                b := make([]byte, 10)
                length := binary.PutUvarint(b, request.docId)
    
                // 得到value
                var buf bytes.Buffer
                enc := gob.NewEncoder(&buf)
                err := enc.Encode(request.docInfo)
                if err != nil {
                    atomic.AddUint64(&engine.numDocumentsStored, 1)
                    return
                }
    
                // 将key-value写入数据库
                engine.dbs[shard][getDB(request.typ)].Set(b[0:length], buf.Bytes())
                atomic.AddUint64(&engine.numDocumentsStored, 1)
    
            case "index":
                // 得到key
                b := []byte(request.keyword)
    
                // 得到value
                var buf bytes.Buffer
                enc := gob.NewEncoder(&buf)
                err := enc.Encode(request.keywordIndices)
                if err != nil {
                    return
                }
    
                // 将key-value写入数据库
                engine.dbs[shard][getDB(request.typ)].Set(b, buf.Bytes())
            }
        }
    }
    
    func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard int) {
        // 得到key
        b := make([]byte, 10)
        length := binary.PutUvarint(b, docId)
    
        // 从数据库删除该key
        engine.dbs[shard][getDB("info")].Delete(b[0:length])
    }
    
    func (engine *Engine) persistentStorageInitWorker(shard int) {
        var finish sync.WaitGroup
        finish.Add(2)
        // 恢复docInfo
        go func() {
            defer finish.Add(-1)
            engine.dbs[shard][getDB("info")].ForEach(func(k, v []byte) error {
                key, value := k, v
                // 得到docID
                docId, _ := binary.Uvarint(key)
    
                // 得到data
                buf := bytes.NewReader(value)
                dec := gob.NewDecoder(buf)
                var data types.DocInfo
                err := dec.Decode(&data)
                if err == nil {
                    // 添加索引
                    core.AddDocInfo(shard, docId, &data)
                }
                return nil
            })
        }()
    
        // 恢复invertedIndex
        go func() {
            defer finish.Add(-1)
            engine.dbs[shard][getDB("index")].ForEach(func(k, v []byte) error {
                key, value := k, v
                // 得到keyword
                keyword := string(key)
    
                // 得到data
                buf := bytes.NewReader(value)
                dec := gob.NewDecoder(buf)
                var data types.KeywordIndices
                err := dec.Decode(&data)
                if err == nil {
                    // 添加索引
                    core.AddKeywordIndices(shard, keyword, &data)
                }
                return nil
            })
        }()
        finish.Wait()
        engine.persistentStorageInitChannel <- true
    }

    可以看到,倒排索引存在DB里是丑陋的,直接set(key, value) 其中,key是倒排列表的关键字,而value是doc id list也就是数组。

    如果索引比较多,每次去DB set是非常耗时的,尤其针对同一个keyword有doc id插入时!

    总之,wukong对于持久化的做法很丑陋!

  • 相关阅读:
    P1219 N皇后(位运算&普通dfs)
    P1434 滑雪(记忆化搜索)
    P1118 数字三角形(技巧)
    P1162 填涂颜色
    P1141 01迷宫
    P2685抓牛(bfs)
    WordPress ‘crypt_private()’方法远程拒绝服务漏洞
    java获取网页源码
    tomcat报错: Error parsing HTTP request header
    空指针异常的原因
  • 原文地址:https://www.cnblogs.com/bonelee/p/6582163.html
Copyright © 2011-2022 走看看