zoukankan      html  css  js  c++  java
  • java-es查询

    前释:此为结合项目中的应用场景自己想的开发方案,项目在自己本机搭建,自定义模拟数据(不是海量数据)。

    1:应用背景。

      全国300多城市的小区,及多方发布的房源数据的一个检索功能。

      全国的房源小区数据搜索。(多条件模糊查询,经纬度查询)

    2:技术实现:ELK(elasticSearch+logstash+kibana)+ filebeat + kafka

    3:数据来源:

      1. 原始数据

      2. 其它平台的定时推送过来的房源数据

      3. 平台本身用户发布的房源数据

    4:数据源处理方案:

      4.1. 原始数据采用sqoop处理。

        实现:这里不说明如何实现,本章主要实现的是es在java中的检索应用。

      4.2. 定时推送的数据定的有2种解决方案,一种是采用binlog方式,二是采用接收数据的接口在装数时候传数据到kafka一份。

        本人想采用kafka方式,原因:对binlog不熟。

        实现:如同4.3中的数据。

      4.3. 对于本身的平台数据。

        3.1:将发布房源的数据写入log的目录文件中(写入的格式及文件名称形式可自定义)。

        3.2:filebeat采集日志信息写入kafka中。

        3.3:logstash消费kafka中的数据并解析数据output到es中。

        实现:这里不说明如何实现,本章主要实现的是es在java中的检索应用。

    5:数据的检索实现。(实现之后未能及时的随笔记录,有些点可能已经忘记了)

      5.1:版本及客户端使用。

          我的版本使用的是6.4.2版本。

          客户端有transport client和rest client,我选择的是rest方式。(在8.0版本已经弃用了transport方式)。

      5.2:jar包引入。

          

      5.3:客户端的连接类。

    @Configuration
    public class EsConfiguration {
     
        private static String hosts = "192.168.147.101,192.168.147.102,192.168.147.103"; // 集群地址,多个用,隔开
        private static int port = 9200; // 使用的端口号
        private static String schema = "http"; // 使用的协议
        private static ArrayList<HttpHost> hostList = null;
     
        private static int connectTimeOut = 1000; // 连接超时时间
        private static int socketTimeOut = 30000; // 连接超时时间
        private static int connectionRequestTimeOut = 500; // 获取连接的超时时间
     
        private static int maxConnectNum = 100; // 最大连接数
        private static int maxConnectPerRoute = 100; // 最大路由连接数
     
        static {
            hostList = new ArrayList<>();
            String[] hostStrs = hosts.split(",");
            for (String host : hostStrs) {
                hostList.add(new HttpHost(host, port, schema));
            }
        }
     
        @Bean
        public RestHighLevelClient client() {
            RestClientBuilder builder = RestClient.builder(hostList.toArray(new HttpHost[0]));
            // 异步httpclient连接延时配置
            builder.setRequestConfigCallback(new RequestConfigCallback() {
                @Override
                public Builder customizeRequestConfig(Builder requestConfigBuilder) {
                    requestConfigBuilder.setConnectTimeout(connectTimeOut);
                    requestConfigBuilder.setSocketTimeout(socketTimeOut);
                    requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
                    return requestConfigBuilder;
                }
            });
            // 异步httpclient连接数配置
            builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setMaxConnTotal(maxConnectNum);
                    httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
                    return httpClientBuilder;
                }
            });
            RestHighLevelClient client = new RestHighLevelClient(builder);
            return client;
        }
    }

      5.4:实体类。

        

           

      5.5:重点,整合java使用的测试类。在搜索的方法中有测试场景时候的情况的注释。

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = { Demo01Application.class })
    public class Demo01ApplicationTests {
    
        @Autowired
        private RestHighLevelClient client;
    
    
        public static String INDEX_TEST = null;
        public static String TYPE_TEST = null;
        public static Tests tests = null;
        public static List<Tests> testsList = null;
    
        @Test
        public void contextLoads() throws IOException, CloneNotSupportedException {
    //        // 判断是否存在索引
    //        existsIndex("indexName");
    //        // 创建索引
    //        createIndex("house", "house-info");
    //        // 删除索引
    //        deleteIndex("house");
    //        // 判断数据是否存在
    //        exists("user", "user-info", "JO3hP24BlvWqEof7y5BF");
    //        // 根据ID获取数据
    //        get("index01", "type", "201");
    //
    //        List<String> idList = null;
    //        List<Map<String, Object>> dataList = null;
    //        // //批量更新
    //        bulkUpdate("index_name", "index_type", idList, dataList);
    //        // 批量添加
    //        bulkAdd("index_name", "index_type", idList, dataList);
    //        // 批量删除
    //       List<Map<String, Object>> dataList = null;
    //       List<String> idList = new ArrayList<String>();
    //       idList.add("ZsSfRW4B3jWdK-k5x4lo");
    //       idList.add("ZcSZRW4B3jWdK-k5E4ld");
    //        bulkDelete("user", "user-info", idList, dataList);
    //        
    //        //坐标范围查询
    //        searchPoint();
    //        //关键字查询
    //        search("蒋", "user-info", "user");
            
            client.close();
        }
    
        /**
         * 创建索引
         * 
         * @param index_name
         * @param index_type
         * @throws IOException
         */
        private void createIndex(String index_name, String index_type) throws IOException {
            CreateIndexRequest request = new CreateIndexRequest(index_name);// 创建索引
            // 创建的每个索引都可以有与之关联的特定设置===================设置主片与副本数======================
            //主片的个数(默认5个)确定是不会变的,副本数(默认1个)是可以改变的
            request.settings(Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 1));
            XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
            xContentBuilder.startObject()
                    // 索引库名(类似数据库中的表),可以指定字段的类型,String中text是分词的,keyword不分词
                    .startObject(index_type).startObject("properties")
                    .startObject("doc.haCode").field("type", "keyword").endObject()
                    .startObject("doc.haStatus").field("type", "integer").field("index", false).endObject()
                    .startObject("doc.haName").field("type", "text").endObject()
                    .startObject("doc.haAddr").field("type", "text").endObject()
                    .startObject("doc.haPrice").field("type", "float").field("index", false).endObject()
                    .startObject("doc.location").field("type", "geo_point").endObject()
                    //不参与索引创建
                    .startObject("doc.haImage").field("type", "text").field("index", false).endObject()
                    .startObject("doc.haDate").field("type", "date").endObject()
                    .endObject().endObject().endObject();
            // 创建索引时创建文档类型映射==========================定义mapping,指定字段类型,如一些特殊字段=========================
            request.mapping(index_type, xContentBuilder);
    
            // 异步执行
            // 异步执行创建索引请求需要将CreateIndexRequest实例和ActionListener实例传递给异步方法:
            // CreateIndexResponse的典型监听器如下所示:
            // 异步方法不会阻塞并立即返回。
            ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {
                @Override
                public void onResponse(CreateIndexResponse createIndexResponse) {
                    // 如果执行成功,则调用onResponse方法;
                }
    
                @Override
                public void onFailure(Exception e) {
                    // 如果失败,则调用onFailure方法。
                }
            };
            CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
            System.out.println("createIndex: " + JSON.toJSONString(createIndexResponse));
            xContentBuilder.close();
            // client.indices().createAsync(request, RequestOptions.DEFAULT,
            // listener);//要执行的CreateIndexRequest和执行完成时要使用的ActionListener
        }
    
        /**
         * 判断索引是否存在
         * 
         * @param index_name
         * @return
         * @throws IOException
         */
        public boolean existsIndex(String index_name) throws IOException {
            GetIndexRequest request = new GetIndexRequest();
            request.indices(index_name);
            boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
            System.out.println("existsIndex: " + exists);
            return exists;
        }
    
        /**
         * 删除索引
         * 
         * @param index
         * @return
         * @throws IOException
         */
        public void deleteIndex(String index_name) throws IOException {
            DeleteIndexRequest request = new DeleteIndexRequest(index_name);
            DeleteIndexResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
            System.out.println("deleteIndex: " + JSON.toJSONString(deleteIndexResponse));
        }
    
        /**
         * 判断记录是否存在
         * 
         * @param index
         * @param type
         * @param tests
         * @return
         * @throws IOException
         */
        public boolean exists(String index_name, String index_type, String index_id) throws IOException {
            GetRequest getRequest = new GetRequest(index_name, index_type, index_id);
            getRequest.fetchSourceContext(new FetchSourceContext(false));
            getRequest.storedFields("_none_");
            boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
            System.out.println("exists: " + exists);
            return exists;
        }
    
        /**
         * 根据ID获取记录信息
         * 
         * @param index
         * @param type
         * @param id
         * @throws IOException
         */
        public void get(String index_name, String index_type, String index_id) throws IOException {
            GetRequest getRequest = new GetRequest(index_name, index_type, index_id);
            GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
            System.out.println("get: " + JSON.toJSONString(getResponse));
        }
    
        /**
         * 批量增加 addTestList方法封装list数据
         * 
         * @throws IOException
         */
        private void bulkAdd(String index_name, String index_type, List<String> idList, List<Map<String, Object>> dataList)
                throws IOException {
            BulkRequest bulkAddRequest = new BulkRequest();
            for (int i = 0; i < dataList.size(); i++) {
                IndexRequest indexRequest = new IndexRequest(index_name, index_type, idList.get(i));
                indexRequest.source(JSON.toJSONString(dataList.get(i)), XContentType.JSON);
                bulkAddRequest.add(indexRequest);
            }
            BulkResponse bulkAddResponse = client.bulk(bulkAddRequest, RequestOptions.DEFAULT);
            System.out.println("bulkAddResponse: " + JSON.toJSONString(bulkAddResponse));
        }
        
        /**
         * 批量更新
         * 
         * @param index
         * @param type
         * @param tests
         * @throws IOException
         */
        public void bulkUpdate(String index_name, String index_type, List<String> idList,
                List<Map<String, Object>> dataList) throws IOException {
            BulkRequest bulkUpdateRequest = new BulkRequest();
            for (int i = 0; i < dataList.size(); i++) {
                UpdateRequest updateRequest = new UpdateRequest(index_name, index_type, idList.get(i));
                updateRequest.doc(JSON.toJSONString(dataList.get(i)), XContentType.JSON);
                bulkUpdateRequest.add(updateRequest);
            }
            BulkResponse bulkUpdateResponse = client.bulk(bulkUpdateRequest, RequestOptions.DEFAULT);
            System.out.println("bulkUpdate: " + JSON.toJSONString(bulkUpdateResponse));
        }
    
        /**
         * 删除记录
         * 
         * @param index
         * @param type
         * @param id
         * @throws IOException
         */
        public void bulkDelete(String index_name, String index_type, List<String> idList,
                List<Map<String, Object>> dataList) throws IOException {
            BulkRequest bulkDeleteRequest = new BulkRequest();
            for (int i = 0; i < idList.size(); i++) {
                DeleteRequest deleteRequest = new DeleteRequest(index_name, index_type, idList.get(i));
                bulkDeleteRequest.add(deleteRequest);
            }
            BulkResponse bulkDeleteResponse = client.bulk(bulkDeleteRequest, RequestOptions.DEFAULT);
            System.out.println("bulkDelete: " + JSON.toJSONString(bulkDeleteResponse));
        }
    
        /**
         * 搜索
         * 
         * @param index 要搜索的索引库
         * @param type 要搜索的索引库类型
         * @param name  要搜索的关键字
         * @throws IOException
         */
        public void search(String name, String type, String index) throws IOException {
            //query查询:
                //match查询:知道分词器存在,会对查询的关键字分词;
                //team一个关键词/teams多个关键词查询:不知道分词器,不会对查询的关键字分词; 较精确的查询
                //例子:条件是‘我你’ team查询的是含有‘我你’的,match查询含有‘我你’‘我’‘你’的都能查询出 
                //QueryBuilders.matchQuery("name", name)
            
            //filter查询:不计算相关性,且有cache,速度比query查询快
                // boolBuilder多条件查询:must相当于and, should相当于or,mustnot不符合条件的
            
            //聚合查询
                // sum min max avg cardinality基数(类似去重之后的数量) teams分组
            
            // matchQuery(提高召回率,关键字会被分词), 
            // matchPhraseQuery(关键字不会分词), match_phrase提高精准度
            // matchQuery单一查询QueryBuilders.matchQuery("name", name),
            // multiMatchQuery匹配多列查询QueryBuilders.multiMatchQuery("music","name","interest"),
            // wildcardQuery模糊匹配查询QueryBuilders.wildcardQuery("name", "*jack*") *多个 ?一个
            // QueryBuilders.matchPhraseQuery("字段名字", "前缀");//前缀查询
            // QueryBuilders.fuzzyQuery("字段名字", "关键字");    //模糊查询,跟关键字类似的都可以查询出来 --> 关键字:tet 有可能会搜寻出 test text等
            // rangeQuery区间查询
            // geoDistanceQuery经纬度范围商家
            // 排序:ScoreSortBuilder,FieldSortBuilder
            //关键字查询
            BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
            // boolBuilder.must(QueryBuilders.matchQuery("name", name)); //
            // 这里可以根据字段进行搜索,must表示符合条件的,相反的mustnot表示不符合条件的
            // boolBuilder.must(QueryBuilders.matchQuery("sex", name));
            // boolBuilder.must(QueryBuilders.matchQuery("id", tests.getId().toString()));
    
            // boolQueryBuilder.must(QueryBuilders.termQuery("field","value"));
            // boolQueryBuilder.must(QueryBuilders.wildcardQuery("field","value"));
            // boolQueryBuilder.must(QueryBuilders.rangeQuery("field").gt("value"));
            // boolQueryBuilder.must(QueryBuilders.termsQuery("field","value"));
    
    //        boolBuilder.should(boolBuilder.filter(QueryBuilders.matchPhraseQuery("name", name)));
    //        boolBuilder.should(boolBuilder.filter(QueryBuilders.matchPhraseQuery("sex", name)));
    //        boolBuilder.should(QueryBuilders.matchPhraseQuery("sex", name));
            boolBuilder.should(QueryBuilders.matchPhraseQuery("doc.username", name));
            // RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("id");
            // //区间查询,都是闭区间
            // rangeQueryBuilder.gte(1);
            // rangeQueryBuilder.lte(1);
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            sourceBuilder.query(boolBuilder);
            // sourceBuilder.query(rangeQueryBuilder);
            sourceBuilder.from(0);
            sourceBuilder.size(100); // 获取记录数,默认10
            // sourceBuilder.sort("id", SortOrder.ASC);
    //        sourceBuilder.fetchSource(new String[] { "id", "name", "sex", "age" }, new String[] {}); // 第一个是获取字段,第二个是过滤的字段,默认获取全部
            sourceBuilder.fetchSource(new String[] { "doc.username", "doc.password"}, new String[] {}); // 第一个是获取字段,第二个是过滤的字段,默认获取全部
            SearchRequest searchRequest = new SearchRequest(index);
            searchRequest.types(type);
            searchRequest.source(sourceBuilder);
            SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
            System.out.println("search: " + JSON.toJSONString(response));
            SearchHits hits = response.getHits();
            SearchHit[] searchHits = hits.getHits();
            for (SearchHit hit : searchHits) {
                System.out.println("search -> " + hit.getSourceAsString());
                System.out.println("search -> " + hit.getSourceAsMap().get("doc"));
                UserEntity parseObject = JSONObject.parseObject(JSONObject.toJSONString(hit.getSourceAsMap().get("doc")), UserEntity.class);
                System.out.println("search -> " + parseObject.getUsername());
            }
        }
    
        /**
         * 范围查询,左右都是闭集
         * 
         * @param fieldKey
         * @param start
         * @param end
         * @return
         */
        public RangeQueryBuilder rangeMathQuery(String fieldKey, String start, String end) {
            RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(fieldKey);
            rangeQueryBuilder.gte(start);
            rangeQueryBuilder.lte(end);
            return rangeQueryBuilder;
        }
    
        //坐标查询 范围查询
        public void searchPoint() throws IOException {
            double a = 40.215193;
            double b = 116.680852;
            GeoDistanceQueryBuilder builder = QueryBuilders.geoDistanceQuery("location");
            builder.point(a, b);
            builder.distance(1000, DistanceUnit.MILES);
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            sourceBuilder.query(builder);
            GeoDistanceSortBuilder sort = SortBuilders.geoDistanceSort("location", a, b).order(SortOrder.ASC)
                    .unit(DistanceUnit.KILOMETERS);
            sourceBuilder.sort("id", SortOrder.ASC);
            SearchRequest searchRequest = new SearchRequest("index1");
            searchRequest.types("type1");
            searchRequest.source(sourceBuilder);
            SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
            SearchHits hits = response.getHits();
            SearchHit[] searchHits = hits.getHits();
            for (SearchHit hit : searchHits) {
                System.out.println("search -> " + hit.getSourceAsString());
            }
        }
    
        /**
         * 创建新增数据
         * 
         * @throws IOException
         */
        public void addTestList() throws IOException {
            double lat = 38.929986;
            double lon = 113.395645;
            for (long i = 201; i < 202; i++) {
                double max = 0.00001;
                double min = 0.000001;
                Random random = new Random();
                double s = random.nextDouble() % (max - min + 1) + max;
                DecimalFormat df = new DecimalFormat("######0.000000");
                // System.out.println(s);
                String lons = df.format(s + lon);
                String lats = df.format(s + lat);
                Double dlon = Double.valueOf(lons);
                Double dlat = Double.valueOf(lats);
                Tests tests = new Tests();
                tests.setId(i);
                tests.setName("名字啊" + i);
                tests.setSex("电话啊" + i);
                GeoPoint location = new GeoPoint();
                location.setLat(dlat);
                location.setLon(dlon);
                tests.setLocation(location);
                testsList.add(tests);
            }
        }
    }

    6:注意的点

      6.1:当时有想到可不可以关联查询?

          首先不建议关联查询,尽量一个表表述完整。原因关联查询会慢几倍-几百倍,失去了快速查询的意义。

          这个是关联查询的方法:https://blog.csdn.net/tuposky/article/details/80988915

          这个是验证关联会慢的 :http://www.matools.com/blog/190652134

      6.2:如果说有海量的数据,且数据的字段很多怎么处理?

          目前我能想到的就是将海量数据存入hbase中,在es中存入要检索的关键信息。能力一般水平有限,还希望各路大神指点。

      6.3:es的数据存储字段定义的问题?

          es的数据是存储在 "_source" 下面的,es本身会有很多属性,所以如果字段中有type,host,path等值,那么数据插入不进去。

          解决办法:logstash中的output配置target => "doc"  将数据放在  _source下的doc中,数据就是doc.xxx

          但是由于有经纬度数据,经纬度类型数据无法报存到上述方法的doc中,未找到原因,所以字段就尽量避开type,host,path等值就好了。

  • 相关阅读:
    C/C++多文件之间的变量定义
    PKU POJ 2186 Popular Cows 强连通分量
    重载函数
    ZOJ 2763 Prison Break
    201357 训练赛总结
    hdu 4467 Graph 构造
    201356 训练赛总结
    201353 NEERC 2012, Eastern subregional contest
    2013512 CF 183 总结
    一道动态规划
  • 原文地址:https://www.cnblogs.com/dblog/p/12157769.html
Copyright © 2011-2022 走看看