zoukankan      html  css  js  c++  java
  • elasticsearch es java api Using Bulk Processor

    Using Bulk Processor

    The BulkProcessor class offers a simple interface to flush bulk operations automatically based on the number or size of requests, or after a given period.

    To use it, first create a BulkProcessor instance:

    import org.elasticsearch.action.bulk.BackoffPolicy;
    import org.elasticsearch.action.bulk.BulkProcessor;
    import org.elasticsearch.common.unit.ByteSizeUnit;
    import org.elasticsearch.common.unit.ByteSizeValue;
    import org.elasticsearch.common.unit.TimeValue;
    
    BulkProcessor bulkProcessor = BulkProcessor.builder(
            client,  
            new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId,
                                       BulkRequest request) { ... } 
    
                @Override
                public void afterBulk(long executionId,
                                      BulkRequest request,
                                      BulkResponse response) { ... } 
    
                @Override
                public void afterBulk(long executionId,
                                      BulkRequest request,
                                      Throwable failure) { ... } 
            })
            .setBulkActions(10000) 
            .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) 
            .setFlushInterval(TimeValue.timeValueSeconds(5)) 
            .setConcurrentRequests(1) 
            .setBackoffPolicy(
                BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 
            .build();

    Add your Elasticsearch client

     

    This method is called just before bulk is executed. You can for example see the numberOfActions with request.numberOfActions()

     

    This method is called after bulk execution. You can for example check if there was some failing requests with response.hasFailures()

     

    This method is called when the bulk failed and raised a Throwable

     

    We want to execute the bulk every 10 000 requests

     

    We want to flush the bulk every 5mb

     

    We want to flush the bulk every 5 seconds whatever the number of requests

     

    Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.

     

    Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three times. A retry is attempted whenever one or more bulk item requests have failed with an EsRejectedExecutionException which indicates that there were too little compute resources available for processing the request. To disable backoff, pass BackoffPolicy.noBackoff().

    By default, BulkProcessor:

    • sets bulkActions to 1000
    • sets bulkSize to 5mb
    • does not set flushInterval
    • sets concurrentRequests to 1, which means an asynchronous execution of the flush operation.
    • sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.

    Add requests

    Then you can simply add your requests to the BulkProcessor:

    bulkProcessor.add(new IndexRequest("twitter", "_doc", "1").source(/* your doc here */));
    bulkProcessor.add(new DeleteRequest("twitter", "_doc", "2"));

    Closing the Bulk Processoredit

    When all documents are loaded to the BulkProcessor it can be closed by using awaitClose or close methods:

    bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

    or

    bulkProcessor.close();

    Both methods flush any remaining documents and disable all other scheduled flushes, if they were scheduled by setting flushInterval. If concurrent requests were enabled, the awaitClose method waits for up to the specified timeout for all bulk requests to complete then returns true; if the specified waiting time elapses before all bulk requests complete, false is returned. The close method doesn’t wait for any remaining bulk requests to complete and exits immediately.

    Using Bulk Processor in tests

    If you are running tests with Elasticsearch and are using the BulkProcessor to populate your dataset you should better set the number of concurrent requests to 0 so the flush operation of the bulk will be executed in a synchronous manner:

    BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
            .setBulkActions(10000)
            .setConcurrentRequests(0)
            .build();
    
    // Add your requests
    bulkProcessor.add(/* Your requests */);
    
    // Flush any remaining requests
    bulkProcessor.flush();
    
    // Or close the bulkProcessor if you don't need it anymore
    bulkProcessor.close();
    
    // Refresh your indices
    client.admin().indices().prepareRefresh().get();
    
    // Now you can start searching!
    client.prepareSearch().get();

    Global Parameters

    Global parameters can be specified on the BulkRequest as well as BulkProcessor, similar to the REST API. These global parameters serve as defaults and can be overridden by local parameters specified on each sub request. Some parameters have to be set before any sub request is added - index, type - and you have to specify them during BulkRequest or BulkProcessor creation. Some are optional - pipeline, routing - and can be specified at any point before the bulk is sent.

    try (BulkProcessor processor = initBulkProcessorBuilder(listener)
            .setGlobalIndex("tweets")
            .setGlobalType("_doc")
            .setGlobalRouting("routing")
            .setGlobalPipeline("pipeline_id")
            .build()) {
    
    
        processor.add(new IndexRequest() 
            .source(XContentType.JSON, "user", "some user"));
        processor.add(new IndexRequest("blogs").id("1") 
            .source(XContentType.JSON, "title", "some title"));
    }
     

    global parameters from the BulkRequest will be applied on a sub request

     

    local pipeline parameter on a sub request will override global parameters from BulkRequest

    BulkRequest request = new BulkRequest();
    request.pipeline("globalId");
    
    request.add(new IndexRequest("test").id("1")
        .source(XContentType.JSON, "field", "bulk1")
        .setPipeline("perIndexId")); 
    
    request.add(new IndexRequest("test").id("2")
        .source(XContentType.JSON, "field", "bulk2"));
     

    local pipeline parameter on a sub request will override global pipeline from the BulkRequest

     

    global parameter from the BulkRequest will be applied on a sub request

  • 相关阅读:
    推荐vue脚手架工具 vue-cli
    React仿大众点评外卖app
    推荐一个react脚手架工具
    eclipse 中配置php的 XDebug调试
    再谈extjs4.1中gridpanel动态表头动态列
    控制extsj4.1 gridpanel表格行或者单元格的编辑
    windows8.1 app所有默认样式
    windows8.1 app样式定义使用
    windows8.1 app入门开发学习
    Silverlight Tools 语言不匹配问题
  • 原文地址:https://www.cnblogs.com/felixzh/p/12356991.html
Copyright © 2011-2022 走看看