zoukankan      html  css  js  c++  java
  • Elasticsearch【JAVA REST Client】客户端操作

    ES系统作为集群,环境搭建非常方便简单。 现在在我们的应用中,如何对这个集群进行操作呢?

    我们利用ES系统,通常都是下面的架构:

    在这里,客户端的请求通过LB进行负载均衡,因为操作任何一个ES的实例,最终在ES集群系统中内容都是一样的,所以,考虑各个ES节点的负载问题以及容灾问题,上述的架构是当下最流行的方式。

    下面,将要介绍ES JAVA REST API方式操作ES集群。

    若是通过maven项目操作,相对比较简单的,只需要在pom.xml下面加入下面的配置

    1 <dependency>
    2     <groupId>org.elasticsearch.client</groupId>
    3     <artifactId>rest</artifactId>
    4     <version>5.0.0-rc1</version>
    5 </dependency>

    但是,我们公司网络让人蛋疼,maven仓库基本连接不上,所以,我们不得不采用Dynamic Web Project的方式,所以,需要的ES客户端jar包需要自己下载了。大家可以在maven的网站上一个个的下载,主要是依赖下载,可以在rest的下载地址下面,找到这个包的依赖文件,其实,就是httpclient的一些jar包。

    1 commons-codec
    2 commons-logging
    3 httpclient
    4 httpcore
    5 httpasyncclient
    6 httpcore-nio

    这些包,都不大,下载后放在自己的工程lib下面即可。

    我在项目TKSearch里面创建了一个测试用的controller文件SearchProducerController,然后在service下面创建一个接口文件IESRestService.java,并做相关实现ESRestService.java. 代码如下:

    IESRestService.java

     1 /**
     2  * @author "shihuc"
     3  * @date   2016年10月26日
     4  */
     5 package com.tk.es.search.service;
     6 
     7 import org.apache.http.HttpEntity;
     8 
     9 /**
    10  * @author chengsh05
    11  *
    12  */
    13 public interface IESRestService {
    14 
    15     /**
    16      * 同步方式向ES集群写入数据
    17      * 
    18      * @param index
    19      * @param type
    20      * @param id
    21      * @param entity
    22      * @return
    23      */
    24     public boolean putSync(String index, String type, String id, HttpEntity entity);
    25     
    26     /**
    27      * 异步的方式向ES写入数据
    28      * 
    29      * @param index
    30      * @param type
    31      * @param id
    32      * @param entity
    33      * @return
    34      */
    35     public void putAsync(String index, String type, String id, HttpEntity entity);
    36         
    37 }

    ESRestService.java

     1 /**
     2  * @author "shihuc"
     3  * @date   2016年10月26日
     4  */
     5 package com.tk.es.search.service.impl;
     6 
     7 import java.io.IOException;
     8 import java.util.Collections;
     9 
    10 import javax.annotation.PostConstruct;
    11 import javax.annotation.PreDestroy;
    12 
    13 import org.apache.http.HttpEntity;
    14 import org.apache.http.HttpHost;
    15 import org.apache.http.entity.ContentType;
    16 import org.apache.http.entity.StringEntity;
    17 import org.elasticsearch.client.Response;
    18 import org.elasticsearch.client.ResponseListener;
    19 import org.elasticsearch.client.RestClient;
    20 import org.springframework.stereotype.Service;
    21 
    22 import com.tk.es.search.service.IESRestService;
    23 
    24 /**
    25  * @author chengsh05
    26  *
    27  */
    28 @Service
    29 public class ESRestService implements IESRestService{
    30 
    31     private RestClient restClient = null;
    32     
    33     /* (non-Javadoc)
    34      * @see com.tk.es.search.service.IESRestService#putSync(java.lang.String, java.lang.String, java.lang.String, org.apache.http.HttpEntity)
    35      */
    36     @Override
    37     public boolean putSync(String index, String type, String id, HttpEntity entity) {        
    38         Response indexResponse = null;
    39         try {
    40             indexResponse = restClient.performRequest(
    41                     "PUT",
    42                     "/" + index + "/" + type + "/" + id,
    43                     Collections.<String, String>emptyMap(),
    44                     entity);
    45         } catch (IOException e) {
    46             e.printStackTrace();
    47         }
    48         return (indexResponse != null);
    49     }
    50     
    51     @PostConstruct
    52     public void init(){
    53         restClient = RestClient.builder(new HttpHost("10.90.7.2", 9201, "http")).build();     #这里builder的参数,可以是很多个HttpHost的数组,若采用博文开篇的架构图的话,这里的HttpHost就是LB的的地址
    54     }
    55     
    56     @PreDestroy
    57     public void destroy(){
    58         if(restClient != null){
    59             try {
    60                 restClient.close();
    61             } catch (IOException e) {                
    62                 e.printStackTrace();
    63             }
    64         }
    65     }
    66 
    67     /* (non-Javadoc)
    68      * @see com.tk.es.search.service.IESRestService#putAsync(java.lang.String, java.lang.String, java.lang.String, org.apache.http.HttpEntity)
    69      */
    70     @Override
    71     public void putAsync(String index, String type, String id, HttpEntity entity) {
    72 
    73         restClient.performRequestAsync(
    74                 "PUT",                        #HTTP的方法,可以是PUT,POST,DELETE,HEAD,GET等
    75                 "/" + index + "/" + type + "/" + id,       #endpoint, 这个就是指数据在ES中的位置,由index,type以及id确定
    76                 Collections.<String, String>emptyMap(),      #是一个map
    77                 entity,                        #指的是操作数,即目标数据,这个例子里面表示要存入ES的数据对象
    78                 new ResponseListener() {              #异步操作的监听器,在这里,注册listener,对操作成功或者失败进行后续的处理,比如在这里向前端反馈执行后的结果状态
    79                     @Override
    80                     public void onSuccess(Response response) {
    81                         System.out.println(response);                            
    82                     }
    83 
    84                     @Override
    85                     public void onFailure(Exception exception) {
    86                         System.out.println("failure in async scenrio");
    87                     }
    88                });
    89         
    90     }
    91 
    92 }

    SearchProducerController.java

     1 /**
     2  * @author "shihuc"
     3  * @date   2016年10月26日
     4  */
     5 package com.tk.es.search.controller;
     6 
     7 import java.util.HashMap;
     8 
     9 import javax.annotation.Resource;
    10 import javax.servlet.http.HttpServletRequest;
    11 import javax.servlet.http.HttpServletResponse;
    12 
    13 import org.apache.http.HttpEntity;
    14 import org.apache.http.entity.ContentType;
    15 import org.apache.http.entity.StringEntity;
    16 import org.springframework.stereotype.Controller;
    17 import org.springframework.web.bind.annotation.RequestMapping;
    18 import org.springframework.web.bind.annotation.ResponseBody;
    19 
    20 import com.google.gson.Gson;
    21 import com.tk.es.search.service.impl.ESRestService;
    22 
    23 /**
    24  * @author chengsh05
    25  *
    26  */
    27 @Controller
    28 public class SearchProducerController {
    29     
    30     @Resource
    31     private ESRestService esRestService;
    32     
    33     @RequestMapping(value="/articleSync")
    34     @ResponseBody
    35     public String prepareArticleSync(HttpServletRequest req, HttpServletResponse rsp){
    36         HttpEntity entity = new StringEntity(
    37                 "{
    " +
    38                 "    "user" : "kimchy",
    " +
    39                 "    "post_date" : "2009-11-15T14:12:12",
    " +
    40                 "    "message" : "trying out Elasticsearch"
    " +
    41                 "}", ContentType.APPLICATION_JSON);
    42         
    43         esRestService.putSync("rest_index", "client", "1", entity);
    44         Gson gson = new Gson();
    45         HashMap<String, String> res = new HashMap<String, String>();
    46         res.put("result", "successful");
    47         return gson.toJson(res);
    48     }
    49     
    50     @RequestMapping(value="/articleAsync")
    51     @ResponseBody
    52     public String prepareArticleAsync(HttpServletRequest req, HttpServletResponse rsp){
    53         HttpEntity entity = new StringEntity(
    54                 "{
    " +
    55                 "    "user" : "shihuc",
    " +
    56                 "    "post_date" : "2016-10-26T13:30:12",
    " +
    57                 "    "message" : "Demo REST Client to operate Elasticsearch"
    " +
    58                 "}", ContentType.APPLICATION_JSON);
    59         
    60         esRestService.putAsync("rest_index", "client", "2", entity);
    61         Gson gson = new Gson();
    62         HashMap<String, String> res = new HashMap<String, String>();
    63         res.put("result", "successful");
    64         return gson.toJson(res);
    65     }
    66 }

    启动项目,在地址栏输入http://10.90.9.20:8080/TKSearch/articleSync将会以同步的方式在ES集群中创建一条记录。

    输入http://10.90.9.20:8080/TKSearch/articleAsync,将会以异步的方式创建一个记录。

    将RestClient以一个Service进行包装,在Spring启动的时候,注入bean过程中进行初始化,在bean销毁前进行连接的关闭操作。利用Spring支持的两个注解@PostConstruct和@PreDestroy完成连接的建立和销毁,结构干净简单。

    到此,客户端以Java REST Client的方式操作ES集群的demo就演示结束。 后续将介绍Elasticsearch Java API的方式操作ES集群的实施过程。

  • 相关阅读:
    酷狗大数据平台架构是如何重构的
    分层架构,前后端分离有啥坏处?
    Redis快速入门及应用
    Kafka不只是个消息系统
    提高代码质量:如何编写函数
    jquery 回车事件
    程序员的薪资是如何确定出来的?
    HTTP 错误 500.21
    制作手机浏览器显示格式的HTML页面
    操作必须使用一个可更新的查询。
  • 原文地址:https://www.cnblogs.com/shihuc/p/6000725.html
Copyright © 2011-2022 走看看