zoukankan      html  css  js  c++  java
  • Bulk API

    承接上文,使用Java High Level REST Client操作elasticsearch

    Bulk API

    高级客户端提供了批量处理器以协助批量请求

    Bulk Request

    BulkRequest可以在一次请求中执行多个索引,更新或者删除操作。一次请求至少需要一个操作。

            //创建BulkRequest实例
            BulkRequest request = new BulkRequest();
            //使用IndexRequest添加三个文档,不清楚用法可以参考Index API
            request.add(new IndexRequest("posts", "doc", "1")
                    .source(XContentType.JSON,"field", "foo"));
            request.add(new IndexRequest("posts", "doc", "2")
                    .source(XContentType.JSON,"field", "bar"));
            request.add(new IndexRequest("posts", "doc", "3")
                    .source(XContentType.JSON,"field", "baz"));

    Bulk API仅支持以JSON或SMILE编码的文档。 提供任何其他格式的文档将导致错误。

    同一个BulkRequest可以添加不同类型的操作。

          // 添加 DeleteRequest到BulkRequest,不清楚用法可以参考Delete API
            request.add(new DeleteRequest("posts", "doc", "3"));
            // 添加 UpdateRequest到BulkRequest,不清楚用法可以参考Update API
            request.add(new UpdateRequest("posts", "doc", "2")
                    .doc(XContentType.JSON, "other", "test"));
            // 添加 一个使用SMILE格式的IndexRequest
            request.add(new IndexRequest("posts", "doc", "4")
                    .source(XContentType.SMILE, "field", "baz"));

    可选参数

    //设置超时,等待批处理被执行的超时时间(使用TimeValue形式)
    request.timeout(TimeValue.timeValueMinutes(2)); 
    //设置超时,等待批处理被执行的超时时间(字符串形式)
    request.timeout("2m"); 
    //刷新策略
    request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);//WriteRequest.RefreshPolicy实例方式
    request.setRefreshPolicy("wait_for");//字符串方式
    //设置在执行索引/更新/删除操作之前必须处于活动状态的分片副本数。
    request.waitForActiveShards(2);
    //使用ActiveShardCount方式来提供分片副本数:可以是ActiveShardCount.ALL,ActiveShardCount.ONE或ActiveShardCount.DEFAULT(默认)
    request.waitForActiveShards(ActiveShardCount.ALL);

    同步执行

    BulkResponse bulkResponse = client.bulk(request);

    异步执行

     批量请求的异步执行需要将BulkRequest实例和ActionListener实例传递给异步方法:

    //当BulkRequest执行完成时,ActionListener会被调用
    client.bulkAsync(request, listener);

    异步方法不会阻塞并会立即返回。完成后,如果执行成功完成,则使用onResponse方法回调ActionListener,如果失败则使用onFailure方法。
    BulkResponse 的典型监听器如下所示:

    ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
        @Override
        public void onResponse(BulkResponse bulkResponse) {
            //执行成功完成时调用。 response作为参数提供,并包含已执行的每个操作的单个结果列表。 请注意,一个或多个操作可能已失败,然而其他操作已成功执行。
        }
    
        @Override
        public void onFailure(Exception e) {
            //在整个BulkRequest失败时调用。 在这种情况下,exception作为参数提供,并且没有执行任何操作。
        }
    };

    Bulk Response

    返回的BulkResponse包含有关已执行操作的信息,并允许迭代每个结果,如下所示:

        //遍历所有操作结果
            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                //获取操作的响应,可以是IndexResponse,UpdateResponse或DeleteResponse,
                // 它们都可以被视为DocWriteResponse实例
                DocWriteResponse itemResponse = bulkItemResponse.getResponse();
    
                if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
                        || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
                    //处理index操作
                    IndexResponse indexResponse = (IndexResponse) itemResponse;
    
                } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
                    //处理update操作
                    UpdateResponse updateResponse = (UpdateResponse) itemResponse;
    
                } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
                    //处理delete操作
                    DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                }
            }

    批量响应提供了用于快速检查一个或多个操作是否失败的方法:

    if (bulkResponse.hasFailures()) { 
        //该方法只要有一个操作失败都会返回true
    }

    如果想要查看操作失败的原因,则需要遍历所有操作结果:

            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                if (bulkItemResponse.isFailed()) {//判断当前操作是否失败
                    //获取失败对象,拿到了failure对象,想怎么玩都行
                    BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                }
            }

    Bulk Processor

    BulkProcessor通过提供一个工具类来简化Bulk API的使用,允许索引/更新/删除操作在添加到处理器后透明地执行。

    为了执行请求,BulkProcessor需要以下组件:

    RestHighLevelClient
    此客户端用于执行BulkRequest并获取BulkResponse
    BulkProcessor.Listener
    在每次BulkRequest执行之前和之后或BulkRequest失败时调用此监听器
    然后BulkProcessor.builder方法可用于构建新的BulkProcessor:

            //创建BulkProcessor.Listener
            BulkProcessor.Listener listener1 = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    //在每次执行BulkRequest之前调用此方法
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request,BulkResponse response) {
                    //在每次执行BulkRequest之后调用此方法
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    //执行BulkRequest失败时调用此方法
                }
            };
         //通过从BulkProcessor.Builder调用build()方法来创建BulkProcessor。
         //RestHighLevelClient.bulkAsync()方法将用来执行BulkRequest。
            BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener1).build();

    BulkProcessor.Builder提供了配置BulkProcessor应如何处理请求执行的方法:

            BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener1);
            //设置何时刷新新的批量请求,根据当前已添加的操作数量(默认为1000,使用-1禁用它)
            builder.setBulkActions(500);//操作数为500时就刷新请求
            //设置何时刷新新的批量请求,根据当前已添加的操作大小(默认为5Mb,使用-1禁用它)
            builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));//操作大小为1M时就刷新请求
            //设置允许执行的并发请求数(默认为1,使用0只允许执行单个请求)
            builder.setConcurrentRequests(0);//不并发执行
            //设置刷新间隔时间,如果超过了间隔时间,则随便刷新一个BulkRequest挂起(默认为未设置)
            builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
            //设置一个最初等待1秒,最多重试3次的常量退避策略。
            // 有关更多选项,请参阅BackoffPolicy.noBackoff(),BackoffPolicy.constantBackoff()和BackoffPolicy.exponentialBackoff()。
            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));

    创建BulkProcessor后,就可以向其添加请求了:

    IndexRequest one = new IndexRequest("posts", "doc", "1").
            source(XContentType.JSON, "title",
                    "In which order are my Elasticsearch queries executed?");
    IndexRequest two = new IndexRequest("posts", "doc", "2")
            .source(XContentType.JSON, "title",
                    "Current status and upcoming changes in Elasticsearch");
    IndexRequest three = new IndexRequest("posts", "doc", "3")
            .source(XContentType.JSON, "title",
                    "The Future of Federated Search in Elasticsearch");
    
    bulkProcessor.add(one);
    bulkProcessor.add(two);
    bulkProcessor.add(three);

    这些请求将由BulkProcessor执行,BulkProcessor负责为每个批量请求调用BulkProcessor.Listener。
    侦听器提供访问BulkRequest和BulkResponse的方法:

           BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    //在每次执行BulkRequest之前调用,通过此方法可以获取将在BulkRequest中执行的操作数
                    int numberOfActions = request.numberOfActions();
                    logger.debug("Executing bulk [{}] with {} requests",
                            executionId, numberOfActions);
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                    //在每次执行BulkRequest后调用,通过此方法可以获取BulkResponse是否包含错误
                    if (response.hasFailures()) {
                        logger.warn("Bulk [{}] executed with failures", executionId);
                    } else {
                        logger.debug("Bulk [{}] completed in {} milliseconds",
                                executionId, response.getTook().getMillis());
                    }
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    //如果BulkRequest失败,通过调用此方法可以获取失败
                    logger.error("Failed to execute bulk", failure);
                }
            };

    将所有请求添加到BulkProcessor后,需要使用两种可用的关闭方法之一关闭其实例。

    awaitClose()方法可用于等待所有请求都已处理或过了指定的等待时间:

    boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); 

    如果所有批量请求都已完成,则该方法返回true;如果在所有批量请求完成之前等待时间已过,则返回false

    close()方法可用于立即关闭BulkProcessor:

    bulkProcessor.close();

    两种方法在关闭处理器之前会刷新已添加到处理器的请求,并且还会禁止将任何新请求添加到处理器。

    官方文档:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-bulk.html#_optional_arguments_4

  • 相关阅读:
    hdu 6836
    2019 树形—DP
    2020牛客暑期多校训练营(第六场)
    hdu 6756 Finding a MEX 线段树
    2020 Multi-University Training Contest 2
    spring boot maven 打jar包 不能引入外部jar
    git 操作命令
    homestead 安装swoole
    MYSQL-触发器
    再次认知const
  • 原文地址:https://www.cnblogs.com/ginb/p/9407806.html
Copyright © 2011-2022 走看看