https://segmentfault.com/a/1190000019792172
针对ES 批量写入, 提供了3种方式,在 high-rest-client 中分别是 bulk bulkAsync bulkProcessor 3种方式。
BulkProcessor
BulkProcessor是一个线程安全的批量处理类,允许方便地设置刷新 一个新的批量请求 (基于消息大小,时间,消息数量), 容易控制并发批量的数量, 请求允许并行执行。
此processer的含义为如果消息数量到达20000 或者消息大小到大10M 或者时间达到300s 任意条件满足,客户端就会把当前的数据提交到服务端处理。减少网路开销, 效率很高。
/** * 实例化 BulkProcessor * @param client * @return */ public static BulkProcessor bulkProcessor(RestHighLevelClient client) { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { //bulk请求前执行 int numberOfActions = request.numberOfActions(); logger.info("ES Executing bulk [{}] with {} request ", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { //bulk请求后执行 if (response.hasFailures()) { logger.error("ES Bulk [{}] executed with failures ", +executionId); } else { logger.info("ES Bulk [{}] completed in {} milliseconds ", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // 失败后执行 logger.error("ES Bulk Failed to execute bulk ", failure); } }; BulkProcessor bulkProcessor = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener) // 达到刷新的条数 .setBulkActions(20000) // 达到 刷新的大小 .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB)) // 固定刷新的时间频率 .setFlushInterval(TimeValue.timeValueSeconds(300)) //并发线程数 .setConcurrentRequests(5) // 重试补偿策略 .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build(); return bulkProcessor; }
awaitClose()方法可以用来等待,直到所有的请求都被处理完毕或者指定的等待时间过去.
boolean b = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
如果所有bulk请求都已经完成,则该方法返回true,如果所有bulk请求完成之前的等待时间已经过去,则返回false.
close()方法可用于立即关闭bulkProcessor.
这两种方法都在关闭处理器之前刷新添加到处理器的请求,并且禁止向处理器添加任何新请求.
使用方法:
Map<String, Object> m = new HashMap<String, Object>(16); m.put("id", "544345"); m.put("area_id", 1); m.put("camera_id", 1); m.put("log_time", new Date().toString()); m.put("age", 1); BulkProcessor bulkProcessor = EsClientUtil.getBulkProcessor(); IndexRequest one = new IndexRequest("posts").id("1").source(m); bulkProcessor.add(one);