zoukankan      html  css  js  c++  java
  • es数据库查询封装

    package cn.com.nike.dao.impl;

    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;

    import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
    import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
    import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
    import org.elasticsearch.action.bulk.BulkRequestBuilder;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.delete.DeleteResponse;
    import org.elasticsearch.action.get.GetResponse;
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.action.search.SearchType;
    import org.elasticsearch.action.update.UpdateResponse;
    import org.elasticsearch.client.Client;
    import org.elasticsearch.common.unit.TimeValue;
    import org.elasticsearch.index.query.BoolQueryBuilder;
    import org.elasticsearch.index.query.QueryBuilder;
    import org.elasticsearch.index.query.QueryBuilders;
    import org.elasticsearch.rest.RestStatus;
    import org.elasticsearch.search.SearchHit;
    import org.elasticsearch.search.SearchHits;
    import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
    import org.elasticsearch.search.aggregations.AggregationBuilders;
    import org.elasticsearch.search.aggregations.bucket.terms.Terms;
    import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Repository;

    import cn.com.nike.dao.EntityDao;
    import cn.com.nike.util.Locker;
    import cn.com.nike.util.Pagination;
    import cn.com.nike.util.UtilDateTime;
    import cn.com.nike.util.Constants.ConstantsData;
    import net.sf.json.JSONObject;

    @Repository("EntityDao")
    public class EntityDaoImpl implements EntityDao {

    private static final Logger logger = LoggerFactory.getLogger(EntityDaoImpl.class);

    @Autowired
    private Client client;

    @Override
    public String createEntity(String index, String type, Map<String, Object> entity) {

    entity.put(ConstantsData.CREATE_TIME, UtilDateTime.nowDateString(UtilDateTime.DATE_TIME_FORMAT));
    entity.put(ConstantsData.UPDATE_TIME, UtilDateTime.nowDateString(UtilDateTime.DATE_TIME_FORMAT));

    IndexResponse response = client.prepareIndex(index, type).setSource(entity).get();
    String result = "error";
    if (RestStatus.OK.equals(response.status()) || RestStatus.CREATED.equals(response.status())) {
    result = response.getId();
    }
    return result;
    }

    @Override
    public int updateEntity(String index, String type, String id, Map<String, Object> entity) {

    entity.put(ConstantsData.UPDATE_TIME, UtilDateTime.nowDateString(UtilDateTime.DATE_TIME_FORMAT));

    UpdateResponse response = client.prepareUpdate(index, type, id).setDoc(entity).get();
    int result = -1;
    if (RestStatus.OK.equals(response.status())) {
    result = 1;
    }
    return result;
    }

    @Override
    public int deleteEntity(String index, String type, String id) {
    DeleteResponse response = client.prepareDelete(index, type, id).get();
    int result = -1;
    if (RestStatus.OK.equals(response.status())) {
    result = 1;
    }
    return result;
    }

    @Override
    public Map<String, Object> findEntityById(String index, String type, String id) {
    GetResponse response = client.prepareGet(index, type, id).get();
    return response.getSource();
    }

    @Override
    public String createJson(String index, String type, List<JSONObject> list) {
    Locker lock = new Locker();
    lock.lock();
    List<JSONObject> objects = list;
    BulkRequestBuilder bulkRequest = client.prepareBulk();
    BulkResponse bulkResponse = null;
    int count = 0;
    for (int i = 0; i < objects.size(); i++) {

    objects.get(i).put(ConstantsData.CREATE_TIME, UtilDateTime.nowDateString(UtilDateTime.DATE_TIME_FORMAT));
    objects.get(i).put(ConstantsData.UPDATE_TIME, UtilDateTime.nowDateString(UtilDateTime.DATE_TIME_FORMAT));

    String data = objects.get(i).toString();

    bulkRequest.add(client.prepareIndex(index, type).setId(Integer.toString(i + 1)).setSource(data));
    count++;
    if (count % 1000 == 0) {
    bulkRequest.execute().actionGet();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + index + "已经提交了"
    + count + "条数据");
    }
    }
    bulkRequest.execute().actionGet();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "插入完毕");
    bulkResponse = bulkRequest.get();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "是否正确:"
    + !bulkResponse.hasFailures());
    if (bulkResponse.hasFailures()) {
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "error");
    logger.info(bulkResponse.buildFailureMessage());
    return "error";
    } else {
    Map<String, Object> entity = new HashMap<String, Object>();
    entity.put("id", Integer.toString(0));
    // createEntity(index.substring(0, index.length() - 4), type,
    // entity);
    boolean result = exindex(index.substring(0, index.length() - 4));
    logger.info("result" + result);
    if (result == true) {
    delete(index.substring(0, index.length() - 4), type);
    }
    String copyresult = copyFromTemp(index);
    if (copyresult == "success") {
    delete(index);
    lock.unLock();
    return "success";
    } else {
    lock.unLock();
    return "error";
    }
    }
    }

    public String createJson(String index, String type, String id, List<JSONObject> list) {
    Locker lock = new Locker();
    lock.lock();
    List<JSONObject> objects = list;
    BulkRequestBuilder bulkRequest = client.prepareBulk();
    BulkResponse bulkResponse = null;
    int count = 0;
    for (int i = 0; i < objects.size(); i++) {

    objects.get(i).put(ConstantsData.CREATE_TIME, UtilDateTime.nowDateString(UtilDateTime.DATE_TIME_FORMAT));
    objects.get(i).put(ConstantsData.UPDATE_TIME, UtilDateTime.nowDateString(UtilDateTime.DATE_TIME_FORMAT));

    String data = objects.get(i).toString();
    String ID = objects.get(i).get(id).toString();
    bulkRequest.add(client.prepareIndex(index, type).setId(ID).setSource(data));
    count++;
    if (count % 1000 == 0) {
    bulkRequest.execute().actionGet();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + index + "已经提交了"
    + count + "条数据");
    }
    }
    bulkRequest.execute().actionGet();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "插入完毕");
    bulkResponse = bulkRequest.get();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "是否正确:"
    + !bulkResponse.hasFailures());
    if (bulkResponse.hasFailures()) {
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "error");
    logger.info(bulkResponse.buildFailureMessage());
    return "error";
    } else {
    boolean result = exindex(index.substring(0, index.length() - 4));
    if (result == true) {
    delete(index.substring(0, index.length() - 4), type);
    }
    try {
    Thread.sleep(1000L);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    String copyresult = copyFromTemp(index);
    try {
    Thread.sleep(1000L);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    if (copyresult == "success") {
    delete(index);
    lock.unLock();
    return "success";
    } else {
    lock.unLock();
    return "error";
    }
    }
    }

    @Override
    public String createObj(String index, String type, List<Object> objects) throws Exception {
    List<JSONObject> list = new ArrayList<>();
    for (Object object : objects) {
    JSONObject jsonobject = (JSONObject) object;
    jsonobject.put(ConstantsData.CREATE_TIME, UtilDateTime.nowDateString(UtilDateTime.DATE_TIME_FORMAT));
    jsonobject.put(ConstantsData.UPDATE_TIME, UtilDateTime.nowDateString(UtilDateTime.DATE_TIME_FORMAT));
    list.add(jsonobject);
    }
    String result = createJson(index, type, list);
    return result;
    }

    @Override
    public boolean delete(String index, String type) {
    QueryBuilder queryBuilder = null;
    SearchResponse searchResponse = client.prepareSearch(index).setTypes(type).setQuery(queryBuilder)
    .setSearchType(SearchType.DEFAULT).setScroll(new TimeValue(60000)).setSize(1000).get();
    SearchHits hits = searchResponse.getHits();
    SearchHit[] searchHits = hits.hits();
    while (true) {
    for (SearchHit s : searchHits) {
    deleteEntity(index, type, s.getId());
    }
    searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(new TimeValue(600000))
    .get();
    if (searchResponse.getHits().getHits().length == 0) {
    break;
    }
    }
    return true;
    }

    @Override
    public String copyFromTemp(String index) {
    String indexFrom = index;
    String indexTo = indexFrom.substring(0, indexFrom.length() - 4);
    int timeMillis = 60000;
    SearchResponse scrollResp = client.prepareSearch(indexFrom).setScroll(new TimeValue(timeMillis)).setSize(1000)
    .execute().actionGet();
    while (true) {
    BulkRequestBuilder bulkRequest = client.prepareBulk();
    SearchHit[] hits = scrollResp.getHits().getHits();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "里面有" + hits.length
    + "条数据");
    if (hits.length <= 0) {
    return "error";
    } else if (hits.length > 0) {
    int count = 0;
    for (int i = 0; i < hits.length; i++) {
    count++;
    bulkRequest.add(client.prepareIndex(indexTo, hits[i].getType(), hits[i].getId())
    .setSource(hits[i].getSource()));
    if (count % 1000 == 0) {
    bulkRequest.execute().actionGet();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "已经提交了"
    + count + "条数据");
    }
    }
    bulkRequest.execute().actionGet();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "插入完毕");
    scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(timeMillis))
    .execute().actionGet();
    if (scrollResp.getHits().getHits().length == 0) {
    break;
    }
    }
    }
    return "success";
    }

    @Override
    public Map<String, Object> searcher(String index, String type, Pagination page, QueryBuilder queryBuilder)
    throws Exception {
    SearchResponse searchResponse = null;
    Map<String, Object> map = new HashMap<>();
    if (page == null) {
    searchResponse = client.prepareSearch(index).setTypes(type).setQuery(queryBuilder)
    .setSearchType(SearchType.DEFAULT).setScroll(new TimeValue(60000)).setSize(10000).get();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "查询:" + index + "下的 "
    + queryBuilder);
    List<JSONObject> list = new ArrayList<>();
    SearchHits hits = searchResponse.getHits();
    long total = hits.getTotalHits();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "查询到:" + total + "条。");
    while (true) {
    SearchHit[] searchHits = hits.hits();
    for (SearchHit s : searchHits) {
    String result = s.getSourceAsString().toString();
    JSONObject jsonObject = JSONObject.fromObject(result);
    jsonObject.put("id", s.getId());
    list.add(jsonObject);
    }
    searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
    .setScroll(new TimeValue(600000)).get();
    if (searchResponse.getHits().getHits().length == 0) {
    break;
    }
    }
    map.put("total", total);
    map.put("result", list);
    return map;
    } else {
    Map<String, Object> map2 = searcherbypagebyqueryBuilder(index, type, page, queryBuilder);
    return map2;
    }
    }

    public Map<String, Object> searcherbypagebyqueryBuilder(String index, String type, Pagination page,
    QueryBuilder queryBuilder) throws Exception {
    Map<String, Object> map = new HashMap<>();
    Integer pageNo = 0;
    Integer pageSize = 0;
    Integer pageno = page.getPageNo();
    pageSize = page.getPageSize();
    if (pageno <= 0) {
    pageNo = 1;
    } else {
    pageNo = pageno;
    }
    SearchResponse searchResponse = client.prepareSearch(index).setTypes(type).setQuery(queryBuilder)
    .setSearchType(SearchType.QUERY_THEN_FETCH).setFrom((pageNo - 1) * pageSize).setSize(pageSize).execute()
    .get();
    List<JSONObject> list = new ArrayList<>();
    SearchHits hits = searchResponse.getHits();
    long total = hits.getTotalHits();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "查询到:" + total + "条。");
    SearchHit[] searchHits = hits.hits();
    for (SearchHit s : searchHits) {
    String result = s.getSourceAsString().toString();
    JSONObject jsonObject = JSONObject.fromObject(result);
    jsonObject.put("id", s.getId());
    list.add(jsonObject);
    }
    map.put("total", total);
    map.put("result", list);
    return map;
    }

    public boolean exindex(String index) {
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "判断:" + index + "是否存在");
    IndicesExistsRequest request = new IndicesExistsRequest(index);
    IndicesExistsResponse response = client.admin().indices().exists(request).actionGet();
    if (response.isExists()) {
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "查询到:" + index + "存在。");
    return true;
    } else {
    logger.info(
    new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "查询到:" + index + "不存在。");
    return false;
    }
    }

    @Override
    public boolean delete(String index) {
    DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(index).execute().actionGet();
    if (dResponse.isAcknowledged()) {
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + index + "删除成功");
    return true;
    } else {
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + index + "删除失败");
    return false;
    }
    }

    @Override
    public Map<String, Object> searcher(String index, String type, Pagination page, Map<String, String> require)
    throws Exception {
    Integer pageNo = null;
    Integer pageSize = null;
    if (page == null) {
    pageNo = 1;
    pageSize = 1000;
    } else {
    pageNo = page.getPageNo();
    pageSize = page.getPageSize();
    pageNo = null == pageNo ? 1 : pageNo;
    }
    QueryBuilder queryBuilder = null;
    Map<String, Object> map = new HashMap<>();
    List<QueryBuilder> querys = new ArrayList<>();
    if (require == null) {
    queryBuilder = null;
    } else {
    if (require.size() == 1) {
    Iterator<?> iterator = require.entrySet().iterator();
    Entry<?, ?> entry = (Entry<?, ?>) iterator.next();
    String string1 = entry.getKey().toString();
    String string2 = entry.getValue().toString();
    if (string2 == null || "".equals(string2)) {
    queryBuilder = null;
    }
    if (string2.contains("*")) {
    queryBuilder = QueryBuilders.wildcardQuery(string1, string2.toLowerCase());
    } else {
    queryBuilder = QueryBuilders.matchPhraseQuery(string1, string2.toLowerCase());
    }
    } else if (require.size() > 1) {
    queryBuilder = QueryBuilders.boolQuery();
    for (int i = 0; i < require.size(); i++) {
    Iterator<?> iterator = require.entrySet().iterator();
    while (iterator.hasNext()) {
    Entry<?, ?> entry = (Entry<?, ?>) iterator.next();
    String string1 = (String) entry.getKey();
    String string2 = (String) entry.getValue();
    if (string2 != null || "".equals(string2)) {
    if (string2.contains("*")) {
    QueryBuilder query = QueryBuilders.wildcardQuery(string1, string2.toLowerCase());
    querys.add(query);
    } else {
    QueryBuilder query = QueryBuilders.matchPhraseQuery(string1, string2.toLowerCase());
    querys.add(query);
    }
    }
    }
    }
    for (int i = 0; i < querys.size(); i++) {
    ((BoolQueryBuilder) queryBuilder).must(querys.get(i));
    }
    }
    }
    SearchResponse searchResponse = client.prepareSearch(index).setTypes(type).setQuery(queryBuilder)
    .setSearchType(SearchType.QUERY_THEN_FETCH).setFrom((pageNo - 1) * pageSize).setSize(pageSize).execute()
    .get();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "查询:" + index + "下的"
    + require + "条件");
    List<JSONObject> list = new ArrayList<>();
    SearchHits hits = searchResponse.getHits();
    long total = hits.getTotalHits();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "查询到:" + total + "条。");
    SearchHit[] searchHits = hits.hits();
    for (SearchHit s : searchHits) {
    String result = s.getSourceAsString().toString();
    JSONObject jsonObject = JSONObject.fromObject(result);
    jsonObject.put("id", s.getId());
    list.add(jsonObject);
    }
    map.put("total", total);
    map.put("result", list);
    return map;
    }

    @SuppressWarnings("unused")
    @Override
    public int sumQTY(String index, String type, String name) throws Exception {
    int sum = 0;
    QueryBuilder queryBuilder = null;
    SearchResponse searchResponse = client.prepareSearch(index).setTypes(type).setQuery(queryBuilder)
    .setSearchType(SearchType.DEFAULT).setScroll(new TimeValue(60000)).setSize(1000).get();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "查询:" + index + "下的");
    List<JSONObject> list = new ArrayList<>();
    SearchHits hits = searchResponse.getHits();
    long total = hits.getTotalHits();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "查询到:" + total + "条。");
    SearchHit[] searchHits = hits.hits();
    while (true) {
    for (int i = 0; i < searchHits.length; i++) {
    String result = searchHits[i].getSourceAsString().toString();
    JSONObject jsonObject = JSONObject.fromObject(result);
    String QTY = jsonObject.get(name).toString();
    logger.debug("在第" + (i + 1) + "条记录中" + name + "数量" + QTY);
    sum += Integer.parseInt(QTY);
    }
    searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(new TimeValue(600000))
    .execute().actionGet();
    if (searchResponse.getHits().getHits().length == 0) {
    break;
    }
    }
    return sum;
    }

    @SuppressWarnings("null")
    @Override
    public String[] queryQTYAndAmount(String index, String type, Pagination page, QueryBuilder queryBuilder)
    throws Exception {
    List<String> list = null;
    String[] datas = new String[2];
    int sumQTY = 0;
    float sumPrice = 0;
    String sumq = null;
    String sump = null;
    SearchResponse searchResponse = client.prepareSearch(index).setTypes(type).setQuery(queryBuilder)
    .setSearchType(SearchType.DEFAULT).setScroll(new TimeValue(60000)).setSize(1000).get();
    SearchHits hits = searchResponse.getHits();
    SearchHit[] searchHits = hits.hits();
    while (true) {
    for (SearchHit s : searchHits) {
    String result = (String) s.getSourceAsString().toString();
    JSONObject jsonObject = JSONObject.fromObject(result);
    sumQTY += Integer.valueOf(jsonObject.get("QTY").toString());
    float number = 0;
    float price = 0;
    number = Integer.valueOf(jsonObject.get("QTY").toString());
    price = Float.valueOf(jsonObject.get("Unit Price").toString());
    sumPrice += number * price;
    }
    searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(new TimeValue(600000))
    .get();
    if (searchResponse.getHits().getHits().length == 0) {
    break;
    }
    }
    if (sumQTY != 0) {
    sumq = Integer.toString(sumQTY);
    } else {
    sumq = "";
    }
    if (sumPrice != 0) {
    sump = Float.toString(sumPrice);
    } else {
    sump = "";
    }
    datas[0] = sumq;
    datas[1] = sump;

    return datas;
    }

    @SuppressWarnings("unused")
    @Override
    public Map<String, Object> searchErrorType(String index, String type, Pagination page, Map<String, String> require)
    throws Exception {
    Integer pageNo = null;
    Integer pageSize = null;
    Map<String, Object> map = new HashMap<>();

    String requireValue1 = require.get("PO_No");
    String requireValue2 = require.get("Error Type");

    QueryBuilder queryBuilder = null;
    if (!(requireValue1 == null || requireValue1.equals(""))) {
    queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.wildcardQuery("PO_No", requireValue1))
    .must(QueryBuilders.wildcardQuery("Error Type", requireValue2));
    } else if ("T".equals(requireValue2)) {
    queryBuilder = QueryBuilders.matchQuery("Error Type", "");
    } else if ("F".equals(requireValue2)) {
    queryBuilder = QueryBuilders.wildcardQuery("Error Type", "*");
    } else {
    queryBuilder = QueryBuilders.wildcardQuery("Error Type", requireValue2);
    }

    SearchResponse searchResponse = null;
    if (page == null) {
    searchResponse = client.prepareSearch(index).setTypes(type).setQuery(queryBuilder)
    .setSearchType(SearchType.QUERY_THEN_FETCH).execute().get();
    } else {
    pageNo = page.getPageNo();
    pageSize = page.getPageSize();
    pageNo = null == pageNo ? 1 : pageNo;
    searchResponse = client.prepareSearch(index).setTypes(type).setQuery(queryBuilder)
    .setSearchType(SearchType.QUERY_THEN_FETCH).setFrom((pageNo - 1) * pageSize).setSize(pageSize)
    .execute().get();
    }

    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "查询:" + index + "下的"
    + require + "条件");
    List<JSONObject> list = new ArrayList<>();
    SearchHits hits = searchResponse.getHits();
    long total = hits.getTotalHits();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "查询到:" + total + "条。");
    SearchHit[] searchHits = hits.hits();
    for (SearchHit s : searchHits) {
    String result = s.getSourceAsString().toString();
    Object object = result;
    JSONObject jsonObject = JSONObject.fromObject(result);
    jsonObject.put("id", s.getId());
    list.add(jsonObject);
    }
    map.put("total", total);
    map.put("result", list);
    return map;
    }

    public boolean updatedatebyname(String fromindex, String fromtype, String fromname, String toindex, String totype,
    String toname) {
    QueryBuilder queryBuilder = null;
    SearchResponse searchResponse = client.prepareSearch(fromindex).setTypes(fromtype).setQuery(queryBuilder)
    .setSearchType(SearchType.DEFAULT).setScroll(new TimeValue(60000)).setSize(1000).get();
    SearchHits hits = searchResponse.getHits();
    SearchHit[] searchHits = hits.hits();
    while (true) {
    for (SearchHit s : searchHits) {
    String result = (String) s.getSourceAsString().toString();
    JSONObject jsonObject = JSONObject.fromObject(result);
    Map<String, Object> entity = new HashMap<String, Object>();
    entity.put(toname, jsonObject.get(fromname));
    boolean extendidresult = extendid(toindex, totype, s.getId());
    if (extendidresult == true) {
    updateEntity(toindex, totype, s.getId(), entity);
    }
    }
    searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(new TimeValue(600000))
    .get();
    if (searchResponse.getHits().getHits().length == 0) {
    break;
    }
    }
    return true;
    }

    public int count(String index, String type, QueryBuilder queryBuilder) {
    SearchResponse searchResponse = client.prepareSearch(index).setTypes(type).setQuery(queryBuilder)
    .setSearchType(SearchType.DEFAULT).setScroll(new TimeValue(60000)).setSize(1000).get();
    SearchHits hits = searchResponse.getHits();
    SearchHit[] searchHits = hits.hits();
    while (true) {
    searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(new TimeValue(600000))
    .get();
    if (searchResponse.getHits().getHits().length == 0) {
    break;
    }
    }
    return searchHits.length;
    }

    @Override
    public int sumQTYandPrice(String index, String type, String price, String QTY, QueryBuilder queryBuilder) {
    int sum = 0;
    SearchResponse searchResponse = client.prepareSearch(index).setTypes(type).setQuery(queryBuilder)
    .setSearchType(SearchType.DEFAULT).setScroll(new TimeValue(60000)).setSize(1000).get();
    SearchHits hits = searchResponse.getHits();
    SearchHit[] searchHits = hits.hits();
    while (true) {
    for (SearchHit s : searchHits) {
    String result = (String) s.getSourceAsString().toString();
    JSONObject jsonObject = JSONObject.fromObject(result);
    sum += Integer.valueOf(jsonObject.get("QTY").toString())
    * Integer.valueOf(jsonObject.get("Unit Price").toString());
    }
    searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(new TimeValue(600000))
    .get();
    if (searchResponse.getHits().getHits().length == 0) {
    break;
    }
    }
    return sum;
    }

    public boolean extendid(String index, String type, String id) {
    GetResponse response = client.prepareGet(index, type, id).get();
    return response.isExists();
    }

    @SuppressWarnings("unused")
    @Override
    public List<JSONObject> serarchNoPage(String index, String type, QueryBuilder queryBuilder) {

    List<JSONObject> list = new ArrayList<>();
    SearchResponse searchResponse = null;

    searchResponse = client.prepareSearch(index).setTypes(type).setQuery(queryBuilder)
    .setSearchType(SearchType.DEFAULT).setScroll(new TimeValue(60000)).setSize(10000).get();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "search DB: " + index
    + " " + queryBuilder);
    SearchHits hits = searchResponse.getHits();
    long total = hits.getTotalHits();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "Total: " + total);
    while (true) {
    SearchHit[] searchHits = hits.hits();
    for (SearchHit s : searchHits) {
    String result = s.getSourceAsString().toString();
    JSONObject jsonObject = JSONObject.fromObject(result);
    jsonObject.put("id", s.getId());
    list.add(jsonObject);
    }
    searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(new TimeValue(600000))
    .get();
    if (searchResponse.getHits().getHits().length == 0) {
    break;
    }
    }
    return list;
    }

    /* qap添加数据分季度 index为年 type为季度 */
    @Override
    public String qapcreateJson(String index, String type, List<JSONObject> list) throws Exception {
    Locker lock = new Locker();
    lock.lock();
    List<JSONObject> objects = list;
    BulkRequestBuilder bulkRequest = client.prepareBulk();
    BulkResponse bulkResponse = null;
    int count = 0;
    String newindex = index.substring(0, index.length() - 4);
    int nain = Integer.valueOf(newindex);
    String oldnaian = Integer.toString(nain - 1);
    boolean cunzai = exindex(oldnaian);
    for (int i = 0; i < objects.size(); i++) {
    if (cunzai == true) {
    boolean idresult = extendid(oldnaian, type, objects.get(i).get("Product Code").toString());
    if (idresult == false) {
    objects.get(i).put("newProductCode", "Y");
    } else {
    objects.get(i).put("newProductCode", "N");
    }
    } else {
    objects.get(i).put("newProductCode", "Y");
    }
    objects.get(i).put(ConstantsData.CREATE_TIME, UtilDateTime.nowDateString(UtilDateTime.DATE_TIME_FORMAT));
    objects.get(i).put(ConstantsData.UPDATE_TIME, UtilDateTime.nowDateString(UtilDateTime.DATE_TIME_FORMAT));
    String data = objects.get(i).toString();
    String ID = objects.get(i).get("Product Code").toString();
    bulkRequest.add(client.prepareIndex(index, type).setId(ID).setSource(data));
    count++;
    if (count % 1000 == 0) {
    bulkRequest.execute().actionGet();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + index + "已经提交了"
    + count + "条数据");
    }
    }
    bulkRequest.execute().actionGet();
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "插入完毕");
    bulkResponse = bulkRequest.get();
    if (bulkResponse.hasFailures()) {
    logger.info(new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss").format(new Date()) + " " + "error");
    logger.info(bulkResponse.buildFailureMessage());
    return "error";
    } else {
    String copyresult = copyFromTemp(index);
    System.out.println(index);
    if (copyresult == "success") {
    delete(index);
    lock.unLock();
    return "success";
    } else {
    lock.unLock();
    return "error";
    }
    }
    }

    /**
    * 分组查询
    *
    * @param indexName
    * 库名
    * @param typeName表名
    * @param aggRequire别名+agg
    * @param fieldName列名
    * @param query查询条件
    * @return
    * @throws Exception
    */
    @SuppressWarnings({ "rawtypes", "unused", "unchecked" })
    public List<String> groupByBuilder(String indexName, String typeName, String aggRequire, String fieldName,
    QueryBuilder query) throws Exception {
    logger.debug(
    UtilDateTime.getTimestamp(System.currentTimeMillis())
    + ".groupByBuilder.start.入参indexName={},aggRequire={},fieldName={},query={}",
    indexName, aggRequire, fieldName, query);
    List<String> list = new ArrayList();
    try {
    aggRequire = aggRequire + "agg";
    fieldName = fieldName + ".keyword";
    list = new ArrayList<>();
    AbstractAggregationBuilder aggregation = AggregationBuilders.terms(aggRequire).field(fieldName).size(10000);
    SearchResponse response = client.prepareSearch(indexName).setTypes(typeName).setQuery(query)
    .setSearchType(SearchType.QUERY_THEN_FETCH).addAggregation(aggregation).execute().actionGet();
    SearchHits hits = response.getHits();
    Terms terms = response.getAggregations().get(aggRequire);
    List<Bucket> materialBuckets = terms.getBuckets();
    for (Bucket item : materialBuckets) {
    list.add((String) item.getKey());
    }
    } catch (Exception e) {
    logger.error(UtilDateTime.getTimestamp(System.currentTimeMillis()) + ".groupByBuilder.exception...异常信息:{}",
    e);
    throw new Exception(
    UtilDateTime.getTimestamp(System.currentTimeMillis()) + ".groupByBuilder.exception...异常信息:{}", e);
    }
    logger.debug(UtilDateTime.getTimestamp(System.currentTimeMillis()) + ".groupByBuilder.end.出参list.size={}",
    list);
    return list;
    }

    }

  • 相关阅读:
    OC基础框架
    协议代理
    内存管理
    重写init或自定义init方法
    iOS输入框UITextField输入限制
    iOS 打包FrameWork
    iOS 持续往文件写入数据。
    ld: library not found for -lxxx 问题的解决办法
    iOS 侧滑返回过程中导航栏的黑色问题解决办法
    iOS 蓝牙分包发送数据
  • 原文地址:https://www.cnblogs.com/songyunxinQQ529616136/p/6834775.html
Copyright © 2011-2022 走看看