zoukankan      html  css  js  c++  java
  • es分页查询 scroll

    函数:

    public long queryByScroll(QueryBuilder query, int size, HitFunction function) {
            try {
                String[] includeFields = new String[]{"venderId"};
                SearchRequestBuilder builder = client
                        .prepareSearch(INDEX)
                        .setTypes(TYPE)
                        // .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC) // Scroll requests have optimizations that make them faster when the sort order is _doc. If you want to iterate over all documents regardless of the order, this is the most efficient option:
                        .setScroll(TimeValue.timeValueSeconds(DEFAULT_TIME_VALUE_IN_SECONDS))
                        .setQuery(query)
                        .setFetchSource(includeFields, null)
                        .setSize(size); // max of {size} hits will be returned for each scroll
    
    
                SearchResponse response = builder.get();
                long totalHits = response.getHits().getTotalHits();
    
                log.info("get from es, size:{}", totalHits);
                // Scroll until no hits are returned
                AtomicInteger counter = new AtomicInteger(0);
                scroll:
                do {
                    log.info("enter while, response:{}, totalSize:{}",  JSON.toJSONString(response), response.getHits().getTotalHits());
                    for (SearchHit hit : response.getHits().getHits()) {
                        if (!function.apply(counter.getAndIncrement(), hit)) {
                            if (log.isInfoEnabled()) {
                                log.info("index scroll break at index: {}, id: {}", counter.get(), hit.getId());
                            }
                            break scroll;
                        }
                    }
                    response = client
                            .prepareSearchScroll(response.getScrollId())
                            .setScroll(TimeValue.timeValueSeconds(DEFAULT_TIME_VALUE_IN_SECONDS))
                            .execute()
                            .actionGet();
                    log.info("before exit while, response:{}",  JSON.toJSONString(response));
                } while (response.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop.
    
                // Search context are automatically removed when the scroll timeout has been exceeded. However keeping scrolls open has a cost, as discussed in the previous section so scrolls should be explicitly cleared as soon as the scroll is not being used anymore using the clear-scroll API:
                ClearScrollResponse clearScrollResponse = client
                        .prepareClearScroll()
                        .addScrollId(response.getScrollId())
                        .get();
    
                if (log.isInfoEnabled()) {
                    log.info("Clear scroll response:{}", clearScrollResponse.isSucceeded());
                }
                return totalHits;
            } catch (Exception e) {
                log.error("queryByScroll error", e);
            }
            return 0;
        }
    

     调用处:

    venderCrowdEsDao.queryByScroll(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("venderMode", "POP")),
                    200,  (index, hit) -> {
                        //获取venderId,通过venderId调用外呼获取电话、邮箱,首先获取运营负责人的电话,若为空,则使用店铺负责人的电话,若都为空则不存储。
                        Long venderId = Long.valueOf(String.valueOf(hit.getSourceAsMap().get("venderId")));
                        log.info("function applyL:{}", venderId);
                        VenderContactPO contactResult = venderServiceRpc.getContactsInfoByVenderId(venderId);
                        VenderContactPO contactPO = new VenderContactPO();//最终要存储到数据表中的对象
                        //加密,插入vender_contact_info表
                        if(contactResult != null){
                            contactPO.setVenderId(contactResult.getVenderId());
                            contactPO.setEmail(contactResult.getEmail());
                            contactPO.setPhoneNum(contactResult.getPhoneNum());
                        }else {
                            contactPO.setEmail("");
                            contactPO.setPhoneNum("");
                            contactPO.setVenderId(venderId);
                        }
                        contactPO.setShopName(venderServiceRpc.getShopNameByVenderId(venderId));
    
                        venderContactService.insertIfNotPresent(contactPO);
                        return true;
                    })
    

      

  • 相关阅读:
    JZ-C-01
    C/C++学习笔记
    15、排序:选择类排序和归并排序
    玩转Spring——Spring事务
    玩转Spring——Spring整合JDBC
    玩转Spring——Spring AOP
    玩转Spring——Spring IOC/DI
    Java算法——递归思想
    Java排序算法——堆排序
    Java排序算法——归并排序
  • 原文地址:https://www.cnblogs.com/zhima-hu/p/14042608.html
Copyright © 2011-2022 走看看