zoukankan      html  css  js  c++  java
  • Lucene学习总结之四:Lucene索引过程分析(4)

    6、关闭IndexWriter对象

    代码:

    writer.close();

    --> IndexWriter.closeInternal(boolean)

          --> (1) 将索引信息由内存写入磁盘: flush(waitForMerges, true, true);
          --> (2) 进行段合并: mergeScheduler.merge(this);

    对段的合并将在后面的章节进行讨论,此处仅仅讨论将索引信息由写入磁盘的过程。

    代码:

    IndexWriter.flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes)

    --> IndexWriter.doFlush(boolean flushDocStores, boolean flushDeletes)

          --> IndexWriter.doFlushInternal(boolean flushDocStores, boolean flushDeletes)

    将索引写入磁盘包括以下几个过程:

    • 得到要写入的段名:String segment = docWriter.getSegment();
    • DocumentsWriter将缓存的信息写入段:docWriter.flush(flushDocStores);
    • 生成新的段信息对象:newSegment = new SegmentInfo(segment, flushedDocCount, directory, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile, docWriter.hasProx());
    • 准备删除文档:docWriter.pushDeletes();
    • 生成cfs段:docWriter.createCompoundFile(segment);
    • 删除文档:applyDeletes();

    6.1、得到要写入的段名

    代码:

    SegmentInfo newSegment = null;

    final int numDocs = docWriter.getNumDocsInRAM();//文档总数

    String docStoreSegment = docWriter.getDocStoreSegment();//存储域和词向量所要要写入的段名,"_0"   

    int docStoreOffset = docWriter.getDocStoreOffset();//存储域和词向量要写入的段中的偏移量

    String segment = docWriter.getSegment();//段名,"_0"

    在Lucene的索引文件结构一章做过详细介绍,存储域和词向量可以和索引域存储在不同的段中。

    6.2、将缓存的内容写入段

    代码:

    flushedDocCount = docWriter.flush(flushDocStores);

    此过程又包含以下两个阶段;

    • 按照基本索引链关闭存储域和词向量信息
    • 按照基本索引链的结构将索引结果写入段

    6.2.1、按照基本索引链关闭存储域和词向量信息

    代码为:

    closeDocStore();

    flushState.numDocsInStore = 0;

    其主要是根据基本索引链结构,关闭存储域和词向量信息:

    • consumer(DocFieldProcessor).closeDocStore(flushState);
      • consumer(DocInverter).closeDocStore(state);
        • consumer(TermsHash).closeDocStore(state);
          • consumer(FreqProxTermsWriter).closeDocStore(state);
          • if (nextTermsHash != null) nextTermsHash.closeDocStore(state);
            • consumer(TermVectorsTermsWriter).closeDocStore(state);
        • endConsumer(NormsWriter).closeDocStore(state);
      • fieldsWriter(StoredFieldsWriter).closeDocStore(state);

    其中有实质意义的是以下两个closeDocStore:

    • 词向量的关闭:TermVectorsTermsWriter.closeDocStore(SegmentWriteState)

    void closeDocStore(final SegmentWriteState state) throws IOException {

                       if (tvx != null) {
                //为不保存词向量的文档在tvd文件中写入零。即便不保存词向量,在tvx, tvd中也保留一个位置
                fill(state.numDocsInStore - docWriter.getDocStoreOffset());
                //关闭tvx, tvf, tvd文件的写入流
                tvx.close();
                tvf.close();
                tvd.close();
                tvx = null;
                //记录写入的文件名,为以后生成cfs文件的时候,将这些写入的文件生成一个统一的cfs文件。
                state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);
                state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);
                state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);
                //从DocumentsWriter的成员变量openFiles中删除,未来可能被IndexFileDeleter删除
                docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);
                docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);
                docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);
                lastDocID = 0;
            }    
    }
    • 存储域的关闭:StoredFieldsWriter.closeDocStore(SegmentWriteState)

    public void closeDocStore(SegmentWriteState state) throws IOException {

        //关闭fdx, fdt写入流

        fieldsWriter.close();
        --> fieldsStream.close();
        --> indexStream.close();
        fieldsWriter = null;
        lastDocID = 0;

        //记录写入的文件名
        state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.FIELDS_EXTENSION);
        state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.FIELDS_INDEX_EXTENSION);
        state.docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.FIELDS_EXTENSION);
        state.docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.FIELDS_INDEX_EXTENSION);
    }


    6.2.2、按照基本索引链的结构将索引结果写入段

    代码为:

    consumer(DocFieldProcessor).flush(threads, flushState);

        //回收fieldHash,以便用于下一轮的索引,为提高效率,索引链中的对象是被复用的。

        Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> childThreadsAndFields = new HashMap<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>();
        for ( DocConsumerPerThread thread : threads) {
            DocFieldProcessorPerThread perThread = (DocFieldProcessorPerThread) thread;
            childThreadsAndFields.put(perThread.consumer, perThread.fields());
            perThread.trimFields(state);
        }

        //写入存储域

        --> fieldsWriter(StoredFieldsWriter).flush(state);

        //写入索引域

        --> consumer(DocInverter).flush(childThreadsAndFields, state);

        //写入域元数据信息,并记录写入的文件名,以便以后生成cfs文件

        --> final String fileName = state.segmentFileName(IndexFileNames.FIELD_INFOS_EXTENSION);

        --> fieldInfos.write(state.directory, fileName);

        --> state.flushedFiles.add(fileName);

    此过程也是按照基本索引链来的:

    • consumer(DocFieldProcessor).flush(…);
      • consumer(DocInverter).flush(…);
        • consumer(TermsHash).flush(…);
          • consumer(FreqProxTermsWriter).flush(…);
          • if (nextTermsHash != null) nextTermsHash.flush(…);
            • consumer(TermVectorsTermsWriter).flush(…);
        • endConsumer(NormsWriter).flush(…);
      • fieldsWriter(StoredFieldsWriter).flush(…);

    6.2.2.1、写入存储域

    代码为:

    StoredFieldsWriter.flush(SegmentWriteState state) {
        if (state.numDocsInStore > 0) {
          initFieldsWriter();
          fill(state.numDocsInStore - docWriter.getDocStoreOffset());
        }
        if (fieldsWriter != null)
          fieldsWriter.flush();
      }

    从代码中可以看出,是写入fdx, fdt两个文件,但是在上述的closeDocStore已经写入了,并且把state.numDocsInStore置零,fieldsWriter设为null,在这里其实什么也不做。

    6.2.2.2、写入索引域

    代码为:

    DocInverter.flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>, SegmentWriteState)

        //写入倒排表及词向量信息

        --> consumer(TermsHash).flush(childThreadsAndFields, state);

        //写入标准化因子

        --> endConsumer(NormsWriter).flush(endChildThreadsAndFields, state);

    6.2.2.2.1、写入倒排表及词向量信息

    代码为:

    TermsHash.flush(Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>>, SegmentWriteState)

        //写入倒排表信息

        --> consumer(FreqProxTermsWriter).flush(childThreadsAndFields, state);

       //回收RawPostingList

        --> shrinkFreePostings(threadsAndFields, state);

        //写入词向量信息

        --> if (nextTermsHash != null) nextTermsHash.flush(nextThreadsAndFields, state);

              --> consumer(TermVectorsTermsWriter).flush(childThreadsAndFields, state);

    6.2.2.2.1.1、写入倒排表信息

    代码为:

    FreqProxTermsWriter.flush(Map<TermsHashConsumerPerThread,
                                           Collection<TermsHashConsumerPerField>>, SegmentWriteState)

        (a) 所有域按名称排序,使得同名域能够一起处理

        Collections.sort(allFields);

        final int numAllFields = allFields.size();

        (b) 生成倒排表的写对象

        final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos);

        int start = 0;

        (c) 对于每一个域

        while(start < numAllFields) {

            (c-1) 找出所有的同名域

            final FieldInfo fieldInfo = allFields.get(start).fieldInfo;

            final String fieldName = fieldInfo.name;

            int end = start+1;

            while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName))

                end++;

            FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start];

            for(int i=start;i<end;i++) {

                fields[i-start] = allFields.get(i);

                fieldInfo.storePayloads |= fields[i-start].hasPayloads;

            }

            (c-2) 将同名域的倒排表添加到文件

            appendPostings(fields, consumer);

           (c-3) 释放空间

            for(int i=0;i<fields.length;i++) {

                TermsHashPerField perField = fields[i].termsHashPerField;

                int numPostings = perField.numPostings;

                perField.reset();

                perField.shrinkHash(numPostings);

                fields[i].reset();

            }

            start = end;

        }

        (d) 关闭倒排表的写对象

        consumer.finish();

    (b) 生成倒排表的写对象

    代码为:

    public FormatPostingsFieldsWriter(SegmentWriteState state, FieldInfos fieldInfos) throws IOException {
        dir = state.directory;
        segment = state.segmentName;
        totalNumDocs = state.numDocs;
        this.fieldInfos = fieldInfos;
        //用于写tii,tis
        termsOut = new TermInfosWriter(dir, segment, fieldInfos, state.termIndexInterval);
        //用于写freq, prox的跳表 
        skipListWriter = new DefaultSkipListWriter(termsOut.skipInterval, termsOut.maxSkipLevels, totalNumDocs, null, null);
        //记录写入的文件名,
        state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_EXTENSION));
        state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_INDEX_EXTENSION)); 
        //用以上两个写对象,按照一定的格式写入段
        termsWriter = new FormatPostingsTermsWriter(state, this);
    }

    对象结构如下:

    consumer    FormatPostingsFieldsWriter  (id=119)  //用于处理一个域
        dir    SimpleFSDirectory  (id=126)   //目标索引文件夹
        totalNumDocs    8   //文档总数
        fieldInfos    FieldInfos  (id=70)  //域元数据信息  
        segment    "_0"   //段名
        skipListWriter    DefaultSkipListWriter  (id=133)  //freq, prox中跳表的写对象  
        termsOut    TermInfosWriter  (id=125)  //tii, tis文件的写对象
        termsWriter    FormatPostingsTermsWriter  (id=135)  //用于添加词(Term)
            currentTerm    null   
            currentTermStart    0   
            fieldInfo    null   
            freqStart    0   
            proxStart    0   
            termBuffer    null   
            termsOut    TermInfosWriter  (id=125)   
            docsWriter    FormatPostingsDocsWriter  (id=139)  //用于写入此词的docid, freq信息
                df    0   
                fieldInfo    null   
                freqStart    0   
                lastDocID    0   
                omitTermFreqAndPositions    false   
                out    SimpleFSDirectory$SimpleFSIndexOutput  (id=144)   
                skipInterval    16   
                skipListWriter    DefaultSkipListWriter  (id=133)   
                storePayloads    false   
                termInfo    TermInfo  (id=151)   
                totalNumDocs    8    
                posWriter    FormatPostingsPositionsWriter  (id=146)  //用于写入此词在此文档中的位置信息  
                    lastPayloadLength    -1   
                    lastPosition    0   
                    omitTermFreqAndPositions    false   
                    out    SimpleFSDirectory$SimpleFSIndexOutput  (id=157)   
                    parent    FormatPostingsDocsWriter  (id=139)   
                    storePayloads    false   
    • FormatPostingsFieldsWriter.addField(FieldInfo field)用于添加索引域信息,其返回FormatPostingsTermsConsumer用于添加词信息
    • FormatPostingsTermsConsumer.addTerm(char[] text, int start)用于添加词信息,其返回FormatPostingsDocsConsumer用于添加freq信息
    • FormatPostingsDocsConsumer.addDoc(int docID, int termDocFreq)用于添加freq信息,其返回FormatPostingsPositionsConsumer用于添加prox信息
    • FormatPostingsPositionsConsumer.addPosition(int position, byte[] payload, int payloadOffset, int payloadLength)用于添加prox信息

    (c-2) 将同名域的倒排表添加到文件

    代码为:

    FreqProxTermsWriter.appendPostings(FreqProxTermsWriterPerField[], FormatPostingsFieldsConsumer) {

        int numFields = fields.length;

        final FreqProxFieldMergeState[] mergeStates = new FreqProxFieldMergeState[numFields];

        for(int i=0;i<numFields;i++) {

          FreqProxFieldMergeState fms = mergeStates[i] = new FreqProxFieldMergeState(fields[i]);

          boolean result = fms.nextTerm(); //对所有的域,取第一个词(Term)

        }

        (1) 添加此域,虽然有多个域,但是由于是同名域,只取第一个域的信息即可。返回的是用于添加此域中的词的对象。

        final FormatPostingsTermsConsumer termsConsumer = consumer.addField(fields[0].fieldInfo);

        FreqProxFieldMergeState[] termStates = new FreqProxFieldMergeState[numFields];

        final boolean currentFieldOmitTermFreqAndPositions = fields[0].fieldInfo.omitTermFreqAndPositions;

        (2) 此while循环是遍历每一个尚有未处理的词的域,依次按照词典顺序处理这些域所包含的词。当一个域中的所有的词都被处理过后,则numFields减一,并从mergeStates数组中移除此域。直到所有的域的所有的词都处理完毕,方才退出此循环。

        while(numFields > 0) {

           (2-1) 找出所有域中按字典顺序的下一个词。可能多个同名域中,都包含同一个term,因而要遍历所有的numFields,得到所有的域里的下一个词,numToMerge即为有多少个域包含此词。

          termStates[0] = mergeStates[0];

          int numToMerge = 1;

          for(int i=1;i<numFields;i++) {

            final char[] text = mergeStates[i].text;

            final int textOffset = mergeStates[i].textOffset;

            final int cmp = compareText(text, textOffset, termStates[0].text, termStates[0].textOffset);

            if (cmp < 0) {

              termStates[0] = mergeStates[i];

              numToMerge = 1;

            } else if (cmp == 0)

              termStates[numToMerge++] = mergeStates[i];

          }

          (2-2) 添加此词,返回FormatPostingsDocsConsumer用于添加文档号(doc ID)及词频信息(freq)

          final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(termStates[0].text, termStates[0].textOffset);

          (2-3) 由于共numToMerge个域都包含此词,每个词都有一个链表的文档号表示包含这些词的文档。此循环遍历所有的包含此词的域,依次按照从小到大的循序添加包含此词的文档号及词频信息。当一个域中对此词的所有文档号都处理过了,则numToMerge减一,并从termStates数组中移除此域。当所有包含此词的域的所有文档号都处理过了,则结束此循环。

          while(numToMerge > 0) {

            (2-3-1) 找出最小的文档号

            FreqProxFieldMergeState minState = termStates[0];

            for(int i=1;i<numToMerge;i++)

              if (termStates[i].docID < minState.docID)

                minState = termStates[i];

            final int termDocFreq = minState.termFreq;

            (2-3-2) 添加文档号及词频信息,并形成跳表,返回FormatPostingsPositionsConsumer用于添加位置(prox)信息

            final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(minState.docID, termDocFreq);

            //ByteSliceReader是用于读取bytepool中的prox信息的。

            final ByteSliceReader prox = minState.prox;

            if (!currentFieldOmitTermFreqAndPositions) {

              int position = 0;

              (2-3-3) 此循环对包含此词的文档,添加位置信息

              for(int j=0;j<termDocFreq;j++) {

                final int code = prox.readVInt();

                position += code >> 1;

                final int payloadLength;

                // 如果此位置有payload信息,则从bytepool中读出,否则设为零。

                if ((code & 1) != 0) {

                  payloadLength = prox.readVInt();

                  if (payloadBuffer == null || payloadBuffer.length < payloadLength)

                    payloadBuffer = new byte[payloadLength];

                  prox.readBytes(payloadBuffer, 0, payloadLength);

                } else

                  payloadLength = 0;

                  //添加位置(prox)信息

                  posConsumer.addPosition(position, payloadBuffer, 0, payloadLength);

              }

              posConsumer.finish();

            }

           (2-3-4) 判断退出条件,上次选中的域取得下一个文档号,如果没有,则说明此域包含此词的文档已经处理完毕,则从termStates中删除此域,并将numToMerge减一。然后此域取得下一个词,当循环到(2)的时候,表明此域已经开始处理下一个词。如果没有下一个词,说明此域中的所有的词都处理完毕,则从mergeStates中删除此域,并将numFields减一,当numFields为0的时候,循环(2)也就结束了。

            if (!minState.nextDoc()) {//获得下一个docid

              //如果此域包含此词的文档已经没有下一篇docid,则从数组termStates中移除,numToMerge减一。

              int upto = 0;

              for(int i=0;i<numToMerge;i++)

                if (termStates[i] != minState)

                  termStates[upto++] = termStates[i];

              numToMerge--;

              //此域则取下一个词(term),在循环(2)处来参与下一个词的合并

              if (!minState.nextTerm()) {

                //如果此域没有下一个词了,则此域从数组mergeStates中移除,numFields减一。

                upto = 0;

                for(int i=0;i<numFields;i++)

                  if (mergeStates[i] != minState)

                    mergeStates[upto++] = mergeStates[i];

                numFields--;

              }

            }

          }

          (2-4) 经过上面的过程,docid和freq信息虽已经写入段文件,而跳表信息并没有写到文件中,而是写入skip buffer里面了,此处真正写入文件。并且词典(tii, tis)也应该写入文件。

          docConsumer(FormatPostingsDocsWriter).finish();

        }

        termsConsumer.finish();

      }

    (2-3-4) 获得下一篇文档号代码如下:

    public boolean nextDoc() {//如何获取下一个docid

      if (freq.eof()) {//如果bytepool中的freq信息已经读完

        if (p.lastDocCode != -1) {//由上述缓存管理,PostingList里面还存着最后一篇文档的文档号及词频信息,则将最后一篇文档返回

          docID = p.lastDocID;

          if (!field.omitTermFreqAndPositions)

            termFreq = p.docFreq;

          p.lastDocCode = -1;

          return true;

        } else

          return false;//没有下一篇文档

      }

      final int code = freq.readVInt();//如果bytepool中的freq信息尚未读完

      if (field.omitTermFreqAndPositions)

        docID += code;

      else {

        //读出文档号及词频信息。

        docID += code >>> 1;

        if ((code & 1) != 0)

          termFreq = 1;

        else

          termFreq = freq.readVInt();

      }

      return true;

    }

    (2-3-2) 添加文档号及词频信息代码如下:

    FormatPostingsPositionsConsumer FormatPostingsDocsWriter.addDoc(int docID, int termDocFreq) {

        final int delta = docID - lastDocID;

        //当文档数量达到skipInterval倍数的时候,添加跳表项。

        if ((++df % skipInterval) == 0) {

          skipListWriter.setSkipData(lastDocID, storePayloads, posWriter.lastPayloadLength);

          skipListWriter.bufferSkip(df);

       }

       lastDocID = docID;

       if (omitTermFreqAndPositions)

         out.writeVInt(delta);

       else if (1 == termDocFreq)

         out.writeVInt((delta<<1) | 1);

       else {

         //写入文档号及词频信息。

         out.writeVInt(delta<<1);

         out.writeVInt(termDocFreq);

       }

       return posWriter;

    }

    (2-3-3) 添加位置信息:

    FormatPostingsPositionsWriter.addPosition(int position, byte[] payload, int payloadOffset, int payloadLength) {

        final int delta = position - lastPosition;

        lastPosition = position;

        if (storePayloads) {

            //保存位置及payload信息

            if (payloadLength != lastPayloadLength) {

                lastPayloadLength = payloadLength;

                out.writeVInt((delta<<1)|1);

                out.writeVInt(payloadLength);

            } else

                out.writeVInt(delta << 1);

                if (payloadLength > 0)

                    out.writeBytes(payload, payloadLength);

        } else

            out.writeVInt(delta);

    }

    (2-4) 将跳表和词典(tii, tis)写入文件

    FormatPostingsDocsWriter.finish() {

        //将跳表缓存写入文件

        long skipPointer = skipListWriter.writeSkip(out);

        if (df > 0) {

          //将词典(terminfo)写入tii,tis文件

          parent.termsOut(TermInfosWriter).add(fieldInfo.number, utf8.result, utf8.length, termInfo);

        }

      }

    将跳表缓存写入文件:

    DefaultSkipListWriter(MultiLevelSkipListWriter).writeSkip(IndexOutput)  {

        long skipPointer = output.getFilePointer();

        if (skipBuffer == null || skipBuffer.length == 0) return skipPointer;

        //正如我们在索引文件格式中分析的那样, 高层在前,低层在后,除最低层外,其他的层都有长度保存。

        for (int level = numberOfSkipLevels - 1; level > 0; level--) {

          long length = skipBuffer[level].getFilePointer();

          if (length > 0) {

            output.writeVLong(length);

            skipBuffer[level].writeTo(output);

          }

        }

        //写入最低层

        skipBuffer[0].writeTo(output);

        return skipPointer;

      }

    将词典(terminfo)写入tii,tis文件:

    • tii文件是tis文件的类似跳表的东西,是在tis文件中每隔indexInterval个词提取出一个词放在tii文件中,以便很快的查找到词。
    • 因而TermInfosWriter类型中有一个成员变量other也是TermInfosWriter类型的,还有一个成员变量isIndex来表示此对象是用来写tii文件的还是用来写tis文件的。
    • 如果一个TermInfosWriter对象的isIndex=false则,它是用来写tis文件的,它的other指向的是用来写tii文件的TermInfosWriter对象
    • 如果一个TermInfosWriter对象的isIndex=true则,它是用来写tii文件的,它的other指向的是用来写tis文件的TermInfosWriter对象

    TermInfosWriter.add (int fieldNumber, byte[] termBytes, int termBytesLength, TermInfo ti) {

        //如果词的总数是indexInterval的倍数,则应该写入tii文件

        if (!isIndex && size % indexInterval == 0)

          other.add(lastFieldNumber, lastTermBytes, lastTermBytesLength, lastTi);

        //将词写入tis文件

        writeTerm(fieldNumber, termBytes, termBytesLength);

        output.writeVInt(ti.docFreq);                       // write doc freq

        output.writeVLong(ti.freqPointer - lastTi.freqPointer); // write pointers

        output.writeVLong(ti.proxPointer - lastTi.proxPointer);

        if (ti.docFreq >= skipInterval) {

          output.writeVInt(ti.skipOffset);

        }

        if (isIndex) {

          output.writeVLong(other.output.getFilePointer() - lastIndexPointer);

          lastIndexPointer = other.output.getFilePointer(); // write pointer

        }

        lastFieldNumber = fieldNumber;

        lastTi.set(ti);

        size++;

      }

    6.2.2.2.1.2、写入词向量信息

    代码为:

    TermVectorsTermsWriter.flush (Map<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>>
                                                  threadsAndFields, final SegmentWriteState state) {

        if (tvx != null) {

          if (state.numDocsInStore > 0)

            fill(state.numDocsInStore - docWriter.getDocStoreOffset());

          tvx.flush();

          tvd.flush();

          tvf.flush();

        }

        for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry :
                                                                                                                                          threadsAndFields.entrySet()) {

          for (final TermsHashConsumerPerField field : entry.getValue() ) {

            TermVectorsTermsWriterPerField perField = (TermVectorsTermsWriterPerField) field;

            perField.termsHashPerField.reset();

            perField.shrinkHash();

          }

          TermVectorsTermsWriterPerThread perThread = (TermVectorsTermsWriterPerThread) entry.getKey();

          perThread.termsHashPerThread.reset(true);

        }

      }

    从代码中可以看出,是写入tvx, tvd, tvf三个文件,但是在上述的closeDocStore已经写入了,并且把tvx设为null,在这里其实什么也不做,仅仅是清空postingsHash,以便进行下一轮索引时重用此对象。

    6.2.2.2.2、写入标准化因子

    代码为:

    NormsWriter.flush(Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> threadsAndFields,

                               SegmentWriteState state) {

        final Map<FieldInfo,List<NormsWriterPerField>> byField = new HashMap<FieldInfo,List<NormsWriterPerField>>();

        for (final Map.Entry<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> entry :
                                                                                                                               threadsAndFields.entrySet()) {

         //遍历所有的域,将同名域对应的NormsWriterPerField放到同一个链表中。

          final Collection<InvertedDocEndConsumerPerField> fields = entry.getValue();

          final Iterator<InvertedDocEndConsumerPerField> fieldsIt = fields.iterator();

          while (fieldsIt.hasNext()) {

            final NormsWriterPerField perField = (NormsWriterPerField) fieldsIt.next();

            List<NormsWriterPerField> l = byField.get(perField.fieldInfo);

            if (l == null) {

                l = new ArrayList<NormsWriterPerField>();

                byField.put(perField.fieldInfo, l);

            }

            l.add(perField);

        }

        //记录写入的文件名,方便以后生成cfs文件。

        final String normsFileName = state.segmentName + "." + IndexFileNames.NORMS_EXTENSION;

        state.flushedFiles.add(normsFileName);

        IndexOutput normsOut = state.directory.createOutput(normsFileName);

        try {

          //写入nrm文件头

          normsOut.writeBytes(SegmentMerger.NORMS_HEADER, 0, SegmentMerger.NORMS_HEADER.length);

          final int numField = fieldInfos.size();

          int normCount = 0;

          //对每一个域进行处理

          for(int fieldNumber=0;fieldNumber<numField;fieldNumber++) {

            final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber);

            //得到同名域的链表

            List<NormsWriterPerField> toMerge = byField.get(fieldInfo);

            int upto = 0;

            if (toMerge != null) {

              final int numFields = toMerge.size();

              normCount++;

              final NormsWriterPerField[] fields = new NormsWriterPerField[numFields];

              int[] uptos = new int[numFields];

              for(int j=0;j<numFields;j++)

                fields[j] = toMerge.get(j);

              int numLeft = numFields;

              //处理同名的多个域

              while(numLeft > 0) {

                //得到所有的同名域中最小的文档号

                int minLoc = 0;

                int minDocID = fields[0].docIDs[uptos[0]];

                for(int j=1;j<numLeft;j++) {

                  final int docID = fields[j].docIDs[uptos[j]];

                  if (docID < minDocID) {

                    minDocID = docID;

                    minLoc = j;

                  }

                }

                // 在nrm文件中,每一个文件都有一个位置,没有设定的,放入默认值

                for(;upto<minDocID;upto++)

                  normsOut.writeByte(defaultNorm);

                //写入当前的nrm值

                normsOut.writeByte(fields[minLoc].norms[uptos[minLoc]]);

                (uptos[minLoc])++;

                upto++;

                //如果当前域的文档已经处理完毕,则numLeft减一,归零时推出循环。

                if (uptos[minLoc] == fields[minLoc].upto) {

                  fields[minLoc].reset();

                  if (minLoc != numLeft-1) {

                    fields[minLoc] = fields[numLeft-1];

                    uptos[minLoc] = uptos[numLeft-1];

                  }

                  numLeft--;

                }

              }

              // 对所有的未设定nrm值的文档写入默认值。

              for(;upto<state.numDocs;upto++)

                normsOut.writeByte(defaultNorm);

            } else if (fieldInfo.isIndexed && !fieldInfo.omitNorms) {

              normCount++;

              // Fill entire field with default norm:

              for(;upto<state.numDocs;upto++)

                normsOut.writeByte(defaultNorm);

            }

          }

        } finally {

          normsOut.close();

        }

      }

    6.2.2.3、写入域元数据

    代码为:

    FieldInfos.write(IndexOutput) {

        output.writeVInt(CURRENT_FORMAT);

        output.writeVInt(size());

        for (int i = 0; i < size(); i++) {

          FieldInfo fi = fieldInfo(i);

          byte bits = 0x0;

          if (fi.isIndexed) bits |= IS_INDEXED;

          if (fi.storeTermVector) bits |= STORE_TERMVECTOR;

          if (fi.storePositionWithTermVector) bits |= STORE_POSITIONS_WITH_TERMVECTOR;

          if (fi.storeOffsetWithTermVector) bits |= STORE_OFFSET_WITH_TERMVECTOR;

          if (fi.omitNorms) bits |= OMIT_NORMS;

          if (fi.storePayloads) bits |= STORE_PAYLOADS;

          if (fi.omitTermFreqAndPositions) bits |= OMIT_TERM_FREQ_AND_POSITIONS;

          output.writeString(fi.name);

          output.writeByte(bits);

        }

    }

    此处基本就是按照fnm文件的格式写入的。

    6.3、生成新的段信息对象

    代码:

    newSegment = new SegmentInfo(segment, flushedDocCount, directory, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile, docWriter.hasProx());

    segmentInfos.add(newSegment);

    6.4、准备删除文档

    代码:

    docWriter.pushDeletes();

        --> deletesFlushed.update(deletesInRAM);

    此处将deletesInRAM全部加到deletesFlushed中,并把deletesInRAM清空。原因上面已经阐明。

    6.5、生成cfs段

    代码:

    docWriter.createCompoundFile(segment);

    newSegment.setUseCompoundFile(true);

    代码为:

    DocumentsWriter.createCompoundFile(String segment) {

        CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION);

        //将上述中记录的文档名全部加入cfs段的写对象。

        for (final String flushedFile : flushState.flushedFiles)

          cfsWriter.addFile(flushedFile);

        cfsWriter.close();

      }

    6.6、删除文档

    代码:

    applyDeletes();

    代码为:

    boolean applyDeletes(SegmentInfos infos) {

      if (!hasDeletes())

        return false;

      final int infosEnd = infos.size();

      int docStart = 0;

      boolean any = false;

      for (int i = 0; i < infosEnd; i++) {

        assert infos.info(i).dir == directory;

        SegmentReader reader = writer.readerPool.get(infos.info(i), false);

        try {

          any |= applyDeletes(reader, docStart);

          docStart += reader.maxDoc();

        } finally {

          writer.readerPool.release(reader);

        }

      }

      deletesFlushed.clear();

      return any;

    }

    • Lucene删除文档可以用reader,也可以用writer,但是归根结底还是用reader来删除的。
    • reader的删除有以下三种方式:
      • 按照词删除,删除所有包含此词的文档。
      • 按照文档号删除。
      • 按照查询对象删除,删除所有满足此查询的文档。
    • 但是这三种方式归根结底还是按照文档号删除,也就是写.del文件的过程。

    private final synchronized boolean applyDeletes(IndexReader reader, int docIDStart)

      throws CorruptIndexException, IOException {

      final int docEnd = docIDStart + reader.maxDoc();

      boolean any = false;

      //按照词删除,删除所有包含此词的文档。

      TermDocs docs = reader.termDocs();

      try {

        for (Entry<Term, BufferedDeletes.Num> entry: deletesFlushed.terms.entrySet()) {

          Term term = entry.getKey();

          docs.seek(term);

          int limit = entry.getValue().getNum();

          while (docs.next()) {

            int docID = docs.doc();

            if (docIDStart+docID >= limit)

              break;

            reader.deleteDocument(docID);

            any = true;

          }

        }

      } finally {

        docs.close();

      }

      //按照文档号删除。

      for (Integer docIdInt : deletesFlushed.docIDs) {

        int docID = docIdInt.intValue();

        if (docID >= docIDStart && docID < docEnd) {

          reader.deleteDocument(docID-docIDStart);

          any = true;

        }

      }

      //按照查询对象删除,删除所有满足此查询的文档。

      IndexSearcher searcher = new IndexSearcher(reader);

      for (Entry<Query, Integer> entry : deletesFlushed.queries.entrySet()) {

        Query query = entry.getKey();

        int limit = entry.getValue().intValue();

        Weight weight = query.weight(searcher);

        Scorer scorer = weight.scorer(reader, true, false);

        if (scorer != null) {

          while(true)  {

            int doc = scorer.nextDoc();

            if (((long) docIDStart) + doc >= limit)

              break;

            reader.deleteDocument(doc);

            any = true;

          }

        }

      }

      searcher.close();

      return any;

    }

  • 相关阅读:
    美国在线CEO:雅虎被Verizon收购或导致裁员
    美国在线CEO:雅虎被Verizon收购或导致裁员
    在CentOS 7中安装配置JDK8
    在CentOS 7中安装配置JDK8
    在CentOS 7中安装配置JDK8
    在CentOS 7中安装配置JDK8
    库克再访华受到深圳书记市长接待 要建研发中心
    库克再访华受到深圳书记市长接待 要建研发中心
    他变行商为坐商,打造天津港屈指可数的民营运输企业
    放弃市场经理位置,小伙搞医药策划实现创业梦想
  • 原文地址:https://www.cnblogs.com/forfuture1978/p/1661442.html
Copyright © 2011-2022 走看看