这个其实比较简单,直接上代码.
注意部分逻辑可以换成你自己的逻辑
1 package com.cs99lzzs.elasticsearch.service.imp; 2 3 import java.sql.Timestamp; 4 import java.text.DecimalFormat; 5 import java.text.SimpleDateFormat; 6 import java.util.ArrayList; 7 import java.util.Date; 8 import java.util.HashMap; 9 import java.util.List; 10 import java.util.Map; 11 12 import javax.annotation.Resource; 13 14 import org.apache.commons.lang.StringUtils; 15 import org.apache.log4j.Logger; 16 import org.elasticsearch.action.bulk.BulkRequestBuilder; 17 import org.elasticsearch.action.bulk.BulkResponse; 18 import org.elasticsearch.client.Client; 19 import org.springframework.beans.factory.annotation.Value; 20 import org.springframework.stereotype.Service; 21 22 23 @Service("productIndexService") 24 public class ProductIndexServiceImp implements ProductIndexService { 25 26 @Resource 27 private TradeService tradeService; 28 29 @Resource 30 private ProductService productService; 31 32 @Resource 33 private ShopService shopService; 34 35 @Resource(name="esClient") 36 Client esClient; 37 38 @Value("${elasticsearch.index}") 39 private String CLUSTER_INDEX; 40 41 @Value("${elasticsearch.type}") 42 private String CLUSTER_TYPE; 43 44 45 private static final int _DEFAULT_PAGE_SIZE = 50; 46 47 private static final Logger logger = Logger.getLogger(ProductIndexServiceImp.class); 48 49 @Override 50 public void createIndex(Timestamp updateTime) { 51 DecimalFormat decimalFormat = new DecimalFormat("#0.0"); 52 int page = 1; 53 List<SKU> skus = null; 54 55 long startTime = System.currentTimeMillis(); 56 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 57 logger.info("elasticsearch索引构建开始,开始时间:" + sdf.format(new Date())); 58 59 while (true) { 60 //获取当页的sku列表 61 StringBuffer sb = new StringBuffer(1000); 62 skus = productService.getSkus(updateTime, page, _DEFAULT_PAGE_SIZE); 63 if(skus == null|| skus.isEmpty()){ 64 break; 65 } 66 //批量写入 67 BulkRequestBuilder bulkRequest = esClient.prepareBulk(); 68 69 for(SKU sku:skus){ 70 try { 71 if(sku:skus == null){ 72 logger.error("elasticsearch: skuId=" + sku.getId() + "对应的SPU或者Brand或者Cate对象为空,无需构建索引"); 73 continue; 74 } 75 Map<String, Object> source = putToMap(sku); 76 bulkRequest.add(esClient 77 .prepareIndex(CLUSTER_INDEX, CLUSTER_TYPE, "" + elasticseachSku.getId()) 78 .setSource(source)); 79 sb.append(sku.getId()).append(","); 80 } catch (Exception e) { 81 logger.error("更新elasticsearch索引出现异常, skuId=" + sku.getId() + ",exception info is:" 82 + e.getMessage() + ", e.Cause is " + e.getCause()); 83 } 84 } 85 86 BulkResponse response = bulkRequest.execute().actionGet(); 87 if (response == null || response.hasFailures()) { 88 logger.error("elasticsearch 批量构建索引失败, failure message is: " + response.buildFailureMessage()); 89 } else { 90 logger.info("elasticsearch 批量构建索引成功, skuId list is : " + sb.toString()); 91 } 92 page ++; 93 } 94 logger.info("elasticsearch本次索引构建时间:" + (System.currentTimeMillis() - startTime)/1000 + "秒。"); 95 logger.info("elasticsearch索引构建任务结束,结束时间:" + sdf.format(new Date())); 96 } 97 98 /** 99 * @param elasticseachSku 100 * @return 101 */ 102 private Map<String, Object> putToMap(Sku elasticseachSku) { 103 104 Map<String, Object> source = new HashMap<String, Object>(); 105 source.put("brandZhName", elasticseachSku.getBrandZhName()); 106 source.put("brandEnName", elasticseachSku.getBrandEnName()); 107 source.put("brandAliases", elasticseachSku.getBrandAliases()); 108 source.put("aliases", elasticseachSku.getAliases()); 109 source.put("zhName", elasticseachSku.getZhName()); 110 source.put("enName", elasticseachSku.getEnName()); 111 source.put("brandZhName", elasticseachSku.getBrandZhName()); 112 113 /* suggester */ 114 List<String> nameList = new ArrayList<String>(); 115 if (StringUtils.isNotEmpty(elasticseachSku.getZhName())) { 116 nameList.add(elasticseachSku.getZhName()); 117 } 118 if (StringUtils.isNotEmpty(elasticseachSku.getBrandZhName())) { 119 nameList.add(elasticseachSku.getBrandZhName()); 120 } 121 if (StringUtils.isNotEmpty(elasticseachSku.getAliases())) { 122 nameList.add(elasticseachSku.getAliases()); 123 } 124 if (StringUtils.isNotEmpty(elasticseachSku.getEnName())) { 125 nameList.add(elasticseachSku.getEnName()); 126 } 127 if (StringUtils.isNotEmpty(elasticseachSku.getBrandEnName())) { 128 nameList.add(elasticseachSku.getBrandEnName()); 129 } 130 if (StringUtils.isNotEmpty(elasticseachSku.getBrandAliases())) { 131 nameList.add(elasticseachSku.getBrandAliases()); 132 } 133 if (nameList.size() > 0) { 134 source.put("suggestName", nameList); 135 } 136 137 return source; 138 } 139 }