ES系统作为集群,环境搭建非常方便简单。 现在在我们的应用中,如何对这个集群进行操作呢?
我们利用ES系统,通常都是下面的架构:
在这里,客户端的请求通过LB进行负载均衡,因为操作任何一个ES的实例,最终在ES集群系统中内容都是一样的,所以,考虑各个ES节点的负载问题以及容灾问题,上述的架构是当下最流行的方式。
下面,将要介绍ES JAVA REST API方式操作ES集群。
若是通过maven项目操作,相对比较简单的,只需要在pom.xml下面加入下面的配置
1 <dependency> 2 <groupId>org.elasticsearch.client</groupId> 3 <artifactId>rest</artifactId> 4 <version>5.0.0-rc1</version> 5 </dependency>
但是,我们公司网络让人蛋疼,maven仓库基本连接不上,所以,我们不得不采用Dynamic Web Project的方式,所以,需要的ES客户端jar包需要自己下载了。大家可以在maven的网站上一个个的下载,主要是依赖下载,可以在rest的下载地址下面,找到这个包的依赖文件,其实,就是httpclient的一些jar包。
1 commons-codec 2 commons-logging 3 httpclient 4 httpcore 5 httpasyncclient 6 httpcore-nio
这些包,都不大,下载后放在自己的工程lib下面即可。
我在项目TKSearch里面创建了一个测试用的controller文件SearchProducerController,然后在service下面创建一个接口文件IESRestService.java,并做相关实现ESRestService.java. 代码如下:
IESRestService.java
1 /** 2 * @author "shihuc" 3 * @date 2016年10月26日 4 */ 5 package com.tk.es.search.service; 6 7 import org.apache.http.HttpEntity; 8 9 /** 10 * @author chengsh05 11 * 12 */ 13 public interface IESRestService { 14 15 /** 16 * 同步方式向ES集群写入数据 17 * 18 * @param index 19 * @param type 20 * @param id 21 * @param entity 22 * @return 23 */ 24 public boolean putSync(String index, String type, String id, HttpEntity entity); 25 26 /** 27 * 异步的方式向ES写入数据 28 * 29 * @param index 30 * @param type 31 * @param id 32 * @param entity 33 * @return 34 */ 35 public void putAsync(String index, String type, String id, HttpEntity entity); 36 37 }
ESRestService.java
1 /** 2 * @author "shihuc" 3 * @date 2016年10月26日 4 */ 5 package com.tk.es.search.service.impl; 6 7 import java.io.IOException; 8 import java.util.Collections; 9 10 import javax.annotation.PostConstruct; 11 import javax.annotation.PreDestroy; 12 13 import org.apache.http.HttpEntity; 14 import org.apache.http.HttpHost; 15 import org.apache.http.entity.ContentType; 16 import org.apache.http.entity.StringEntity; 17 import org.elasticsearch.client.Response; 18 import org.elasticsearch.client.ResponseListener; 19 import org.elasticsearch.client.RestClient; 20 import org.springframework.stereotype.Service; 21 22 import com.tk.es.search.service.IESRestService; 23 24 /** 25 * @author chengsh05 26 * 27 */ 28 @Service 29 public class ESRestService implements IESRestService{ 30 31 private RestClient restClient = null; 32 33 /* (non-Javadoc) 34 * @see com.tk.es.search.service.IESRestService#putSync(java.lang.String, java.lang.String, java.lang.String, org.apache.http.HttpEntity) 35 */ 36 @Override 37 public boolean putSync(String index, String type, String id, HttpEntity entity) { 38 Response indexResponse = null; 39 try { 40 indexResponse = restClient.performRequest( 41 "PUT", 42 "/" + index + "/" + type + "/" + id, 43 Collections.<String, String>emptyMap(), 44 entity); 45 } catch (IOException e) { 46 e.printStackTrace(); 47 } 48 return (indexResponse != null); 49 } 50 51 @PostConstruct 52 public void init(){ 53 restClient = RestClient.builder(new HttpHost("10.90.7.2", 9201, "http")).build(); #这里builder的参数,可以是很多个HttpHost的数组,若采用博文开篇的架构图的话,这里的HttpHost就是LB的的地址 54 } 55 56 @PreDestroy 57 public void destroy(){ 58 if(restClient != null){ 59 try { 60 restClient.close(); 61 } catch (IOException e) { 62 e.printStackTrace(); 63 } 64 } 65 } 66 67 /* (non-Javadoc) 68 * @see com.tk.es.search.service.IESRestService#putAsync(java.lang.String, java.lang.String, java.lang.String, org.apache.http.HttpEntity) 69 */ 70 @Override 71 public void putAsync(String index, String type, String id, HttpEntity entity) { 72 73 restClient.performRequestAsync( 74 "PUT", #HTTP的方法,可以是PUT,POST,DELETE,HEAD,GET等 75 "/" + index + "/" + type + "/" + id, #endpoint, 这个就是指数据在ES中的位置,由index,type以及id确定 76 Collections.<String, String>emptyMap(), #是一个map 77 entity, #指的是操作数,即目标数据,这个例子里面表示要存入ES的数据对象 78 new ResponseListener() { #异步操作的监听器,在这里,注册listener,对操作成功或者失败进行后续的处理,比如在这里向前端反馈执行后的结果状态 79 @Override 80 public void onSuccess(Response response) { 81 System.out.println(response); 82 } 83 84 @Override 85 public void onFailure(Exception exception) { 86 System.out.println("failure in async scenrio"); 87 } 88 }); 89 90 } 91 92 }
SearchProducerController.java
1 /** 2 * @author "shihuc" 3 * @date 2016年10月26日 4 */ 5 package com.tk.es.search.controller; 6 7 import java.util.HashMap; 8 9 import javax.annotation.Resource; 10 import javax.servlet.http.HttpServletRequest; 11 import javax.servlet.http.HttpServletResponse; 12 13 import org.apache.http.HttpEntity; 14 import org.apache.http.entity.ContentType; 15 import org.apache.http.entity.StringEntity; 16 import org.springframework.stereotype.Controller; 17 import org.springframework.web.bind.annotation.RequestMapping; 18 import org.springframework.web.bind.annotation.ResponseBody; 19 20 import com.google.gson.Gson; 21 import com.tk.es.search.service.impl.ESRestService; 22 23 /** 24 * @author chengsh05 25 * 26 */ 27 @Controller 28 public class SearchProducerController { 29 30 @Resource 31 private ESRestService esRestService; 32 33 @RequestMapping(value="/articleSync") 34 @ResponseBody 35 public String prepareArticleSync(HttpServletRequest req, HttpServletResponse rsp){ 36 HttpEntity entity = new StringEntity( 37 "{ " + 38 " "user" : "kimchy", " + 39 " "post_date" : "2009-11-15T14:12:12", " + 40 " "message" : "trying out Elasticsearch" " + 41 "}", ContentType.APPLICATION_JSON); 42 43 esRestService.putSync("rest_index", "client", "1", entity); 44 Gson gson = new Gson(); 45 HashMap<String, String> res = new HashMap<String, String>(); 46 res.put("result", "successful"); 47 return gson.toJson(res); 48 } 49 50 @RequestMapping(value="/articleAsync") 51 @ResponseBody 52 public String prepareArticleAsync(HttpServletRequest req, HttpServletResponse rsp){ 53 HttpEntity entity = new StringEntity( 54 "{ " + 55 " "user" : "shihuc", " + 56 " "post_date" : "2016-10-26T13:30:12", " + 57 " "message" : "Demo REST Client to operate Elasticsearch" " + 58 "}", ContentType.APPLICATION_JSON); 59 60 esRestService.putAsync("rest_index", "client", "2", entity); 61 Gson gson = new Gson(); 62 HashMap<String, String> res = new HashMap<String, String>(); 63 res.put("result", "successful"); 64 return gson.toJson(res); 65 } 66 }
启动项目,在地址栏输入http://10.90.9.20:8080/TKSearch/articleSync将会以同步的方式在ES集群中创建一条记录。
输入http://10.90.9.20:8080/TKSearch/articleAsync,将会以异步的方式创建一个记录。
将RestClient以一个Service进行包装,在Spring启动的时候,注入bean过程中进行初始化,在bean销毁前进行连接的关闭操作。利用Spring支持的两个注解@PostConstruct和@PreDestroy完成连接的建立和销毁,结构干净简单。
到此,客户端以Java REST Client的方式操作ES集群的demo就演示结束。 后续将介绍Elasticsearch Java API的方式操作ES集群的实施过程。