zoukankan      html  css  js  c++  java
  • 如何使用 Logstash 和 JDBC 确保 Elasticsearch 与关系型数据库保持同步

    MySQL 设置

        CREATE DATABASE es_db;
        USE es_db;
        DROP TABLE IF EXISTS es_table;
        CREATE TABLE es_table (
          id BIGINT(20) UNSIGNED NOT NULL,
          PRIMARY KEY (id),
          UNIQUE KEY unique_id (id),
          client_name VARCHAR(32) NOT NULL,
          modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
          insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
        );

    在上面的 MySQL 配置中,有几个参数需要特别注意:

    • es_table这是 MySQL 数据表的名称,数据会从这里读取出来并同步到 Elasticsearch。
    • id:这是该条记录的唯一标识符。请注意 “id” 已被定义为 PRIMARY KEY(主键)和 UNIQUE KEY(唯一键)。这能确保每个 “id” 仅在当前表格中出现一次。其将会转换为 “_id”,以用于更新 Elasticsearch 中的文档及向 Elasticsearch 中插入文档。
    • client_name此字段表示在每条记录中所存储的用户定义数据。在本篇博文中,为简单起见,我们只有一个包含用户定义数据的字段,但您可以轻松添加更多字段。我们要更改的就是这个字段,从而向大家演示不仅新插入的 MySQL 记录被复制到了 Elasticsearch 中,而且更新的记录也被正确传播到了 Elasticsearch 中。
    • modification_time在 MySQL 中插入或更改任何记录时,都会将这个所定义字段的值设置为编辑时间。有了这个编辑时间,我们便能提取自从上次 Logstash 请求从 MySQL 获取记录后被编辑的任何记录。
    • insertion_time此字段主要用于演示目的,并非正确进行同步需满足的严格必要条件。我们用其来跟踪记录最初插入到 MySQL 中的时间。

     MySQL 操作

     完成上述配置后,可以通过下列语句向 MySQL 中写入记录:

    INSERT INTO es_table (id, client_name) VALUES (<id>, <client name>);

    可以通过下列命令更新 MySQL 中的记录:

    UPDATE es_table SET client_name = <new client name> WHERE id=<id>;

    可以通过下列语句完成 MySQL 更新/插入操作 (upsert):

    INSERT INTO es_table (id, client_name) VALUES (<id>, <client name when created> ON DUPLICATE KEY UPDATE client_name=<client name when updated>;

    同步代码

    下列 Logstash 管道会实施在前一部分中所描述的同步代码:

    input {
      jdbc {
        jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
        jdbc_user => <my username>
        jdbc_password => <my password>
        jdbc_paging_enabled => true
        tracking_column => "unix_ts_in_secs"
        use_column_value => true
        tracking_column_type => "numeric"
        schedule => "*/5 * * * * *"
        statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
      }
    }
    filter {
      mutate {
        copy => { "id" => "[@metadata][_id]"}
        remove_field => ["id", "@version", "unix_ts_in_secs"]
      }
    }
    output {
      # stdout { codec =>  "rubydebug"}
      elasticsearch {
          index => "rdbms_sync_idx"
          document_id => "%{[@metadata][_id]}"
      }
    }

    如果大家不是很熟悉如何配置jdbc input来访问MySQL,请参阅我之前的文章“Logstash:把MySQL数据导入到Elasticsearch中”。

    在上述管道中,应该重点强调几个区域:

    • tracking_column此字段会指定 “unix_ts_in_secs” 字段(用于跟踪 Logstash 从 MySQL 读取的最后一个文档,下面会进行描述),其存储在 .logstash_jdbc_last_run 中的磁盘上。该值将会用来确定 Logstash 在其轮询循环的下一次迭代中所请求文档的起始值。在 .logstash_jdbc_last_run 中所存储的值可以作为 “:sql_last_value” 通过 SELECT 语句进行访问。
    • unix_ts_in_secs这是一个由上述 SELECT 语句生成的字段,包含可作为标准 Unix 时间戳(自 Epoch 起秒数)的 “modification_time”。我们刚讨论的 “tracking column” 会引用该字段。Unix 时间戳用于跟踪进度,而非作为简单的时间戳;如将其作为简单时间戳,可能会导致错误,因为在 UMT 和本地时区之间正确地来回转换是一个十分复杂的过程。
    • sql_last_value这是一个内置参数,包括 Logstash 轮询循环中当前迭代的起始点,上面 JDBC 输入配置中的 SELECT 语句便会引用这一参数。该字段会设置为 “unix_ts_in_secs”(读取自 .logstash_jdbc_last_run)的最新值。在 Logstash 轮询循环内所执行的 MySQL 查询中,其会用作所返回文档的起点。通过在查询中加入这一变量,能够确保不会将之前传播到 Elasticsearch 的插入或更新内容重新发送到 Elasticsearch。
    • schedule其会使用 cron 语法来指定 Logstash 应当以什么频率对 MySQL 进行轮询以查找变更。这里所指定的 "*/5 * * * * *" 会告诉 Logstash 每 5 秒钟联系一次 MySQL。
    • modification_time < NOW():SELECT 中的这一部分是一个较难解释的概念,我们会在下一部分详加解释。
    • filter:在这一部分,我们只需简单地将 MySQL 记录中的 “id” 值复制到名为 “_id” 的元数据字段,因为我们之后输出时会引用这一字段,以确保写入 Elasticsearch 的每个文档都有正确的 “_id” 值。通过使用元数据字段,可以确保这一临时值不会导致创建新的字段。我们还从文档中删除了 “id”、“@version” 和 “unix_ts_in_secs” 字段,因为我们不希望将这些字段写入到 Elasticsearch 中。
    • output在这一部分,我们指定每个文档都应当写入 Elasticsearch,还需为其分配一个 “_id”(需从我们在筛选部分所创建的元数据字段提取出来)。还会有一个包含被注释掉代码的 rubydebug 输出,启用此输出后能够帮助您进行故障排查。

     转载自:https://blog.csdn.net/UbuntuTouch/article/details/103874185

  • 相关阅读:
    从Objective-C到Swift,你必须会的(四)DLog
    从Objective-C到Swift,你必须会的(三)init的顺序
    从Objective-C到Swift,你必须会的(二)组合options
    从Objective-C到Swift,你必须会的(一)#pragma mark
    swift的可选值(optional)
    一个通用的DataGridView导出Excel扩展方法(支持列数据格式化)
    C#定时执行一个操作
    用Dictionary替换switch case
    Web Service 通过BinaryFormatter序列化和反序列化泛型List
    开源.NET FTP组件edtFTPnet 用法
  • 原文地址:https://www.cnblogs.com/fat-girl-spring/p/14303372.html
Copyright © 2011-2022 走看看