zoukankan      html  css  js  c++  java
  • Elasticsearch 5.4.3实战--Java API调用:批量写入数据

    这个其实比较简单,直接上代码.

    注意部分逻辑可以换成你自己的逻辑

      1 package com.cs99lzzs.elasticsearch.service.imp;
      2 
      3 import java.sql.Timestamp;
      4 import java.text.DecimalFormat;
      5 import java.text.SimpleDateFormat;
      6 import java.util.ArrayList;
      7 import java.util.Date;
      8 import java.util.HashMap;
      9 import java.util.List;
     10 import java.util.Map;
     11 
     12 import javax.annotation.Resource;
     13 
     14 import org.apache.commons.lang.StringUtils;
     15 import org.apache.log4j.Logger;
     16 import org.elasticsearch.action.bulk.BulkRequestBuilder;
     17 import org.elasticsearch.action.bulk.BulkResponse;
     18 import org.elasticsearch.client.Client;
     19 import org.springframework.beans.factory.annotation.Value;
     20 import org.springframework.stereotype.Service;
     21 
     22 
     23 @Service("productIndexService")
     24 public class ProductIndexServiceImp implements ProductIndexService {
     25 
     26     @Resource
     27     private TradeService               tradeService;
     28 
     29     @Resource
     30     private ProductService             productService;
     31     
     32     @Resource
     33     private ShopService                shopService;
     34     
     35     @Resource(name="esClient")
     36     Client esClient;
     37 
     38     @Value("${elasticsearch.index}")
     39     private String CLUSTER_INDEX;
     40 
     41     @Value("${elasticsearch.type}")
     42     private String CLUSTER_TYPE;
     43     
     44 
     45     private static final int           _DEFAULT_PAGE_SIZE = 50;
     46 
     47     private static final Logger        logger             = Logger.getLogger(ProductIndexServiceImp.class);
     48 
     49     @Override
     50     public void createIndex(Timestamp updateTime) {
     51         DecimalFormat decimalFormat = new DecimalFormat("#0.0");
     52         int page = 1;
     53         List<SKU> skus = null;
     54         
     55         long startTime = System.currentTimeMillis();
     56         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd  HH:mm:ss");
     57         logger.info("elasticsearch索引构建开始,开始时间:" + sdf.format(new Date()));
     58         
     59         while (true) {
     60             //获取当页的sku列表
     61             StringBuffer sb = new StringBuffer(1000);
     62             skus = productService.getSkus(updateTime, page, _DEFAULT_PAGE_SIZE);
     63             if(skus == null|| skus.isEmpty()){
     64                 break;
     65             }
     66             //批量写入
     67             BulkRequestBuilder bulkRequest = esClient.prepareBulk();
     68             
     69             for(SKU sku:skus){
     70                 try {
     71                     if(sku:skus == null){
     72                         logger.error("elasticsearch: skuId=" + sku.getId() + "对应的SPU或者Brand或者Cate对象为空,无需构建索引");
     73                         continue;
     74                     }
     75                     Map<String, Object> source = putToMap(sku);
     76                     bulkRequest.add(esClient
     77                             .prepareIndex(CLUSTER_INDEX, CLUSTER_TYPE, "" + elasticseachSku.getId())
     78                             .setSource(source));
     79                     sb.append(sku.getId()).append(",");
     80                 } catch (Exception e) {
     81                     logger.error("更新elasticsearch索引出现异常, skuId=" + sku.getId() + ",exception info is:" 
     82                             + e.getMessage() + ", e.Cause is " + e.getCause());
     83                 }
     84             } 
     85             
     86             BulkResponse response = bulkRequest.execute().actionGet();
     87             if (response == null || response.hasFailures()) {
     88                 logger.error("elasticsearch 批量构建索引失败, failure message is: " + response.buildFailureMessage());
     89             } else {
     90                 logger.info("elasticsearch 批量构建索引成功, skuId list is : " + sb.toString());
     91             }
     92             page ++;
     93         }
     94         logger.info("elasticsearch本次索引构建时间:" + (System.currentTimeMillis() - startTime)/1000 + "秒。");
     95         logger.info("elasticsearch索引构建任务结束,结束时间:" + sdf.format(new Date()));
     96     }
     97 
     98     /**
     99      * @param elasticseachSku
    100      * @return
    101      */
    102     private Map<String, Object> putToMap(Sku elasticseachSku) {
    103         
    104         Map<String, Object> source = new HashMap<String, Object>();
    105         source.put("brandZhName", elasticseachSku.getBrandZhName());
    106         source.put("brandEnName", elasticseachSku.getBrandEnName());
    107         source.put("brandAliases", elasticseachSku.getBrandAliases());
    108         source.put("aliases", elasticseachSku.getAliases());
    109         source.put("zhName", elasticseachSku.getZhName());
    110         source.put("enName", elasticseachSku.getEnName());
    111         source.put("brandZhName", elasticseachSku.getBrandZhName());
    112         
    113         /* suggester */
    114         List<String> nameList = new ArrayList<String>();
    115         if (StringUtils.isNotEmpty(elasticseachSku.getZhName())) {
    116             nameList.add(elasticseachSku.getZhName());
    117         }
    118         if (StringUtils.isNotEmpty(elasticseachSku.getBrandZhName())) {
    119             nameList.add(elasticseachSku.getBrandZhName());
    120         }
    121         if (StringUtils.isNotEmpty(elasticseachSku.getAliases())) {
    122             nameList.add(elasticseachSku.getAliases());
    123         }
    124         if (StringUtils.isNotEmpty(elasticseachSku.getEnName())) {
    125             nameList.add(elasticseachSku.getEnName());
    126         }
    127         if (StringUtils.isNotEmpty(elasticseachSku.getBrandEnName())) {
    128             nameList.add(elasticseachSku.getBrandEnName());
    129         }
    130         if (StringUtils.isNotEmpty(elasticseachSku.getBrandAliases())) {
    131             nameList.add(elasticseachSku.getBrandAliases());
    132         }
    133         if (nameList.size() > 0) {
    134             source.put("suggestName", nameList);
    135         }
    136         
    137         return source;
    138     }
    139 }
  • 相关阅读:
    查看windows下指定的端口是否开放
    网易云音乐评论爬虫:爬取歌曲的全部评论
    用 Python 玩转 GitHub 的贡献板
    用python实现linux口令破解
    Python 音频数据扩充的技巧
    教你使用python+Opencv完成人脸解锁
    opencv+Python特征检测及K-最近邻匹配
    opencv+python 统计及绘制直方图
    学会用这个工具做分析,1年积累3年工作经验
    15分钟,教你用Python爬网站数据,并用BI可视化分析!
  • 原文地址:https://www.cnblogs.com/cs99lzzs/p/7212474.html
Copyright © 2011-2022 走看看