前面说过,接收indexerRequest的代码在index_worker.go里:
func (engine *Engine) indexerAddDocumentWorker(shard int) { for { request := <-engine.indexerAddDocumentChannels[shard] //关键 addInvertedIndex := engine.indexers[shard].AddDocument(request.document, request.dealDocInfoChan) // 向反向索引表(数组)中加入一个文档 // save if engine.initOptions.UsePersistentStorage { for k, v := range addInvertedIndex { engine.persistentStorageIndexDocumentChannels[shard] <- persistentStorageIndexDocumentRequest{ typ: "index", keyword: k, keywordIndices: v, } } } atomic.AddUint64(&engine.numTokenIndexAdded, uint64(len(request.document.Keywords))) atomic.AddUint64(&engine.numDocumentsIndexed, 1) } }
持久化的代码:engine/persistent_storage_worker.go
package engine import ( "bytes" "encoding/binary" "encoding/gob" "github.com/huichen/wukong/core" "github.com/huichen/wukong/types" "sync" "sync/atomic" ) type persistentStorageIndexDocumentRequest struct { typ string //"info"or"index" // typ=="info"时,以下两个字段有效 docId uint64 docInfo *types.DocInfo // typ=="index"时,以下两个字段有效 keyword string keywordIndices *types.KeywordIndices } func (engine *Engine) persistentStorageIndexDocumentWorker(shard int) { for { request := <-engine.persistentStorageIndexDocumentChannels[shard] switch request.typ { case "info": // 得到key b := make([]byte, 10) length := binary.PutUvarint(b, request.docId) // 得到value var buf bytes.Buffer enc := gob.NewEncoder(&buf) err := enc.Encode(request.docInfo) if err != nil { atomic.AddUint64(&engine.numDocumentsStored, 1) return } // 将key-value写入数据库 engine.dbs[shard][getDB(request.typ)].Set(b[0:length], buf.Bytes()) atomic.AddUint64(&engine.numDocumentsStored, 1) case "index": // 得到key b := []byte(request.keyword) // 得到value var buf bytes.Buffer enc := gob.NewEncoder(&buf) err := enc.Encode(request.keywordIndices) if err != nil { return } // 将key-value写入数据库 engine.dbs[shard][getDB(request.typ)].Set(b, buf.Bytes()) } } } func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard int) { // 得到key b := make([]byte, 10) length := binary.PutUvarint(b, docId) // 从数据库删除该key engine.dbs[shard][getDB("info")].Delete(b[0:length]) } func (engine *Engine) persistentStorageInitWorker(shard int) { var finish sync.WaitGroup finish.Add(2) // 恢复docInfo go func() { defer finish.Add(-1) engine.dbs[shard][getDB("info")].ForEach(func(k, v []byte) error { key, value := k, v // 得到docID docId, _ := binary.Uvarint(key) // 得到data buf := bytes.NewReader(value) dec := gob.NewDecoder(buf) var data types.DocInfo err := dec.Decode(&data) if err == nil { // 添加索引 core.AddDocInfo(shard, docId, &data) } return nil }) }() // 恢复invertedIndex go func() { defer finish.Add(-1) engine.dbs[shard][getDB("index")].ForEach(func(k, v []byte) error { key, value := k, v // 得到keyword keyword := string(key) // 得到data buf := bytes.NewReader(value) dec := gob.NewDecoder(buf) var data types.KeywordIndices err := dec.Decode(&data) if err == nil { // 添加索引 core.AddKeywordIndices(shard, keyword, &data) } return nil }) }() finish.Wait() engine.persistentStorageInitChannel <- true }
可以看到,倒排索引存在DB里是丑陋的,直接set(key, value) 其中,key是倒排列表的关键字,而value是doc id list也就是数组。
如果索引比较多,每次去DB set是非常耗时的,尤其针对同一个keyword有doc id插入时!
总之,wukong对于持久化的做法很丑陋!