zoukankan      html  css  js  c++  java
  • Elasticsearch系列---Java客户端代码Demo

    前言

    前面历经33篇内容的讲解,与ES的请求操作都是在Kibana平台上用Restful请求完成的,一直没发布Java或python的客户端代码,Restful才是运用、理解ES核心功能最直接的表达方式,但实际项目中肯定是以Java/python来完成ES请求的发起与数据处理的,前面理解了ES的核心功能,后面Java API的使用将会非常简单,剩余的未覆盖的功能API,自行查阅文档即可。

    概要

    本篇讲解Elasticsearch的客户端API开发的一些示例,以Java语言为主,介绍一些最常用,最核心的API。

    代码示例

    引入依赖

    我们以maven项目为例,添加项目依赖

    <dependency>
    	<groupId>org.elasticsearch</groupId>
    	<artifactId>elasticsearch</artifactId>
    	<version>6.3.1</version>
    </dependency>
    <dependency>
    	<groupId>org.elasticsearch.client</groupId>
    	<artifactId>transport</artifactId>
    	<version>6.3.1</version>
    </dependency>
    <dependency>
    	<groupId>log4j</groupId>
    	<artifactId>log4j</artifactId>
    	<version>1.2.17</version>
    </dependency>
    <dependency>
    	<groupId>org.apache.logging.log4j</groupId>
    	<artifactId>log4j-core</artifactId>
    	<version>2.12.1</version>
    </dependency>
    

    建立ES连接

    1. 创建Settings对象,指定集群名称
    2. 创建TransportClient对象,手动指定IP、端口即可
    Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
    		
    TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
    

    如果集群的节点数比较多,为每个node分别指定IP、Port可行性不高,我们可以使用集群节点自动探查的功能,代码如下:

    // 将client.transport.sniff设置为true即可打开集群节点自动探查功能
    Settings settings = Settings.builder().put("client.transport.sniff", true)..put("cluster.name", "elasticsearch").build();
    
    // 只需要指定一个node就行
    TransportClient client = new PreBuiltTransportClient(settings);
    transport.addTransportAddress(new TransportAddress(InetAddress.getByName("192.168.17.137"), 9300));
    
    

    基本CRUD

    最基本的CRUD代码,可以当作入门demo来写:

    /**
    	 * 创建员工信息(创建一个document)
    	 * @param client
    	 */
    	private static void createEmployee(TransportClient client) throws Exception {
    		IndexResponse response = client.prepareIndex("company", "employee", "1")
    				.setSource(XContentFactory.jsonBuilder()
    						.startObject()
    							.field("name", "jack")
    							.field("age", 27)
    							.field("position", "technique")
    							.field("country", "china")
    							.field("join_date", "2017-01-01")
    							.field("salary", 10000)
    						.endObject())
    				.get();
    		System.out.println(response.getResult()); 
    	}
    	
    	/**
    	 * 获取员工信息
    	 * @param client
    	 * @throws Exception
    	 */
    	private static void getEmployee(TransportClient client) throws Exception {
    		GetResponse response = client.prepareGet("company", "employee", "1").get();
    		System.out.println(response.getSourceAsString()); 
    	}
    	
    	/**
    	 * 修改员工信息
    	 * @param client
    	 * @throws Exception
    	 */
    	private static void updateEmployee(TransportClient client) throws Exception {
    		UpdateResponse response = client.prepareUpdate("company", "employee", "1") 
    				.setDoc(XContentFactory.jsonBuilder()
    							.startObject()
    								.field("position", "technique manager")
    							.endObject())
    				.get();
    		System.out.println(response.getResult());  
     	}
    	
    	/**
    	 * 删除 员工信息
    	 * @param client
    	 * @throws Exception
    	 */
    	private static void deleteEmployee(TransportClient client) throws Exception {
    		DeleteResponse response = client.prepareDelete("company", "employee", "1").get();
    		System.out.println(response.getResult());  
    	}
    

    搜索

    我们之前使用Restful的搜索,现在改用java实现,原有的Restful示例如下:

    GET /company/employee/_search
    {
      "query": {
        "bool": {
          "must": [
            {
              "match": {
                "position": "technique"
              }
            }
          ],
          "filter": {
            "range": {
              "age": {
                "gte": 30,
                "lte": 40
              }
            }
          }
        }
      },
      "from": 0,
      "size": 1
    }
    

    等同于这样的Java代码:

    SearchResponse response = client.prepareSearch("company")
            .setTypes("employee")
            .setQuery(QueryBuilders.termQuery("position", "technique"))                 // Query
            .setPostFilter(QueryBuilders.rangeQuery("age").from(30).to(40))     // Filter
            .setFrom(0).setSize(60)
            .get();
    

    聚合查询

    聚合查询稍微麻烦一些,请求的封装和响应报文的解析,都是根据实际返回的结构来做的,例如下面的查询:

    需求:

    1. 按照country国家来进行分组
    2. 在每个country分组内,再按照入职年限进行分组
    3. 最后计算每个分组内的平均薪资

    Restful的请求如下:

    GET /company/employee/_search
    {
      "size": 0,
      "aggs": {
        "group_by_country": {
          "terms": {
            "field": "country"
          },
          "aggs": {
            "group_by_join_date": {
              "date_histogram": {
                "field": "join_date",
                "interval": "year"
              },
              "aggs": {
                "avg_salary": {
                  "avg": {
                    "field": "salary"
                  }
                }
              }
            }
          }
        }
      }
    }
    

    用Java编写的请求如下:

    SearchResponse sr = node.client().prepareSearch()
        .addAggregation(
            AggregationBuilders.terms("by_country").field("country")
            .subAggregation(AggregationBuilders.dateHistogram("by_year")
                .field("dateOfBirth")
                .dateHistogramInterval(DateHistogramInterval.YEAR)
                .subAggregation(AggregationBuilders.avg("avg_children").field("children"))
            )
        )
        .execute().actionGet();
    

    对响应的处理,则需要一层一层获取数据:

    Map<String, Aggregation> aggrMap = searchResponse.getAggregations().asMap();
    	StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");
    	Iterator<Bucket> groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();
    	
    	while(groupByCountryBucketIterator.hasNext()) {
    		Bucket groupByCountryBucket = groupByCountryBucketIterator.next();
    		
    		System.out.println(groupByCountryBucket.getKey() + "	" + groupByCountryBucket.getDocCount()); 
    		
    		Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date"); 
    		Iterator<org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket> groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();
    		 
    		while(groupByJoinDateBucketIterator.hasNext()) {
    			org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();
    			
    			System.out.println(groupByJoinDateBucket.getKey() + "	" + groupByJoinDateBucket.getDocCount()); 
    			
    			Avg avgSalary = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");
    			System.out.println(avgSalary.getValue()); 
    		}
    	}
    	
    	client.close();
    }
    

    upsert请求

    private static void upsert(TransportClient transport) {
    	try {
    		IndexRequest index = new IndexRequest("book_shop", "books", "2").source(
    				XContentFactory.jsonBuilder().startObject()
    						.field("name", "mysql从入门到删库跑路")
    						.field("tags", "mysql")
    						.field("price", 32.8)
    						.endObject());
    
    		UpdateRequest update = new UpdateRequest("book_shop", "books", "2")
    				.doc(XContentFactory.jsonBuilder()
    						.startObject().field("price", 31.8)
    						.endObject())
    				.upsert(index);
    		UpdateResponse response = transport.update(update).get();
    		System.out.println(response.getVersion());
    	} catch (IOException e) {
    		e.printStackTrace();
    	} catch (InterruptedException e) {
    		e.printStackTrace();
    	} catch (ExecutionException e) {
    		e.printStackTrace();
    	}
    }
    

    mget请求

    public static void mget(TransportClient transport) {
    	MultiGetResponse res = transport.prepareMultiGet()
    			.add("book_shop", "books", "1")
    			.add("book_shop", "books", "2")
    			.get();
    	for (MultiGetItemResponse item : res.getResponses()) {
    		System.out.println(item.getResponse());
    	}
    }
    

    bulk请求

    public static void bulk(TransportClient transport) {
    	try {
    	BulkRequestBuilder bulk = transport.prepareBulk();
    	bulk.add(transport.prepareIndex("book_shop", "books", "3").setSource(
    			XContentFactory.jsonBuilder().startObject()
    					.field("name", "设计模式从入门到拷贝代码")
    					.field("tags", "设计模式")
    					.field("price", 55.9)
    					.endObject()));
    		bulk.add(transport.prepareIndex("book_shop", "books", "4").setSource(
    				XContentFactory.jsonBuilder().startObject()
    						.field("name", "架构设计从入门到google搜索")
    						.field("tags", "架构设计")
    						.field("price", 68.9)
    						.endObject()));
    		bulk.add(transport.prepareUpdate("book_shop", "books", "1").setDoc((XContentFactory.jsonBuilder()
    				.startObject().field("price", 32.8)
    				.endObject())));
    
    		BulkResponse bulkRes = bulk.get();
    		if (bulkRes.hasFailures()) {
    			System.out.println("Error...");
    		}
    	} catch (IOException e) {
    		e.printStackTrace();
    	}
    }
    

    scorll请求

    public static void scorll(TransportClient client) {
    	SearchResponse bookShop = client.prepareSearch("book_shop").setScroll(new TimeValue(60000)).setSize(1).get();
    
    	int batchCnt = 0;
    	do {
    	    // 循环读取scrollid信息,直到结果为空
    		for(SearchHit hit: bookShop.getHits().getHits()) {
    			System.out.println("batchCnt:" + ++batchCnt);
    			System.out.println(hit.getSourceAsString());
    		}
    		bookShop = client.prepareSearchScroll(bookShop.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
    	} while (bookShop.getHits().getHits().length != 0);
    }
    
    

    搜索模板

    public static void searchTemplates(TransportClient client) {
    	Map<String,Object> params = new HashMap<>(10);
    	params.put("from",0);
    	params.put("size",10);
    	params.put("tags","java");
    
    	SearchTemplateResponse str = new SearchTemplateRequestBuilder(client)
    			.setScript("page_query_by_tags")
    			.setScriptType(ScriptType.STORED)
    			.setScriptParams(params)
    			.setRequest(new SearchRequest())
    			.get();
    
    	for(SearchHit hit:str.getResponse().getHits().getHits()) {
    		System.out.println(hit.getSourceAsString());
    	}
    }
    

    多条件组合查询

    public static void otherSearch(TransportClient client) {
    	SearchResponse response1 = client.prepareSearch("book_shop").setQuery(QueryBuilders.termQuery("tags", "java")).get();
    	SearchResponse response2 = client.prepareSearch("book_shop").setQuery(QueryBuilders.multiMatchQuery("32.8", "price","tags")).get();
    	SearchResponse response3 = client.prepareSearch("book_shop").setQuery(QueryBuilders.commonTermsQuery("name", "入门")).get();
    	SearchResponse response4 = client.prepareSearch("book_shop").setQuery(QueryBuilders.prefixQuery("name", "java")).get();
    
    	System.out.println(response1.getHits().getHits()[0].getSourceAsString());
    	System.out.println(response2.getHits().getHits()[0].getSourceAsString());
    	System.out.println(response3.getHits().getHits()[0].getSourceAsString());
    	System.out.println(response4.getHits().getHits()[0].getSourceAsString());
    
    	// 多个条件组合
    	SearchResponse response5 = client.prepareSearch("book_shop").setQuery(QueryBuilders.boolQuery()
    			.must(QueryBuilders.termQuery("tags", "java"))
    			.mustNot(QueryBuilders.matchQuery("name", "跑路"))
    			.should(QueryBuilders.matchQuery("name", "入门"))
    			.filter(QueryBuilders.rangeQuery("price").gte(23).lte(55))).get();
    
    	System.out.println(response5.getHits().getHits()[0].getSourceAsString());
    }
    

    地理位置查询

    public static void geo(TransportClient client) {
    	GeoBoundingBoxQueryBuilder query1 = QueryBuilders.geoBoundingBoxQuery("location").setCorners(23, 112, 21, 114);
    
    	List<GeoPoint> points = new ArrayList<>();
    	points.add(new GeoPoint(23,115));
    	points.add(new GeoPoint(25,113));
    	points.add(new GeoPoint(21,112));
    	GeoPolygonQueryBuilder query2 = QueryBuilders.geoPolygonQuery("location",points);
    
    	GeoDistanceQueryBuilder query3 = QueryBuilders.geoDistanceQuery("location").point(22.523375, 113.911231).distance(500, DistanceUnit.METERS);
    
    
    	SearchResponse response = client.prepareSearch("location").setQuery(query3).get();
    	for(SearchHit hit:response.getHits().getHits()) {
    		System.out.println(hit.getSourceAsString());
    	}
    }
    

    小结

    上述的那些案例demo,快速浏览一下即可,如果已经在开发ES相关的项目,还是多参考官方的API文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.3/index.html。上面有很详尽的API说明和使用Demo

    专注Java高并发、分布式架构,更多技术干货分享与心得,请关注公众号:Java架构社区
    可以扫左边二维码添加好友,邀请你加入Java架构社区微信群共同探讨技术
    Java架构社区

  • 相关阅读:
    python的元类
    中国的互联网:草根与精英
    PEP8中文翻译
    一些重要的算法
    tornado模板语法
    C#l类与对象
    sql_ 存储过程
    SQL_触发器
    SQL_事务处理
    C#_方法
  • 原文地址:https://www.cnblogs.com/huangying2124/p/12944429.html
Copyright © 2011-2022 走看看