zoukankan      html  css  js  c++  java
  • golang 操作es 批量索引数据 Bulk

    减少开销 提高效率 现有130万条数据 一条一条索引的话需要不停的跑需要两天左右 如果使用bulk 五分钟就完事儿了 

    func IndexPrice() {
    	es := tool.ES{
    		Index: "financials.us.gama",
    		Type:  "esstockprice",
    	}
    	var MaxId int
    	MaxId = 0
    	var price *model.PriceRecord
    	// 每次取两万条数据索引两万条 记录最大id 下一次根据maxid查询
    	for {
    		bulkRequest := tool.Client.Bulk()
    		// 每次只取2万条数据
    		priceList := price.GetPrice2(MaxId)
    		if len(priceList) == 0 {
    			break
    		}
    		var upDateList []int
    		for _, i := range priceList {
    			upDateList = append(upDateList, i.ID)
    			if i.ID > MaxId {
    				MaxId = i.ID
    			}
    			id := i.Symbol + "_" + i.DateTime
    			var esPrice *ESPrice
    			esPrice = &ESPrice{
    				ShortName:    i.Symbol,
    				TradeDate:    i.DateTime,
    				StockPriceId: id,
    				IsTradeDate:  1,
    				Open:         i.Open,
    				Close:        i.Close,
    				Volume:       i.Volume,
    				High:         i.High,
    				Low:          i.Low,
    			}
    			req := elastic.NewBulkIndexRequest().
    				Index(es.Index).
    				Type(es.Type).
    				Id(id).
    				Doc(esPrice)
    			bulkRequest = bulkRequest.Add(req)
    		}
    		bulkResponse, err := bulkRequest.Do(context.Background())
    		if err != nil {
    			fmt.Println(err)
    		}else{
    			// 索引成功就修改数据库状态 is_index = 1
    			if !price.UpdatePriceList(upDateList){
    				fmt.Println("更新数据库状态错误")
    			}
    		}
    		fmt.Println("耗时:",bulkResponse.Took, "索引了:",len(bulkResponse.Items))
    	}
    }
    

      

  • 相关阅读:
    1 绪论
    3.4 向量空间及其子空间的的基与维数
    3.3 极大线性无关组以及&向量的秩
    3.2 线性相关与线性无关的向量组
    3.1 n维向量空间及其子空间
    2.6 拉普拉斯定理
    2.5 克拉默法则
    2.4 行列式按行(列)展开
    2.3 行列式的性质
    2.2 n阶行列式的定义
  • 原文地址:https://www.cnblogs.com/sumafan/p/13048882.html
Copyright © 2011-2022 走看看