pom 文件中添加:
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.3.2</version> </dependency>
如果是SpringBoot工程(这里不是SpringBoot工程,是自己写的简单Demo),在pom文件中的<properties>标签中添加<elasticsearch.version>6.1.4</elasticsearch.version>,否则可能会导致ElasticSearch依赖包的版本不一致使程序无法正常运行。
注意版本是6.3.2,6.1.4版本不支持创建索引
Log2ESUtil代码:
package com.sux.demo.utils; import org.apache.http.HttpHost; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.UUID; public class Log2ESUtil { private static final Logger log = LoggerFactory.getLogger(Log2ESUtil.class); RestHighLevelClient client = null; private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ"); public void initES() { try { client = new RestHighLevelClient( RestClient.builder( new HttpHost("34.8.8.93", 24100, "http"), new HttpHost("34.8.8.94", 24100, "http"), new HttpHost("34.8.8.95", 24100, "http"), new HttpHost("34.8.8.96", 24100, "http"), new HttpHost("34.8.8.98", 24100, "http"), new HttpHost("34.8.8.99", 24100, "http")) .setMaxRetryTimeoutMillis(5 * 60 * 1000));//超时时间设为5分钟 log.info("Log2ESService init 成功"); } catch (Exception e) { log.error("Log2ESService init 失败", e); } } public void closeES() { try { client.close(); log.info("Log2ESService close 成功"); } catch (Exception e) { log.error("Log2ESService close 失败", e); } } public void log2ES(boolean success, String index, String app, String msg) throws Exception { try { String doc = "doc"; String id = UUID.randomUUID().toString(); //保存到ES的数据 Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("app", app); if (success) { jsonMap.put("operation_result", "成功"); } else { jsonMap.put("operation_result", "失败"); } jsonMap.put("message", msg); jsonMap.put("log_time", simpleDateFormat.format(new Date())); IndexRequest indexRequest = new IndexRequest(index, doc, id) .source(jsonMap); client.index(indexRequest); //log.info("Log2ESService log2ES 成功,数据:" + jsonMap.toString()); } catch (Exception e) { log.error("Log2ESService log2ES 失败", e); } } public boolean indexExists(String indexName) throws IOException { GetIndexRequest getIndexRequest = new GetIndexRequest(); getIndexRequest.indices(indexName); return client.indices().exists(getIndexRequest); } public boolean createIndex(String indexName) throws IOException { CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); // 配置映射关系 Map<String, Object> mappings = new HashMap<>(); Map<String, Object> type = new HashMap<>(); mappings.put("doc", type); type.put("dynamic", false); //说明: Map<String, Object> properties = new HashMap<>(); type.put("properties", properties); //文档的id映射 Map<String, Object> idProperties = new HashMap<>(); idProperties.put("type", "integer"); idProperties.put("store", "true"); properties.put("id", idProperties); // 文档的其他字段映射 Map<String, Object> moreProperties = new HashMap<>(); moreProperties.put("type", "text"); //说明: moreProperties.put("store", "true"); //说明: properties.put("app", moreProperties); moreProperties = new HashMap<>(); moreProperties.put("type", "text"); moreProperties.put("store", "true"); properties.put("operation_result", moreProperties); moreProperties = new HashMap<>(); moreProperties.put("type", "text"); moreProperties.put("store", "true"); properties.put("message", moreProperties); moreProperties = new HashMap<>(); moreProperties.put("type", "date"); moreProperties.put("store", "true"); moreProperties.put("format", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"); properties.put("log_time", moreProperties); createIndexRequest.mapping("doc", mappings); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest); return createIndexResponse.isAcknowledged(); } public boolean deleteIndex(String indexName) throws IOException { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest); return deleteIndexResponse.isAcknowledged(); } }
测试代码:
创建索引:
package com.sux.demo; import com.sux.demo.utils.Log2ESUtil; import org.apache.log4j.PropertyConfigurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; public class TestES_CreateIndex { private static final Logger log = LoggerFactory.getLogger(TestES_CreateIndex.class); private static Log2ESUtil log2ESUtil = new Log2ESUtil(); private static String indexName = "sux-test"; private static String app = "sux-test"; public static void main(String[] args) throws Exception { try { PropertyConfigurator.configure("src/main/resources/log4j.properties"); log2ESUtil.initES(); createIndex(); log2ESUtil.closeES(); } catch (Exception e) { log.error("TestES_CreateIndex 出错", e); } } private static void createIndex() throws IOException { if (!log2ESUtil.indexExists(indexName)) { boolean result = log2ESUtil.createIndex(indexName); if (result) { log.info("创建索引" + indexName + "成功!"); } else { log.info("创建索引" + indexName + "失败!"); } } else { System.out.println("索引" + indexName + "已存在,不需要创建"); } } }
删除索引:
package com.sux.demo; import com.sux.demo.utils.Log2ESUtil; import org.apache.log4j.PropertyConfigurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; public class TestES_DeleteIndex { private static final Logger log = LoggerFactory.getLogger(TestES_DeleteIndex.class); private static Log2ESUtil log2ESUtil = new Log2ESUtil(); private static String indexName = "sux-test"; public static void main(String[] args) throws Exception { try { PropertyConfigurator.configure("src/main/resources/log4j.properties"); log2ESUtil.initES(); if (log2ESUtil.indexExists(indexName)) { boolean result = log2ESUtil.deleteIndex(indexName); if (result) { log.info("删除索引" + indexName + "成功!"); } else { log.info("删除索引" + indexName + "失败!"); } } else { log.info("索引" + indexName + "不存在,不需要删除!"); } log2ESUtil.closeES(); } catch (Exception e) { log.error("TestES_DeleteIndex 出错", e); } } }
单线程数据写入:
package com.sux.demo; import com.sux.demo.utils.Log2ESUtil; import com.sux.demo.utils.Speed; import org.apache.log4j.PropertyConfigurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; public class TestES_SingleInsert { private static final Logger log = LoggerFactory.getLogger(TestES_Insert.class); private static Log2ESUtil log2ESUtil = new Log2ESUtil(); private static String indexName = "sux-test"; private static String app = "sux-test"; public static void main(String[] args) throws Exception { try { PropertyConfigurator.configure("src/main/resources/log4j.properties"); log2ESUtil.initES(); long startTime = System.currentTimeMillis(); int n = 200; for (int i = 1; i <= n; i++) { log2ESUtil.log2ES(true, indexName, app, "单线程插入数据" + i); if (i % 50 == 0) { log.info("count=" + i); } Speed.addCount(); } long endTime = System.currentTimeMillis(); double speed = Speed.getCount() / (double) ((endTime - startTime) / 1000.0); System.out.println(" 数据插入速度:" + (int) speed + " 条/秒"); log2ESUtil.closeES(); } catch (Exception e) { log.error("TestES_SingleInsert 出错", e); } } }
多线程数据写入:
package com.sux.demo; import com.sux.demo.utils.Log2ESUtil; import com.sux.demo.utils.Speed; import org.apache.log4j.PropertyConfigurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; public class TestES_Insert { private static final Logger log = LoggerFactory.getLogger(TestES_Insert.class); private static Log2ESUtil log2ESUtil = new Log2ESUtil(); private static String indexName = "sux-test"; private static String app = "sux-test"; private static ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(50); public static void main(String[] args) throws Exception { try { PropertyConfigurator.configure("src/main/resources/log4j.properties"); log2ESUtil.initES(); long startTime = System.currentTimeMillis(); int n = 10000; CountDownLatch countDownLatch = new CountDownLatch(n); for (int i = 1; i <= n; i++) { ESInsertRunnable esInsertRunnable = new ESInsertRunnable(countDownLatch, log2ESUtil, i, "多线程插入数据"); threadPool.submit(esInsertRunnable); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); double speed = Speed.getCount() / (double) ((endTime - startTime) / 1000.0); System.out.println(" 数据插入速度:" + (int) speed + " 条/秒"); log2ESUtil.closeES(); threadPool.shutdown(); } catch (Exception e) { log.error("TestES_Insert 出错", e); } } } class ESInsertRunnable implements Runnable { private static final Logger log = LoggerFactory.getLogger(ESInsertRunnable.class); private CountDownLatch countDownLatch; private Log2ESUtil log2ESUtil; private int num; private static String indexName = "sux-test"; private static String app = "sux-test"; private String msg; public ESInsertRunnable(CountDownLatch countDownLatch, Log2ESUtil log2ESUtil, int num, String msg) { this.countDownLatch = countDownLatch; this.log2ESUtil = log2ESUtil; this.num = num; this.msg = msg; } public void run() { try { log2ESUtil.log2ES(true, indexName, app, msg + num); if (countDownLatch.getCount() % 500 == 0) { log.info("count=" + countDownLatch.getCount()); } Speed.addCount(); } catch (Exception e) { log.error("TestES_Insert 异常", e); } countDownLatch.countDown(); } }
实际测试性能(使用现网es集群测试):
单线程:20条/秒
线程池(50个线程):500条/秒