一、ES使用,以及客户端
1、pom引用
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>5.4.3</version> </dependency>
如果测试@Test还需增加一下
<dependency> <groupId>org.elasticsearch.plugin</groupId> <artifactId>transport-netty4-client</artifactId> <version>5.4.3</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>5.4.3</version> </dependency>
2、日志添加
同时需要添加日志包引用log4j
<dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency>
或者使用slf4j
<dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.24</version> </dependency>
github地址:URL
二、index索引操作
常用查看
查看集群健康:http://127.0.0.1:9200/_cat/health?v
查看集群节点:http://127.0.0.1:9200/_cat/nodes?v
查看索引信息:http://127.0.0.1:9200/_cat/indices?v
Elasticsearch提供了一个完整的Java API来处理管理任务。要访问它们,您需要从客户端调用admin()方法以获取AdminClient:
AdminClient adminClient = client.admin();
2.1、java API 索引的创建
方式一、自动创建索引
需要在elasticsearch中创建自动索引,配置yml
action.auto_create_index: .security,.monitoring*,.watches,.triggered_watches,.watcher-history*,app-a-*,app-b-*
方式二、手工创建索引及基本操作
索引:要访问索引Java API,您需要从AdminClient调用indices()方法:
IndicesAdminClient indicesAdminClient = client.admin().indices();
基础帮助类【客户端、增加索引、删除索引、是否存在索引】
文档地址:https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.4/java-docs.html
索引API允许将类型化的JSON文档索引到特定索引中并使其可搜索。
查看github文档中的索引映射操作
使用header工具查看索引:http://localhost:9200/twitter_index/_mapping/tweet_type
2.2、生成json操作
es操作多半使用nosql操作,即一般使用json格式字符串。
注意:在内部,每种类型都转换为byte [](因此String转换为byte [])。因此,如果对象已经是这种形式,那么使用它。 jsonBuilder是高度优化的JSON生成器,可直接构造byte []。
方式一、自己写一个json String或者json byte[]
String json = "{" + ""user":"kimchy"," + ""postDate":"2013-01-30"," + ""message":"trying out Elasticsearch"" + "}";
注意:如果有日期类型,需要进行定义格式
方式二、使用能转化成json的map
Map<String, Object> json = new HashMap<String, Object>(); json.put("user","kimchy"); json.put("postDate",new Date()); json.put("message","trying out Elasticsearch");
方式三、使用序列化类库Jackson等
import com.fasterxml.jackson.databind.*; // instance a json mapper ObjectMapper mapper = new ObjectMapper(); // create once, reuse // generate json byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
方式四、使用内置帮助器XContentFactory.jsonBuilder()
import static org.elasticsearch.common.xcontent.XContentFactory.*; XContentBuilder builder = jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject()
查看内容
String json = builder.string();
请注意,您还可以使用startArray(String)和endArray()方法添加数组。顺便说一下,field方法接受许多对象类型。您可以直接传递数字,日期甚至其他XContentBuilder对象。
2.3、操作线程
索引API允许设置线程模型,当在同一节点上执行API的实际执行时将执行操作(API在同一服务器上分配的分片上执行)。
选项是在不同的线程上执行操作,或者在调用线程上执行它(注意API仍然是异步的)。默认情况下,operationThreaded设置为true,这意味着操作在不同的线程上执行。
更多消息可以参看
三、文档操作
1)Document内容新增
方式一、对应上文方式四
import static org.elasticsearch.common.xcontent.XContentFactory.*; IndexResponse response = client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) .get();
方式二、序列化成String
String json = "{" + ""user":"kimchy"," + ""postDate":"2013-01-30"," + ""message":"trying out Elasticsearch"" + "}"; IndexResponse response = client.prepareIndex("twitter", "tweet") .setSource(json) .get();
IndexResponse
对象会有一个响应报告
// 索引名 String _index = response.getIndex(); // 类型名 String _type = response.getType(); // 文档ID 主键ID String _id = response.getId(); // Version (if it's the first time you index this document, you will get: 1) long _version = response.getVersion(); // status has stored current instance statement. RestStatus status = response.status();
单条记录,批量插入等请参看
4)通过ID查询记录
GetResponse response = client.prepareGet("indexName", "type", "id").get();
默认情况下,operationThreaded设置为true,这意味着操作在不同的线程上执行。这是一个将其设置为false的示例:
GetResponse response = client.prepareGet("twitter", "tweet", "1") .setOperationThreaded(false) .get();
5)通过ID删除文档数据
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
默认情况下,operationThreaded设置为true,这意味着操作在不同的线程上执行。这是一个将其设置为false的示例:
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1") .setOperationThreaded(false) .get();
6)根据条件删除记录
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) //query .source("persons") //index .get(); // excute long deleted = response.getDeleted(); // number
因为它可以是一个长时间运行的操作,如果你想异步地执行它,你可以调用execute而不是get并提供一个监听器,如:
DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) .source("persons") .execute(new ActionListener<BulkIndexByScrollResponse>() { //listener @Override public void onResponse(BulkIndexByScrollResponse response) { long deleted = response.getDeleted(); } @Override public void onFailure(Exception e) { // Handle the exception } });
7)通过ID更新
方式一、通过UpdateRequest
UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("index"); updateRequest.type("type"); updateRequest.id("1"); updateRequest.doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();
方式二、prepareUpdate方法
脚本方式
client.prepareUpdate("ttl", "doc", "1") .setScript(new Script("ctx._source.gender = "male"" , ScriptService.ScriptType.INLINE, null, null)) .get();
关于脚本:它也可以是本地存储的脚本名称。在这种情况下,您需要使用ScriptService.ScriptType.FILE
doc方式
client.prepareUpdate("ttl", "doc", "1") .setDoc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .get();
更新API还支持传递部分文档,该部分文档将合并到现有文档中(简单的递归合并,对象的内部合并,替换核心“键/值”和数组)。例如:
选用其中一种即可,无法同时提供脚本和doc。
8)upsert 更新插入
对upsert的支持。如果文档不存在,则upsert元素的内容将用于索引新文档:
IndexRequest indexRequest = new IndexRequest("index", "type", "1") .source(jsonBuilder() .startObject() .field("name", "Joe Smith") .field("gender", "male") .endObject()); UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .upsert(indexRequest); client.update(updateRequest).get();
如果文档不存在,将添加indexRequest中的文档
9)多ID获取
MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("twitter", "tweet", "1") .add("twitter", "tweet", "2", "3", "4") .add("another", "type", "foo") .get(); for (MultiGetItemResponse itemResponse : multiGetItemResponses) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String json = response.getSourceAsString(); } }
10)批量插入API
import static org.elasticsearch.common.xcontent.XContentFactory.*; BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item }
11)批量操作API
BulkProcessor类提供了一个简单的接口,可根据请求的数量或大小自动刷新批量操作,或者在给定时间段之后。
https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.4/java-docs-bulk-processor.html