zoukankan      html  css  js  c++  java
  • Elasticsearch Java High Level REST Client(Bulk API)

    https://segmentfault.com/a/1190000019792172

    针对ES 批量写入, 提供了3种方式,在 high-rest-client 中分别是 bulk  bulkAsync  bulkProcessor 3种方式。

    BulkProcessor

    BulkProcessor是一个线程安全的批量处理类,允许方便地设置刷新 一个新的批量请求 (基于消息大小,时间,消息数量), 容易控制并发批量的数量, 请求允许并行执行。

    此processer的含义为如果消息数量到达20000 或者消息大小到大10M 或者时间达到300s 任意条件满足,客户端就会把当前的数据提交到服务端处理。减少网路开销, 效率很高。

    /**
         * 实例化 BulkProcessor
         * @param client
         * @return
         */
        public static BulkProcessor bulkProcessor(RestHighLevelClient client) {
            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    //bulk请求前执行
                    int numberOfActions = request.numberOfActions();
                    logger.info("ES Executing bulk [{}] with {} request ", executionId, numberOfActions);
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                    //bulk请求后执行
                    if (response.hasFailures()) {
                        logger.error("ES Bulk [{}] executed with failures ", +executionId);
                    } else {
                        logger.info("ES Bulk [{}] completed in {} milliseconds ", executionId, response.getTook().getMillis());
                    }
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    // 失败后执行
                    logger.error("ES Bulk Failed to execute bulk ", failure);
                }
            };
    
            BulkProcessor bulkProcessor = BulkProcessor.builder(
                    (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                    listener)
                    //  达到刷新的条数
                    .setBulkActions(20000)
                    // 达到 刷新的大小
                    .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
                    // 固定刷新的时间频率
                    .setFlushInterval(TimeValue.timeValueSeconds(300))
                    //并发线程数
                    .setConcurrentRequests(5)
                    // 重试补偿策略
                    .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();
            return bulkProcessor;
        }

    awaitClose()方法可以用来等待,直到所有的请求都被处理完毕或者指定的等待时间过去.

    boolean b = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

    如果所有bulk请求都已经完成,则该方法返回true,如果所有bulk请求完成之前的等待时间已经过去,则返回false.
    close()方法可用于立即关闭bulkProcessor.

    这两种方法都在关闭处理器之前刷新添加到处理器的请求,并且禁止向处理器添加任何新请求.


    使用方法:

    Map<String, Object> m = new HashMap<String, Object>(16);
            m.put("id", "544345");
            m.put("area_id", 1);
            m.put("camera_id", 1);
            m.put("log_time", new Date().toString());
            m.put("age", 1);
            BulkProcessor bulkProcessor = EsClientUtil.getBulkProcessor();
            IndexRequest one = new IndexRequest("posts").id("1").source(m);
            bulkProcessor.add(one);
  • 相关阅读:
    demo 集合
    iOS12、iOS11、iOS10、iOS9常见适配
    gem install cocoapods ERROR: While executing gem ... (Gem::FilePermissionError)
    ios LaunchScreen.storyboard 适配启动图
    加载资源文件读取以及转换成字符串的方法
    [UIApplication sharedApplication].keyWindow和[[UIApplication sharedApplication].delegate window]区别
    婚庆手机APP
    从一部电视剧开始
    论一次使用代理模式实现共用导出报表的功能
    MySql中使用EXPLAIN查看sql的执行计划
  • 原文地址:https://www.cnblogs.com/chong-zuo3322/p/13435101.html
Copyright © 2011-2022 走看看