注意:这个演示也是读取数据库中的某个数据表的数据,并不是整个数据库中的所有表的数据
注意:@timestamp时间戳是utc时间,若想换成东八区时间,需要新增其他设置,具体看如下示例
系统配置
- MySQL:5.5.53
- Elasticsearch:7.5.0
- Logstash:7.5.0
- Java:1.8.0_221
- JDBC 输入插件:v4.3.19
- JDBC 连接器:Connector/J 8.0.20
下载连接mysql的驱动包,放到指定目录下
在地址https://dev.mysql.com/downloads/connector/j/下载最新的Connector。下载完这个Connector后,把这个connector存入到Logstash安装目录下的如下子目录中:
logstash-core/lib/jars/
同步步骤整体概览
从概念上讲,Logstash 的 JDBC 输入插件会运行一个循环来定期对 MySQL 进行轮询,从而找出在此次循环的上次迭代后插入或更改的记录。如要让其正确运行,必须满足下列条件:
1.在将 MySQL 中的文档写入 Elasticsearch 时,Elasticsearch 中的 "_id" 字段必须设置为 MySQL 中的 "id" 字段。这可在 MySQL 记录与 Elasticsearch 文档之间建立一个直接映射关系。如果在 MySQL 中更新了某条记录,那么将会在 Elasticsearch 中覆盖整条相关记录。请注意,在 Elasticsearch 中覆盖文档的效率与更新操作的效率一样高,因为从内部原理上来讲,更新便包括删除旧文档以及随后对全新文档进行索引。
2.当在 MySQL 中插入或更新数据时,该条记录必须有一个包含更新或插入时间的字段。通过此字段,便可允许 Logstash 仅请求获得在轮询循环的上次迭代后编辑或插入的文档。Logstash 每次对 MySQL 进行轮询时,都会保存其从 MySQL 所读取最后一条记录的更新或插入时间。在下一次迭代时,Logstash 便知道其仅需请求获得符合下列条件的记录:更新或插入时间晚于在轮询循环中的上一次迭代中所收到的最后一条记录。
第一个条件的意思是把MySQL数据表中的id字段值作为es中索引的_id
的值,这样两者形成一一对应的关系
第二个条件是MySQL数据表中有一个表示当前时间的字段,属性是根据当前时间戳进行更新
MySQL设置
创建数据表
CREATE TABLE `es_table` (
`id` bigint(20) unsigned NOT NULL,
`client_name` varchar(32) NOT NULL,
`modification_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `unique_id` (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
注意:表中id字段不是自增的,当然也可以设置成自增;modification_time字段即时上一步所说的第二个条件。
- es_table:这是 MySQL 数据表的名称,数据会从这里读取出来并同步到 Elasticsearch。
- id:这是该条记录的唯一标识符。请注意 “id” 已被定义为 PRIMARY KEY(主键)和 UNIQUE KEY(唯一键)。这能确保每个 “id” 仅在当前表格中出现一次。其将会转换为 “_id”,以用于更新 Elasticsearch 中的文档及向 Elasticsearch 中插入文档。
这个即是用来满足上一步所说的第一个条件 - client_name:此字段表示在每条记录中所存储的用户定义数据。可以添加多个,为了演示使用就这使用这一个
- modification_time:在 MySQL 中插入或更改任何记录时,都会将这个所定义字段的值设置为编辑时间。有了这个编辑时间,我们便能提取自从上次 Logstash 请求从 MySQL 获取记录后被编辑的任何记录。
这个即是用来满足上一步所说的第二个条件
MySQL 操作
增加数据:INSERT INTO es_table (id, client_name) VALUES (<id>, <client name>);
更新数据:UPDATE es_table SET client_name = <new client name> WHERE id=<id>;
更新/插入:INSERT INTO es_table (id, client_name) VALUES (<id>, <client name when created> ON DUPLICATE KEY UPDATE client_name=<client name when updated>;
同步代码
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.0.145:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_driver_library => ""
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
}
}
filter {
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs","timestamp"]
}
}
output {
elasticsearch {
hosts => ["192.168.75.21:9200"]
index => "rdbms_sync_idx"
document_id => "%{[@metadata][_id]}"
user => "elastic"
password => "GmSjOkL8Pz8IwKJfWgLT"
}
}
字段详解:
- tracking_column:此字段会指定 “unix_ts_in_secs” 字段(用于跟踪 Logstash 从 MySQL 读取的最后一个文档),其存储在 .logstash_jdbc_last_run 中的磁盘上。该值将会用来确定 Logstash 在其轮询循环的下一次迭代中所请求文档的起始值。在 .logstash_jdbc_last_run 中所存储的值可以作为 “:sql_last_value” 通过 SELECT 语句进行访问。
- unix_ts_in_secs:这是一个由上述 SELECT 语句生成的字段,包含可作为标准 Unix 时间戳(自 Epoch 起秒数)的 “modification_time”。我们刚讨论的 “tracking column” 会引用该字段。Unix 时间戳用于跟踪进度,而非作为简单的时间戳;如将其作为简单时间戳,可能会导致错误,因为在 UMT 和本地时区之间正确地来回转换是一个十分复杂的过程。
- sql_last_value:这是一个内置参数,包括 Logstash 轮询循环中当前迭代的起始点,上面 JDBC 输入配置中的 SELECT 语句便会引用这一参数。该字段会设置为 “unix_ts_in_secs”(读取自 .logstash_jdbc_last_run)的最新值。在 Logstash 轮询循环内所执行的 MySQL 查询中,其会用作所返回文档的起点。通过在查询中加入这一变量,能够确保不会将之前传播到 Elasticsearch 的插入或更新内容重新发送到 Elasticsearch。
- schedule:其会使用 cron 语法来指定 Logstash 应当以什么频率对 MySQL 进行轮询以查找变更。这里所指定的 "*/5 * * * * *" 会告诉 Logstash 每 5 秒钟联系一次 MySQL。
- modification_time < NOW():用来解决从 MySQL 读取数据时丢失数据或者会从 MySQL 读取冗余数据
- filter:将 MySQL 记录中的 “id” 值复制到名为 “_id” 的元数据字段,因为我们之后输出时会引用这一字段,以确保写入 Elasticsearch 的每个文档都有正确的 “_id” 值。通过使用元数据字段,可以确保这一临时值不会导致创建新的字段。我们还从文档中删除了 “id”、“@version” 和 “unix_ts_in_secs” 字段,因为我们不希望将这些字段写入到 Elasticsearch 中。
- output:指定每个文档都应当写入 Elasticsearch,指定索引名,还需为其分配一个 “_id”(需从我们在筛选部分所创建的元数据字段提取出来)。
测试
新增三条数据:
INSERT INTO es_table (id, client_name) VALUES (1, 'Jim Carrey');
INSERT INTO es_table (id, client_name) VALUES (2, 'Mike Myers');
INSERT INTO es_table (id, client_name) VALUES (3, 'Bryan Adams');
查询:GET rdbms_sync_idx/_search
使用下列命令更新在 MySQL 中对应至 _id=1 的文档:
UPDATE es_table SET client_name = 'Jimbo Kerry' WHERE id=1;
通过运行下列命令直接查看 Elasticsearch 中的文档:
GET rdbms_sync_idx/_doc/1
请注意 _version 现已设置为 2,并且 client_name 字段已正确更新至新值。在本例中,@timestamp 字段的用处并不大,由 Logstash 默认添加。
MySQL 中的更新/插入 (upsert) 可通过下列命令完成,您可以验证正确信息是否会反映在 Elasticsearch 中:
INSERT INTO es_table (id, client_name) VALUES (4, 'Bob is new') ON DUPLICATE KEY UPDATE client_name='Bob exists already';
在Elasticsearch中,查看索引rdbms_sync_idx:
GET rdbms_sync_idx/_search
删除数据
如果从 MySQL 中删除一个文档,那么这一删除操作并不会传播到 Elasticsearch。可以考虑通过下列方法来解决这一问题:
1.MySQL 记录可以包含一个 "is_deleted" 字段,用来显示该条记录是否仍有效。这一方法被称为“软删除”。正如对 MySQL 中的记录进行其他更新一样,"is_deleted" 字段将会通过 Logstash 传播至 Elasticsearch。如果实施这一方法,则需要编写 Elasticsearch 和 MySQL 查询,从而将 "is_deleted" 为 “true”(正)的记录/文档排除在外。 最后,可以通过后台作业来从 MySQL 和 Elastic 中移除此类文档。
2.另一种方法是确保负责从 MySQL 中删除记录的任何系统随后也会执行一条命令,从而直接从 Elasticsearch 中删除相应文档。