函数:
public long queryByScroll(QueryBuilder query, int size, HitFunction function) { try { String[] includeFields = new String[]{"venderId"}; SearchRequestBuilder builder = client .prepareSearch(INDEX) .setTypes(TYPE) // .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC) // Scroll requests have optimizations that make them faster when the sort order is _doc. If you want to iterate over all documents regardless of the order, this is the most efficient option: .setScroll(TimeValue.timeValueSeconds(DEFAULT_TIME_VALUE_IN_SECONDS)) .setQuery(query) .setFetchSource(includeFields, null) .setSize(size); // max of {size} hits will be returned for each scroll SearchResponse response = builder.get(); long totalHits = response.getHits().getTotalHits(); log.info("get from es, size:{}", totalHits); // Scroll until no hits are returned AtomicInteger counter = new AtomicInteger(0); scroll: do { log.info("enter while, response:{}, totalSize:{}", JSON.toJSONString(response), response.getHits().getTotalHits()); for (SearchHit hit : response.getHits().getHits()) { if (!function.apply(counter.getAndIncrement(), hit)) { if (log.isInfoEnabled()) { log.info("index scroll break at index: {}, id: {}", counter.get(), hit.getId()); } break scroll; } } response = client .prepareSearchScroll(response.getScrollId()) .setScroll(TimeValue.timeValueSeconds(DEFAULT_TIME_VALUE_IN_SECONDS)) .execute() .actionGet(); log.info("before exit while, response:{}", JSON.toJSONString(response)); } while (response.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop. // Search context are automatically removed when the scroll timeout has been exceeded. However keeping scrolls open has a cost, as discussed in the previous section so scrolls should be explicitly cleared as soon as the scroll is not being used anymore using the clear-scroll API: ClearScrollResponse clearScrollResponse = client .prepareClearScroll() .addScrollId(response.getScrollId()) .get(); if (log.isInfoEnabled()) { log.info("Clear scroll response:{}", clearScrollResponse.isSucceeded()); } return totalHits; } catch (Exception e) { log.error("queryByScroll error", e); } return 0; }
调用处:
venderCrowdEsDao.queryByScroll(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("venderMode", "POP")), 200, (index, hit) -> { //获取venderId,通过venderId调用外呼获取电话、邮箱,首先获取运营负责人的电话,若为空,则使用店铺负责人的电话,若都为空则不存储。 Long venderId = Long.valueOf(String.valueOf(hit.getSourceAsMap().get("venderId"))); log.info("function applyL:{}", venderId); VenderContactPO contactResult = venderServiceRpc.getContactsInfoByVenderId(venderId); VenderContactPO contactPO = new VenderContactPO();//最终要存储到数据表中的对象 //加密,插入vender_contact_info表 if(contactResult != null){ contactPO.setVenderId(contactResult.getVenderId()); contactPO.setEmail(contactResult.getEmail()); contactPO.setPhoneNum(contactResult.getPhoneNum()); }else { contactPO.setEmail(""); contactPO.setPhoneNum(""); contactPO.setVenderId(venderId); } contactPO.setShopName(venderServiceRpc.getShopNameByVenderId(venderId)); venderContactService.insertIfNotPresent(contactPO); return true; })