zoukankan      html  css  js  c++  java
  • SQL数据同步到ElasticSearch(三)- 使用Logstash+LastModifyTime同步数据

    在系列开篇,我提到了四种将SQL SERVER数据同步到ES中的方案,本文将采用最简单的一种方案,即使用LastModifyTime来追踪DB中在最近一段时间发生了变更的数据。

    安装Java

    安装部分的官方文档在这里:https://www.elastic.co/guide/en/logstash/current/installing-logstash.html

    可以直接查看官方文档。

    我这里使用的还是之前文章中所述的CentOS来进行安装。

    首先需要安装Java(万物源于Java)

    输入命令找到的OpenJDK 1.8.X版本(截止我尝试时,在Java11上会有问题):

    yum search java | grep -i --color JDK

    使用Yum进行安装:

    yum install java-1.8.0-openjdk

    配置环境变量JAVA_HOME、CLASSPATH、PATH。

    打开/etc/profile文件:

    vi /etc/profile

    将下面几行代码粘贴到该文件的最后:

    --这句要自己到/usr/lib/jvm下面找对应的目录,不能直接copy
    export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64/ 
    export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
    export PATH=$PATH:$JAVA_HOME/bin

    保存并关闭,然后执行下列命令让设置立即生效。

    source /etc/profile

    可以输入下面的命令查看是否已生效:

    java –-version
    echo $JAVA_HOME
    echo $CLASSPATH
    echo $PATH

    安装LogStash

    首先注册ELK官方的GPG-KEY:

    然后cd /etc/yum.repos.d/文件夹下,创建一个logstash.repo文件,并将下面一段内容粘贴到该文件中保存:

    [logstash-7.x]
    name=Elastic repository for 7.x packages
    baseurl=https://artifacts.elastic.co/packages/7.x/yum
    gpgcheck=1
    gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
    enabled=1
    autorefresh=1
    type=rpm-md

    然后执行安装命令:

    sudo yum install logstash

    以上步骤可能比较慢,还有另外一种办法,就是通过下载来安装LogStash:

    官方文档在这里:https://www.elastic.co/cn/downloads/logstash

    首先在上面的链接中下载LogStash的tar.gz包,这个过程有可能也很慢,我的解决方案是在自己机器上使用迅雷进行下载,完事儿Copy到Linux服务器中。

    下载完成后,执行解压操作:

    sudo tar -xvf logstash-7.2.0.tar.gz

    解压完成后,进入解压后的logstash-7.2.0文件夹。

    接着我们安装Logstash-input-jdbc插件:

    bin/logstash-plugin install logstash-input-jdbc

    下载SQL SERVER jbdc组件,这里我们从微软官网下载:https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-2017 ,当然这个链接只是目前的,如果你在尝试时这个链接失效了,那就自行百度搜索吧~

    下载完成后,解压到logstash下面的lib目录下,这里我自己为了方便,把微软默认给jdbc外面包的一层语言名称的文件夹给去掉了。

    接着,我们到/config文件夹,新建一个logstash.conf文件,内容大概如下:

    下面的每一个参数含义都可以在官方文档中找到:

    input {
        jdbc {
            jdbc_driver_library => "/usr/local/logstash-7.2.0/lib/mssql-jdbc-7.2.2/mssql-jdbc-7.2.2.jre8.jar" // 这里请灵活应变,能找到我们上一步下载的jdbc jar包即可
            jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" // 这个名字是固定的
            jdbc_connection_string => "jdbc:sqlserver: //数据库ServerIP:1433;databaseName=数据库名;"
            jdbc_user => "数据库账号"
            jdbc_password => "数据库密码"
            schedule => "* * * * *" // Corn 表达式,请自行百度写法
            jdbc_default_timezone => "Asia/Shanghai"
            jdbc_page_size => "500" // 每一批传输的数量
            record_last_run => "true" //是否保存状态 
            use_column_value => "true" //设置为时true,使用定义的 tracking_column值作为:sql_last_value。设置为时false,:sql_last_value反映上次执行查询的时间。
            tracking_column => "LastModificationTime" //配合use_column_value使用
            last_run_metadata_path => "/usr/opt/logstash/config/last_id" //记录:sql_last_value的文件
            lowercase_column_names => "false" //将DB中的列名自动转换为小写
            tracking_column_type => "timestamp" //tracking_column的数据类型,只能是numberic和timestamp
            clean_run => "false" //是否应保留先前的运行状态,其实我也不知道这个字段干啥用的~~
            statement => "SELECT * FROM 表 WITH(NOLOCK) WHERE LastModificationTime > :sql_last_value" //从DB中抓数据的SQL脚本
        }
    }
    output {
        elasticsearch {
            index => "test"  //ES集群的索引名称     
            document_id => "%{Id}" //Id是表里面的主键,为了拿这个主键在ES中生成document ID
            hosts => ["http://192.168.154.135:9200"]// ES集群的地址
        }
    }

    上面的被注释搞的乱糟糟的,给你们一个可以复制的版本吧:

    input {
        jdbc {
            jdbc_driver_library => "/usr/local/logstash-7.2.0/lib/mssql-jdbc-7.2.2/mssql-jdbc-7.2.2.jre8.jar"
            jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
            jdbc_connection_string => "jdbc:sqlserver://SERVER_IP:1433;databaseName=DBName;"
            jdbc_user => "xxx"
            jdbc_password => "password"
            schedule => "* * * * *"
            jdbc_default_timezone => "Asia/Shanghai"
            jdbc_page_size => "50000"
            record_last_run => "true"
            use_column_value => "true"
            tracking_column => "LastModificationTime"
            last_run_metadata_path => "/usr/local/logstash-7.2.0/config/last_id"
            lowercase_column_names => "false"
            tracking_column_type => "timestamp"
            clean_run => "false"
            statement => "SELECT * FROM xxx WITH(NOLOCK) WHERE LastModificationTime > :sql_last_value"
        }
    }
    output {
        elasticsearch {
            index => "item"       
            document_id => "%{Id}"
            hosts => ["http://ES集群IP:9200"]
        }
    }

    Logstash 整体思路

    回头来说一下这个LogStash的整体思路吧,其实我的理解,LogStash就是一个数据搬运工,他的搬运数据,分为三个大的阶段:

    1. 读取数据(input)
    2. 过滤数据(filter)
    3. 输出数据(output)

    对应的官方文档:https://www.elastic.co/guide/en/logstash/current/pipeline.html

    而这每一个阶段,都是通过一些插件来实现的,比如在上述的配置文件中,我们有:

    • 读取数据即input部分,这部分由于我们是需要从数据库读取数据,所以使用了一个可以执行SQL语句的jdbc-input插件,这里如果我们的数据源是其他的部分,就需要使用其他的一些插件来实现。
    • 也有输出数据部分,这部分我们是将数据写入到ElasticSearch,所以我们使用了一个elasticsearch-output插件。这里也可以将数据写入到kafka等其他的一些产品中,也是需要一些插件即可搞定。
    • 可以发现我们上面的部分没有涉及到filter插件,其实如果我们想对数据做一些过滤、规范化处理等,都可以使用filter插件来进行处理,具体的还需要进一步去探索啦~

    执行数据同步

    剩下的部分就简单了,切换目录到logstash的目录下,执行命令:

    bin/logstash -f config/logstash.conf

    最后执行的效果图大概如下:

    image

    可以使用Elasticsearch-Head等插件来查看是否同步正常:

    image

    image

    大概就是这样啦,后续我这边会继续尝试使用其他方式来进行数据同步,欢迎大家关注~

  • 相关阅读:
    服务器×××上的MSDTC不可用解决办法
    安装VS2010后,更改iis的asp.net版本
    刷新后 页面 保持滚动条位置
    Atitit.java 反编译 工具  attilax 总结
    Atitit.收银系统模块架构attilax 总结
    Atitit.论垃圾文件的识别与清理 文档类型垃圾文件 与api概要设计pa6.doc
    atitit.guice3 绑定方式打总结生成非单例对象toInstance toProvider区别 v2 pb29
    Atitit. Derby的使用总结attilax
    Atitit.attilax的 case list 项目经验 案例列表
    Atitit.收银系统pos 以及打印功能的行业标准
  • 原文地址:https://www.cnblogs.com/baiyunchen/p/11253404.html
Copyright © 2011-2022 走看看