zoukankan      html  css  js  c++  java
  • 利用logstash从mysql同步数据到ElasticSearch

    前面一篇已经把logstash和logstash-input-jdbc安装好了。

    下面就说下具体怎么配置。

    1.先在安装目录bin下面(一般都是在bin下面)新建两个文件jdbc.conf和jdbc.sql

    2.配置jdbc.conf

     1 input {
     2       stdin {
     3        }
     4       jdbc {
     5         # 连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连
     6         jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/microstorage_backend?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
     7         jdbc_user => "root"
     8         jdbc_password => "123456"
     9         # 下载连接数据库的驱动包,建议使用绝对地址
    10        jdbc_driver_library => "/usr/local/Cellar/logstash/6.5.4/libexec/logstash-core/lib/jars/mysql-connector-java-5.1.42.jar"
    11  
    12        jdbc_driver_class => "com.mysql.jdbc.Driver"
    13        jdbc_paging_enabled => "true"
    14        jdbc_page_size => "50000"
    15        codec => plain { charset => "UTF-8"}
    16  
    17         #使用其它字段追踪,而不是用时间
    18       #use_column_value => true   //这里如果是用时间追踪比如:数据的更新时间或创建时间等和时间有关的这里一定不能是true, 切记切记切记,我是用update_time来追踪的
    19         #追踪的字段
    20      tracking_column => update_time
    21      record_last_run => true
    22      #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值  这里说是必须指定初始值,我没指定默认是1970-01-01 08:00:00 
    23      last_run_metadata_path => "/usr/local/opt/logstash/lastrun/.logstash_jdbc_last_run"  //这里的lastrun文件夹和.logstash_jdbc_last_run是自己创建的
    24  
    25       jdbc_default_timezone => "Asia/Shanghai"   //设置时区
    26       #statement => SELECT * FROM goods  WHERE update_time > :last_sql_value  //这里要说明一下如果直接写sql语句,前面这种写法肯定不对的
    27                                                 ,加上引号也试过也不对,所以我直接写在jdbc.sql文件中
    28       statement_filepath => "/usr/local/Cellar/logstash/6.5.4/bin/jdbc.sql"
    29  
    30  
    31      #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    32      clean_run => false
    33  
    34        # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟 不设置就是1分钟执行一次
    35        schedule => "* * * * *"
    36        type => "std"
    37      }
    38  }
    39 
    40  filter {
    41 
    42     json {
    43 
    44         source => "message"
    45 
    46         remove_field => ["message"]
    47 
    48     }
    49 
    50 }
    51 
    52 output {
    53 
    54     elasticsearch {
    55 
    56         # 要导入到的Elasticsearch所在的主机
    57 
    58         hosts => "127.0.0.1:9200"
    59 
    60         # 要导入到的Elasticsearch的索引的名称
    61 
    62         index => "goods"
    63 
    64         # 类型名称(类似数据库表名)
    65 
    66         document_type => "spu"
    67 
    68         # 主键名称(类似数据库主键)
    69 
    70         document_id => "%{id}"
    71     }
    72 
    73     stdout {
    74 
    75         # JSON格式输出
    76 
    77         codec => json_lines
    78 
    79     }
    80 }

    3.配置jdbc.sql

    1 select id,goods_name,goods_no,price,account_id,create_time,update_time from goods where update_time > :sql_last_value

    4.我们来看下 .logstash_jdbc_last_run文件中的内容(网上讲述该配置的时候都没讲到里面具体的内容写法,导致很多人很迷惑,其中我就是)

    前面的---具体什么意思,我也不太清楚。

    5.启动jdbc.conf配置,开始同步数据

    第一次:因为时间是从1970年开始的所以会全部同步一遍。相当于全量同步了

    从第二次开始,会从上次最新的一次时间同步,既新增和修改都会同步

    遇到的问题:

    1.ES中8小时时差的问题?

    解决方法:从源头解决问题

    在jdbc.conf配置文件中只要是有关时间的字段都手动+8小时

     39 filter {
     40     json {
     41         source => "message"
     42         remove_field => ["message"]
     43     }
     44    // date类型不能省略,不然会报错,       就是把当前字段+8小时后赋值给新的字段,然后再取新字段的值赋值给老的字段,再把新的字段删除
     45     date {
     46       match => ["message","UNIX_MS"]
     47       target => "@timestamp"
     48        }
     49          ruby {
     50                 code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
     51          }      
     52         ruby{   
     53                 code => "event.set('@timestamp',event.get('timestamp'))"
     54         }       
     55         mutate{ 
     56                remove_field => ["timestamp"]
     57         }      
     58         
     59    date {
     60     match => ["message","UNIX_MS"]
     61     target => "create_time"
     62          } 
     63          ruby {
     64                  code => "event.set('@create_time', event.get('create_time').time.localtime + 8*60*60)"
     65          }       
     66         ruby {   
     67                  code => "event.set('create_time',event.get('@create_time'))"
     68          }       
     69         mutate { 
     70          remove_field => ["@create_time"]
     71         }
     72         
     73         date {
     74         match => ["message","UNIX_MS"]
     75         target => "update_time"
     76          }
     77          ruby {
     78                  code => "event.set('@update_time', event.get('update_time').time.localtime + 8*60*60)"
     79          }       
     80         ruby {   
     81                  code => "event.set('update_time',event.get('@update_time'))"
     82          }       
     83         mutate {
     84          remove_field => ["@update_time"]
     85         }
    86 }

    总结:主要是配置,有什么问题,先检查配置文件。 

  • 相关阅读:
    6个变态的C语言Hello World程序
    Thread和Runnable差别
    HTML5游戏开发技术基础整理
    Kruskal算法
    PHP手机获取6为不反复验证码
    怎样推断手机用户是移动,电信,联通?
    SQL Server 2012 sa 用户登录 18456 错误
    Phonegap(Cordova)3.4 + Android 环境搭建
    LCA在线算法ST算法
    Swift学习笔记八:枚举
  • 原文地址:https://www.cnblogs.com/wang-yaz/p/10232417.html
Copyright © 2011-2022 走看看