zoukankan      html  css  js  c++  java
  • go对elasticsearch的增删改查

    环境

    elasticsearch 6.8 (6.x版本应该都没问题)

    go客户端sdk: github.com/elastic/go-elasticsearch/v6

    其实自己封装api也行,反正elasticsearch对外交互的协议是restful接口

    注意点

    发起的请求,如果成功了,一定要记得关闭返回Response的Body,否则会占用一个连接。

    全局变量和函数

    var c *elasticsearch.Client
    
    func init() {
        var err error
        config := elasticsearch.Config{}
        config.Addresses = []string{"http://127.0.0.1:9200"}
        c, err = elasticsearch.NewClient(config)
        checkError(err)
    }
    
    func checkError(err error) {
        if err != nil {
            fmt.Println(err)
            os.Exit(1)
        }   
    }
    

    创建索引

    func createIndex() {
        body := map[string]interface{}{
            "mappings": map[string]interface{}{
                "test_type": map[string]interface{}{
                    "properties": map[string]interface{}{
                        "str": map[string]interface{}{
                            "type": "keyword",   // 表示这个字段不分词
                        },  
                    },  
                },  
            },  
        }
        jsonBody, _ := json.Marshal(body)
        req := esapi.IndicesCreateRequest{
            Index: "test_index",
            Body:  bytes.NewReader(jsonBody),
        }
        res, err := req.Do(context.Background(), c)
        checkError(err)
        defer res.Body.Close()
        fmt.Println(res.String())
    }
    

    [200 OK] {"acknowledged":true,"shards_acknowledged":true,"index":"test_index"}

    删除索引

    func deleteIndex() {
        req := esapi.IndicesDeleteRequest{
            Index: []string{"test_index"},
        }
        res, err := req.Do(context.Background(), c)
        checkError(err)
        defer res.Body.Close()
        fmt.Println(res.String())
    }
    

    [200 OK] {"acknowledged":true}

    往索引插入数据

    插入单条数据

    func insertSingle() {
        body := map[string]interface{}{
            "num": 0,
            "v":   0,
            "str": "test",
        }
        jsonBody, _ := json.Marshal(body)
    
        req := esapi.CreateRequest{    // 如果是esapi.IndexRequest则是插入/替换
            Index:        "test_index",
            DocumentType: "test_type",
            DocumentID:   "test_1",
            Body:         bytes.NewReader(jsonBody),
        }
        res, err := req.Do(context.Background(), c)
        checkError(err)
        defer res.Body.Close()
        fmt.Println(res.String())
    }
    

    [201 Created] {"_index":"test_index","_type":"test_type","_id":"test_1","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1}

    批量插入(很明显,也可以批量做其他操作)

    func insertBatch() {
        var bodyBuf bytes.Buffer
        for i := 2; i < 10; i++ {
            createLine := map[string]interface{}{
                "create": map[string]interface{}{
                    "_index": "test_index",
                    "_id":    "test_" + strconv.Itoa(i),
                    "_type":  "test_type",
                },
            }
            jsonStr, _ := json.Marshal(createLine)
            bodyBuf.Write(jsonStr)
            bodyBuf.WriteByte('
    ')
    
            body := map[string]interface{}{
                "num": i % 3,
                "v":   i,
                "str": "test" + strconv.Itoa(i),
            }
            jsonStr, _ = json.Marshal(body)
            bodyBuf.Write(jsonStr)
            bodyBuf.WriteByte('
    ')
        }
    
        req := esapi.BulkRequest{
            Body: &bodyBuf,
        }
        res, err := req.Do(context.Background(), c)
        checkError(err)
        defer res.Body.Close()
        fmt.Println(res.String())
    }
    

    [200 OK] {"took":31,"errors":false,"items":[{"create":{"_index":"test_index","_type":"test_type","_id":"test_2","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}},{"create":{"_index":"test_index","_type":"test_type","_id":"test_3","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1,"status":201}},{"create":{"_index":"test_index","_type":"test_type","_id":"test_4","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1,"status":201}},{"create":{"_index":"test_index","_type":"test_type","_id":"test_5","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}},{"create":{"_index":"test_index","_type":"test_type","_id":"test_6","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1,"status":201}},{"create":{"_index":"test_index","_type":"test_type","_id":"test_7","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1,"status":201}},{"create":{"_index":"test_index","_type":"test_type","_id":"test_8","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1,"status":201}},{"create":{"_index":"test_index","_type":"test_type","_id":"test_9","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":3,"_primary_term":1,"status":201}}]}

    查询

    通过sql查询

    func selectBySql() {
        query := map[string]interface{}{
            "query": "select count(*) as cnt, max(v) as value, num from test_index where num > 0 group by num",
        }
        jsonBody, _ := json.Marshal(query)
        req := esapi.XPackSQLQueryRequest{
            Body: bytes.NewReader(jsonBody),
        }
        res, err := req.Do(context.Background(), c)
        checkError(err)
        defer res.Body.Close()
        fmt.Println(res.String())
    }
    

    [200 OK] {"columns":[{"name":"cnt","type":"long"},{"name":"value","type":"long"},{"name":"num","type":"long"}],"rows":[[2,7.0,1],[3,8.0,2]],"cursor":"q47zAgFjAQp0ZXN0X2luZGV4igEBAQljb21wb3NpdGUHZ3JvdXBieQEDbWF4Ajg2AAD/AQF2AAAA/wAA/wEAAjc4AQNudW0AAAH/AADoBwEKAQI3OAIAAAAAAAAAAgACAQAAAAABAP////8PAAAAAAEFcmFuZ2U/gAAAAANudW0BAAAAAP8AAAAAAAAAAAAAAAABWgMAAgIAAAAAAAH/////DwMBawI3OAEBWgABbQI4NgV2YWx1ZQAAAVoBawI3OAABWgABBw=="}

    通过Search Api查询

    func selectBySearch() {
        query := map[string]interface{}{
            "query": map[string]interface{}{
                "bool": map[string]interface{}{
                    "filter": map[string]interface{}{
                        "range": map[string]interface{}{
                            "num": map[string]interface{}{
                                "gt": 0,
                            },  
                        },  
                    },  
                },  
            },  
            "size": 0,
            "aggs": map[string]interface{}{
                "num": map[string]interface{}{
                    "terms": map[string]interface{}{
                        "field": "num",
                        //"size":  1,
                    },  
                    "aggs": map[string]interface{}{
                        "max_v": map[string]interface{}{
                            "max": map[string]interface{}{
                                "field": "v",
                            },  
                        },  
                    },  
                },  
            },  
        }   
        jsonBody, _ := json.Marshal(query)
        
        req := esapi.SearchRequest{
            Index:        []string{"test_index"},
            DocumentType: []string{"test_type"},
            Body:         bytes.NewReader(jsonBody),
        }   
        res, err := req.Do(context.Background(), c)
        checkError(err)
        defer res.Body.Close()
        fmt.Println(res.String())
    }
    

    [200 OK] {"took":10,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":5,"max_score":0.0,"hits":[]},"aggregations":{"num":{"doc_count_error_upper_bound":0,"sum_other_doc_count":0,"buckets":[{"key":2,"doc_count":3,"max_v":{"value":8.0}},{"key":1,"doc_count":2,"max_v":{"value":7.0}}]}}}

    但是elasticsearch对聚合查询分页并不是很友好,基本上都是得自己手动分页。

    局部更新(批量更新略)

    根据id更新

    func updateSingle() {          
        body := map[string]interface{}{
            "doc": map[string]interface{}{
                "v": 100,     
            },
        }                     
        jsonBody, _ := json.Marshal(body)
        req := esapi.UpdateRequest{
            Index:        "test_index",
            DocumentType: "test_type",
            DocumentID:   "test_1",
            Body:         bytes.NewReader(jsonBody),
        }                     
                              
        res, err := req.Do(context.Background(), c)
        checkError(err)       
        defer res.Body.Close()
        fmt.Println(res.String())                
    }
    

    [200 OK] {"_index":"test_index","_type":"test_type","_id":"test_1","_version":2,"result":"updated","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":3,"_primary_term":1}

    除了doc方式之外,还有script方式

    根据条件更新

    func updateByQuery() {
        body := map[string]interface{}{
            "script": map[string]interface{}{
                "lang": "painless",
                "source": `
                    ctx._source.v = params.value;
                `,
                "params": map[string]interface{}{
                    "value": 101,
                },
            },
            "query": map[string]interface{}{
                "match_all": map[string]interface{}{},
            },
        } 
        jsonBody, _ := json.Marshal(body)
        req := esapi.UpdateByQueryRequest{
            Index: []string{"test_index"},
            Body:  bytes.NewReader(jsonBody),
        } 
        res, err := req.Do(context.Background(), c)
        checkError(err)
        defer res.Body.Close()
        fmt.Println(res.String())
    }
    

    [200 OK] {"took":109,"timed_out":false,"total":9,"updated":9,"deleted":0,"batches":1,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[]}

    删除

    根据id删除

    func deleteSingle() {
        req := esapi.DeleteRequest{
            Index:        "test_index",
            DocumentType: "test_type",
            DocumentID:   "test_1",
        }   
    
        res, err := req.Do(context.Background(), c)
        checkError(err)
        defer res.Body.Close()
        fmt.Println(res.String())
    }	
    

    [200 OK] {"_index":"test_index","_type":"test_type","_id":"test_1","_version":6,"result":"deleted","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":7,"_primary_term":1}

    根据条件删除

    func deleteByQuery() { 
        body := map[string]interface{}{
            "query": map[string]interface{}{
                "match_all": map[string]interface{}{},
            },
        } 
        jsonBody, _ := json.Marshal(body)
        req := esapi.DeleteByQueryRequest{
            Index: []string{"test_index"},
            Body:  bytes.NewReader(jsonBody),
        } 
        res, err := req.Do(context.Background(), c)
        checkError(err)
        defer res.Body.Close()
        fmt.Println(res.String())
    }
    

    [200 OK] {"took":17,"timed_out":false,"total":9,"deleted":9,"batches":1,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[]}

  • 相关阅读:
    oracle的wm_concat()方法与的排序问题,Oracle的 listagg 函数
    sql sever 常用的存储过程的写法或者说与Oracle中存过的异同点
    Oracle游标的使用
    oracle与sql sever的财务月份归属的问题
    sql sever使用习惯
    sqlsever 的存储过程的调试
    sql sever与Oracle的异同点
    单例模式
    线程 ---- 锁(生产者、消费者)
    IO 流理解实例
  • 原文地址:https://www.cnblogs.com/Me1onRind/p/11534544.html
Copyright © 2011-2022 走看看