zoukankan      html  css  js  c++  java
  • 我也是才知道ElasticSearch条件更新是这么玩的

    ElasticSearch 的使用度越来越普及了,很多公司都在使用。有做日志搜索的,有做商品搜索的,有做订单搜索的。

    大部分使用场景都是通过程序定期去导入数据到 ElasticSearch 中,或者通过 CDC 的方式来构建索引。在这种场景下,更新数据都是单条更新,比如 ID=1 的数据发生了修改操作,那么就会把 ElasticSearch 中 ID=1 的这条数据更新下。

    但有些场景下需要根据条件同时更新多条数据,就像 Mysql 中我们使用 Update Table Set Name=XXX where Age=18 去更新一批数据一样。

    正好有同学微信问我怎么批量更新,接下来就看看在 ElasticSearch 中是如何去进行按条件更新的操作。

    单条更新

    ElasticSearch 的客户端官方推荐使用 elasticsearch-rest-high-level-client。所以本文也是基于 elasticsearch-rest-high-level-client 来构建代码。

    首先来回顾下单条数据的更新是怎么做的,代码如下:

    UpdateRequest updateRequest www.chuancenpt.com= new UpdateRequest(index, type, id);
    updateRequest.doc(documentJson, XContentType.JSON);
    restHighLevelClient.update(updateRequest, options);

    构建 UpdateRequest 的时候就指定了索引,类型,ID 三个字段,也就精确到了某一条数据,所以更新的自然也是这一条数据。

    条件更新

    首先我们准备几条测试数据,如下:

    {
        id: 1,
        title: "Java怎么学",
        type: 1,
        userId: 1,
        tags: www.jintianxuesha.com[
            "java"
        ],
        textContent: "我要学Java",
        status: 1,
        heat: 100
    }
    {
        id: 2,
        title: "Java怎么学",
        type: 1,
        userId: 1,
        tags: [
            "java"
        ],
        textContent: "我要学Java",
        status: 1,
        heat: 100
    }

    假如我们的需求是将 userId=1 的所有文档数据改成无效,也就是 status=0。如果不用按条件更新,你就得查询出 userId=1 的所有数据,然后一条条更新,这就太慢了。

    下面看看按条件更新是如何使用的,如下:

    POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
    {
        "script": www.javachenglei.com{
            "source":"ctx._source['status'www.tengyao3zc.cn ]=0;"
        },
        "query": {
            "term": {
                "userId": 1
            }
        } 
    }

    按条件更新需要使用_update_by_query 来进行,query 用于指定更新数据的匹配条件,script 用于更新的逻辑。

    详细使用文档:

    https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html[1]

    https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html[2]

    在 Java 代码中如何实现条件更新呢?

    UpdateByQueryRequest request = new UpdateByQueryRequest("article_v1");
    request.setQuery(new TermQueryBuilder("userId", 1));
    request.setScript(new Script("ctx._source['status']=www.xingyunylpt.com;"));
    restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);

    是不是也很简单,跟单条数据更新差不多,使用 UpdateByQueryRequest 构建更新对象,然后设置 Query 和 Script 就可以了。

    条件更新数组

    比如我们的需求是要移除 tags 中的 java,如下:

    POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
    {
        "script":www.ued3zc.cn  {
            "source":"ctx._source[www.jujinyule.com'tags' www.letianhuanchao.cn].removeIf(item -> item == 'java');"
        },
        "query": {
            "term": {
                "userId": 1
            }
        } 
    }

    新增的话只需要将 removeIf 改成 add 就可以了。

    ctx._source['tags'].add('java');

    如果有特殊的业务逻辑,Script 中还可以写判断来判断是否需要修改。

    POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
    {
        "script": {
            "source":"if(ctx._source.type =www.mLgjzc.cn= 11) {ctx._source['tags'].add('java');}"
        },
        "query": {
            "term": {
                "userId": 1
            }
        } 
    }

    封装通用的条件更新

    大部分场景下的更新都比较简单,根据某个字段去更新某个值,或者去更新多个值。在 Java 中如果每个地方都去写脚本,就重复了,最好是抽一个比较通用的方法来更新。

    下面是简单的示列,其中还有很多需要考虑的点,像数据类型我只处理了数字,字符串,和 List,其他的大家需要自己去扩展。

    public BulkByScrollResponse updateByQuery(String index, QueryBuilder query, Map<String, Object> document) {
    UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(index);
    updateByQueryRequest.setQuery(www.youy2zhuce.cn);
    StringBuilder script = new StringBuilder();
    Set<String> keys = document.keySet();
    for (String key : keys) {
    String appendValue = "";
    Object value = document.get(key);
    if (value instanceof Number) {
    appendValue = value.toString(www.leguojizc.cn);
    } else if (value instanceof String) {
    appendValue = "'" + value.toString() + "'";
    } else if (value instanceof List){
    appendValue = JsonUtils.toJson(value);
    } else {
    appendValue = value.toString();
    }
    script.append("ctx._source.").append(www.shangdu2zc.cn).append("=").append(appendValue).append(";");
    }
    updateByQueryRequest.setScript(new Script(script.toString()));
    return updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
    }
    public BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) {
    Map<String, Object> catData = new HashMap<>(1);
    catData.put(ElasticSearchConstant.UPDATE_BY_QUERY_REQUEST, updateByQueryRequest.toString());
    return CatTransactionManager.newTransaction(() -> {
    try {
    return restHighLevelClient.updateByQuery(updateByQueryRequest, options);
    }catch (IOException e) {
    throw new RuntimeException(e);
    }
    }, ElasticSearchConstant.ES_CAT_TYPE, ElasticSearchConstant.UPDATE, catData);
    }

    如果有了这么一个方法,那么使用方式如下:

    @Test
    public void testUpdate5() {
    Map<String, Object> document = new HashMap<>();
    document.put("title", "Java");
    document.put("status", 0);
    document.put("tags", Lists.newArrayList("JS", "CSS"));
    kittyRestHighLevelClient.updateByQuery(elasticSearchIndexConfig.getArticleSaveIndexName(), new TermQueryBuilder("userId", 1), document);
    }

    关于作者:尹吉欢,简单的技术爱好者,《Spring Cloud 微服务-全栈技术与案例解析》, 《Spring Cloud 微服务 入门 实战与进阶》作者, 公众号 猿天地 发起人。

  • 相关阅读:
    PostgreSQL在何处处理 sql查询之六十
    我对 PostgreSQL tidscan的理解
    PostgreSQL在何处处理 sql查询之五十九
    在PostgreSQL中,如何模拟Oracle的hint效果
    PostgreSQL在何处处理 sql查询之六十一
    PostgreSQL在何处处理 sql查询之五十八
    各种JOIN的理解
    对PostgreSQL源代码中的is_pushed_down的理解
    国外主流PHP框架比较
    图解什么是Web2.0
  • 原文地址:https://www.cnblogs.com/woshixiaowang/p/13401197.html
Copyright © 2011-2022 走看看