zoukankan      html  css  js  c++  java
  • Elasticsearch:应用 Nodejs 来访问 Elasticsearch【转载】

    Nodejs Elasticsearch 教程示例是今天的主要主题。 Elasticsearch 是一个开源搜索引擎,由于其高性能和分布式架构而变得非常流行。 Elasticsearch构建于 Apache Lucene 之上,后者是一个高性能的文本搜索引擎库。 虽然 Elasticsearch 可以执行数据的存储和检索,但其主要目的不是用作数据库。 相反,它是一个搜索引擎(服务器),其主要目标是索引,搜索和提供有关数据的实时统计信息。

    在本教程中,我们将与 Node.js 集成并使用它来索引和搜索数据。 导入数据后,它立即可用于搜索。 Elasticsearch 是 schema-less,可以将数据存储在 JSON 文档中,并且可以自动检测数据结构及其类型。

    Elasticsearch 也完全由 API 驱动。 这意味着几乎所有操作都可以通过使用 HTTP上 的 JSON 数据的简单 RESTful API 完成。 它有几乎所有编程语言的客户端库,包括 JavaScript。 在此示例中,我们将使用 官方客户端库

    第一步

    请按照文章 “如何在Linux,MacOS及Windows上进行安装Elasticsearch” 进行安装 Elasticsearch。针对 Mac 用户,你也可以使用如下的命令来安装 Elasticsearch:

    $ brew install elasticsearch
    复制代码

    我们可以在 Mac 上用如下的方式运行 Elasticsearch:

    $ brew services start elasticsearch
    复制代码

    我们按照那篇文章介绍的那样,把 Elasticsearch 服务器运行起来。

    第二步

    我们在我们的电脑上创建一个目录,比如说:

    $ mkdir node-elastic
    $ cd node-elastic
    复制代码

    在这个目录中,我们可以通过如下的命令来创建我们的 nodejs 项目的 package.json 文件。

    $ npm init
    复制代码

    我通过选默认的选项

    {
      "name": "elastic",
      "version": "1.0.0",
      "description": "",
      "main": "index.js",
      "scripts": {
        "start": "node index.js"
      },
      "author": "",
      "license": "ISC",
      "dependencies": {
      }
    复制代码

    这里,我对 scripts 部分做了一点小的修改。这样我们就可以通过 npm start 来运行我们的 index.js 文件。

    就像上面显示的那样,我们来创建一个叫做 index.js 的文件。它的内容如下:

    // index.js
    
    const express = require('express');
    const elasticsearch = require('elasticsearch');
    const fs = require('fs');
    const app = express();
    
    const PORT = 5000;
    
    const client = new elasticsearch.Client({
        host: '127.0.0.1:9200',
        log: 'error'
     });
    
    app.listen(PORT, function() {
        console.log('Server is running on PORT:',PORT);
    });
    复制代码

    在我们的上面的代码中我们需要 express 及 elasticsearch 两个模块。我们需要通过的如下的方法来进行安装:

    $ npm install express --save
    $ npm install elasticsearch --save
    复制代码

    通过 --save 选项,我们把我们需要的模块存于到我们当前的目录中。

    我们可以通过如下的方法来运行我们的应用:

    $ node index.js
    复制代码

    或者:

    $ npm start
    复制代码

    如果我们的 index.js 运行成功的话,它将在 PORT:5000 口地址上。而我们的 Elasticsearch 服务器运行在 PORT:9200 上。显示的结果如下:

    Server is running on PORT: 5000
    复制代码

    创建一个样本数据

    我们需要创建一个样本数据。这个数据将被读入进来并存于 Elasticsearch 之中。一旦数据被录入到 Elasticsearch 中,我们就可以对它们进行搜索。我们在项目的根目录中创建一个叫做 data.json 的文件,并把如下的内容输入进去:

    [
      {
        "id": "2",
        "user" : "双榆树-张三",
        "message" : "今儿天气不错啊,出去转转去",
        "uid" : 2,
        "age" : 20,
        "city" : "北京",
        "province" : "北京",
        "country" : "中国",
        "address" : "中国北京市海淀区",
        "location" : {
            "lat" : "39.970718",
            "lon" : "116.325747"
          }
      },
      {
        "id": "3",
        "user" : "东城区-老刘",
        "message" : "出发,下一站云南!",
        "uid" : 3,
        "age" : 30,
        "city" : "北京",
        "province" : "北京",
        "country" : "中国",
        "address" : "中国北京市东城区台基厂三条3号",
        "location" : {
          "lat" : "39.904313",
          "lon" : "116.412754"
        }
      },
      {
        "id": "4",
        "user" : "东城区-李四",
        "message" : "happy birthday!",
        "uid" : 4,
        "age" : 30,
        "city" : "北京",
        "province" : "北京",
        "country" : "中国",
        "address" : "中国北京市东城区",
        "location" : {
          "lat" : "39.893801",
          "lon" : "116.408986"
        }
      },
      {
        "id": "7",
        "user" : "虹桥-老吴",
        "message" : "好友来了都今天我生日,好友来了,什么 birthday happy 就成!",
        "uid" : 7,
        "age" : 90,
        "city" : "上海",
        "province" : "上海",
        "country" : "中国",
        "address" : "中国上海市闵行区",
        "location" : {
          "lat" : "31.175927",
          "lon" : "121.383328"
        }
      }
    ]
    复制代码

    它是一个由4个元素组成的数组。 因此,当我们索引数据时,如果我们正确地索引它,那么数据的长度是4,我们将证明它。

    在实际的应用中,这些数据可能来自实时数据或数据等各种数据源。

    把样本数据加入进来进行 indexing

    所有的数据在加入到 Elasticsearch 中都需要进行一项 indexing 的工作。我们继续修改我们的 index.js 文件如下:

    const express = require('express');
    var elasticsearch = require('elasticsearch');
    const fs = require('fs');
    const app = express();
    
    const PORT = 5000;
    
    const client = new elasticsearch.Client({
        host: '127.0.0.1:9200',
        log: 'error'
    });
    
    client.ping({ requestTimeout: 30000 }, function(error) {
        if (error) {
            console.error('elasticsearch cluster is down!');
        } else {
            console.log('Everything is ok');
        }
    });
    
    
    const bulkIndex = function bulkIndex(index, type, data) {
        let bulkBody = [];
    
        data.forEach(item => {
            bulkBody.push({
                index: {
                    _index: index,
                    _type: type,
                    _id: item.id
                }
            });
    
            bulkBody.push(item);
        });
    
        client.bulk({body: bulkBody})
            .then(response => {
                let errorCount = 0;
                response.items.forEach(item => {
                    if (item.index && item.index.error) {
                        console.log(++errorCount, item.index.error);
                    }
                });
                console.log(
                    `Successfully indexed ${data.length - errorCount}
             out of ${data.length} items`
                );
            })
            .catch(console.err);
    };
    
    async function indexData() {
        const twittersRaw = await fs.readFileSync('./data.json');
        const twitters = JSON.parse(twittersRaw);
        console.log(`${twitters.length} items parsed from data file`);
        bulkIndex('twitter2', '_doc', twitters);
    };
    
    indexData();
    
    app.listen(PORT, function() {
        console.log('Server is running on PORT:',PORT);
    });
    复制代码

    这里,我们可以看到在 indexData() 方法里,我们首先通过 readFileSync 来把我们的 data.json 中的数据读入进来。通过 JSON.parse 把数据转化为 Javascript 的 object。最终,我们通过 bulkIndex 来引用 elasticsearch 的 API 来完成数据的录入。就像我们之前所说的那样,这个数据在实际的应用中可能来自其它的种类的数据源。如果大家想了解什么是 bulk API,可以参照我之前的教程 “开始使用Elasticsearch (1)”。它可以很快地将很多文档在一起通过一次的 REST 接口调用,并完成数据的 indexing。

    验证录入的数据

    在上面的一节中,我们已经通过 bulk API 接口把我们的数据录入到系统中了。接下来我们看看如何通过 elasticsearch 的方法来把我们的录入的数据读出来。我们在当前目录创建一个新的文件 verify.js

    // verify.js
    
    const elasticsearch = require('elasticsearch');
    
    const client = new elasticsearch.Client({
        host: '127.0.0.1:9200',
        log: 'error'
     });
    
    
    function indices() {
        return client.cat.indices({v: true})
        .then(console.log)
        .catch(err => console.error(`Error connecting to the es client: ${err}`));
      }
    
    module.exports = function verify() {
        console.log(`elasticsearch indices information:`);
        indices();
    }
    复制代码

    在这里,我们通过 client.cat.indices 接口来显示所有目前的 index。为了使用刚才建立的 verify.js 文件,我们需要更进一步修改我们之前的 index.js:

    const express = require('express');
    var elasticsearch = require('elasticsearch');
    const fs = require('fs');
    const app = express();
    
    const PORT = 5000;
    const verify = require('./verify');
    
    const client = new elasticsearch.Client({
        host: '127.0.0.1:9200',
        log: 'error'
    });
    
    client.ping({ requestTimeout: 30000 }, function(error) {
        if (error) {
            console.error('elasticsearch cluster is down!');
        } else {
            console.log('Everything is ok');
        }
    });
    
    
    const bulkIndex = function bulkIndex(index, type, data) {
        let bulkBody = [];
    
        data.forEach(item => {
            bulkBody.push({
                index: {
                    _index: index,
                    _type: type,
                    _id: item.id
                }
            });
    
            bulkBody.push(item);
        });
    
        client.bulk({body: bulkBody})
            .then(response => {
                let errorCount = 0;
                response.items.forEach(item => {
                    if (item.index && item.index.error) {
                        console.log(++errorCount, item.index.error);
                    }
                });
                console.log(
                    `Successfully indexed ${data.length - errorCount}
             out of ${data.length} items`
                );
            })
            .catch(console.err);
    };
    
    async function indexData() {
        const twittersRaw = await fs.readFileSync('./data.json');
        const twitters = JSON.parse(twittersRaw);
        console.log(`${twitters.length} items parsed from data file`);
        bulkIndex('twitter2', '_doc', twitters);
    };
    
    indexData();
    verify();
    
    app.listen(PORT, function() {
        console.log('Server is running on PORT:',PORT);
    });
    复制代码

    注意前面的

    const verify = require('./verify');
    复制代码

    及后面的 verify() 调用

    ...
    indexData();
    verify();
    ...
    复制代码

    我们重新运行我们的 index.js。显示的结果如下:

    我们可以看到有4个文档被导入到 Elasticsearch,并有一个叫做 twitter2 的 index 被创建。这意味着我们已经成功地把数据录入到 Elasticsearch 中。在接下来的章节中,我们来搜索我们其中的数据。

    搜索 Elasticsearch 索引中的数据

    首先,我们来搜索所有的 twitter 文档。我们在项目的目录中创建一个新的文件叫做 search.js。它的内容如下:

    // search.js
    
    const elasticsearch = require('elasticsearch');
    
    const client = new elasticsearch.Client({
        host: '127.0.0.1:9200',
        log: 'error'
    });
    
    const search = function search(index, body) {
        return client.search({index: index, body: body});
    };
    
    module.exports =  function searchData() {
        let body = {
            size: 4,
            from: 0,
            query: {
            match_all: {}
            }
        };
    
        search('twitter2', body)
            .then(results => {
                console.log(`found ${results.hits.total} items in ${results.took}ms`);
                console.log(`returned twitters:`);
                results.hits.hits.forEach(
                    (hit, index) => console.log(
                        hit._source
                    )
                )
            })
            .catch(console.error);
    };
    复制代码

    这是一个崭新的模块。因为我们已经为我们的数据进行了索引,我只需要把数据从 Elasticsearch 中读出来。我们对 search() 方法传入了两个参数:

    1. index 名称
    2. 查询的 DSL body

    如果大家对 DSL 不是很熟的话,请参阅我之前的教程“开始使用Elasticsearch (2)”。这里的 body 含有一个我们需要返回的 size 已经进行查询所需要的 query 语句。

    为了使得我们的search.js能够被使用,我们更进一步修改我们的 index.js 文件:

    const express = require('express');
    var elasticsearch = require('elasticsearch');
    const fs = require('fs');
    const app = express();
    
    const PORT = 5000;
    const verify = require('./verify');
    const searchData = require('./search');
    
    const client = new elasticsearch.Client({
        host: '127.0.0.1:9200',
        log: 'error'
    });
    
    client.ping({ requestTimeout: 30000 }, function(error) {
        if (error) {
            console.error('elasticsearch cluster is down!');
        } else {
            console.log('Everything is ok');
        }
    });
    
    
    const bulkIndex = function bulkIndex(index, type, data) {
        let bulkBody = [];
    
        data.forEach(item => {
            bulkBody.push({
                index: {
                    _index: index,
                    _type: type,
                    _id: item.id
                }
            });
    
            bulkBody.push(item);
        });
    
        client.bulk({body: bulkBody})
            .then(response => {
                let errorCount = 0;
                response.items.forEach(item => {
                    if (item.index && item.index.error) {
                        console.log(++errorCount, item.index.error);
                    }
                });
                console.log(
                    `Successfully indexed ${data.length - errorCount}
             out of ${data.length} items`
                );
            })
            .catch(console.err);
    };
    
    async function indexData() {
        const twittersRaw = await fs.readFileSync('./data.json');
        const twitters = JSON.parse(twittersRaw);
        console.log(`${twitters.length} items parsed from data file`);
        bulkIndex('twitter2', '_doc', twitters);
    };
    
    // indexData();
    // verify();
    searchData();
    
    app.listen(PORT, function() {
        console.log('Server is running on PORT:',PORT);
    });
    复制代码

    请注意文件开头的部分:

    const searchData = require('./search');
    复制代码

    以及下面部分的修改:

    ...
    // indexData();
    // verify();
    searchData();
    ...
    复制代码

    这次运行时,我们只做 search 的动作,所以把之前的 index 及 verify 部分代码注释掉。存下我们的代码,并重新运行我们的应用:

    returned twitters:
    {
      id: '2',
      user: '双榆树-张三',
      message: '今儿天气不错啊,出去转转去',
      uid: 2,
      age: 20,
      city: '北京',
      province: '北京',
      country: '中国',
      address: '中国北京市海淀区',
      location: { lat: '39.970718', lon: '116.325747' }
    }
    {
      id: '3',
      user: '东城区-老刘',
      message: '出发,下一站云南!',
      uid: 3,
      age: 30,
      city: '北京',
      province: '北京',
      country: '中国',
      address: '中国北京市东城区台基厂三条3号',
      location: { lat: '39.904313', lon: '116.412754' }
    }
    {
      id: '4',
      user: '东城区-李四',
      message: 'happy birthday!',
      uid: 4,
      age: 30,
      city: '北京',
      province: '北京',
      country: '中国',
      address: '中国北京市东城区',
      location: { lat: '39.893801', lon: '116.408986' }
    }
    {
      id: '7',
      user: '虹桥-老吴',
      message: '好友来了都今天我生日,好友来了,什么 birthday happy 就成!',
      uid: 7,
      age: 90,
      city: '上海',
      province: '上海',
      country: '中国',
      address: '中国上海市闵行区',
      location: { lat: '31.175927', lon: '121.383328' }
    }
    复制代码

    搜索特定术语

    我们在项目的根目录下创建一个新的文件 search_term.js,并且加入如下的代码:

    // search_term.js
    
    const elasticsearch = require('elasticsearch');
    
    const client = new elasticsearch.Client({
        host: '127.0.0.1:9200',
        log: 'error'
    });
    
    const search = function search(index, body) {
        return client.search({index: index, body: body});
    };
    
    module.exports = function searchTerm() {
        let body = {
            size: 4,
            from: 0,
            query: {
                match: {
                    city: {
                        query: '北京'
                    }
                }
            }
        };
    
        console.log(`retrieving documents whose twitter matches '${body.query.match.city.query}' (displaying ${body.size} items at a time)...`);
        search('twitter2', body)
            .then(results => {
                console.log(`found ${results.hits.total} items in ${results.took}ms`);
                if (results.hits.total > 0) console.log(`returned twitters:`);
                results.hits.hits.forEach(hit => console.log(hit._source));
            })
            .catch(console.error);
    };
    复制代码

    在这里,我们使用如下的 query:

    query: {
            match: {
              city: {
                query: '北京'
              }
            }
          }
    复制代码

    我寻找所有城市是 “北京” 的所有的文档。

    同样,我们需要对我们的 index.js 做相应的改动:

    const express = require('express');
    var elasticsearch = require('elasticsearch');
    const fs = require('fs');
    const app = express();
    
    const PORT = 5000;
    const verify = require('./verify');
    const searchData = require('./search');
    const searctTerm = require('./search_term');
    
    const client = new elasticsearch.Client({
        host: '127.0.0.1:9200',
        log: 'error'
    });
    
    client.ping({ requestTimeout: 30000 }, function(error) {
        if (error) {
            console.error('elasticsearch cluster is down!');
        } else {
            console.log('Everything is ok');
        }
    });
    
    
    const bulkIndex = function bulkIndex(index, type, data) {
        let bulkBody = [];
    
        data.forEach(item => {
            bulkBody.push({
                index: {
                    _index: index,
                    _type: type,
                    _id: item.id
                }
            });
    
            bulkBody.push(item);
        });
    
        client.bulk({body: bulkBody})
            .then(response => {
                let errorCount = 0;
                response.items.forEach(item => {
                    if (item.index && item.index.error) {
                        console.log(++errorCount, item.index.error);
                    }
                });
                console.log(
                    `Successfully indexed ${data.length - errorCount}
             out of ${data.length} items`
                );
            })
            .catch(console.err);
    };
    
    async function indexData() {
        const twittersRaw = await fs.readFileSync('./data.json');
        const twitters = JSON.parse(twittersRaw);
        console.log(`${twitters.length} items parsed from data file`);
        bulkIndex('twitter2', '_doc', twitters);
    };
    
    // indexData();
    // verify();
    // searchData();
    searctTerm();
    
    app.listen(PORT, function() {
        console.log('Server is running on PORT:',PORT);
    });
    复制代码

    请注意在 index.js 的前面的这行代码:

    const searctTerm = require('./search_term');
    复制代码

    以及后面的这个部分:

    ...
    // indexData();
    // verify();
    // searchData();
    searctTerm();
    ...
    复制代码

    运行我们的应用,我们可以看到如下的结果:

    Found [object Object] items in 3ms
    {
      id: '2',
      user: '双榆树-张三',
      message: '今儿天气不错啊,出去转转去',
      uid: 2,
      age: 20,
      city: '北京',
      province: '北京',
      country: '中国',
      address: '中国北京市海淀区',
      location: { lat: '39.970718', lon: '116.325747' }
    }
    {
      id: '3',
      user: '东城区-老刘',
      message: '出发,下一站云南!',
      uid: 3,
      age: 30,
      city: '北京',
      province: '北京',
      country: '中国',
      address: '中国北京市东城区台基厂三条3号',
      location: { lat: '39.904313', lon: '116.412754' }
    }
    {
      id: '4',
      user: '东城区-李四',
      message: 'happy birthday!',
      uid: 4,
      age: 30,
      city: '北京',
      province: '北京',
      country: '中国',
      address: '中国北京市东城区',
      location: { lat: '39.893801', lon: '116.408986' }
    }
    复制代码

    在这里我们看到 city 为北京的所有的文档。其中的一个上海的文档是不在这里面。

    到这里,我们已经完成了所有的这个教程。如果你想了解更多关于 Elastic Stack 的知识,请参阅我们的官方网站:www.elastic.co/guide/index…

    这个教程的源码可以在地址找到:github.com/liu-xiao-gu…


    作者:Elasticsearch
    链接:https://juejin.cn/post/6959743781658705956
    来源:稀土掘金
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
  • 相关阅读:
    python——numpy (二)
    python——numpy(一)
    python——matplotlib
    redis
    图片验证码识别技术——Tesseraact
    Linux 环境变量PROMPT_COMMAND
    maven项目管理工具
    Log4J日志组件
    java中的泛型,注解
    数据库备份还原
  • 原文地址:https://www.cnblogs.com/xiaozhumaopao/p/15409842.html
Copyright © 2011-2022 走看看