zoukankan      html  css  js  c++  java
  • Elasticsearch Java High Level REST Client(Bulk API)

    https://segmentfault.com/a/1190000019792172

    针对ES 批量写入, 提供了3种方式,在 high-rest-client 中分别是 bulk  bulkAsync  bulkProcessor 3种方式。

    BulkProcessor

    BulkProcessor是一个线程安全的批量处理类,允许方便地设置刷新 一个新的批量请求 (基于消息大小,时间,消息数量), 容易控制并发批量的数量, 请求允许并行执行。

    此processer的含义为如果消息数量到达20000 或者消息大小到大10M 或者时间达到300s 任意条件满足,客户端就会把当前的数据提交到服务端处理。减少网路开销, 效率很高。

    /**
         * 实例化 BulkProcessor
         * @param client
         * @return
         */
        public static BulkProcessor bulkProcessor(RestHighLevelClient client) {
            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    //bulk请求前执行
                    int numberOfActions = request.numberOfActions();
                    logger.info("ES Executing bulk [{}] with {} request ", executionId, numberOfActions);
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                    //bulk请求后执行
                    if (response.hasFailures()) {
                        logger.error("ES Bulk [{}] executed with failures ", +executionId);
                    } else {
                        logger.info("ES Bulk [{}] completed in {} milliseconds ", executionId, response.getTook().getMillis());
                    }
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    // 失败后执行
                    logger.error("ES Bulk Failed to execute bulk ", failure);
                }
            };
    
            BulkProcessor bulkProcessor = BulkProcessor.builder(
                    (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                    listener)
                    //  达到刷新的条数
                    .setBulkActions(20000)
                    // 达到 刷新的大小
                    .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
                    // 固定刷新的时间频率
                    .setFlushInterval(TimeValue.timeValueSeconds(300))
                    //并发线程数
                    .setConcurrentRequests(5)
                    // 重试补偿策略
                    .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();
            return bulkProcessor;
        }

    awaitClose()方法可以用来等待,直到所有的请求都被处理完毕或者指定的等待时间过去.

    boolean b = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

    如果所有bulk请求都已经完成,则该方法返回true,如果所有bulk请求完成之前的等待时间已经过去,则返回false.
    close()方法可用于立即关闭bulkProcessor.

    这两种方法都在关闭处理器之前刷新添加到处理器的请求,并且禁止向处理器添加任何新请求.


    使用方法:

    Map<String, Object> m = new HashMap<String, Object>(16);
            m.put("id", "544345");
            m.put("area_id", 1);
            m.put("camera_id", 1);
            m.put("log_time", new Date().toString());
            m.put("age", 1);
            BulkProcessor bulkProcessor = EsClientUtil.getBulkProcessor();
            IndexRequest one = new IndexRequest("posts").id("1").source(m);
            bulkProcessor.add(one);
  • 相关阅读:
    浅谈协方差矩阵
    Android开发之Http通信HttpClient接口
    Android开发之XML文件的解析的三种方法
    Android开发之Http通信HttpURLConnection接口
    [Android] SurfaceView使用实例
    Android开发之初识Camera图像采集
    Android开发之SurfaceView
    基于android的远程视频监控系统
    Android编程9:蓝牙测试
    Android--PendingIntent 实现发送通知notification
  • 原文地址:https://www.cnblogs.com/chong-zuo3322/p/13435101.html
Copyright © 2011-2022 走看看