zoukankan      html  css  js  c++  java
  • elasticsearch 连接配置模板

    package com.kedacom.ctsp.authz.oauth2.configuration;
    
    
    import lombok.Setter;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.http.HttpHost;
    import org.apache.http.auth.AuthScope;
    import org.apache.http.auth.UsernamePasswordCredentials;
    import org.apache.http.client.CredentialsProvider;
    import org.apache.http.client.config.RequestConfig;
    import org.apache.http.impl.client.BasicCredentialsProvider;
    import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
    import org.elasticsearch.action.bulk.BackoffPolicy;
    import org.elasticsearch.action.bulk.BulkProcessor;
    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.common.unit.ByteSizeUnit;
    import org.elasticsearch.common.unit.ByteSizeValue;
    import org.elasticsearch.common.unit.TimeValue;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.ArrayList;
    
    /**
     * init es client config
     */
    @Configuration
    @ConfigurationProperties(prefix = "spring.elasticsearch.jest")
    @ConditionalOnProperty(prefix = "commons.oauth2.server",name = "esEnable",havingValue = "true")
    @Slf4j
    public class ElasticsearchConfig {
    
        @Setter
        private static String uris = ""; // 集群地址,多个用,隔开
    
        private static ArrayList<HttpHost> hostList = null;
        private static int connectTimeOut = 2000; // 连接超时时间
        private static int socketTimeOut = 40000; // 连接超时时间
        private static int connectionRequestTimeOut = 1000; // 获取连接的超时时间
        private static int maxConnectNum = 100; // 最大连接数
        private static int maxConnectPerRoute = 100; // 最大路由连接数
        @Setter
        private static String username;
        @Setter
        private static String password;
    
    
        private static ArrayList<HttpHost> getHostList() {
            if (hostList == null) {
                synchronized (ElasticsearchConfig.class) {
                    if (hostList == null) {
                        hostList = new ArrayList<>();
                        if (StringUtils.isNotEmpty(uris)) {
                            String[] hostStrs = uris.split(",");
                            for (String host : hostStrs) {
                                hostList.add(HttpHost.create(host));
                            }
                        }
                    }
                }
            }
            return hostList;
        }
    
        @Bean
        public RestHighLevelClient restHighLevelClient() {
            RestClientBuilder builder = RestClient.builder(getHostList().toArray(new HttpHost[0]));
            // 异步httpclient连接延时配置
            builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
                @Override
                public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
                    requestConfigBuilder.setConnectTimeout(connectTimeOut);
                    requestConfigBuilder.setSocketTimeout(socketTimeOut);
                    requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
                    return requestConfigBuilder;
                }
            });
            // 异步httpclient连接数配置
            builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    if (StringUtils.isNotEmpty(username)) {
                        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                        credentialsProvider.setCredentials(AuthScope.ANY,
                                new UsernamePasswordCredentials(username, password));  //es账号密码(默认用户名为elastic)
                        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                    httpClientBuilder.setMaxConnTotal(maxConnectNum);
                    httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
                    return httpClientBuilder;
                }
            });
            RestHighLevelClient client = new RestHighLevelClient(builder);
            return client;
        }
    
        @Bean
        public BulkProcessor bulkProcessor() {
            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    log.info("序号:{} ,开始执行 {} 条数据批量操作。", executionId, request.numberOfActions());
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                    // 在每次执行BulkRequest后调用,通过此方法可以获取BulkResponse是否包含错误
                    if (response.hasFailures()) {
                        log.info("Bulk {} executed with failures,{}", executionId, response.buildFailureMessage());
                    } else {
                        log.info("序号:{} ,执行 {} 条数据批量操作成功,共耗费{}毫秒。", executionId, request.numberOfActions(), response.getTook().getMillis());
                    }
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
    
                    log.warn("序号:{} 批量操作失败,总记录数:{} ,报错信息为:{}", executionId, request.numberOfActions(), failure.getMessage());
                }
            };
    
            return BulkProcessor.builder((request, bulkListener) ->
                    restHighLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
                    .setBulkActions(2000)
                    .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                    .setFlushInterval(TimeValue.timeValueSeconds(5))
                    .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                    .setConcurrentRequests(2)
                    .build();
        }
    
    }
    
    
    
  • 相关阅读:
    20145223《信息安全系统设计基础》第7周学习总结
    20145223《信息安全系统设计基础》第6周学习总结
    20145223《信息安全系统设计基础》第5周学习总结
    20145223《信息安全系统设计基础》第3周学习总结
    20145223《信息安全系统设计基础》第2周学习总结
    20145223《信息安全系统设计基础》第1周学习总结
    node小爬虫
    node知识
    css之单行缩略..以及多行缩略
    html两大布局
  • 原文地址:https://www.cnblogs.com/liuyupen/p/14005967.html
Copyright © 2011-2022 走看看