zoukankan      html  css  js  c++  java
  • lucene中的nrt

    NRT原理

    When you ask for the IndexReader from the IndexWriter, the IndexWriter will be flushed (docs accumulated in RAM will be written to disk) but not committed (fsync files, write new segments file, etc).The returned IndexReader will search over previously committed segments, as well as the new, flushed but not committed segment. Because flushing will likely be processor rather than IO bound, this should be a process that can be attacked with more processor power if found to be too slow.

    Also, deletes are carried in RAM, rather than flushed to disk, which may help in eeking a bit more speed. The result is that you can add and remove documents from a Lucene index in ‘near’ real time by continuously asking for a new Reader from the IndexWriter every second or couple seconds. I haven’t seen a non synthetic test yet, but it looks like its been tested at around 50 documents updates per second without heavy slowdown (eg the results are visible every second).

    The patch takes advantage of LUCENE-1483, which keys FieldCaches and Filters at the individual segment level rather than at the index level – this allows you to only reload caches per segment rather then per index – essential for real-time search with filter/cache use.

    从中我们可以知道:

    1.indexWriter就算不提交,但通过indexWriter获取的indexReader对新加文档可见;

    API:

    public static IndexReader open(IndexWriter writer,
                                   boolean applyAllDeletes)
                            throws CorruptIndexException,
                                   IOException
    Open a near real time IndexReader from the IndexWriter
    Parameters:
    writer - The IndexWriter to open from
    applyAllDeletes - If true, all buffered deletes will be applied (made visible) in the returned reader. If false, the deletes are not applied but remain buffered (in IndexWriter) so that they will be applied in the future. Applying deletes can be costly, so if your app can tolerate deleted documents being returned you might gain some performance by passing false.
    Returns:
    The new IndexReader

    2.通过indexWriter获取indexReader会是IndexWriter执行flush操作;

    源码:

    IndexReader getReader(int termInfosIndexDivisor, boolean applyAllDeletes) throws IOException {
        ensureOpen();
        
        final long tStart = System.currentTimeMillis();
    
        if (infoStream != null) {
          message("flush at getReader");
        }
    
        // Do this up front before flushing so that the readers
        // obtained during this flush are pooled, the first time
        // this method is called:
        poolReaders = true;
    
        // Prevent segmentInfos from changing while opening the
        // reader; in theory we could do similar retry logic,
        // just like we do when loading segments_N
        IndexReader r;
        synchronized(this) {
          flush(false, applyAllDeletes);
          r = new ReadOnlyDirectoryReader(this, segmentInfos, termInfosIndexDivisor, applyAllDeletes);
          if (infoStream != null) {
            message("return reader version=" + r.getVersion() + " reader=" + r);
          }
        }
    
        maybeMerge();
    
        if (infoStream != null) {
          message("getReader took " + (System.currentTimeMillis() - tStart) + " msec");
        }
        return r;
      }
    

    3.虽然indexReader对新加文档可见,但是此时新加的文档并没有commit到磁盘,因此如果发生意外导致程序非正常结束,那么未commit的数据将会丢失。

    以下是一段实验性代码,主要目的是粗略查看NRT的效率问题。

    package com.fox.nrt;
    
    import java.io.File;
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.lucene.analysis.Analyzer;
    import org.apache.lucene.analysis.standard.StandardAnalyzer;
    import org.apache.lucene.document.Document;
    import org.apache.lucene.document.Field;
    import org.apache.lucene.document.Field.Index;
    import org.apache.lucene.document.Field.Store;
    import org.apache.lucene.index.CorruptIndexException;
    import org.apache.lucene.index.IndexReader;
    import org.apache.lucene.index.IndexWriter;
    import org.apache.lucene.index.IndexWriterConfig;
    import org.apache.lucene.index.Term;
    import org.apache.lucene.search.IndexSearcher;
    import org.apache.lucene.search.Query;
    import org.apache.lucene.search.TermQuery;
    import org.apache.lucene.search.TopDocs;
    import org.apache.lucene.store.Directory;
    import org.apache.lucene.store.FSDirectory;
    import org.apache.lucene.util.Version;
    
    /**
     * @author huangfox
     * @data 2012-8-21
     * @email huangfox009@126.com
     * @desc 
     */
    public class OpenIfChangedTest {
    
    	IndexWriter w;
    
    	public OpenIfChangedTest() {
    		Directory d;
    		try {
    			d = FSDirectory.open(new File("d:/test"));
    			Analyzer analyzer = new StandardAnalyzer(Version.LUCENE_36);
    			IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_36,
    					analyzer);
    			w = new IndexWriter(d, conf);
    			r = IndexReader.open(w, true);
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	IndexReader r;
    
    	public void commit() {
    		try {
    			w.commit();
    			w.forceMerge(2);
    		} catch (CorruptIndexException e) {
    			e.printStackTrace();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	public void search() {
    		try {
    			long bt = System.currentTimeMillis();
    			IndexReader reader = IndexReader.openIfChanged(r);
    			if (reader == null)
    				reader = this.r;
    			IndexSearcher searcher = new IndexSearcher(reader);
    			Query query = new TermQuery(new Term("f", "a"));
    			TopDocs topdocs = searcher.search(query, 10);
    			// ScoreDoc[] docs = topdocs.scoreDocs;
    			// for (ScoreDoc doc : docs) {
    			// System.out.println(reader.document(doc.doc));
    			// }
    			long et = System.currentTimeMillis();
    			System.out.println(topdocs.totalHits + ":" + (et - bt) + "ms");
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	public void addDoc() {
    		Document doc = new Document();
    		doc.add(new Field("f", "a", Store.YES, Index.NOT_ANALYZED));
    		doc.add(new Field("f1", "a", Store.YES, Index.NOT_ANALYZED));
    		doc.add(new Field("f2", "a", Store.YES, Index.NOT_ANALYZED));
    		try {
    			w.addDocument(doc);
    		} catch (CorruptIndexException e) {
    			e.printStackTrace();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	/**
    	 * @param args
    	 */
    	public static void main(String[] args) {
    		OpenIfChangedTest oic = new OpenIfChangedTest();
    		for (int i = 0; i < 1; i++) {
    			Thread wt = new Thread(new WThread(oic));
    			wt.start();
    		}
    		for (int i = 0; i < 10; i++) {
    			Thread rt = new Thread(new RThread(oic));
    			rt.start();
    		}
    		//
    		Thread ct = new Thread(new CommitThread(oic));
    		ct.setDaemon(true);
    		ct.start();
    
    	}
    
    }
    /**
     * 
     * @author huangfox
     * @data 2012-8-21
     * @email huangfox009@126.com
     * @desc 添加文档的线程
     */
    class WThread implements Runnable {
    
    	OpenIfChangedTest oic;
    
    	public WThread(OpenIfChangedTest oic) {
    		super();
    		this.oic = oic;
    	}
    
    	@Override
    	public void run() {
    		while (true) {
    			oic.addDoc();
    			try {
    				TimeUnit.MILLISECONDS.sleep(100);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    }
    /**
     * 
     * @author huangfox
     * @data 2012-8-21
     * @email huangfox009@126.com
     * @desc 检索线程
     */
    class RThread implements Runnable {
    	OpenIfChangedTest oic;
    
    	public RThread(OpenIfChangedTest oic) {
    		super();
    		this.oic = oic;
    	}
    
    	@Override
    	public void run() {
    		while (true){
    			oic.search();
    		try {
    				TimeUnit.MILLISECONDS.sleep(10);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    }
    
    /**
     * 
     * @author huangfox
     * @data 2012-8-21
     * @email huangfox009@126.com
     * @desc 提交线程
     */
    class CommitThread implements Runnable {
    	OpenIfChangedTest oic;
    
    	public CommitThread(OpenIfChangedTest oic) {
    		super();
    		this.oic = oic;
    	}
    
    	@Override
    	public void run() {
    		while (true) {
    			oic.commit();
    			System.out.println("commit");
    			try {
    				TimeUnit.SECONDS.sleep(3);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    }
    

    通过调整添加文档线程、检索文档线程的数量,以及添加文档的频率、检索文档的频率,可以模拟读写操作的比例;

    上面的代码中,当写操作比较频繁时,写操作会受到严重的影响。这个问题我们先留在这里!

    下面我们来看看lucene中NRT的“标准”实现。代码如下:

    package com.fox.nrt;
    
    import java.io.File;
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.lucene.analysis.Analyzer;
    import org.apache.lucene.analysis.standard.StandardAnalyzer;
    import org.apache.lucene.document.Document;
    import org.apache.lucene.document.Field;
    import org.apache.lucene.document.Field.Index;
    import org.apache.lucene.document.Field.Store;
    import org.apache.lucene.index.CorruptIndexException;
    import org.apache.lucene.index.IndexReader;
    import org.apache.lucene.index.IndexWriter;
    import org.apache.lucene.index.IndexWriterConfig;
    import org.apache.lucene.index.Term;
    import org.apache.lucene.search.IndexSearcher;
    import org.apache.lucene.search.NRTManager;
    import org.apache.lucene.search.NRTManagerReopenThread;
    import org.apache.lucene.search.Query;
    import org.apache.lucene.search.SearcherFactory;
    import org.apache.lucene.search.TermQuery;
    import org.apache.lucene.search.TopDocs;
    import org.apache.lucene.store.Directory;
    import org.apache.lucene.store.FSDirectory;
    import org.apache.lucene.util.Version;
    
    /**
     * @author huangfox
     * @data 2012-8-21
     * @email huangfox009@126.com
     * @desc
     */
    public class NRTTest {
    
    	IndexWriter w;
    	NRTManager nrtM;
    	NRTManager.TrackingIndexWriter tw;
    	IndexReader r;
    	NRTManagerReopenThread nmrT = null;
    
    	public NRTTest() {
    		Directory d;
    		try {
    			d = FSDirectory.open(new File("d:/test1"));
    			Analyzer analyzer = new StandardAnalyzer(Version.LUCENE_36);
    			IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_36,
    					analyzer);
    			w = new IndexWriter(d, conf);
    			SearcherFactory searcherFactory = new SearcherFactory();
    			tw = new NRTManager.TrackingIndexWriter(w);
    			nrtM = new NRTManager(tw, searcherFactory, true);
    			r = IndexReader.open(w, true);
    			//
    			nmrT = new NRTManagerReopenThread(nrtM, 0.50, 0.05);
    			nmrT.setName("nrt reopen thread");
    			nmrT.setDaemon(true);
    			nmrT.start();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	public void commit() {
    		try {
    			w.commit();
    			// w.forceMerge(2);
    		} catch (CorruptIndexException e) {
    			e.printStackTrace();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	public void search() {
    		IndexSearcher searcher = null;
    		try {
    			long bt = System.currentTimeMillis();
    			searcher = nrtM.acquire();
    			Query query = new TermQuery(new Term("f", "a"));
    			TopDocs topdocs = searcher.search(query, 10);
    			// ScoreDoc[] docs = topdocs.scoreDocs;
    			// for (ScoreDoc doc : docs) {
    			// System.out.println(reader.document(doc.doc));
    			// }
    			long et = System.currentTimeMillis();
    			System.out.println(topdocs.totalHits + ":" + (et - bt) + "ms");
    		} catch (IOException e) {
    			e.printStackTrace();
    		} finally {
    			try {
    				nrtM.release(searcher);
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    	public void addDoc() {
    		Document doc = new Document();
    		doc.add(new Field("f", "a", Store.YES, Index.NOT_ANALYZED));
    		doc.add(new Field("f1", "a", Store.YES, Index.NOT_ANALYZED));
    		doc.add(new Field("f2", "a", Store.YES, Index.NOT_ANALYZED));
    		try {
    			tw.addDocument(doc);
    		} catch (CorruptIndexException e) {
    			e.printStackTrace();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	/**
    	 * @param args
    	 */
    	public static void main(String[] args) {
    		NRTTest nrt = new NRTTest();
    		for (int i = 0; i < 1; i++) {
    			Thread wt = new Thread(new NRTWThread(nrt));
    			wt.start();
    		}
    		for (int i = 0; i < 10; i++) {
    			Thread rt = new Thread(new NRTRThread(nrt));
    			rt.start();
    		}
    		//
    		Thread ct = new Thread(new NRTCommitThread(nrt));
    		ct.setDaemon(true);
    		ct.start();
    	}
    
    }
    
    /**
     * 
     * @author huangfox
     * @data 2012-8-21
     * @email huangfox009@126.com
     * @desc 添加文档的线程
     */
    class NRTWThread implements Runnable {
    
    	NRTTest nrt;
    
    	public NRTWThread(NRTTest nrt) {
    		super();
    		this.nrt = nrt;
    	}
    
    	@Override
    	public void run() {
    		while (true) {
    			nrt.addDoc();
    			try {
    				TimeUnit.MILLISECONDS.sleep(100);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    }
    
    /**
     * 
     * @author huangfox
     * @data 2012-8-21
     * @email huangfox009@126.com
     * @desc 检索线程
     */
    class NRTRThread implements Runnable {
    	NRTTest nrt;
    
    	public NRTRThread(NRTTest nrt) {
    		super();
    		this.nrt = nrt;
    	}
    
    	@Override
    	public void run() {
    		while (true) {
    			nrt.search();
    			try {
    				TimeUnit.MILLISECONDS.sleep(10);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    }
    
    /**
     * 
     * @author huangfox
     * @data 2012-8-21
     * @email huangfox009@126.com
     * @desc 提交线程
     */
    class NRTCommitThread implements Runnable {
    	NRTTest nrt;
    
    	public NRTCommitThread(NRTTest nrt) {
    		super();
    		this.nrt = nrt;
    	}
    
    	@Override
    	public void run() {
    		while (true) {
    			nrt.commit();
    			System.out.println("commit");
    			try {
    				TimeUnit.SECONDS.sleep(3);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    }
    

    然而同样的读写压力,这段代码就不会出现检索效率问题。

    我们先来分析下这段代码中的重点:

    1.NRTManager

    NRTManager继承了ReferenceManager,以下内容摘自api。

    Utility class to manage sharing near-real-time searchers across multiple searching thread. The difference vs SearcherManager is that this class enables individual requests to wait until specific indexing changes are visible.

    You must create an IndexWriter, then create a NRTManager.TrackingIndexWriter from it, and pass that to the NRTManager. You may want to create two NRTManagers, once that always applies deletes on refresh and one that does not. In this case you should use a single NRTManager.TrackingIndexWriter instance for both.

    Then, use ReferenceManager.acquire() to obtain the IndexSearcher, and ReferenceManager.release(G) (ideally, from within a finally clause) to release it.

    NOTE: to use this class, you must call ReferenceManager.maybeRefresh() periodically. The NRTManagerReopenThread is a simple class to do this on a periodic basis, and reopens more quickly if a request is waiting. If you implement your own reopener, be sure to calladdWaitingListener(org.apache.lucene.search.NRTManager.WaitingListener) so your reopener is notified when a caller is waiting for a specific generation searcher.

    2.NRTManagerReopenThread

    Utility class that runs a reopen thread to periodically reopen the NRT searchers in the provided NRTManager.

    通过NRTManager周期性的重新打开indexSearcher。

    NRTManagerReopenThread主要是周期调用

    manager.maybeRefresh();
    

      

    3.NRTManager的acquire和release

    Obtain the current reference. You must match every call to acquire with one call to release(G); it's best to do so in a finally clause, and set the reference to null to prevent accidental usage after it has been released.

    You must call this, periodically, if you want that acquire() will return refreshed instances.

    Threads: it's fine for more than one thread to call this at once. Only the first thread will attempt the refresh; subsequent threads will see that another thread is already handling refresh and will return immediately. Note that this means if another thread is already refreshing then subsequent threads will return right away without waiting for the refresh to complete.

    If this method returns true it means the calling thread either refreshed or that there were no changes to refresh. If it returns false it means another thread is currently refreshing.

    4.nrt通过什么判断是否需要“刷新”?

    infos.version == segmentInfos.version && !docWriter.anyChanges() && !bufferedDeletesStream.any()
    

      

    简单测试:

    条件:

    5条写线程,每条线程写入一个doc后休息100毫秒;

    50条读线程,每条线程检索一次后休息10毫秒;

    一条后台提交线程,3秒提交一次。

    测试结果如下:

    25565:0ms
    25565:1ms
    25565:3ms
    25565:1ms
    25565:20ms
    25565:0ms
    25565:0ms
    25565:22ms
    25565:2ms
    25565:82ms
    25565:26ms
    25565:50ms
    25565:0ms
    25565:29ms
    25565:1ms
    25565:31ms
    25565:37ms
    25565:34ms
    25565:0ms
    25565:0ms
    25565:2ms
    25565:41ms
    25565:42ms
    25565:0ms
    25565:56ms
    25565:44ms
    

    冒号前面是数据总条数,后面是检索时间。

    我们加大文档数量再测试一次:

    170378:5ms
    170378:184ms
    170378:8ms
    170378:8ms
    170378:84ms
    170378:6ms
    170378:5ms
    170378:192ms
    170378:6ms
    170378:190ms
    170378:189ms
    170378:5ms
    170378:140ms
    170378:179ms
    170378:6ms
    170378:7ms
    170378:152ms
    170378:157ms
    170378:190ms
    170378:176ms
    170378:159ms
    170378:183ms
    170378:12ms
    170378:18ms
    170378:5ms
    170378:6ms
    

    看来效率还是有些问题,不过我是在pc机(32位)上简单测试。

    但是不管机器再怎么好,数量上去了,并发大了,单机还是很难搞定的。因此分布式孕育而生!

  • 相关阅读:
    javaWeb总结——session
    javaWeb中servlet开发——过滤器
    参数化测试
    JUnit 4中元数据的用法
    appium常用方法
    appium查看控件的方法
    appium+java+testng+maven环境搭建
    java-appium自动化测试之DesiredCapabilities
    GitHub 标星 15.3k,Java 编程思想最新中文版(On Java 8)
    【成功上岸】2年半开发经验进入梦寐以求的京东上班,附上我的上岸经验希望帮助到大家!
  • 原文地址:https://www.cnblogs.com/huangfox/p/2649055.html
Copyright © 2011-2022 走看看