zoukankan      html  css  js  c++  java
  • es lucene写入流程,segment产生机制源码分析

    本文主要分析es lucene写入流程,lucene segment的产生,flush, commit与es的refresh,flush。

    1 segment的产生

    当索引一个文档时,如果存在空闲的segment(未被其他线程锁定),则取出空闲segment list中的最后一个segment(LIFO),并锁定,将文档索引至该segment,

    找达到flush条件的segment,然后解锁,归还至空闲segment list,如果有达到flush条件的segment,flush该segment(同步执行)。

    如果不存在,则创建新的segment,重复上述步骤。

    总结1:如果并行的执行向一个索引,索引文档,则需要不同的segment。

    相关代码:

    //索引一个文档。
    IndexWriter.updateDocument
    //索引一个文档。
    DocumentsWriter.updateDocument
    //一个线程索引时锁定一个ThreadState对象,索引后归还至free list。
    ThreadState
    //ThreadState的属性,一个DocumentsWriterPerThread对应一个segment,flush后,该ThreadState的dwpt为null,
    //下次使用该ThreadState,创建新的dwpt,新的segment。
    DocumentsWriterPerThread

    2 flush条件

    索引一个文档后,找出是否有达到flush条件的segment。

    1:如果maxBufferedDocs(默认-1,es未设置)不等于-1,且当前segment在内存中的doc数量大于等于maxBufferedDocs,则标记该segment的flushPending。

    2:如果不满足1,且ramBufferSizeMB(默认16.0,es设置为es.index.memory.max_index_buffer_size)不等于-1,当内存中当前IndexWriter所有segment之和(包括deleted docs)大于ramBufferSizeMB时,找出内存中最大的且未标记flushPending的segment,标记该segment的flushPending。

    3:如果当前1,2之后,当前segment还未标记flushPending,则当前segment大于perThreadHardLimitMB(默认1945,es未设置),标记该segment的flushPending。

    123之后,如果当前segment被标记,则flush当前segment。否则从flushQueue中poll一个segment,如果flushQueue(调用flush时,将所有segment加入queue)为空,则遍历segment取第一个标记flushPending的segment进行flush。

    相关代码:

    //查找符合flush的segment。
    DocumentsWriterFlushControl.doAfterDocument
    //flush当前segment前,reset当前dwpt,下次使用当前ThreadState需要新的dwpt,新的segment。
    DocumentsWriterFlushControl.internalTryCheckOutForFlush
    //flush当前segment,或者其他segment。
    DocumentsWriter.postUpdate

    注意:除了达到flush条件的自动flush,还可以通过调用api flush,如:

    1:es refresh

    2:es flush

    3:es syncedFlush

    3 flush

    flush将内存中的segment写到文件(在调用线程中同步执行),但不执行fileChannel.force(nio,bio则fileOutputStream.flush),一部分数据可能在buffer中。

    相关代码:

    //flush一个segment。
    DocumentsWriter.doFlush
    DocumentsWriterPerThread.flush
    DefaultIndexingChain.flush
    //写nvd, nvm文件。
    writeNorms
    //写dvd, dvm文件。
    writeDocValues
    //针对numberic, 写dii, dim文件。
    writePoints
    //写fdt, fdx文件(该文件在首次indexing时创建,flush时写入值)。
    storedFieldsConsumer.flush
    //写doc, pos, tim, tip文件。
    termsHash.flush
    //写fnm文件。
    docWriter.codec.fieldInfosFormat().write
    //写cfs, cfe, si, liv(如果有删除)文件。
    DocumentsWriterPerThread.sealFlushedSegment
    //删除cfs, cfe, si, liv(如果有删除)之外的文件。
    IndexWriter.doAfterFlush

    4 commit

    commit执行fileChannel.force,将buffer中的数据写到磁盘。具体步骤为:

    1:flush all segments 将内存中所有的segments写到文件。

    2:依次sync pending_segments_n,segment files(fileChannel.force)将这写文件同步到磁盘。

    3:将pending_segments_n重命名为segments_n,删除旧的segments_n-1。

    4:如果步骤1 flush了segment,执行maybeMerge,如果达到merge条件,将会merge。

    相关代码:

    //commit。
    IndexWriter.commit
    IndexWriter.commitInternal
    IndexWriter.prepareCommitInternal
    //flush segments。
    DocumentsWriter.flushAllThreads
    //sync file。
    IndexWriter.startCommit
    Directory.sync
    IOUtils.fsync
    FileChannel.force
    FileChannelImpl.force
    //更新commit信息segments_n,删除旧的segments_n-1。
    IndexWriter.finishCommit
    //如果达到merge条件,将会merge。
    IndexWriter.maybeMerge

    5 maybeMerge

    flush或者commit后,如果flush了segment,执行maybeMerge,如果达到merge条件,将执行merge(异步执行)。具体步骤为:

    1:将segments按size降序排列。

    2:计算total segments size 和 minimum segment size。

    3:total segments size过滤掉tooBigSegment(大于max_merged_segment/2.0)的segment,并记录tooBigCount;minSegmentBytes如果小于floor_segment(默认2mb),取2mb。

    4:计算allowedSegCountInt,当segments(不包含tooBigSegment)数量大于此数,将触发merge。

    5:从大到小(之前的降序排列),以每个segment为起点,贪心找出不大于maxMergeAtOnce个, 且size总和不大于maxMergedSegmentBytes个segments作为candidate。

    使用mergeScore(totalAfterMergeBytes,totAfterMergeBytes/totBeforeMergeBytes)最低的candidate进行merge。

    相关代码:

    //maybeMerge。
    IndexWriter.maybeMerge
    IndexWriter.updatePendingMerges
    //查找可merge的segments。
    TieredMergePolicy.findMerges
    //执行merge。
    ConcurrentMergeScheduler.merge
    //控制merge线程数量
    ConcurrentMergeScheduler.maybeStall

    //用来异步执行merge的线程。

    MergeThread

    6 es refresh

    主要执行lucene的flushAllThreads和maybeMerge。refresh的两个条件:

    1:达到refresh_interval设置的时间间隔。

    2:节点所有shard的segments占用内存(调用lucene api获取)之和达到indices.memory.index_buffer_size,找出占用最大的shard执行refresh。

    相关代码:

     //refresh_interval refresh。
     IndexService.AsyncRefreshTask
     //indices.memory.index_buffer_size refresh。
     IndexingMemoryController.runUnlocked
     IndexingMemoryController.writeIndexingBufferAsync

    //es refresh。
    InternalEngine.refresh
    //lucene refresh。
    ReferenceManager.maybeRefreshBlocking
    DirectoryReader.openIfChanged
    StandardDirectoryReader.doOpenIfChanged
    IndexWriter.getReader
    //flush segments。
    DocumentsWriter.flushAllThreads
    //如果flush了segment,则执行maybeMerge。
    IndexWriter.maybeMerge

    7 es flush

    主要执行步骤为:

    1:prepareCommit translog:

    1.1 备份 translog.ckp到translog-1.ckp。

    1.2 fsync translog-1.ckp以及translog 文件夹。

    1.3 创建新的translog数据文件translog-n.tlog,更新translog.ckp(写入checkPoint)。

    2:commit indexWriter(见4 commit)。

    3:refresh(见6 es refresh)。

    4:commit translog:删除备份的translog-1.ckp以及旧的translog数据文件translog-n-1.tlog。

    相关代码:

    //es flush。
    InternalEngine.flush
    //prepareCommit translog。
    Translog.prepareCommit
    //es commit index writer。
    InternalEngine.commitIndexWriter
    //lucene commit。
    IndexWriter.commit
    //es refresh。
    InternalEngine.refresh
    //commit translog。
    Translog.commit

    总结2:lucene的flush是指将内存中的segment,写到磁盘但不执行fileChannel.force,一部分数据会在buffer中;commit会调用force,将buffer中的数据写到磁盘。

    es的refresh调用lucene的flush;flush调用lucene的commit。

    参考:

    elasticsearch5.6.12,lucene6.6.1 源码

    https://www.outcoldman.com/en/archive/2017/07/13/elasticsearch-explaining-merge-settings/

    http://blog.mikemccandless.com/2011/02/visualizing-lucenes-segment-merges.html

  • 相关阅读:
    web测试方法总结
    APP测试点总结
    函数初识
    字符编码及文件操作
    简单购物车程序(Python)
    基本数据类型(列表,元祖,字典,集合)
    python基础
    基本数据类型(数字和字符串)
    Python入门
    操作系统
  • 原文地址:https://www.cnblogs.com/vsop/p/10162326.html
Copyright © 2011-2022 走看看