zoukankan      html  css  js  c++  java
  • 第八篇 elasticsearch链接mysql自动更新数据库

    增量更新

    input {
      jdbc {
        jdbc_driver_library => "D:	oolsmysqlmysql-connector-java-5.1.45/mysql-connector-java-5.1.45-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/canyin?characterEncoding=UTF-8&useSSL=false"
        jdbc_user => "root"
        jdbc_password => "228151"
        statement => "SELECT * FROM goods"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        schedule => "* * * * *"
        type => "foods"
        record_last_run => true
        last_run_metadata_path => ""
        clean_run => false
    
    
      }
    }
    
    filter {
       json {
            source => "message"
            remove_field => ["message"]
        }
    }
    
    output {
      stdout {
        codec => rubydebug
      }
      elasticsearch {
        hosts => "127.0.0.1:9200"
        index => "goods"
        document_type=>"foods"  
        document_id=>"%{id}"  
      }        
    } 

    增量更新的参数介绍:

    input {
        stdin {
        
        }
        jdbc {
            # 数据库地址  端口  数据库名
            jdbc_connection_string => "jdbc:mysql://localhost:3306/shen"
            # 数据库用户名      
            jdbc_user => "root"
            # 数据库密码
            jdbc_password => "rootroot"
            # mysql java驱动地址 
            jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.43-bin.jar"
            # 驱动类的名称
            jdbc_driver_class => "com.mysql.jdbc.Driver"
            
            jdbc_paging_enabled => "true"
            jdbc_page_size => "50000"
    
            #是否记录上次运行的结果
            record_last_run => true
            #记录上次运行结果的文件位置
            last_run_metadata_path => ""
            #是否使用数据库某一列的值,
            use_column_value => true
            tracking_column => "id"
            #numeric或者timestamp
            tracking_column_type => "numeric"
            
            #如果为true则会清除 last_run_metadata_path 的记录,即重新开始同步数据
            clean_run => false
    
            #sql_last_value根据tracking类型,默认为0或者1970-1-1
            statement => "SELECT * FROM TABLE WHERE id > :last_sql_value"
            # sql 语句文件,对于复杂的查询,可以放在文件中。
            # statement_filepath => "filename.sql"
            # 设置监听间隔,语法与Linux系统Cron相同
            schedule => "* * * * *"
        }
    }
    output {
         stdout {
            codec => json_lines
        }
       elasticsearch {
            hosts  => "localhost:9200"
            index => "contacts"
         document_type => "contact"
            document_id => "%{id}"
        }
    }

    chedule现在设置成每分钟都执行一次,是为了方便观察行为。statefile这一句是一定要加的。$metrics.lastexecutionstart就是这个脚本的关键所在了,这个指的是上一次脚本执行的时间,可以通过比较这个时间和数据库里的字段来判断是否要更新。
    参考文献:http://www.cnblogs.com/cocowool/p/mysql_data_to_elasticsearch_via_logstash.html
    http://www.cnblogs.com/zhongshengzhen/p/elasticsearch_logstash.html
    配置Logstash https://www.elastic.co/guide/en/logstash/current/configuration.html#

  • 相关阅读:
    Read-Copy Update Implementation For Non-Cache-Coherent Systems
    10 华电内部文档搜索系统 search04
    10 华电内部文档搜索系统 search05
    lucene4
    10 华电内部文档搜索系统 search01
    01 lucene基础 北风网项目培训 Lucene实践课程 索引
    01 lucene基础 北风网项目培训 Lucene实践课程 系统架构
    01 lucene基础 北风网项目培训 Lucene实践课程 Lucene概述
    第五章 大数据平台与技术 第13讲 NoSQL数据库
    第五章 大数据平台与技术 第12讲 大数据处理平台Spark
  • 原文地址:https://www.cnblogs.com/yu-hailong/p/8087387.html
Copyright © 2011-2022 走看看