zoukankan      html  css  js  c++  java
  • Logstash的配置

    MySql 到 Elasticsearch

    input {
        jdbc {
            # 驱动jar包的位置
            jdbc_driver_library => "/usr/local/logstash/jdbc-driver-library/mysql-connector-java-8.0.19.jar"
            # 驱动类名
            jdbc_driver_class => "com.mysql.cj.jdbc.Driver"        
            # 数据库连接        
            jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/base_db?characterEncoding=utf8&autoReconnect=true&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull"
            # mysql用户名
            jdbc_user => "root"
            # mysql密码
            jdbc_password => "123456"        
            # 数据库重连尝试次数        
            connection_retry_attempts => "3"
            # 开启分页查询(默认false不开启)
            jdbc_paging_enabled => true
            # 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值)
            jdbc_page_size => "2"
            # 如果sql较复杂,建议配通过statement_filepath配置sql文件的存放路径;
            statement_filepath => "/usr/local/logstash/sql/t_sys_loginperson.sql"
            # 需要记录查询结果某字段的值时,此字段为true
            use_column_value => true        
            # 需要记录的字段,用于增量同步
            tracking_column => "last_modify_time"
            # 字段类型
            tracking_column_type => "timestamp"
            # 记录上一次运行记录
            record_last_run => true
            # 上一次运行记录值的存放文件路径
            last_run_metadata_path => "/usr/local/logstash/last-run-metadata/t_sys_loginperson.txt"
            # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;
            clean_run => false
            # 设置时区 如果设置会将 last_modify_time 增加8小时
            #jdbc_default_timezone => "Asia/Shanghai"
            # 同步频率(分 时 天 月 年),默认每分钟同步一次;
            schedule => "* * * * *"
        }    
    }
    filter {
        json {
            source => "message"
            remove_field => ["message","@version"]
        }
    }
    output {        
        elasticsearch { 
            hosts => "127.0.0.1:9200"
            index => "%{table_name}" 
            document_id => "%{id}"
        }
    } 

    MySql 到 Kafka

    input {
        jdbc {
            # 驱动jar包的位置
            jdbc_driver_library => "/usr/local/logstash/jdbc-driver-library/mysql-connector-java-8.0.19.jar"
            # 驱动类名
            jdbc_driver_class => "com.mysql.cj.jdbc.Driver"        
            # 数据库连接        
            jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/base_db?characterEncoding=utf8&autoReconnect=true&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull"
            # mysql用户名
            jdbc_user => "root"
            # mysql密码
            jdbc_password => "123456"        
            # 数据库重连尝试次数        
            connection_retry_attempts => "3"
            # 开启分页查询(默认false不开启)
            jdbc_paging_enabled => true
            # 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值)
            jdbc_page_size => "2"
            # 如果sql较复杂,建议配通过statement_filepath配置sql文件的存放路径;
            statement_filepath => "/usr/local/logstash/sql/t_sys_loginperson.sql"
            # 需要记录查询结果某字段的值时,此字段为true
            use_column_value => true        
            # 需要记录的字段,用于增量同步
            tracking_column => "last_modify_time"
            # 字段类型
            tracking_column_type => "timestamp"
            # 记录上一次运行记录
            record_last_run => true
            # 上一次运行记录值的存放文件路径
            last_run_metadata_path => "/usr/local/logstash/last-run-metadata/t_sys_loginperson.txt"
            # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false
            clean_run => false
            # 设置时区 如果设置会将 last_modify_time 增加8小时
            #jdbc_default_timezone => "Asia/Shanghai"
            # 同步频率(分 时 天 月 年),默认每分钟同步一次;
            schedule => "* * * * *"        
        }        
    }
    filter {
        mutate {
            # 删除字段
            remove_field => ["@timestamp","@version"]
        }
    }
    output {    
        kafka {        
            bootstrap_servers => "10.10.6.202:9092"
            topic_id => "base-db"            
            codec => "json"
            client_id => "kafkaOutPut"
        }
    } 

    Kafka 到 Elasticsearch

    input {
        kafka {
            bootstrap_servers => "10.10.6.202:9092"
            client_id => "kafkaInPut"
            # 从最新一条开始读取
            auto_offset_reset => "latest"
            # 消费线程,一般是这个队列的partitions数
            consumer_threads => 3
            decorate_events => true
            # 队列名称
            topics => ["base-db"]
            codec => "json"        
        }
    }
    filter {
        mutate {
            # 删除字段
            remove_field => ["@timestamp","@version"]
        }
    }
    output {    
        elasticsearch {
            hosts => "10.10.6.202:9200"
            index => "%{table_name}"
            document_id => "%{id}"
        }    
    }

    t_sys_loginperson.sql 文件

    select id,person_name,date_format(create_time, '%Y-%m-%d %H:%i:%s') as create_time,date_format(last_modify_time, '%Y-%m-%d %H:%i:%s') as last_modify_time,'t_sys_loginperson' as table_name from t_sys_loginperson where last_modify_time>:sql_last_value
    注意 所有的日期类型需转为字符串类型
    #带时分秒
    date_format(create_time, '%Y-%m-%d %H:%i:%s')
    #不带时分秒
    date_format(create_time, '%Y-%m-%d')

    启动

    #单配置文件启动(后台启动)
    nohup /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/kafka2elasticsearch.conf > /dev/null 2>&1 &

    多个pipeline启动多个配置文件

    配置pipelines.yml

    vi /usr/local/logstash/config/pipelines.yml
    - pipeline.id: mysql
       pipeline.workers: 1
       pipeline.batch.size: 100
       path.config: "/usr/local/logstash/customize-config/mysql2kafka.conf"
     - pipeline.id: es
       queue.type: persisted
       path.config: "/usr/local/logstash/customize-config/kafka2elasticsearch.conf"

    启动

    #无需指定配置文件,默认走pipelines.yml的配置,如果使用-e或-f时,Logstash会忽略pipelines.yml文件
    nohup /usr/local/logstash/bin/logstash > /dev/null 2>&1 &

    多个pipeline备忘
    https://blog.csdn.net/UbuntuTouch/article/details/100995868?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task

    https://segmentfault.com/a/1190000016592277

  • 相关阅读:
    解决点击链接自动置顶问题
    ie6 下遮罩层 height 不显示100%的解决方法
    【转帖】微软分布式缓存框架Volocity资源推荐
    Tips 2 MVC实现多个按钮提交的几种方法
    .NET Framework 4 中的新增功能
    Memcached 汇总 不断更新
    理解敏捷Agile
    GPIOPS中断成功,问题仍旧存在 ZEDBOARD,ZYNQ7000
    Xilinx驱动API的一个重要BUG,ZEDBOARD,ZYNQ7000
    RelativeLayout相对布局
  • 原文地址:https://www.cnblogs.com/kgdxpr/p/12453616.html
Copyright © 2011-2022 走看看