前言
Logstash一般用来收集整理日志,但是也可以做数据的同步
我希望我的数据库的数据全部存到ElasticSearch之后,我的数据库做的增删改查都可以增量的更新到ElasticSearch里面
Logstash配置文件
数据库驱动
首先,下载JDBC,我使用的是SQLserver,所以直接搜索
Microsoft SQL Server JDBC
然后下载即可,我目前是放在了Logstash的bin文件目录下
配置文件编写:看看就行,重点在多表同步配置
还记得安装篇,我写的运行Logstash吗?命令是这样的,在bin目录下执行
.logstash -e 'input { stdin { } } output { stdout {} }'
注意到input和output了吧,一个是输入,一个是输出,也就是说Logstash的启动,得需要一个配置文件,这个配置文件有输入和输出.这就好办了,输入源有很多,我这里使用SQLserver,输出呢也有很多,我这里介绍两个
配置文件,读取SQLserver数据库导出到txt和ElasticSearch
input {
jdbc {
jdbc_driver_library => "D:VaeElasticSearchlogstash-7.6.2logstash-7.6.2injdbcconfigmssql-jdbc-8.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
jdbc_user => "sa"
jdbc_password => "666666"
# clean_run => true #开启了sql_last_value就从0开始了
schedule => "* * * * *"
statement => "select Id,Name as name,CreateDate as createDate from Solution where SolutionId > :sql_last_value"
lowercase_column_names => false
use_column_value => true
tracking_column => "solutionid" #注意,这里必须全部小写
}
}
output {
file{
path => "D:Desktopoutput.txt"
}
# elasticsearch {
# hosts => ["192.168.3.8:9200"]
# index => "solution"
# }
}
我必须讲解一下这个配置文件,因为有坑,我踩了
首先,我希望读取数据库一张表的数据,导出到txt文本,我希望数据库有新加入的数据的时候,能增量的给我导出到txt里面去
所以我们查数据库的时候,导出的做一个记录,比如Id,下次我再导出的时候,只导出Id比上一次大的即可,但是使用Id作为增量有一个问题,就是我数据库更新了以前的数据,logstash不知道,所以增量的判断字段必须得是时间,这个在多表更新的那里讲
输入源解释
- jdbc_driver_library : 没啥说的,数据库驱动,也可以不写死路径,可以配置在环境变量里,不过我懒得弄了
- jdbc_driver_class : 没啥说的,驱动类别
- jdbc_connection_string : 你的数据库地址
- jdbc_user : 数据库用户名
- jdbc_password : 数据库密码
- clean_run : 值为true的话sql_last_value就从0开始了,sql_last_value下面讲
- schedule : 更新频率,五个星星代表 分 时 天 月 年,例如"* * * * *"就是每分钟, " * /1 * * " 就是每小时, "/10 * * * *" /10 是每十分钟 不加斜杠 10 是每小时的第10分,默认是最小单位是分,也就是全部5个就是每分钟执行一次,其实也可以秒级执行的,这样写
schedule => "*/5 * * * * *"
只要在前面再加一个* 单位就是秒,这里就是每5s执行一次
- use_column_value : 是否开启增量字段
- tracking_column : 指定增量字段,一般情况下都是表的Id或者创建时间这俩字段,但是我推荐使用时间字段,而且必须要注意,Logstash默认把数据库字段全部转成小写了,所以我们得关闭,使用lowercase_column_names => false
- lowercase_column_names => false 关闭默认的小写
- statement : 数据库查询语句,也可以写到sql文件里面,读取文件,不过简单的话直接写语句挺好的,这里做了一个增量查询,我每查询一次,就把我的增量字段赋值给JDBC自带的sql_last_value变量,所以我查询的时候大于sql_last_value就实现了增量更新了
现在知道clean_run 的作用了吧,自己测试的时候,测了几次,sql_last_value变的很大了,又想从头测起,就把sql_last_value清0即可
在正式服务器上使用的时候,clean_run 关了
输出解释
file{
path => "D:Desktopoutput.txt"
}
这个就是导出到txt
elasticsearch {
hosts => ["192.168.100.100:9200"]
index => "solution"
}
这个就是导出到ElasticSearch,Index是solution
启动
还是来到bin目录下,启动命令框,输入
logstash -f jdbcconfig/jdbc.conf
我在bin目录下新建了一个文件夹叫jdbcconfig,所以换成你们自己的文件夹名.我的配置文件叫jdbc.conf,同样也换成你们自己的
多表同步 : 重要,非常重要
这一块是重点,我踩了很多的坑,找了很多的资料,尝试了很多次,终于找到了我想要的解决方案
上面讲的配置是单表的,但是实际应用的时候都是多表导入到ElasticSearch的,这个时候有两种方法,我目前已知的2种
- 多写几个配置文件,多启动几次
- 一个配置文件里写多个输入源和多个输出
input {
jdbc {
jdbc_driver_library => "D:VaeElasticSearchlogstash-7.6.2logstash-7.6.2injdbcconfigmssql-jdbc-8.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
jdbc_user => "sa"
jdbc_password => "666666"
schedule => "* * * * *"
clean_run => true
statement => "select ArticleID as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS updateDate from Article where ApproveState = 1 and UpdateDate > :sql_last_value"
use_column_value => true
tracking_column => "updateDate"
tracking_column_type => "timestamp"
type => "article"
}
jdbc {
jdbc_driver_library => "D:VaeElasticSearchlogstash-7.6.2logstash-7.6.2injdbcconfigmssql-jdbc-8.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
jdbc_user => "sa"
jdbc_password => "666666"
schedule => "* * * * *"
clean_run => true
statement => "select SolutionId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate,Tags,ClickCount from Solution where UpdateDate > :sql_last_value"
use_column_value => true
tracking_column => "updatedate"
tracking_column_type => "timestamp"
type => "solution"
}
jdbc {
jdbc_driver_library => "D:VaeElasticSearchlogstash-7.6.2logstash-7.6.2injdbcconfigmssql-jdbc-8.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
jdbc_user => "sa"
jdbc_password => "666666"
schedule => "* * * * *"
# clean_run => true
statement => "select FileId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate from DownloadFile where ApproveState = 1 and UpdateDate > :sql_last_value"
use_column_value => true
tracking_column => "updateDate"
tracking_column_type => "timestamp"
type => "downloadfile"
}
jdbc {
jdbc_driver_library => "D:VaeElasticSearchlogstash-7.6.2logstash-7.6.2injdbcconfigmssql-jdbc-8.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
jdbc_user => "sa"
jdbc_password => "666666"
schedule => "* * * * *"
clean_run => true
statement => "select VideoId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate from Video where UpdateDate > :sql_last_value"
use_column_value => true
tracking_column => "updateDate"
tracking_column_type => "timestamp"
type => "video"
}
}
filter {
mutate {
add_field => {
"[@metadata][articleid]" => "%{articleid}"
}
add_field => {
"[@metadata][solutionid]" => "%{solutionid}"
}
add_field => {
"[@metadata][fileid]" => "%{fileid}"
}
add_field => {
"[@metadata][videoid]" => "%{videoid}"
}
}
}
output {
# stdout {
# codec => json_lines
# }
if [type] == "article"{
elasticsearch {
hosts => "192.168.100.100:9200"
index => "article"
action => "index"
document_id => "%{[@metadata][articleid]}"
}
}
if [type] == "solution"{
elasticsearch {
hosts => "192.168.100.100:9200"
index => "solution"
action => "index"
document_id => "%{[@metadata][solutionid]}"
}
}
if [type] == "downloadfile"{
elasticsearch {
hosts => "192.168.100.100:9200"
index => "downloadfile"
action => "index"
document_id => "%{[@metadata][fileid]}"
}
}
if [type] == "video"{
elasticsearch {
hosts => "192.168.100.100:9200"
index => "video"
action => "index"
document_id => "%{[@metadata][videoid]}"
}
}
}
配置讲解
如果是Id就这样写
use_column_value => true
tracking_column => "videoid"
如果是时间就这样写,多了一个时间类型
use_column_value => true
tracking_column => "updatedate"
tracking_column_type => "timestamp"
想要做增量的更新,包括插入和修改,就最好使用一个updateDate字段
filter是干嘛的?
这一块我现在还很模糊,不过我现在会使用的是定义变量,我定义一个变量,使用
filter {
mutate {
add_field => {
"[@metadata][videoid]" => "%{videoid}"
}
}
}
这个意思就是,定义了一个[@metadata][videoid]的变量,值是上面的查询的表里面的videoid
然后我下边的output里面就可以直接"%{[@metadata][videoid]}"来调用了
所以目前我所知的filter作用,就是变量沟通input和output,当然还有其他的,我还没看
sql语句
我input里面写的sql语句,都是把列名都写出来了,这样好,可以重命名,一定要注意,数据库里面的type字段一定要重命名,不然会冲突
而且时间我都转了一下
CONVERT (VARCHAR (30),UpdateDate,25) AS updateDate
因为网上看到有很多时区的问题,所以我直接转成字符串就解决时区的问题了吧
坑
数据库Type字段问题
数据库字段不能有type,因为Logstash的配置文件使用了type,巨坑,所以数据库里面的type字段自己重命名
数据库字段必须驼峰命名
我数据查询的时候,记得一定要重命名成驼峰的格式,不然ES不能映射给Model,例如
select VideoId as Id,Name as name,Title as title,CreateDate as createDate from ...
注意,单个单词的全部小写,CreateDate这种两个大写字母的,驼峰命名,前面小写,后面大写
如果你实在不想使用驼峰命名,每个字段去as一次,也有其他办法,就是在你接收的Model上加上 [Text(Name = "CreateDate")]也行
类型错误
expected:'String Begin Token', actual:'10', at offset:597
expected:'String Begin Token', actual:'22', at offset:332
这种错都一样,都是类型的错误,比如ES里面是long,但是你的Model接收的是string,我遇到的情况是我的数据ID,有的是int类型的,有的是string类型的,我都用一个string类型的Id去接收,这就报错了