说明
本篇笔记部分案例使用ElasticSearch官方教程案例。
什么是文档?
ElasticSearch是面向文档的,它可以存储整个对象或文档,并且索引每个文档的内容使其可以被搜索。其使用Json作为文档序列化格式,Json目前已经被大多语言支持,并且成为NoSQL领域的标准格式,具有简洁、易读的特点。如下:
{
"name": "John Smith",
"age": 42,
"confirmed": true,
"join_date": "2014-06-01",
"home": {
"lat": 51.5,
"lon": 0.1
},
"accounts": [
{
"type": "facebook",
"id": "johnsmith"
},
{
"type": "twitter",
"id": "johnsmith"
}
]
}
在大多数应用中,实体与对象多可以被序列化为包含键值对的JSON对象。在 Elasticsearch 中,文档 有着特定的含义。它是指最顶层或者根对象, 这个根对象被序列化成 JSON 并存储到 Elasticsearch 中,指定了唯一 ID。
文档元数据
一个文档不仅仅包含它的数据,也包含了元数据,ElasticSearch中有三个较为重要的元素分别是:
元素名 | 含义 |
---|---|
_index | 文档在哪保存 |
_type | 文档表示的对象类别 |
_id | 文档的唯一标识 |
如果你在Elasticsearch中搜索过数据的话,应该对这三个元素不陌生,下面是我在我的数据中取的案例:
{
"_index" : "index_name",
"_type" : "_doc",
"_id" : "11",
"_score" : 1.0,
"_source" : {
"tags" : [
"gamegroup"
],
"update_time" : 1607156354,
"@timestamp" : "2021-02-18T03:40:01.235Z",
"id" : 11,
"name" : "对马岛之魂",
"status" : 0,
"create_time" : 1603073095,
"@version" : "1",
}
},
上面是我使用get在Elasticsearch返回数据中截取的一个文档数据。
_index:
一个 索引 应该是因共同的特性被分组到一起的文档集合。 例如,你可能存储所有的产品在索引 products 中,而存储所有销售的交易到索引 sales 中。 虽然也允许存储不相关的数据到一个索引中,但这通常看作是一个反模式的做法。
_type:
数据可能在索引中只是松散的组合在一起,但是通常明确定义一些数据中的子分区是很有用的。 例如,所有的产品都放在一个索引中,但是你有许多不同的产品类别,比如 "electronics" 、 "kitchen" 和 "lawn-care"。
这些文档共享一种相同的(或非常相似)的模式:他们有一个标题、描述、产品代码和价格。他们只是正好属于“产品”下的一些子类。
_id:
ID 是一个字符串,当它和 _index 以及 _type 组合就可以唯一确定 Elasticsearch 中的一个文档。 当你创建一个新的文档,要么提供自己的 _id ,要么让 Elasticsearch 帮你生成。
创建文档(索引文档)
前面说到,_index、_type、_id可以唯一确定Elasticsearch中的一个文档,同时_id可以自己指定,也可以由Elasticsearch自动生成。(自动生成的话是 URL-safe、 基于 Base64 编码且长度为20个字符的 GUID 字符串。 这些 GUID 字符串由可修改的 FlakeID 模式生成,这种模式允许多个节点并行生成唯一 ID ,且互相之间的冲突概率几乎为零。)
下面介绍一种创建文档的方法:
PUT /{index}/{type}/{id}
{
"key": "value",
...
}
采用官方文档案例进行说明,如果我们的索引称为 website ,类型称为 blog ,并且选择 123 作为 ID ,那么索引请求应该是下面这样(其中的title、text、date就是文档保存的内容):
PUT /website/blog/123
{
"title": "My first blog entry",
"text": "Just trying this out...",
"date": "2014/01/01"
}
Elasticsearch的响应体如下:
{
"_index": "website",
"_type": "blog",
"_id": "123",
"_version": 1,
"created": true
}
如果你不自己指定_id的话,可以不指定,如下(Elasticsearch返回的_id将会是由其自动生成的):
POST /website/blog/
{
"title": "My second blog entry",
"text": "Still trying this out...",
"date": "2014/01/01"
}
获取文档
前面介绍了创建文档的方法,这里介绍获取文档信息的方法。
我们依然使用_index、_type、_id组成的url:
GET /index_name/_doc/11
返回结果:
{
"_index" : "index_name",
"_type" : "_doc",
"_id" : "11",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"tags" : [
"gamegroup"
],
"update_time" : 1607156354,
"@timestamp" : "2021-02-18T03:40:01.235Z",
"id" : 11,
"name" : "对马岛之魂",
"status" : 0,
"create_time" : 1603073095,
"@version" : "1"
}
}
如果你指定的数据,无法找到相应的文档的话,将会返回:
{
"_index" : "index_name",
"_type" : "_doc",
"_id" : "11",
"found" : false
}
指定返回元素:
从上面可以看到,Elasticsearch会默认发挥文档中的所有元素,但是在实际的应用中可能很少有这种需要获取到所有元素的情况,所以我们需要指定返回的元素:
如只需要获取_source的数据:
GET /index_name/_doc/11/_source
返回结果:
{
"tags" : [
"gamegroup"
],
"update_time" : 1607156354,
"@timestamp" : "2021-02-18T03:40:01.235Z",
"id" : 11,
"name" : "对马岛之魂",
"status" : 0,
"create_time" : 1603073095,
"@version" : "1"
}
如_source中我们只需要获取id与name信息的话:
GET /index_name/_doc/11?_source=id,name
返回结果:
{
"_index" : "index_name",
"_type" : "_doc",
"_id" : "11",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"name" : "对马岛之魂",
"id" : 11
}
}
注意:
高版本的Elasticsearch可能会收到一条warning信息,如下:
#! Deprecation: [types removal] Specifying types in document get requests is deprecated, use the /{index}/_doc/{id} endpoint instead.
# !在文档get请求中指定类型已被弃用,使用/{index}/_doc/{id}端点代替。
检测文档存在
在某一些场景中,我们可能只需要知道文档是否存在即可,而不需要获取文档内容,这时可以使用HEAD方法,如下:
HEAD /index_name/_doc/11
返回结果(文档存在,状态码200):
200 - OK
返回结果(文档不存在,状态码404):
{
"statusCode":404,
"error":"Not Found",
"message":"404 - Not Found"
}
更新文档
Elasticicsearch提供了文档更新的功能,但是实际在Elasticsearch中文档是不可改变的,它的更新只是创建了一个新的文档。
如下:
PUT /website/blog/123
{
"title": "My first blog entry",
"text": "I am starting to get the hang of this...",
"date": "2014/01/02"
}
返回结果:
{
"_index" : "website",
"_type" : "blog",
"_id" : "123",
"_version" : 2,
"created": false
}
相比一开始的创建文档返回结果,_version信息从1变成了2,created字段从true变成了false。在内部,Elasticsearch 已将旧文档标记为已删除,并增加一个全新的文档。 尽管你不能再对旧版本的文档进行访问,但它并不会立即消失。当继续索引更多的数据,Elasticsearch 会在后台清理这些已删除文档。
创建新文档
前面介绍了创建文档和更新文档,但是存在一个问题,就是创建与更新都是使用的相同的方法。如果我们需要的业务是没有才需要创建的呢?Elasticsearch官方提供了以下两种方法:
第一种方法使用 op_type 查询-字符串参数:
PUT /website/blog/123?op_type=create
{ ... }
第二种方法是在 URL 末端使用 /_create :
PUT /website/blog/123/_create
{ ... }
如果创建成功的话,Elasticsearch将会返回201状态码。不然的话,将会返回409错误码以及错误信息,如:
{
"error": {
"root_cause": [
{
"type": "document_already_exists_exception",
"reason": "[blog][123]: document already exists",
"shard": "0",
"index": "website"
}
],
"type": "document_already_exists_exception",
"reason": "[blog][123]: document already exists",
"shard": "0",
"index": "website"
},
"status": 409
}
删除文档
删除文档的方法也特别的简单,如下:
DELETE /website/blog/123
删除成功的话,将会返回:
{
"found" : true,
"_index" : "website",
"_type" : "blog",
"_id" : "123",
"_version" : 3
}
其中的found为true,表示查找到了这个文档,并且删除。
如果删除失败的话,将会返回404状态码以及如下消息体:
{
"found" : false,
"_index" : "website",
"_type" : "blog",
"_id" : "123",
"_version" : 4
}
注意:
即使文档不存在( Found 是 false ), _version 值仍然会增加。这是 Elasticsearch 内部记录本的一部分,用来确保这些改变在跨多节点时以正确的顺序执行。
处理冲突
这里说的冲突主要是并发情况下出现的变更丢失问题,我们更新文档时是读取原始文档,进行修改,再重新索引新的文档。如果我们遇到了高并发的抢购场景,就很有可能会出现如下情况:
这样就是造成实际卖出了2个,但是你的库存只减少了1个。这样会导致用户抢到了但是没货发,这可就是十分严重得问题了,在数据库领域,有两种方法通常被用来确保并发时的数据可靠性:
悲观并发控制:
这种方法被关系型数据库广泛使用,它假定有变更冲突可能发生,因此阻塞访问资源以防止冲突。 一个典型的例子是读取一行数据之前先将其锁住,确保只有放置锁的线程能够对这行数据进行修改。
乐观并发控制:
Elasticsearch 中使用的这种方法假定冲突是不可能发生的,并且不会阻塞正在尝试的操作。 然而,如果源数据在读写当中被修改,更新将会失败。应用程序接下来将决定该如何解决冲突。 例如,可以重试更新、使用新的数据、或者将相关情况报告给用户。
乐观并发控制
Elasticsearch 是分布式的。当文档创建、更新或删除时, 新版本的文档必须复制到集群中的其他节点。Elasticsearch 也是异步和并发的,这意味着这些复制请求被并行发送,并且到达目的地时也许 顺序是乱的 。 Elasticsearch 需要一种方法确保文档的旧版本不会覆盖新的版本。
前面介绍到了Elasticsearch在每一次修改的时候都会添加_version的值,Elasticsearch也就是使用这个 _version 号来确保变更以正确顺序得到执行。如果旧版本的文档在新版本之后到达,它可以被简单的忽略,但是好像这个并不能解决前面说的并发问题。
Elasticsearch在执行更新操作的时候,可以在请求中指定操作的版本号,如下:
PUT /website/blog/1?version=1
{
"title": "My first blog entry",
"text": "Starting to get the hang of this..."
}
这样的话,只有在当前版本为1的情况下,才会更新成功,返回:
{
"_index": "website",
"_type": "blog",
"_id": "1",
"_version": 2
"created": false
}
不然的话,将会发挥失败信息:
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[blog][1]: version conflict, current [2], provided [1]",
"index": "website",
"shard": "3"
}
],
"type": "version_conflict_engine_exception",
"reason": "[blog][1]: version conflict, current [2], provided [1]",
"index": "website",
"shard": "3"
},
"status": 409
}
这告诉我们在 Elasticsearch 中这个文档的当前 _version 号是 2 ,但我们指定的更新版本号为 1 。这种方法可以在代码中使用乐观的并发控制,这是一种明智的做法。
除此之外,还可以通过外部系统使用版本控制。
PUT /website/blog/2?version=5&version_type=external
{
"title": "My first external blog entry",
"text": "Starting to get the hang of this..."
}
外部版本号的处理方式和我们之前讨论的内部版本号的处理方式有些不同, Elasticsearch 不是检查当前 _version 和请求中指定的版本号是否相同, 而是检查当前 _version 是否 小于 指定的版本号。 如果请求成功,外部的版本号作为文档的新 _version 进行存储。
通过外部系统使用版本控制-官方文档链接
文档部分更新
前面介绍了文档更新的方法,但是很多时候我们其实都是只需要更新文档中的一部分数据,这样的话我们可以使用如下方法(url添加_update):
POST /website/blog/1/_update
{
"doc" : {
"tags" : [ "testing" ],
"views": 0
}
}
这种请求,它只是与现有的文档进行合并。对象被合并到一起,覆盖现有的字段,增加新的字段。
同时也可以使用脚本部分来进行文档的更新(将views的值+1):
POST /website/blog/1/_update
{
"script" : "ctx._source.views+=1"
}
更为复杂一点,我们可以自己在请求中添加参数:
POST /website/blog/1/_update
{
"script" : "ctx._source.tags+=new_tag",
"params" : {
"new_tag" : "search"
}
}
我们甚至可以选择通过设置 ctx.op 为 delete 来删除基于其内容的文档:
POST /website/blog/1/_update
{
"script" : "ctx.op = ctx._source.views == count ? 'delete' : 'none'",
"params" : {
"count": 1
}
}
文档不存在
有一些时候,可能更新时文档不存在的话,需要新创建。比如我们需要保存每一个页面的浏览量,那如果这个网页是一个新的网页的话,Elasticsearch应该是没它的数据的。
在这样的情况下,我们可以使用 upsert 参数,指定如果文档不存在就应该先创建它:
POST /website/pageviews/1/_update
{
"script" : "ctx._source.views+=1",
"upsert": {
"views": 1
}
}
我们第一次运行这个请求时, upsert 值作为新文档被索引,初始化 views 字段为 1 。 在后续的运行中,由于文档已经存在, script 更新操作将替代 upsert 进行应用,对 views 计数器进行累加。
注意:
更新操作时,为了避免数据丢失, update API 在 检索 步骤时检索得到文档当前的 _version 号,并传递版本号到 重建索引 步骤的 index 请求。 如果另一个进程修改了处于检索和重新索引步骤之间的文档,那么 _version 号将不匹配,更新请求将会失败。
对于部分更新的很多使用场景,文档已经被改变也没有关系。我们可以设置重试次数,让它失败之后自动重试,如下:
POST /website/pageviews/1/_update?retry_on_conflict=5
{
"script" : "ctx._source.views+=1",
"upsert": {
"views": 0
}
}
这样的话,将在失败之后重试5次,5次都失败了才返回失败。
批量获取
Elasticsearch的查询速度很快,如果我们需要获取多个文档,分次获取的话将会在网络上花销掉较多的时间,如果你需要从 Elasticsearch 检索很多文档,那么使用 multi-get 或者 mget API 来将这些检索请求放在一个请求中,将比逐个文档请求更快地检索到全部文档。
mget API 要求有一个 docs 数组作为参数,每个元素包含需要检索文档的元数据, 包括 _index 、 _type 和 _id 。如果你想检索一个或者多个特定的字段,那么你可以通过 _source 参数来指定这些字段的名字
GET /_mget
{
"docs" : [
{
"_index" : "website",
"_type" : "blog",
"_id" : 2
},
{
"_index" : "website",
"_type" : "pageviews",
"_id" : 1,
"_source": "views"
}
]
}
返回结果:
{
"docs" : [
{
"_index" : "website",
"_id" : "2",
"_type" : "blog",
"found" : true,
"_source" : {
"text" : "This is a piece of cake...",
"title" : "My first external blog entry"
},
"_version" : 10
},
{
"_index" : "website",
"_id" : "1",
"_type" : "pageviews",
"found" : true,
"_version" : 2,
"_source" : {
"views" : 2
}
}
]
}
如果你想要查询的文档都在同一个_index,甚至在相同的_type中,可以直接在url中指定_index与_type:
GET /website/blog/_mget
{
"ids" : [ "2", "1" ]
}
也可以在请求体内部指定_type:
GET /website/blog/_mget
{
"docs" : [
{ "_id" : 2 },
{ "_type" : "pageviews", "_id" : 1 }
]
}
如果我们查找的文档中有一些不存在的话,返回情况如下:
{
"docs" : [
{
"_index" : "website",
"_type" : "blog",
"_id" : "2",
"_version" : 10,
"found" : true,
"_source" : {
"title": "My first external blog entry",
"text": "This is a piece of cake..."
}
},
{
"_index" : "website",
"_type" : "blog",
"_id" : "1",
"found" : false
}
]
}
注意
任何文档未能找到并不妨碍其它文档被检索到。每个文档都是单独检索和报告的。即使请求 没有 找到任何文档,它的状态码依然是 200 --因为 mget 请求本身已经成功执行。 为了确定某个文档查找是成功或者失败,你需要检查 found 标记。
批量操作
正如mget可以批量获取一般,bulk API 允许在单个步骤中进行多次 create 、 index 、 update 或 delete 请求。 如果你需要索引一个数据流比如日志事件,它可以排队和索引数百或数千批次。其语句格式如下:
{ action: { metadata }}
{ request body }
{ action: { metadata }}
{ request body }
...
注意
每行一定要以换行符( )结尾, 包括最后一行 。这些换行符被用作一个标记,可以有效分隔行。
这些行不能包含未转义的换行符,因为他们将会对解析造成干扰。这意味着这个 JSON 不 能使用 pretty 参数打印。
格式解析
action/metadata行指定哪一个文档做什么操作。action 必须是以下选项之一:
create:如果文档不存在,那么就创建它。
index:创建一个新文档或者替换一个现有的文档。
update:部分更新一个文档。
delete:删除一个文档。
下面贴上一个官方案例:
POST /_bulk
{ "delete": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "create": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title": "My first blog post" }
{ "index": { "_index": "website", "_type": "blog" }}
{ "title": "My second blog post" }
{ "update": { "_index": "website", "_type": "blog", "_id": "123", "_retry_on_conflict" : 3} }
{ "doc" : {"title" : "My updated blog post"} }
注意
delete 动作不能有请求体,它后面跟着的是另外一个操作。
贴上一个官方的返回案例:
{
"took": 3,
"errors": true,
"items": [
{ "create": {
"_index": "website",
"_type": "blog",
"_id": "123",
"status": 409,
"error": "DocumentAlreadyExistsException
[[website][4] [blog][123]:
document already exists]"
}},
{ "index": {
"_index": "website",
"_type": "blog",
"_id": "123",
"_version": 5,
"status": 200
}}
]
}
注意
这也意味着 bulk 请求不是原子的: 不能用它来实现事务控制。每个请求是单独处理的,因此一个请求的成功或失败不会影响其他的请求。
和其它请求一样,bulk请求也接受url中的_index与_type参数:
POST /website/log/_bulk
{ "index": {}}
{ "event": "User logged in" }
{ "index": { "_type": "blog" }}
{ "title": "Overriding the default type" }
批量操作需要注意的一点:一个好的批量操作在开始处理后占有物理大小应该在5MB-15MB。