zoukankan      html  css  js  c++  java
  • wukong引擎源码分析之索引——part 1 倒排列表本质是有序数组存储

    searcher.IndexDocument(0, types.DocumentIndexData{Content: "此次百度收购将成中国互联网最大并购"})

    engine.go中的源码实现:

    // 将文档加入索引
    //
    // 输入参数:
    //  docId   标识文档编号,必须唯一
    //  data    见DocumentIndexData注释
    //
    // 注意:
    //      1. 这个函数是线程安全的,请尽可能并发调用以提高索引速度
    //  2. 这个函数调用是非同步的,也就是说在函数返回时有可能文档还没有加入索引中,因此
    //         如果立刻调用Search可能无法查询到这个文档。强制刷新索引请调用FlushIndex函数。
    func (engine *Engine) IndexDocument(docId uint64, data types.DocumentIndexData) {
        if !engine.initialized {
            log.Fatal("必须先初始化引擎")
        }
        atomic.AddUint64(&engine.numIndexingRequests, 1)
        shard := int(murmur.Murmur3([]byte(fmt.Sprint("%d", docId))) % uint32(engine.initOptions.NumShards))
        engine.segmenterChannel <- segmenterRequest{
            docId: docId, shard: shard, data: data}
    }

    而其中:

    engine.segmenterChannel <- segmenterRequest{
            docId: docId, shard: shard, data: data}

    将请求发送给segmenterChannel,其定义:

        // 建立分词器使用的通信通道
        segmenterChannel chan segmenterRequest

    而接受请求处理的代码在segmenter_worker.go里:

    func (engine *Engine) segmenterWorker() {
        for {
            request := <-engine.segmenterChannel //关键
    
            tokensMap := make(map[string][]int)
            numTokens := 0
            if !engine.initOptions.NotUsingSegmenter && request.data.Content != "" {
                // 当文档正文不为空时,优先从内容分词中得到关键词
                segments := engine.segmenter.Segment([]byte(request.data.Content))
                for _, segment := range segments {
                    token := segment.Token().Text()
                    if !engine.stopTokens.IsStopToken(token) {
                        tokensMap[token] = append(tokensMap[token], segment.Start())
                    }
                }
                numTokens = len(segments)
            } else {
                // 否则载入用户输入的关键词
                for _, t := range request.data.Tokens {
                    if !engine.stopTokens.IsStopToken(t.Text) {
                        tokensMap[t.Text] = t.Locations
                    }
                }
                numTokens = len(request.data.Tokens)
            }
    
            // 加入非分词的文档标签
            for _, label := range request.data.Labels {
                if !engine.initOptions.NotUsingSegmenter {
                    if !engine.stopTokens.IsStopToken(label) {
                        tokensMap[label] = []int{}
                    }
                } else {
                    tokensMap[label] = []int{}
                }
            }
    
            indexerRequest := indexerAddDocumentRequest{
                document: &types.DocumentIndex{
                    DocId:       request.docId,
                    TokenLength: float32(numTokens),
                    Keywords:    make([]types.KeywordIndex, len(tokensMap)),
                },
            }
            iTokens := 0
            for k, v := range tokensMap {
                indexerRequest.document.Keywords[iTokens] = types.KeywordIndex{
                    Text: k,
                    // 非分词标注的词频设置为0,不参与tf-idf计算
                    Frequency: float32(len(v)),
                    Starts:    v}
                iTokens++
            }
    
            var dealDocInfoChan = make(chan bool, 1)
    
            indexerRequest.dealDocInfoChan = dealDocInfoChan
            engine.indexerAddDocumentChannels[request.shard] <- indexerRequest
    
            rankerRequest := rankerAddDocRequest{
                docId:           request.docId,
                fields:          request.data.Fields,
                dealDocInfoChan: dealDocInfoChan,
            }
            engine.rankerAddDocChannels[request.shard] <- rankerRequest
        }
    }

    上面代码的作用就是在统计词频和单词位置(注意:tag也是作为搜索的单词,不过其词频是0,而无法参与tf-idf计算),并封装为indexerRequest,发送给engine.indexerAddDocumentChannels[request.shard]

    ------------------------------------------------

    补充一点,上述代码之所以得以执行是因为在:

    searcher = engine.Engine{}
    // 初始化
    searcher.Init(types.EngineInitOptions{SegmenterDictionaries: "../data/dictionary.txt"})

    searcher的初始化代码里有这么一段:

        // 启动分词器
        for iThread := 0; iThread < options.NumSegmenterThreads; iThread++ {
            go engine.segmenterWorker()
        }

    ------------------------------------------------

    接收indexerRequest的代码在index_worker.go里:

    func (engine *Engine) indexerAddDocumentWorker(shard int) {
        for {
            request := <-engine.indexerAddDocumentChannels[shard] //关键
            addInvertedIndex := engine.indexers[shard].AddDocument(request.document, request.dealDocInfoChan)
            // save
            if engine.initOptions.UsePersistentStorage {
                for k, v := range addInvertedIndex {
                    engine.persistentStorageIndexDocumentChannels[shard] <- persistentStorageIndexDocumentRequest{
                        typ:            "index",
                        keyword:        k,
                        keywordIndices: v,
                    }
                }
            }
    
            atomic.AddUint64(&engine.numTokenIndexAdded,
                uint64(len(request.document.Keywords)))
            atomic.AddUint64(&engine.numDocumentsIndexed, 1)
        }
    }

    -----------------------------------------------

    而上述函数之所以得以执行,还是因为在searcher的初始化函数里有这么一句:

            // 启动索引器和排序器
        for shard := 0; shard < options.NumShards; shard++ {
            go engine.indexerAddDocumentWorker(shard) //关键
            go engine.indexerRemoveDocWorker(shard)
            go engine.rankerAddDocWorker(shard)
            go engine.rankerRemoveDocWorker(shard)
    
            for i := 0; i < options.NumIndexerThreadsPerShard; i++ {
                go engine.indexerLookupWorker(shard)
            }
            for i := 0; i < options.NumRankerThreadsPerShard; i++ {
                go engine.rankerRankWorker(shard)
            }
        }

    ------------------------------------------------

    其中,engine.indexers[shard].AddDocument(request.document, request.dealDocInfoChan)的核心代码在indexer.go里:

    // 向反向索引表中加入一个文档
    func (indexer *Indexer) AddDocument(document *types.DocumentIndex, dealDocInfoChan chan<- bool) (addInvertedIndex map[string]*types.KeywordIndices) {
        if indexer.initialized == false {
            log.Fatal("索引器尚未初始化")
        }
    
        indexer.InvertedIndexShard.Lock()
        defer indexer.InvertedIndexShard.Unlock()
    
        // 更新文档总数及关键词总长度
        indexer.DocInfosShard.Lock()
        if _, found := indexer.DocInfosShard.DocInfos[document.DocId]; !found {
            indexer.DocInfosShard.DocInfos[document.DocId] = new(types.DocInfo)
            indexer.DocInfosShard.NumDocuments++
        }
        if document.TokenLength != 0 {
            originalLength := indexer.DocInfosShard.DocInfos[document.DocId].TokenLengths
            indexer.DocInfosShard.DocInfos[document.DocId].TokenLengths = float32(document.TokenLength)
            indexer.InvertedIndexShard.TotalTokenLength += document.TokenLength - originalLength
        }
        indexer.DocInfosShard.Unlock()
        close(dealDocInfoChan)
    
        // docIdIsNew := true
        foundKeyword := false
        addInvertedIndex = make(map[string]*types.KeywordIndices)
        for _, keyword := range document.Keywords {
            addInvertedIndex[keyword.Text], foundKeyword = indexer.InvertedIndexShard.InvertedIndex[keyword.Text]
            if !foundKeyword {
                addInvertedIndex[keyword.Text] = new(types.KeywordIndices)
            }
            indices := addInvertedIndex[keyword.Text]
    
            if !foundKeyword {
                // 如果没找到该搜索键则加入
                switch indexer.initOptions.IndexType {
                case types.LocationsIndex:
                    indices.Locations = [][]int{keyword.Starts}
                case types.FrequenciesIndex:
                    indices.Frequencies = []float32{keyword.Frequency}
                }
                indices.DocIds = []uint64{document.DocId}
                indexer.InvertedIndexShard.InvertedIndex[keyword.Text] = indices
                continue
            }
    
            // 查找应该插入的位置
            position, found := indexer.searchIndex(
                indices, 0, indexer.getIndexLength(indices)-1, document.DocId)
            if found {
                // docIdIsNew = false
    
                // 覆盖已有的索引项
                switch indexer.initOptions.IndexType {
                case types.LocationsIndex:
                    indices.Locations[position] = keyword.Starts
                case types.FrequenciesIndex:
                    indices.Frequencies[position] = keyword.Frequency
                }
                continue
            }
    
            // 当索引不存在时,插入新索引项
            switch indexer.initOptions.IndexType {
            case types.LocationsIndex:
                indices.Locations = append(indices.Locations, []int{})
                copy(indices.Locations[position+1:], indices.Locations[position:])
                indices.Locations[position] = keyword.Starts
            case types.FrequenciesIndex:
                indices.Frequencies = append(indices.Frequencies, float32(0))
                copy(indices.Frequencies[position+1:], indices.Frequencies[position:])
                indices.Frequencies[position] = keyword.Frequency
            }
            indices.DocIds = append(indices.DocIds, 0)
            copy(indices.DocIds[position+1:], indices.DocIds[position:])
            indices.DocIds[position] = document.DocId
        }
        return
    }

    查找docID是否存在于倒排列表的时候是二分:

    // 二分法查找indices中某文档的索引项
    // 第一个返回参数为找到的位置或需要插入的位置
    // 第二个返回参数标明是否找到  
    func (indexer *Indexer) searchIndex( 
        indices *types.KeywordIndices, start int, end int, docId uint64) (int, bool) {
        // 特殊情况
        if indexer.getIndexLength(indices) == start {
            return start, false
        }
        if docId < indexer.getDocId(indices, start) {
            return start, false
        } else if docId == indexer.getDocId(indices, start) {
            return start, true
        }
        if docId > indexer.getDocId(indices, end) {
            return end + 1, false
        } else if docId == indexer.getDocId(indices, end) {
            return end, true
        }
    
        // 二分
        var middle int
        for end-start > 1 {
            middle = (start + end) / 2      
            if docId == indexer.getDocId(indices, middle) {
                return middle, true
            } else if docId > indexer.getDocId(indices, middle) {
                start = middle     
            } else {
                end = middle
            }
        }
        return end, false
    }  

    TODO,待分析:indexer里索引的细节,以及评分相关的逻辑:

    
    
            rankerRequest := rankerAddDocRequest{
                docId:           request.docId,
                fields:          request.data.Fields,
                dealDocInfoChan: dealDocInfoChan,
            }
            engine.rankerAddDocChannels[request.shard] <- rankerRequest
  • 相关阅读:
    part11-1 Python图形界面编程(Python GUI库介绍、Tkinter 组件介绍、布局管理器、事件处理)
    part10-3 Python常见模块(正则表达式)
    Cyclic Nacklace HDU
    模拟题 Right turn SCU
    状态DP Doing Homework HDU
    Dp Milking Time POJ
    区间DP Treats for the Cows POJ
    DP Help Jimmy POJ
    Dales and Hills Gym
    Kids and Prizes Gym
  • 原文地址:https://www.cnblogs.com/bonelee/p/6528556.html
Copyright © 2011-2022 走看看