zoukankan      html  css  js  c++  java
  • Elasticsearch Java High Level REST Client工具类

    pom.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.ultiwill</groupId>
        <artifactId>ultiwill-es7</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
        <properties>
            <logback.version>1.2.3</logback.version>
            <slf4j.version>1.7.26</slf4j.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>7.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-client</artifactId>
                <version>7.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>7.8.0</version>
            </dependency>
    
            <!--日志-->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-core</artifactId>
                <version>${logback.version}</version>
            </dependency>
            <!--json-->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.32</version>
            </dependency>
        </dependencies>
    
    
    
    </project>
    View Code

    EsClientUtil:

    package com.ultiwill.utils;
    
    import org.apache.http.HttpHost;
    import org.apache.http.auth.AuthScope;
    import org.apache.http.auth.UsernamePasswordCredentials;
    import org.apache.http.client.CredentialsProvider;
    import org.apache.http.impl.client.BasicCredentialsProvider;
    import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
    import org.elasticsearch.action.bulk.*;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.common.unit.ByteSizeUnit;
    import org.elasticsearch.common.unit.ByteSizeValue;
    import org.elasticsearch.common.unit.TimeValue;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.*;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author chong.zuo
     * @date 2020/8/3 17:18
     */
    public class EsClientUtil {
        private static final Logger logger = LoggerFactory.getLogger(EsClientUtil.class);
    
        /**
         * 每次都取client太耗时,大约需要2秒左右,所以只取一次,放在内存中,不关闭,一直用
         */
        private static RestHighLevelClient client;
        private static BulkProcessor bulkProcessor;
    
        /**
         * 组装ES的hosts
         *
         * @return
         */
        private static HttpHost[] assembleESAddress() {
            HttpHost httpHost1 = new HttpHost("192.168.100.110", 9201, "http");
            HttpHost httpHost2 = new HttpHost("192.168.100.110", 9202, "http");
            List<HttpHost> list = new ArrayList<HttpHost>();
            list.add(httpHost1);
            list.add(httpHost2);
            HttpHost[] ipHost = new HttpHost[list.size()];
            HttpHost[] httpHosts = list.toArray(ipHost);
            return httpHosts;
        }
    
        /**
         * 获取client连接
         */
        public static RestHighLevelClient getClient() {
            if (client == null) {
                synchronized (EsClientUtil.class) {
                    try {
                        if (client == null) {
                            /** 用户认证对象 */
                            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                            /** 设置账号密码 */
                            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "123456"));
                            /** 创建rest client对象 */
                            RestClientBuilder builder = RestClient.builder(assembleESAddress())
                                    .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                                        @Override
                                        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                                            return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                                        }
                                    });
                            client = new RestHighLevelClient(builder);
                        }
                    } catch (Exception e) {
                        logger.error("EsClient创建失败...." + client, e);
                    }
                }
            }
            return client;
        }
    
    
        /**
         * 关闭client连接
         */
        public static void closeClient() {
            if (client != null) {
                synchronized (EsClientUtil.class) {
                    try {
                        client.close();
                        logger.info("ES Client 关闭成功...");
                    } catch (Exception e) {
                        logger.error("ES Client关闭失败...", e);
                    }
                }
            }
        }
    
    
        /**
         * 单条保存
         *
         * @param index
         * @param id
         * @param m
         */
        private static void saveData(String index, String id, Map<String, Object> m) {
            try {
                RestHighLevelClient client = getClient();
                IndexRequest indexRequest = new IndexRequest(index)
                        .id(id)
                        .source(m);
                client.index(indexRequest, RequestOptions.DEFAULT);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    
        /**
         * 获取单例 BulkProcessor 批量处理类
         */
        public static BulkProcessor getBulkProcessor() {
            if (bulkProcessor == null) {
                synchronized (EsClientUtil.class) {
                    try {
                        if (bulkProcessor == null) {
                            bulkProcessor = bulkProcessor(getClient());
                        }
                    } catch (Exception e) {
                        logger.error("BulkProcessor创建失败...." + bulkProcessor, e);
                    }
                }
            }
            return bulkProcessor;
        }
    
    
        /**
         * 实例化 BulkProcessor
         *
         * @param client
         * @return
         */
        public static BulkProcessor bulkProcessor(RestHighLevelClient client) {
            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    //bulk请求前执行
                    int numberOfActions = request.numberOfActions();
                    logger.info("ES Executing bulk [{}] with {} request ", executionId, numberOfActions);
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                    //bulk请求后执行
                    if (response.hasFailures()) {
                        logger.error("ES Bulk [{}] executed with failures ", +executionId);
                    } else {
                        logger.info("ES Bulk [{}] completed in {} milliseconds ", executionId, response.getTook().getMillis());
                    }
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    // 失败后执行
                    logger.error("ES Bulk Failed to execute bulk ", failure);
                }
            };
    
            BulkProcessor bulkProcessor = BulkProcessor.builder(
                    (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                    listener)
                    //  达到刷新的条数
                    .setBulkActions(20000)
                    // 达到 刷新的大小
                    .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
                    // 固定刷新的时间频率
                    .setFlushInterval(TimeValue.timeValueSeconds(300))
                    //并发线程数
                    .setConcurrentRequests(5)
                    // 重试补偿策略
                    .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();
            return bulkProcessor;
        }
    
    
        public static void main(String[] args) {
            Date d = new Date();
            String id = d.getTime() + "";
            id = "12356789";
            Map<String, Object> m = new HashMap<String, Object>(16);
            m.put("id", id);
            m.put("area_id", 1);
            m.put("camera_id", 1);
            m.put("log_time", new Date().toString());
            m.put("age", 1);
            EsClientUtil.saveData("global_house_list", id, m);
            EsClientUtil.closeClient();
    
            /*BulkProcessor bulkProcessor = EsClientUtil.getBulkProcessor();
            IndexRequest one = new IndexRequest("posts").id("1").source(m);
            bulkProcessor.add(one);*/
        }
    
    
    }
    View Code

    saveToEsBulk:

      /**
        * 使用bulkProcessor 批量写数据入es
      * */
      def save2ESByBulkProcessor(messages:util.ArrayList[util.Map[String,Any]],taskName:String):Unit={
        if(messages.nonEmpty){
          val bulk = IceBockEsConfig.getBulkProcessor(taskName)
          for(message <- messages){
            val indexRequest = new IndexRequest()
            indexRequest.index(message.remove("indexName").toString).`type`(message.remove("indexType").toString).opType(DocWriteRequest.OpType.INDEX)
            if (message.containsKey("indexId")){
              indexRequest.id(message.remove("indexId").toString)
            }
            indexRequest.source(message)
            bulk.add(indexRequest)
          }
        }
      }
    View Code
  • 相关阅读:
    idea主题更换pycharm/intellij
    随机生成n张扑克牌。
    JAVA生成6个1-8的随机数,要求无重复。
    一道简单 的循环
    linux虚拟机互访
    linux中grep命令
    vi和vim编辑器
    文件压缩打包以及备份
    文件内容查询
    目录相关操作
  • 原文地址:https://www.cnblogs.com/chong-zuo3322/p/13435042.html
Copyright © 2011-2022 走看看