zoukankan      html  css  js  c++  java
  • ElasticSearch RestHighLevelClient 通用操作

    项目中使用到ElasticSearch作为搜索引擎。而ES的环境搭建自然是十分简单,且本身就适应于分布式环境,因此这块就不多赘述。而其本身特性和查询语句这篇博文不会介绍,如果有机会会深入介绍。

    ​ 所以这篇博文主要还是介绍Java客户端中如何使用查询搜索引擎中的数据。而使用的Java客户端是官方新推出的RestHighLevelClient,使用Http连接查询结果。但是网上相关资料较少,只有官网的api介绍。所以本文以一个小demo介绍RestHighLevelClient的使用。

    项目依赖:
    dependencies {

    // https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client
    compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '5.6.2'
    }

    一般配置Java Client

    // Java客户端生成工厂
    public class ESClientFactory {
    
        private static final String HOST = "127.0.0.1";
        private static final int PORT = 9200;
        private static final String SCHEMA = "http";
        private static final int CONNECT_TIME_OUT = 1000;
        private static final int SOCKET_TIME_OUT = 30000;
        private static final int CONNECTION_REQUEST_TIME_OUT = 500;
    
        private static final int MAX_CONNECT_NUM = 100;
        private static final int MAX_CONNECT_PER_ROUTE = 100;
    
        private static HttpHost HTTP_HOST = new HttpHost(HOST,PORT,SCHEMA);
        private static boolean uniqueConnectTimeConfig = false;
        private static boolean uniqueConnectNumConfig = true;
        private static RestClientBuilder builder;
        private static RestClient restClient;
        private static RestHighLevelClient restHighLevelClient;
    
        static {
            init();
        }
    
        public static void init(){
            builder = RestClient.builder(HTTP_HOST);
            if(uniqueConnectTimeConfig){
                setConnectTimeOutConfig();
            }
            if(uniqueConnectNumConfig){
                setMutiConnectConfig();
            }
            restClient = builder.build();
            restHighLevelClient = new RestHighLevelClient(restClient);
        }
    
        // 主要关于异步httpclient的连接延时配置
        public static void setConnectTimeOutConfig(){
            builder.setRequestConfigCallback(new RequestConfigCallback() {
    
                @Override
                public Builder customizeRequestConfig(Builder requestConfigBuilder) {
                    requestConfigBuilder.setConnectTimeout(CONNECT_TIME_OUT);
                    requestConfigBuilder.setSocketTimeout(SOCKET_TIME_OUT);
                    requestConfigBuilder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIME_OUT);
                    return requestConfigBuilder;
                }
            });
        }
        // 主要关于异步httpclient的连接数配置
        public static void setMutiConnectConfig(){
            builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
    
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setMaxConnTotal(MAX_CONNECT_NUM);
                    httpClientBuilder.setMaxConnPerRoute(MAX_CONNECT_PER_ROUTE);
                    return httpClientBuilder;
                }
            });
        }
    
        public static RestClient getClient(){
            return restClient;
        }
    
        public static RestHighLevelClient getHighLevelClient(){
            return restHighLevelClient;
        }
    
        public static void close() {
            if (restClient != null) {
                try {
                    restClient.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
    
    }
    public class ESClientSpringFactory {
    
        public static int CONNECT_TIMEOUT_MILLIS = 1000;
        public static int SOCKET_TIMEOUT_MILLIS = 30000;
        public static int CONNECTION_REQUEST_TIMEOUT_MILLIS = 500;
        public static int MAX_CONN_PER_ROUTE = 10;
        public static int MAX_CONN_TOTAL = 30;
    
        private static HttpHost HTTP_HOST;
        private RestClientBuilder builder;
        private RestClient restClient;
        private RestHighLevelClient restHighLevelClient;
    
        private static ESClientSpringFactory esClientSpringFactory = new ESClientSpringFactory();
    
        private ESClientSpringFactory(){}
    
        public static ESClientSpringFactory build(HttpHost httpHost,
                Integer maxConnectNum, Integer maxConnectPerRoute){
            HTTP_HOST = httpHost;
            MAX_CONN_TOTAL = maxConnectNum;
            MAX_CONN_PER_ROUTE = maxConnectPerRoute;
            return  esClientSpringFactory;
        }
    
        public static ESClientSpringFactory build(HttpHost httpHost,Integer connectTimeOut, Integer socketTimeOut,
                Integer connectionRequestTime,Integer maxConnectNum, Integer maxConnectPerRoute){
            HTTP_HOST = httpHost;
            CONNECT_TIMEOUT_MILLIS = connectTimeOut;
            SOCKET_TIMEOUT_MILLIS = socketTimeOut;
            CONNECTION_REQUEST_TIMEOUT_MILLIS = connectionRequestTime;
            MAX_CONN_TOTAL = maxConnectNum;
            MAX_CONN_PER_ROUTE = maxConnectPerRoute;
            return  esClientSpringFactory;
        }
    
    
        public void init(){
            builder = RestClient.builder(HTTP_HOST);
            setConnectTimeOutConfig();
            setMutiConnectConfig();
            restClient = builder.build();
            restHighLevelClient = new RestHighLevelClient(restClient);
            System.out.println("init factory");
        }
        // 配置连接时间延时
        public void setConnectTimeOutConfig(){
            builder.setRequestConfigCallback(new RequestConfigCallback() {
    
                @Override
                public Builder customizeRequestConfig(Builder requestConfigBuilder) {
                    requestConfigBuilder.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
                    requestConfigBuilder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS);
                    requestConfigBuilder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS);
                    return requestConfigBuilder;
                }
            });
        }
        // 使用异步httpclient时设置并发连接数
        public void setMutiConnectConfig(){
            builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
    
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setMaxConnTotal(MAX_CONN_TOTAL);
                    httpClientBuilder.setMaxConnPerRoute(MAX_CONN_PER_ROUTE);
                    return httpClientBuilder;
                }
            });
        }
    
        public RestClient getClient(){
            return restClient;
        }
    
        public RestHighLevelClient getRhlClient(){
            return restHighLevelClient;
        }
    
        public void close() {
            if (restClient != null) {
                try {
                    restClient.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("close client");
        }
    
    
    }

    两种配置方法从本质上都是对client进行配置,且达到相同目的。

    ​ 由于Client配置为单例模式,在Spring中的生命周期随着容器开始结束而开始结束。在定义bean创建和销毁方法后会自动关闭连接。

    ​ 但是使用一般Java配置时,需要手动关闭。如果在web项目中,可以使用监听器,随着项目的生命周期手动调用开启关闭。

    客户端演示
    接下来就是最简单的几个demo,校验这种客户端的有效性,同时也为大家试验如何使用这种Java客户端:

    数据准备
    首先准备要操作的数据:创建一个index为demo,type为demo的新闻索引。

    /PUT http://{{host}}:{{port}}/demo
    {
        "mappings":{
            "demo":{
                "properties":{
                    "title":{
                        "type":"text"
                    },
                    "tag":{
                        "type":"keyword"
                    },
                    "publishTime":{
                        "type":"date"
                    }
                }
            }
        }
    }
    

      

    插入数据
    API格式

    /POST http://{{host}}:{{port}}/demo/demo/
    {
    "title":"中国产小型无人机的“对手”来了,俄微型拦截导弹便宜量又多",
    "tag":"军事",
    "publishTime":"2018-01-24T23:59:30Z"
    }

    Java Client

    public class News {
    
        private String title;
        private String tag;
        private String publishTime;
    
    
        public News() {
            super();
        }
        public News(String title, String tag, String publishTime) {
            super();
            this.title = title;
            this.tag = tag;
            this.publishTime = publishTime;
        }
        public String getTitle() {
            return title;
        }
        public void setTitle(String title) {
            this.title = title;
        }
        public String getTag() {
            return tag;
        }
        public void setTag(String tag) {
            this.tag = tag;
        }
        public String getPublishTime() {
            return publishTime;
        }
        public void setPublishTime(String publishTime) {
            this.publishTime = publishTime;
        }
    }
    
    @RunWith(SpringJUnit4ClassRunner.class)
    
    @ContextConfiguration("classpath:spring/spring-context.xml")
    public class FreeClientTest {
    
        private String index;
    
        private String type;
    
        @Autowired
        private RestHighLevelClient rhlClient;
    
        @Before
        public void prepare() {
            index = "demo";
            type = "demo";
        }
    
        @Test
        public void addTest() {
            IndexRequest indexRequest = new IndexRequest(index, type);
            News news = new News();
            news.setTitle("中国产小型无人机的“对手”来了,俄微型拦截导弹便宜量又多");
            news.setTag("军事");
            news.setPublishTime("2018-01-24T23:59:30Z");
            String source = JsonUtil.toString(news);
            indexRequest.source(source, XContentType.JSON);
            try {
                rhlClient.index(indexRequest);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

    两种方式均可插入数据。如果有大量数据这样插入未免效率太低,接下来看一看批量插入数据。

    批量插入数据
    API格式
    /POST http://{{host}}:{{port}}/_bulk
    {"index":{"_index":"demo","_type":"demo"}}
    {"title":"中印边防军于拉达克举行会晤 强调维护边境和平","tag":"军事","publishTime":"2018-01-27T08:34:00Z"}
    {"index":{"_index":"demo","_type":"demo"}}
    {"title":"费德勒收郑泫退赛礼 进决赛战西里奇","tag":"体育","publishTime":"2018-01-26T14:34:00Z"}
    {"index":{"_index":"demo","_type":"demo"}}
    {"title":"欧文否认拿动手术威胁骑士 兴奋全明星联手詹皇","tag":"体育","publishTime":"2018-01-26T08:34:00Z"}
    {"index":{"_index":"demo","_type":"demo"}}
    {"title":"皇马官方通告拉莫斯伊斯科伤情 将缺阵西甲关键战","tag":"体育","publishTime":"2018-01-26T20:34:00Z"}
    

      Java Client

    @RunWith(SpringJUnit4ClassRunner.class)
    
    @ContextConfiguration("classpath:spring/spring-context.xml")
    public class FreeClientTest {
    
        private String index;
    
        private String type;
    
        @Autowired
        private RestHighLevelClient rhlClient;
    
        @Before
        public void prepare() {
            index = "demo";
            type = "demo";
        }
    
    
        @Test
        public void batchAddTest() {
            BulkRequest bulkRequest = new BulkRequest();
            List<IndexRequest> requests = generateRequests();
            for (IndexRequest indexRequest : requests) {
                bulkRequest.add(indexRequest);
            }
            try {
                rhlClient.bulk(bulkRequest);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        public List<IndexRequest> generateRequests(){
            List<IndexRequest> requests = new ArrayList<>();
            requests.add(generateNewsRequest("中印边防军于拉达克举行会晤 强调维护边境和平", "军事", "2018-01-27T08:34:00Z"));
            requests.add(generateNewsRequest("费德勒收郑泫退赛礼 进决赛战西里奇", "体育", "2018-01-26T14:34:00Z"));
            requests.add(generateNewsRequest("欧文否认拿动手术威胁骑士 兴奋全明星联手詹皇", "体育", "2018-01-26T08:34:00Z"));
            requests.add(generateNewsRequest("皇马官方通告拉莫斯伊斯科伤情 将缺阵西甲关键战", "体育", "2018-01-26T20:34:00Z"));
            return requests;
        }
    
        public IndexRequest generateNewsRequest(String title, String tag, String publishTime){
            IndexRequest indexRequest = new IndexRequest(index, type);
            News news = new News();
            news.setTitle(title);
            news.setTag(tag);
            news.setPublishTime(publishTime);
            String source = JsonUtil.toString(news);
            indexRequest.source(source, XContentType.JSON);
            return indexRequest;
        }
    
    }

    无论通过哪种方式,现在ES中已插入五条文档数据。那么现在就可以通过多种多样的查询方式获得需要的数据了。

    查询数据

    查询目标:2018年1月26日早八点到晚八点关于费德勒的前十条体育新闻的标题

    API 格式
    /POST http://{{host}}:{{port}}/demo/demo/_search
    {
        "from":"0",
        "size":"10",
        "_source":["title"],
        "query":{
            "bool":{
                "must":{
                    "match":{
                        "title":"费德勒"
                    }
                },
                "must":{
                    "term":{"tag":"体育"}
                },
                "must":{
                    "range":{
                        "publishTime":{
                            "gte":"2018-01-26T08:00:00Z",
                            "lte":"2018-01-26T20:00:00Z"
                        }
                    }
                }
            }
        }
    
    }
    

      Java Client

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration("classpath:spring/spring-context.xml")
    public class FreeClientTest {
    
        private String index;
    
        private String type;
    
        @Autowired
        private RestHighLevelClient rhlClient;
    
        @Before
        public void prepare() {
            index = "demo";
            type = "demo";
        }
    
    
        @Test
        public void queryTest(){
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            sourceBuilder.from(0);
            sourceBuilder.size(10);
            sourceBuilder.fetchSource(new String[]{"title"}, new String[]{});
            MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("title", "费德勒");
            TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("tag", "体育");
            RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("publishTime");
            rangeQueryBuilder.gte("2018-01-26T08:00:00Z");
            rangeQueryBuilder.lte("2018-01-26T20:00:00Z");
            BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
            boolBuilder.must(matchQueryBuilder);
            boolBuilder.must(termQueryBuilder);
            boolBuilder.must(rangeQueryBuilder);
            sourceBuilder.query(boolBuilder);
            SearchRequest searchRequest = new SearchRequest(index);
            searchRequest.types(type);
            searchRequest.source(sourceBuilder);
            try {
                SearchResponse response = rhlClient.search(searchRequest);
                System.out.println(response);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }
    更新文档

    如果插入了错误的数据,想要更改或者在文档中新增新的数据,那么就需要更新文档了。

    演示 将费德勒的新闻的tag更改为网球类型:

    API格式
    /POST http://{{host}}:{{port}}/demo/demo/AWE1fnSx00f4t28WJ4D6/_update
    {
        "doc":{
            "tag":"网球"
        }
    }
    

      Java Client

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration("classpath:spring/spring-context.xml")
    public class FreeClientTest {
    
        private String index;
    
        private String type;
    
        private String id;
    
        @Autowired
        private RestHighLevelClient rhlClient;
    
        @Before
        public void prepare() {
            index = "demo";
            type = "demo";
            id = "AWE1fnSx00f4t28WJ4D6";
        }
    
    
        @Test
        public void updateTest(){
            UpdateRequest updateRequest = new UpdateRequest(index, type, id);
            Map<String, String> map = new HashMap<>();
            map.put("tag", "网球");
            updateRequest.doc(map);
            try {
                rhlClient.update(updateRequest);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

    以上介绍了最简单的doc文档更改

    ID方式删除

    API格式
    /DELETE http://{{host}}:{{port}}/delete_demo/demo/AWExGSdW00f4t28WAPen
    • 1
    Java 客户端
    public class ElkDaoTest extends BaseTest{
    
        @Autowired
        private RestHighLevelClient rhlClient;
    
        private String index;
    
        private String type;
    
        private String id;
    
        @Before
        public void prepare(){
            index = "delete_demo";
            type = "demo";
            id = "AWExGSdW00f4t28WAPeo";
        }
    
        @Test
        public void delete(){
            DeleteRequest deleteRequest = new DeleteRequest(index,type,id);
            DeleteResponse response = null;
            try {
                response = rhlClient.delete(deleteRequest);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println(response);
        }
    }

    同样删除成功。

  • 相关阅读:
    剑指offer--03.从尾到头打印链表
    剑指offer--02.替换空格
    剑指offer--01.二维数组中的查找
    JAVA日记之mybatis-3一对一,一对多,多对多xml与注解配置
    SpringBoot 2.x 自定义拦截器并解决静态资源访问被拦截问题
    springboot项目WEB-INF 目录 jsp页面报404
    Spring Boot 配置拦截器方式
    通过idea创建Maven项目整合Spring+spring mvc+mybatis
    idea创建maven项目
    PLSQL操作Oracle创建用户和表
  • 原文地址:https://www.cnblogs.com/WeidLang/p/10245659.html
Copyright © 2011-2022 走看看