运行结果:返回5条数据
参考代码ESTestDocumentAPI.java
package com.dajiangtai.djt_spider.elasticsearch; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.elasticsearch.node.NodeBuilder.*; import static org.elasticsearch.common.xcontent.XContentFactory.*; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.codehaus.jackson.map.ObjectMapper; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.node.Node; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchHits; import org.junit.Before; import org.junit.Test; /** * Document API 操作 * * @author 大讲台 * */ public class ESTestDocumentAPI { private TransportClient client; @Before public void test0() throws UnknownHostException { // 开启client.transport.sniff功能,探测集群所有节点 Settings settings = Settings.settingsBuilder() .put("cluster.name", "escluster") .put("client.transport.sniff", true).build(); // on startup // 获取TransportClient client = TransportClient .builder() .settings(settings) .build() .addTransportAddress( new InetSocketTransportAddress(InetAddress .getByName("master"), 9300)) .addTransportAddress( new InetSocketTransportAddress(InetAddress .getByName("slave1"), 9300)) .addTransportAddress( new InetSocketTransportAddress(InetAddress .getByName("slave2"), 9300)); } /** * 创建索引:use ElasticSearch helpers * * @throws IOException */ @Test public void test1() throws IOException { IndexResponse response = client .prepareIndex("twitter", "tweet", "1") .setSource( jsonBuilder().startObject().field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject()).get(); System.out.println(response.getId()); client.close(); } /** * 创建索引:do it yourself * * @throws IOException */ @Test public void test2() throws IOException { String json = "{" + ""user":"kimchy"," + ""postDate":"2013-01-30"," + ""message":"trying out Elasticsearch"" + "}"; IndexResponse response = client.prepareIndex("twitter", "tweet") .setSource(json).get(); System.out.println(response.getId()); client.close(); } /** * 创建索引:use map * * @throws IOException */ @Test public void test3() throws IOException { Map<String, Object> json = new HashMap<String, Object>(); json.put("user", "kimchy"); json.put("postDate", new Date()); json.put("message", "trying out Elasticsearch"); IndexResponse response = client.prepareIndex("twitter", "tweet") .setSource(json).get(); System.out.println(response.getId()); client.close(); } /** * 创建索引:serialize your beans * * @throws IOException */ @Test public void test4() throws IOException { User user = new User(); user.setUser("kimchy"); user.setPostDate(new Date()); user.setMessage("trying out Elasticsearch"); // instance a json mapper ObjectMapper mapper = new ObjectMapper(); // create once, reuse // generate json byte[] json = mapper.writeValueAsBytes(user); IndexResponse response = client.prepareIndex("twitter", "tweet") .setSource(json).get(); System.out.println(response.getId()); client.close(); } /** * 查询索引:get * * @throws IOException */ @Test public void test5() throws IOException { GetResponse response = client.prepareGet("twitter", "tweet", "1").get(); System.out.println(response.getSourceAsString()); client.close(); } /** * 删除索引:delete * * @throws IOException */ @Test public void test6() throws IOException { client.prepareDelete("twitter", "tweet", "1").get(); client.close(); } /** * 更新索引:Update API-UpdateRequest * * @throws IOException * @throws ExecutionException * @throws InterruptedException */ @Test public void test7() throws IOException, InterruptedException, ExecutionException { UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("twitter"); updateRequest.type("tweet"); updateRequest.id("AVyi3OORot7zkId708s8"); updateRequest.doc(jsonBuilder().startObject().field("gender", "male") .endObject()); client.update(updateRequest).get(); System.out.println(updateRequest.version()); client.close(); } /** * 更新索引:Update API-prepareUpdate()-doc * * @throws IOException * @throws ExecutionException * @throws InterruptedException */ @Test public void test8() throws IOException, InterruptedException, ExecutionException { client.prepareUpdate("twitter", "tweet", "AVyikSKIot7zkId708s6") .setDoc(jsonBuilder().startObject().field("gender", "female") .endObject()).get(); client.close(); } /** * 更新索引:Update API-prepareUpdate()-script * 需要开启:script.engine.groovy.inline.update: on * * @throws IOException * @throws ExecutionException * @throws InterruptedException */ @Test public void test9() throws IOException, InterruptedException, ExecutionException { client.prepareUpdate("twitter", "tweet", "AVyi4oZfot7zkId708s-") .setScript( new Script("ctx._source.gender = "female"", ScriptService.ScriptType.INLINE, null, null)) .get(); client.close(); } /** * 更新索引:Update API-UpdateRequest-upsert * * @throws IOException * @throws ExecutionException * @throws InterruptedException */ @Test public void test10() throws IOException, InterruptedException, ExecutionException { IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "1") .source(jsonBuilder() .startObject() .field("name", "Joe Smith") .field("gender", "male") .endObject()); UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1") .doc(jsonBuilder() .startObject() .field("gender", "female") .endObject()).upsert(indexRequest); client.update(updateRequest).get(); client.close(); } /** * 批量查询索引:Multi Get API * * @throws IOException * @throws ExecutionException * @throws InterruptedException */ @Test public void test11() throws IOException, InterruptedException, ExecutionException { MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("twitter", "tweet", "1") .add("twitter", "tweet", "AVyi4oZfot7zkId708s-", "AVyi3OORot7zkId708s8", "AVyikSKIot7zkId708s6") .add("djt2", "user", "1") .get(); for (MultiGetItemResponse itemResponse : multiGetItemResponses) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String json = response.getSourceAsString(); System.out.println(json); } } client.close(); } /** * 批量操作索引:Bulk API * * @throws IOException * @throws ExecutionException * @throws InterruptedException */ @Test public void test12() throws IOException, InterruptedException, ExecutionException { BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "3") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); DeleteRequestBuilder prepareDelete = client.prepareDelete("twitter", "tweet", "AVyikSKIot7zkId708s6"); bulkRequest.add(prepareDelete); BulkResponse bulkResponse = bulkRequest.get(); //批量操作:其中一个操作失败不影响其他操作成功执行 if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item BulkItemResponse[] items = bulkResponse.getItems(); for (BulkItemResponse bulkItemResponse : items) { System.out.println(bulkItemResponse.getFailureMessage()); } }else{ System.out.println("bulk process success!"); } client.close(); } /** * 批量操作索引:Using Bulk Processor * 优化:先关闭副本,再添加副本,提升效率 * @throws IOException * @throws ExecutionException * @throws InterruptedException */ @Test public void test13() throws IOException, InterruptedException, ExecutionException { BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { public void beforeBulk(long executionId, BulkRequest request) { // TODO Auto-generated method stub System.out.println(request.numberOfActions()); } public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // TODO Auto-generated method stub System.out.println(failure.getMessage()); } public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { // TODO Auto-generated method stub System.out.println(response.hasFailures()); } }) .setBulkActions(1000) // 每个批次的最大数量 .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 每个批次的最大字节数 .setFlushInterval(TimeValue.timeValueSeconds(5))// 每批提交时间间隔 .setConcurrentRequests(1) //设置多少个并发处理线程 //可以允许用户自定义当一个或者多个bulk请求失败后,该执行如何操作 .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build(); String json = "{" + ""user":"kimchy"," + ""postDate":"2013-01-30"," + ""message":"trying out Elasticsearch"" + "}"; for (int i = 0; i < 1000; i++) { bulkProcessor.add(new IndexRequest("djt6", "user").source(json)); } //阻塞至所有的请求线程处理完毕后,断开连接资源 bulkProcessor.awaitClose(3, TimeUnit.MINUTES); client.close(); } /** * SearchType使用方式 * @throws Exception */ @Test public void test14() throws Exception { SearchResponse response = client.prepareSearch("djt") .setTypes("user") .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) // .setSearchType(SearchType.QUERY_AND_FETCH) .execute() .actionGet(); SearchHits hits = response.getHits(); System.out.println(hits.getTotalHits()); } }
先创建索引库djt1
curl -XPUT 'master:9200/djt1/user/1' -d '{"name":"tom","age":18, "info":"tom"}' curl -XPUT 'master:9200/djt1/user/2' -d '{"name":"jack","age":29, "info":"jack"}' curl -XPUT 'master:9200/djt1/user/3' -d '{"name":"jetty","age":18, "info":"jetty"}' curl -XPUT 'master:9200/djt1/user/4' -d '{"name":"daival","age":19, "info":"daival"}' curl -XPUT 'master:9200/djt1/user/5' -d '{"name":"lilei","age":18, "info":"lilei"}' curl -XPUT 'master:9200/djt1/user/6' -d '{"name":"lili","age":29, "info":"lili"}' curl -XPUT 'master:9200/djt1/user/7' -d '{"name":"tom1","age":18, "info":"tom1"}' curl -XPUT 'master:9200/djt1/user/8' -d '{"name":"tom2","age":19, "info":"tom2"}' curl -XPUT 'master:9200/djt1/user/9' -d '{"name":"tom3","age":11, "info":"tom3"}' curl -XPUT 'master:9200/djt1/user/10' -d '{"name":"tom4","age":18, "info":"tom4"}' curl -XPUT 'master:9200/djt1/user/11' -d '{"name":"tom5","age":18, "info":"tom5"}' curl -XPUT 'master:9200/djt1/user/12' -d '{"name":"tom6","age":18, "info":"tom6"}' curl -XPUT 'master:9200/djt1/user/13' -d '{"name":"tom7","age":18, "info":"tom7"}' curl -XPUT 'master:9200/djt1/user/14' -d '{"name":"tom8","age":18, "info":"tom8"}' curl -XPUT 'master:9200/djt1/user/15' -d '{"name":"tom9","age":18, "info":"tom9"}' curl -XPUT 'master:9200/djt1/user/16' -d '{"name":"john","age":18, "info":"john"}' curl -XPUT 'master:9200/djt1/user/17' -d '{"name":"marry","age":38, "info":"marry"}' curl -XPUT 'master:9200/djt1/user/18' -d '{"name":"jjk1","age":18, "info":"jjk1"}'
执行程序(这里是条件匹配查询)
这里是把djt1库的所以数据都查出来
对多字段查询
模糊字符匹配查询方法
根据权重来查询
完全匹配查询
从0开始只查询2条数据
对年龄进行升序查询2条数据
查询某个年龄段的10条数据
参考代码 ESTestQuery.java
package com.dajiangtai.djt_spider.elasticsearch; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.sort.SortOrder; import org.junit.Before; import org.junit.Test; /** * Query 操作 * * @author 大讲台 * */ public class ESTestQuery { private TransportClient client; @Before public void test0() throws UnknownHostException { // 开启client.transport.sniff功能,探测集群所有节点 Settings settings = Settings.settingsBuilder() .put("cluster.name", "escluster") .put("client.transport.sniff", true).build(); // on startup // 获取TransportClient client = TransportClient .builder() .settings(settings) .build() .addTransportAddress( new InetSocketTransportAddress(InetAddress .getByName("master"), 9300)) .addTransportAddress( new InetSocketTransportAddress(InetAddress .getByName("slave1"), 9300)) .addTransportAddress( new InetSocketTransportAddress(InetAddress .getByName("slave2"), 9300)); } /** * 查询:query * 分页:from to * 排序:sort * 过滤:filter * * @throws Exception */ @Test public void test1() throws Exception { SearchRequestBuilder builder = client.prepareSearch("djt1"); builder//.setQuery(QueryBuilders.matchQuery("info", "marry and john")) .setQuery(QueryBuilders.matchAllQuery()) // .setQuery(QueryBuilders.multiMatchQuery("john", "name","info")) // .setQuery(QueryBuilders.queryStringQuery("name:tom*")) //.setQuery(QueryBuilders.boolQuery().should(QueryBuilders.matchQuery("name","tom").boost(3.0f)).should(QueryBuilders.matchQuery("age",18).boost(1.0f))) //.setQuery(QueryBuilders.termQuery("info", "tom")) .setFrom(0) .setSize(10) .addSort("age", SortOrder.ASC) .setPostFilter(QueryBuilders.rangeQuery("age").from(18).to(32)) .setExplain(false) ; SearchResponse searchResponse = builder.get(); SearchHits hits = searchResponse.getHits(); Map<String, Object> map = new HashMap<String, Object>(); SearchHit[] hits2 = hits.getHits(); for (SearchHit searchHit : hits2) { System.out.println(searchHit.getSource().toString()); } } }