zoukankan      html  css  js  c++  java
  • ElasticSearch7.x系列三:Logstash的使用

    前言

    Logstash一般用来收集整理日志,但是也可以做数据的同步

    我希望我的数据库的数据全部存到ElasticSearch之后,我的数据库做的增删改查都可以增量的更新到ElasticSearch里面

    Logstash配置文件

    数据库驱动

    首先,下载JDBC,我使用的是SQLserver,所以直接搜索

    Microsoft SQL Server JDBC

    然后下载即可,我目前是放在了Logstash的bin文件目录下

    配置文件编写:看看就行,重点在多表同步配置

    还记得安装篇,我写的运行Logstash吗?命令是这样的,在bin目录下执行

    .logstash -e 'input { stdin { } } output { stdout {} }'
    

    注意到input和output了吧,一个是输入,一个是输出,也就是说Logstash的启动,得需要一个配置文件,这个配置文件有输入和输出.这就好办了,输入源有很多,我这里使用SQLserver,输出呢也有很多,我这里介绍两个

    配置文件,读取SQLserver数据库导出到txt和ElasticSearch

    input {
        jdbc {
          jdbc_driver_library => "D:VaeElasticSearchlogstash-7.6.2logstash-7.6.2injdbcconfigmssql-jdbc-8.2.2.jre8.jar"
          jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
          jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
          jdbc_user => "sa"
          jdbc_password => "666666"
           # clean_run => true  #开启了sql_last_value就从0开始了
          schedule => "* * * * *"
          statement => "select Id,Name as name,CreateDate as createDate from Solution where SolutionId > :sql_last_value"
    	  lowercase_column_names => false
          use_column_value => true
          tracking_column => "solutionid"   #注意,这里必须全部小写
        }
    }
    
    output { 
        file{
          path => "D:Desktopoutput.txt"
        }
        
        # elasticsearch {
        #       hosts => ["192.168.3.8:9200"]
        #       index => "solution"
        # }
    }
    

    我必须讲解一下这个配置文件,因为有坑,我踩了

    首先,我希望读取数据库一张表的数据,导出到txt文本,我希望数据库有新加入的数据的时候,能增量的给我导出到txt里面去

    所以我们查数据库的时候,导出的做一个记录,比如Id,下次我再导出的时候,只导出Id比上一次大的即可,但是使用Id作为增量有一个问题,就是我数据库更新了以前的数据,logstash不知道,所以增量的判断字段必须得是时间,这个在多表更新的那里讲

    输入源解释

    1. jdbc_driver_library : 没啥说的,数据库驱动,也可以不写死路径,可以配置在环境变量里,不过我懒得弄了
    2. jdbc_driver_class : 没啥说的,驱动类别
    3. jdbc_connection_string : 你的数据库地址
    4. jdbc_user : 数据库用户名
    5. jdbc_password : 数据库密码
    6. clean_run : 值为true的话sql_last_value就从0开始了,sql_last_value下面讲
    7. schedule : 更新频率,五个星星代表 分 时 天 月 年,例如"* * * * *"就是每分钟, " * /1 * * " 就是每小时, "/10 * * * *" /10 是每十分钟 不加斜杠 10 是每小时的第10分,默认是最小单位是分,也就是全部5个就是每分钟执行一次,其实也可以秒级执行的,这样写
     schedule => "*/5 * * * * *"
    

    只要在前面再加一个* 单位就是秒,这里就是每5s执行一次

    1. use_column_value : 是否开启增量字段
    2. tracking_column : 指定增量字段,一般情况下都是表的Id或者创建时间这俩字段,但是我推荐使用时间字段,而且必须要注意,Logstash默认把数据库字段全部转成小写了,所以我们得关闭,使用lowercase_column_names => false
    3. lowercase_column_names => false 关闭默认的小写
    4. statement : 数据库查询语句,也可以写到sql文件里面,读取文件,不过简单的话直接写语句挺好的,这里做了一个增量查询,我每查询一次,就把我的增量字段赋值给JDBC自带的sql_last_value变量,所以我查询的时候大于sql_last_value就实现了增量更新了

    现在知道clean_run 的作用了吧,自己测试的时候,测了几次,sql_last_value变的很大了,又想从头测起,就把sql_last_value清0即可
    在正式服务器上使用的时候,clean_run 关了

    输出解释

    file{
      path => "D:Desktopoutput.txt"
    }
    

    这个就是导出到txt

    elasticsearch {
        hosts => ["192.168.100.100:9200"]
        index => "solution"
    }
    

    这个就是导出到ElasticSearch,Index是solution

    启动

    还是来到bin目录下,启动命令框,输入

    logstash -f jdbcconfig/jdbc.conf
    

    我在bin目录下新建了一个文件夹叫jdbcconfig,所以换成你们自己的文件夹名.我的配置文件叫jdbc.conf,同样也换成你们自己的

    多表同步 : 重要,非常重要

    这一块是重点,我踩了很多的坑,找了很多的资料,尝试了很多次,终于找到了我想要的解决方案

    上面讲的配置是单表的,但是实际应用的时候都是多表导入到ElasticSearch的,这个时候有两种方法,我目前已知的2种

    1. 多写几个配置文件,多启动几次
    2. 一个配置文件里写多个输入源和多个输出
    input {
        jdbc {
          jdbc_driver_library => "D:VaeElasticSearchlogstash-7.6.2logstash-7.6.2injdbcconfigmssql-jdbc-8.2.2.jre8.jar"
          jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
          jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
          jdbc_user => "sa"
          jdbc_password => "666666"
          schedule => "* * * * *"
          clean_run => true
          statement => "select ArticleID as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS updateDate from Article where ApproveState = 1 and UpdateDate > :sql_last_value"
          use_column_value => true
          tracking_column => "updateDate"
          tracking_column_type => "timestamp"
          type => "article"
        }
        jdbc {
          jdbc_driver_library => "D:VaeElasticSearchlogstash-7.6.2logstash-7.6.2injdbcconfigmssql-jdbc-8.2.2.jre8.jar"
          jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
          jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
          jdbc_user => "sa"
          jdbc_password => "666666"
          schedule => "* * * * *"
          clean_run => true
          statement => "select SolutionId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate,Tags,ClickCount from Solution where UpdateDate > :sql_last_value"
          use_column_value => true
          tracking_column => "updatedate"
          tracking_column_type => "timestamp"
          type => "solution"
        }
        jdbc {
          jdbc_driver_library => "D:VaeElasticSearchlogstash-7.6.2logstash-7.6.2injdbcconfigmssql-jdbc-8.2.2.jre8.jar"
          jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
          jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
          jdbc_user => "sa"
          jdbc_password => "666666"
          schedule => "* * * * *"
          # clean_run => true
          statement => "select FileId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate from DownloadFile where ApproveState = 1 and UpdateDate > :sql_last_value"
          use_column_value => true
          tracking_column => "updateDate"
          tracking_column_type => "timestamp"
          type => "downloadfile"
        }
        jdbc {
          jdbc_driver_library => "D:VaeElasticSearchlogstash-7.6.2logstash-7.6.2injdbcconfigmssql-jdbc-8.2.2.jre8.jar"
          jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
          jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
          jdbc_user => "sa"
          jdbc_password => "666666"
          schedule => "* * * * *"
          clean_run => true
          statement => "select VideoId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate from Video where UpdateDate > :sql_last_value"
          use_column_value => true
          tracking_column => "updateDate"
          tracking_column_type => "timestamp"
          type => "video"
        }
    }
    
    filter {
        mutate {
                add_field => {
                        "[@metadata][articleid]" => "%{articleid}"
                }
                add_field => {
                        "[@metadata][solutionid]" => "%{solutionid}"
                }
                add_field => {
                        "[@metadata][fileid]" => "%{fileid}"
                }
                add_field => {
                        "[@metadata][videoid]" => "%{videoid}"
                }
        }  
    }
    
    output {
      # stdout {
      #     codec => json_lines
      # }
     if [type] == "article"{
          elasticsearch {
            hosts  => "192.168.100.100:9200"
            index => "article"
            action => "index"
            document_id => "%{[@metadata][articleid]}"
          }
      }
      if [type] == "solution"{
          elasticsearch {
            hosts  => "192.168.100.100:9200"
            index => "solution"
            action => "index"
            document_id => "%{[@metadata][solutionid]}"
          }
      }
        if [type] == "downloadfile"{
          elasticsearch {
            hosts  => "192.168.100.100:9200"
            index => "downloadfile"
            action => "index"
            document_id => "%{[@metadata][fileid]}"
          }
      }
      if [type] == "video"{
          elasticsearch {
            hosts  => "192.168.100.100:9200"
            index => "video"
            action => "index"
            document_id => "%{[@metadata][videoid]}"
          }
      }
    
    }
    

    配置讲解

    如果是Id就这样写

    use_column_value => true
    tracking_column => "videoid"
    

    如果是时间就这样写,多了一个时间类型

    use_column_value => true
    tracking_column => "updatedate"
    tracking_column_type => "timestamp"
    

    想要做增量的更新,包括插入和修改,就最好使用一个updateDate字段

    filter是干嘛的?

    这一块我现在还很模糊,不过我现在会使用的是定义变量,我定义一个变量,使用

    filter {
        mutate {
          add_field => {
                  "[@metadata][videoid]" => "%{videoid}"
          }
        }
    }
    

    这个意思就是,定义了一个[@metadata][videoid]的变量,值是上面的查询的表里面的videoid

    然后我下边的output里面就可以直接"%{[@metadata][videoid]}"来调用了

    所以目前我所知的filter作用,就是变量沟通input和output,当然还有其他的,我还没看

    sql语句

    我input里面写的sql语句,都是把列名都写出来了,这样好,可以重命名,一定要注意,数据库里面的type字段一定要重命名,不然会冲突

    而且时间我都转了一下

    CONVERT (VARCHAR (30),UpdateDate,25) AS updateDate
    

    因为网上看到有很多时区的问题,所以我直接转成字符串就解决时区的问题了吧

    数据库Type字段问题

    数据库字段不能有type,因为Logstash的配置文件使用了type,巨坑,所以数据库里面的type字段自己重命名

    数据库字段必须驼峰命名

    我数据查询的时候,记得一定要重命名成驼峰的格式,不然ES不能映射给Model,例如

    select VideoId as Id,Name as name,Title as title,CreateDate as createDate from ...
    

    注意,单个单词的全部小写,CreateDate这种两个大写字母的,驼峰命名,前面小写,后面大写

    如果你实在不想使用驼峰命名,每个字段去as一次,也有其他办法,就是在你接收的Model上加上 [Text(Name = "CreateDate")]也行

    类型错误

    expected:'String Begin Token', actual:'10', at offset:597
    expected:'String Begin Token', actual:'22', at offset:332

    这种错都一样,都是类型的错误,比如ES里面是long,但是你的Model接收的是string,我遇到的情况是我的数据ID,有的是int类型的,有的是string类型的,我都用一个string类型的Id去接收,这就报错了

  • 相关阅读:
    洛谷 P1032 子串变换
    RCTF 2017 easyre153
    SUCTF 2016 : dMd
    南邮 base64全家桶
    洛谷 P1908 逆序对
    2019中山大学程序设计竞赛 Triangle
    WhiteHat Contest 11 : re1100
    P1010 幂次方
    洛谷 P1088 火星人
    南邮 骚年来一发吗
  • 原文地址:https://www.cnblogs.com/yunquan/p/12934445.html
Copyright © 2011-2022 走看看