zoukankan      html  css  js  c++  java
  • Java ElasticSearch 操作

    pom 文件中添加:

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>6.3.2</version>
    </dependency>
    View Code

    如果是SpringBoot工程(这里不是SpringBoot工程,是自己写的简单Demo),在pom文件中的<properties>标签中添加<elasticsearch.version>6.1.4</elasticsearch.version>,否则可能会导致ElasticSearch依赖包的版本不一致使程序无法正常运行。

    注意版本是6.3.2,6.1.4版本不支持创建索引

    Log2ESUtil代码:

    package com.sux.demo.utils;
    
    import org.apache.http.HttpHost;
    import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
    import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
    import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
    import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
    import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    public class Log2ESUtil {
        private static final Logger log = LoggerFactory.getLogger(Log2ESUtil.class);
    
        RestHighLevelClient client = null;
    
        private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
    
        public void initES() {
            try {
                client = new RestHighLevelClient(
                        RestClient.builder(
                                new HttpHost("34.8.8.93", 24100, "http"),
                                new HttpHost("34.8.8.94", 24100, "http"),
                                new HttpHost("34.8.8.95", 24100, "http"),
                                new HttpHost("34.8.8.96", 24100, "http"),
                                new HttpHost("34.8.8.98", 24100, "http"),
                                new HttpHost("34.8.8.99", 24100, "http"))
                                .setMaxRetryTimeoutMillis(5 * 60 * 1000));//超时时间设为5分钟
                log.info("Log2ESService init 成功");
            } catch (Exception e) {
                log.error("Log2ESService init 失败", e);
            }
        }
    
        public void closeES() {
            try {
                client.close();
                log.info("Log2ESService close 成功");
            } catch (Exception e) {
                log.error("Log2ESService close 失败", e);
            }
        }
    
        public void log2ES(boolean success, String index, String app, String msg) throws Exception {
            try {
                String doc = "doc";
                String id = UUID.randomUUID().toString();
    
                //保存到ES的数据
                Map<String, Object> jsonMap = new HashMap<>();
                jsonMap.put("app", app);
                if (success) {
                    jsonMap.put("operation_result", "成功");
                } else {
                    jsonMap.put("operation_result", "失败");
                }
                jsonMap.put("message", msg);
                jsonMap.put("log_time", simpleDateFormat.format(new Date()));
    
                IndexRequest indexRequest = new IndexRequest(index, doc, id)
                        .source(jsonMap);
                client.index(indexRequest);
    
                //log.info("Log2ESService log2ES 成功,数据:" + jsonMap.toString());
            } catch (Exception e) {
                log.error("Log2ESService log2ES 失败", e);
            }
        }
    
        public boolean indexExists(String indexName) throws IOException {
            GetIndexRequest getIndexRequest = new GetIndexRequest();
            getIndexRequest.indices(indexName);
            return client.indices().exists(getIndexRequest);
        }
    
        public boolean createIndex(String indexName) throws IOException {
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
    
            // 配置映射关系
            Map<String, Object> mappings = new HashMap<>();
    
            Map<String, Object> type = new HashMap<>();
            mappings.put("doc", type);
            type.put("dynamic", false); //说明:
    
            Map<String, Object> properties = new HashMap<>();
            type.put("properties", properties);
    
            //文档的id映射
            Map<String, Object> idProperties = new HashMap<>();
            idProperties.put("type", "integer");
            idProperties.put("store", "true");
            properties.put("id", idProperties);
    
            // 文档的其他字段映射
            Map<String, Object> moreProperties = new HashMap<>();
            moreProperties.put("type", "text"); //说明:
            moreProperties.put("store", "true"); //说明:
            properties.put("app", moreProperties);
    
            moreProperties = new HashMap<>();
            moreProperties.put("type", "text");
            moreProperties.put("store", "true");
            properties.put("operation_result", moreProperties);
    
            moreProperties = new HashMap<>();
            moreProperties.put("type", "text");
            moreProperties.put("store", "true");
            properties.put("message", moreProperties);
    
            moreProperties = new HashMap<>();
            moreProperties.put("type", "date");
            moreProperties.put("store", "true");
            moreProperties.put("format", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
            properties.put("log_time", moreProperties);
    
            createIndexRequest.mapping("doc", mappings);
    
            CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest);
            return createIndexResponse.isAcknowledged();
        }
    
        public boolean deleteIndex(String indexName) throws IOException {
            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
            DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest);
            return deleteIndexResponse.isAcknowledged();
        }
    
    }
    View Code

    测试代码:

    创建索引:

    package com.sux.demo;
    
    import com.sux.demo.utils.Log2ESUtil;
    import org.apache.log4j.PropertyConfigurator;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    public class TestES_CreateIndex {
        private static final Logger log = LoggerFactory.getLogger(TestES_CreateIndex.class);
    
        private static Log2ESUtil log2ESUtil = new Log2ESUtil();
    
        private static String indexName = "sux-test";
    
        private static String app = "sux-test";
    
        public static void main(String[] args) throws Exception {
            try {
                PropertyConfigurator.configure("src/main/resources/log4j.properties");
    
                log2ESUtil.initES();
    
                createIndex();
    
                log2ESUtil.closeES();
            } catch (Exception e) {
                log.error("TestES_CreateIndex 出错", e);
            }
        }
    
        private static void createIndex() throws IOException {
            if (!log2ESUtil.indexExists(indexName)) {
                boolean result = log2ESUtil.createIndex(indexName);
                if (result) {
                    log.info("创建索引" + indexName + "成功!");
                } else {
                    log.info("创建索引" + indexName + "失败!");
                }
            } else {
                System.out.println("索引" + indexName + "已存在,不需要创建");
            }
        }
    }
    View Code

    删除索引:

    package com.sux.demo;
    
    import com.sux.demo.utils.Log2ESUtil;
    import org.apache.log4j.PropertyConfigurator;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    public class TestES_DeleteIndex {
        private static final Logger log = LoggerFactory.getLogger(TestES_DeleteIndex.class);
    
        private static Log2ESUtil log2ESUtil = new Log2ESUtil();
    
        private static String indexName = "sux-test";
    
        public static void main(String[] args) throws Exception {
            try {
                PropertyConfigurator.configure("src/main/resources/log4j.properties");
    
                log2ESUtil.initES();
    
                if (log2ESUtil.indexExists(indexName)) {
                    boolean result = log2ESUtil.deleteIndex(indexName);
                    if (result) {
                        log.info("删除索引" + indexName + "成功!");
                    } else {
                        log.info("删除索引" + indexName + "失败!");
                    }
                } else {
                    log.info("索引" + indexName + "不存在,不需要删除!");
                }
    
                log2ESUtil.closeES();
            } catch (Exception e) {
                log.error("TestES_DeleteIndex 出错", e);
            }
        }
    
    }
    View Code

    单线程数据写入:

    package com.sux.demo;
    
    import com.sux.demo.utils.Log2ESUtil;
    import com.sux.demo.utils.Speed;
    import org.apache.log4j.PropertyConfigurator;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Date;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    
    public class TestES_SingleInsert {
        private static final Logger log = LoggerFactory.getLogger(TestES_Insert.class);
    
        private static Log2ESUtil log2ESUtil = new Log2ESUtil();
    
        private static String indexName = "sux-test";
    
        private static String app = "sux-test";
    
        public static void main(String[] args) throws Exception {
            try {
                PropertyConfigurator.configure("src/main/resources/log4j.properties");
    
                log2ESUtil.initES();
    
                long startTime = System.currentTimeMillis();
    
                int n = 200;
                for (int i = 1; i <= n; i++) {
                    log2ESUtil.log2ES(true, indexName, app, "单线程插入数据" + i);
                    if (i % 50 == 0) {
                        log.info("count=" + i);
                    }
                    Speed.addCount();
                }
    
                long endTime = System.currentTimeMillis();
    
                double speed = Speed.getCount() / (double) ((endTime - startTime) / 1000.0);
                System.out.println(" 数据插入速度:" + (int) speed + " 条/秒");
    
                log2ESUtil.closeES();
            } catch (Exception e) {
                log.error("TestES_SingleInsert 出错", e);
            }
        }
    }
    View Code

    多线程数据写入:

    package com.sux.demo;
    
    import com.sux.demo.utils.Log2ESUtil;
    import com.sux.demo.utils.Speed;
    import org.apache.log4j.PropertyConfigurator;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    
    public class TestES_Insert {
        private static final Logger log = LoggerFactory.getLogger(TestES_Insert.class);
    
        private static Log2ESUtil log2ESUtil = new Log2ESUtil();
    
        private static String indexName = "sux-test";
    
        private static String app = "sux-test";
    
        private static ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(50);
    
        public static void main(String[] args) throws Exception {
            try {
                PropertyConfigurator.configure("src/main/resources/log4j.properties");
    
                log2ESUtil.initES();
    
                long startTime = System.currentTimeMillis();
    
                int n = 10000;
                CountDownLatch countDownLatch = new CountDownLatch(n);
                for (int i = 1; i <= n; i++) {
                    ESInsertRunnable esInsertRunnable = new ESInsertRunnable(countDownLatch, log2ESUtil, i, "多线程插入数据");
                    threadPool.submit(esInsertRunnable);
                }
    
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                long endTime = System.currentTimeMillis();
    
                double speed = Speed.getCount() / (double) ((endTime - startTime) / 1000.0);
                System.out.println(" 数据插入速度:" + (int) speed + " 条/秒");
    
                log2ESUtil.closeES();
                threadPool.shutdown();
            } catch (Exception e) {
                log.error("TestES_Insert 出错", e);
            }
        }
    }
    
    class ESInsertRunnable implements Runnable {
        private static final Logger log = LoggerFactory.getLogger(ESInsertRunnable.class);
    
        private CountDownLatch countDownLatch;
    
        private Log2ESUtil log2ESUtil;
    
        private int num;
    
        private static String indexName = "sux-test";
    
        private static String app = "sux-test";
    
        private String msg;
    
        public ESInsertRunnable(CountDownLatch countDownLatch, Log2ESUtil log2ESUtil, int num, String msg) {
            this.countDownLatch = countDownLatch;
            this.log2ESUtil = log2ESUtil;
            this.num = num;
            this.msg = msg;
        }
    
        public void run() {
            try {
                log2ESUtil.log2ES(true, indexName, app, msg + num);
                if (countDownLatch.getCount() % 500 == 0) {
                    log.info("count=" + countDownLatch.getCount());
                }
                Speed.addCount();
            } catch (Exception e) {
                log.error("TestES_Insert 异常", e);
            }
    
            countDownLatch.countDown();
        }
    }
    View Code

    实际测试性能(使用现网es集群测试):

    单线程:20条/秒

    线程池(50个线程):500条/秒

  • 相关阅读:
    centos7 安装kafka Manager
    MySql Table错误:is marked as crashed and last (automatic?) 和 Error: Table "mysql"."innodb_table_stats" not found
    安装prometheus+grafana监控mysql redis kubernetes等
    centos7 安装kubernetes1.4
    linux ip 转发设置 ip_forward
    开启Tomcat远程调试(转)
    SSH自动断开连接的原因、配置(转)
    解决mysql启动时报The server quit without updating PID file 的错误(转)
    supervisor的集中化管理搭建
    supervisor安装配置
  • 原文地址:https://www.cnblogs.com/s0611163/p/14839755.html
Copyright © 2011-2022 走看看