zoukankan      html  css  js  c++  java
  • elasticsearch 乐观并发控制

    一、参考

    Elasticsearch: 权威指南 处理冲突

    Index API

    Sequence IDs: Coming Soon to an Elasticsearch Cluster Near You

    二、两种并发更新策略

    2.1 悲观并发控制

    这种方法被关系型数据库广泛应用,它假定有变更冲突可能发生,因此阻塞访问资源以防止冲突

    一个典型的例子是,读取一行数据之前先锁住,确保只有放置了锁的线程可以对这行数据进行操作

    2.2 乐观并发控制

    elasticsearch 使用这种方式的并发控制,假定冲突是不可发生的,并且不会阻塞当前正在尝试的操作

    然而,如果源数据在读写过程中被修改了,更新将会失败,应用程序决定接下来如何解决冲突,例如:

    重试更新、使用新的数据、将相关情况报告给用户

    三、乐观并发控制

    elasticsearch 是分布式的,

    当文档创建、更新或者删除时候,新版本的文档必须复制到集群中的其他节点

    elasticsearch 是异步和并发的,

    这意味着这些复制到其他节点的请求,被并发发送,并且到达目的节点的顺序是乱序的

    elasticsearch 需要一种方法确保文档的旧版本不会覆盖新的版本

    即使用 _version保证

    3.1 _version

    每个文档都有一个版本号_version, 当文档被修改的时候,版本号递增,使用该版本号确保变更顺序正确,

    如果旧版本的文档在新版本之后到达,它可以被简单的忽略

    # (1) 创建文档
    PUT yztest/_doc/1
    {
      "f1": 1
    }
    
    # 创建返回值
    {
      "_index" : "yztest",
      "_type" : "_doc",
      "_id" : "1",
      "_version" : 1, # 初始的版本号为1
      "result" : "created",
      "_shards" : {
        "total" : 2,
        "successful" : 2,
        "failed" : 0
      },
      "_seq_no" : 0,
      "_primary_term" : 1
    }
    
    # (2) 更新文档
    POST yztest/_doc/1
    {
      "f1": 11
    }
    
    # 更新的返回值
    {
      "_index" : "yztest",
      "_type" : "_doc",
      "_id" : "1",
      "_version" : 2, # 更新后,版本号递增
      "result" : "updated",
      "_shards" : {
        "total" : 2,
        "successful" : 2,
        "failed" : 0
      },
      "_seq_no" : 1,
      "_primary_term" : 1
    }
    
    # (3) 删除文档
    DELETE yztest/_doc/1
    
    # 删除的返回值
    {
      "_index" : "yztest",
      "_type" : "_doc",
      "_id" : "1",
      "_version" : 3, # 删除后,版本号递增
      "result" : "deleted",
      "_shards" : {
        "total" : 2,
        "successful" : 2,
        "failed" : 0
      },
      "_seq_no" : 2,
      "_primary_term" : 1
    }
    
    # (4) 删除后重新创建相同id的文档
    POST yztest/_doc/1
    {
      "f1": 11
    }
    
    # 重新创建的返回值
    {
      "_index" : "yztest",
      "_type" : "_doc",
      "_id" : "1",
      "_version" : 4, # 版本号递增,而不是为1
      "result" : "created",
      "_shards" : {
        "total" : 2,
        "successful" : 2,
        "failed" : 0
      },
      "_seq_no" : 3,
      "_primary_term" : 1
    }
    
    

    3.2 如何修改版本号?

    refresh 接口,将从最近一个 refresh 到目前所有的操作刷新到缓存中,即 refresh 后,新入的文档可以被查询到,删除、更新(即先删除后添加)的文档被更新

    # 查看refresh interval
    GET yztest/_settings?include_defaults
    
    # 更新refresh interval
    PUT yztest/_settings
    {
      "index" : {
        "refresh_interval" : "300s"
      }
    }
    
    # 更新返回值
    {
      "acknowledged" : true
    }
    
    

    设置 refresh interval 后,查看创建、更新是否可以被发现?

    # (1) 创建新的文档(重建索引并设置refresh interval)
    POST yztest/_doc/1
    {
      "f1": 1
    }
    
    # 创建返回值
    {
      "_index" : "yztest",
      "_type" : "_doc",
      "_id" : "1",
      "_version" : 1, # 新建的索引,新的文档版本号为1
      "result" : "created",
      "_shards" : {
        "total" : 2,
        "successful" : 2,
        "failed" : 0
      },
      "_seq_no" : 0,
      "_primary_term" : 1
    }
    
    # (2) 更新文档
    POST yztest/_doc/1
    {
      "f1": 2
    }
    
    # 更新返回值
    {
      "_index" : "yztest",
      "_type" : "_doc",
      "_id" : "1",
      "_version" : 2, # 新的版本号为2
      "result" : "updated",
      "_shards" : {
        "total" : 2,
        "successful" : 2,
        "failed" : 0
      },
      "_seq_no" : 1,
      "_primary_term" : 1
    }
    
    # (3) 查看是否更新
    GET yztest/_search?version=true
    
    # 查询返回值
    {
      "took" : 0,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 1,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "yztest",
            "_type" : "_doc",
            "_id" : "1",
            "_version" : 1, # 因为还没有refresh, 所以版本号没有更新到2
            "_score" : 1.0,
            "_source" : {
              "f1" : 1
            }
          }
        ]
      }
    }
    
    # (4) refresh
    POST yztest/_refresh
    
    # refresh返回值
    {
      "_shards" : {
        "total" : 2,
        "successful" : 2,
        "failed" : 0
      }
    }
    
    # (5) refresh后查看文档
    GET yztest/_search?version=true
    
    # refresh后,查询的返回值
    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 1,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "yztest",
            "_type" : "_doc",
            "_id" : "1",
            "_version" : 2, # 现在的版本号为最新的版本号
            "_score" : 1.0,
            "_source" : {
              "f1" : 2
            }
          }
        ]
      }
    }
    
    # (6) 更新文档
    
    POST yztest/_doc/1
    {
      "f1": 3
    }
    
    # 更新的返回结果
    {
      "_index" : "yztest",
      "_type" : "_doc",
      "_id" : "1",
      "_version" : 3, # 版本号会递增,即使没有refresh(即查询到的版本号为1)
      "result" : "updated",
      "_shards" : {
        "total" : 2,
        "successful" : 2,
        "failed" : 0
      },
      "_seq_no" : 2,
      "_primary_term" : 1
    }
    
    
    
    
    

    结论:

    文档的版本号,在相同的索引中,一直递增

    refresh 的作用是将文档操作刷新到内存中,即能被检索到(最新的文档),对于版本号没有影响

    3.3 internal version

    # (1) 创建文档,指定版本信息
    POST yztest/_doc/1?version=2
    {
      "f1": 1
    }
    
    # 返回结果,可以发现不能指定 internal类型的version
    {
      "error" : {
        "root_cause" : [
          {
            "type" : "action_request_validation_exception",
            "reason" : "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
          }
        ],
        "type" : "action_request_validation_exception",
        "reason" : "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
      },
      "status" : 400
    }
    
    # (2) 创建文档
    POST yztest/_doc/1
    {
      "f1": 1
    }
    
    # 创建返回值
    {
      "_index" : "yztest",
      "_type" : "_doc",
      "_id" : "1",
      "_version" : 1, # 新的文档
      "result" : "created",
      "_shards" : {
        "total" : 2,
        "successful" : 2,
        "failed" : 0
      },
      "_seq_no" : 0,
      "_primary_term" : 1
    }
    
    

    结论: internal(内部)类型的版本号,无法通过指定特殊值更新文档的版本信息,详见

    Deprecated usage of internal versioning for optimistic concurrency controledit

    3.4 external version

    常用的场景为:

    使用其他的数据库当作主数据库,使用 ES 当作数据检索,这意味着所有主数据库的所有修改发生时候,都需要被复制到 ES,如果有多个进程负责这个数据的同步,可能会遇到并发一致性问题

    如果主数据库中已经有了版本号(一个能作为版本号的字段值或者 timestamp),那么可以通过在 ES 中添加 version_type=external 的方式使用主数据库的版本信息,

    外部版本号是一个  JAVA 中的 long 正值

    # (1) 创建一个新的文档,指定外部版本号为3
    PUT yztest/_doc/1?version=3&version_type=external
    {
      "f1": 1
    }
    
    # 创建的返回值
    {
      "_index" : "yztest",
      "_type" : "_doc",
      "_id" : "1",
      "_version" : 3, # 通过指定外部版本号,设置文档版本号为3
      "result" : "created",
      "_shards" : {
        "total" : 2,
        "successful" : 2,
        "failed" : 0
      },
      "_seq_no" : 0,
      "_primary_term" : 1
    }
    
    
    

    外部版本号的处理方式:

    检查当前的_version 是否小于指定的版本号,而不是检查当前的_version 和请求中指定的版本号是否相同,如果请求成功,指定的外部版本号作为文档的新的_version进行存储

    外部版本号可以出现在下面的操作中:

    (1) 创建新的文档

    (2) 更新文档

    (3) 删除文档

    # (1) 获取文档当前的版本信息
    GET yztest/_search?version=true
    
    # 查询返回值
    {
      "took" : 0,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 1,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "yztest",
            "_type" : "_doc",
            "_id" : "1",
            "_version" : 3, # 文档的版本号为3
            "_score" : 1.0,
            "_source" : {
              "f1" : 1
            }
          }
        ]
      }
    }
    
    # (2) 更新文档,指定新的版本号为5
    POST yztest/_doc/1?version=5&version_type=external
    {
      "f1": 5
    }
    
    # 更新的结果
    {
      "_index" : "yztest",
      "_type" : "_doc",
      "_id" : "1",
      "_version" : 5, # 设置新的版本号为5
      "result" : "updated",
      "_shards" : {
        "total" : 2,
        "successful" : 2,
        "failed" : 0
      },
      "_seq_no" : 1,
      "_primary_term" : 1
    }
    
    

    3.5 版本冲突

    # (1) 查看文档的version
    GET yztest/_search?version=true
    
    # 查询的结果
    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 1,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "yztest",
            "_type" : "_doc",
            "_id" : "1",
            "_version" : 3, # 当前的版本号为3
            "_score" : 1.0,
            "_source" : {
              "f1" : 1
            }
          }
        ]
      }
    }
    
    # (2) 更新为新的版本号
    POST yztest/_doc/1?version=2&version_type=external
    {
      "f1": 2
    }
    
    # 更新报错
    {
      "error" : {
        "root_cause" : [
          {
            "type" : "version_conflict_engine_exception",
            "reason" : "[1]: version conflict, current version [3] is higher or equal to the one provided [2]",
            "index_uuid" : "oDT5p07YRUiVwglKdUyQww",
            "shard" : "0",
            "index" : "yztest"
          }
        ],
        "type" : "version_conflict_engine_exception",
        "reason" : "[1]: version conflict, current version [3] is higher or equal to the one provided [2]",
        "index_uuid" : "oDT5p07YRUiVwglKdUyQww",
        "shard" : "0",
        "index" : "yztest"
      },
      "status" : 409
    }
    
    
    
    

    结论:更新时候,不能指定低于当前文档版本号

  • 相关阅读:
    String类型作为方法的形参
    [转] 为什么说 Java 程序员必须掌握 Spring Boot ?
    Centos打开、关闭、结束tomcat,及查看tomcat运行日志
    centos中iptables和firewall防火墙开启、关闭、查看状态、基本设置等
    防火墙没有关导致外部访问虚拟机的tomcat遇到的问题和解决方法
    可以ping通ip地址,但是访问80,或者8080报错
    JAVA的非对称加密算法RSA——加密和解密
    CA双向认证的时候,如果一开始下载的证书就有问题的,怎么保证以后的交易没有问题?
    图解HTTPS协议加密解密全过程
    https单向认证服务端发送到客户端到底会不会加密?
  • 原文地址:https://www.cnblogs.com/thewindyz/p/14464921.html
Copyright © 2011-2022 走看看