zoukankan      html  css  js  c++  java
  • 通过Logstash同步数据到ElasticSearch的两种方式

    通过Logstash同步json文件数据到ElasticSearch

    最短可以设为每秒同步一次,可以说是实时的。

    1,生成需要同步到 es 的 json 文件,json文件生成时建议用 utf-8 格式,和 logstash 保持一致。

    2,在logstash配置目录:/etc/logstash/conf.d 新建配置文件bs_domain.conf,并如下配置:

    input中设置的文件*.json、.sincedb、file.log等,建议把权限都设置为chmod 777 filename,防止logstash没有读写权限引起的错误。

    filter过滤格式化数据时,logstash会默认添加两个字段:host、path,如果不需要可以过滤掉。

    # 1.读取json文件
    input {
        file {
    		# 必选项,配置文件路径,可定义多个,也可模糊匹配;
            path => "/home/ldy/logstash/bs_domain/*.json"
    		# path => ["name1.json","name2.json", "name3.json"]
    	
    		# 选择logstash开始读取文件的位置,begining或者end
    		start_position => "beginning"
    	
            # 设置编码
            codec => json { charset => "UTF-8" }
    
    		# 可选项,Logstash多久检查一下path下有新文件,默认15s;
            #discover_interval => 15
    
    		# 可选项,logstash多久检查一次被监听文件的变化,默认1s;
            #stat_interval => 1
    	
    		# 可选项,记录文件以及文件读取信息位置的数据文件;
            #sincedb_path => "/home/ldy/logstash/bs_domain/.sincedb"
    
    		# 可选项,logstash多久写一次sincedb文件,默认15s;
            #sincedb_write_interval => 15
    
            #mode => "read"
            #file_completed_action => "log"
            #file_completed_log_path => "/var/log/logstash/bs_domain/file.log"
        }
    }
    
    
    # 2.过滤格式化数据阶段
    filter {
        mutate {
            # 删除无效的字段
            remove_field => ["_id", "host", "path", "@version", "@timestamp"]
        }
        # 新增timestamp字段,将@timestamp时间增加8小时
        # ruby {code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"}
    
    }
    
    
    # 3.数据输出到ES阶段
    output {
        stdout {
            codec => rubydebug
        }
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            # user => "username"
            # password => "123456"
            document_id => "%{name}"  # 用json的name字段代替es记录的_id
            index => "bs_domain"
        }
    }
    

    3,重启 logstash,service logstash restart。logstash 每秒检查一次文件变化,同步文件内容。

    Logstash jdbc插件实现数据增量更新到ElasticSearch

    不足:最短同步间隔为一分钟,不是实时的。

    # 输入阶段
    input {
        stdin {
        }
        jdbc {
          jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/new_website_stage"
          jdbc_user => "postgres"
          jdbc_password => "123456"
          
          # PG驱动,在PG官网下载,下载前查清和logstash版本对应关系
          jdbc_driver_library => "/etc/logstash/dirve/postgresql-42.2.12.jar"
          jdbc_driver_class => "org.postgresql.Driver"
          
          jdbc_paging_enabled => "true"
          jdbc_page_size => "300000"
          
          # 实现数据增量同步,sql_last_value是最后被同步记录的update_date字段值
          statement => "select * from website_news where update_time > :sql_last_value"
    	  # statement => "select * from website_news"
          use_column_value => "true"
          tracking_column => "update_time"     # 指定用于被跟踪的列
          tracking_column_type => "timestamp"  # 被跟踪列的类型
          
          # 每分钟同步一次
          schedule => "* * * * *"
          type => "jdbc"
          jdbc_default_timezone =>"Asia/Shanghai"
        }
    }
    
    
    # 数据输出到ES阶段
    output {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]  # ES的IP地址及端口
            index => "website_news"      # 索引名称
            document_id => "%{id}"       # 用id字段作为es _id
        }
        stdout {
            codec => json_lines  # JSON格式输出
        }
    }
    
  • 相关阅读:
    加密模块
    Flask_Blueprint(蓝图)
    Python中__get__ ,__getattr__ ,__getattribute__用法与区别?
    为什么要使用数据库连接池?以及用法(DBUtils)
    Flask_配置文件
    CRM知识点汇总(未完💩💩💩💩💩)
    popUp
    Django_调查问卷
    Django_form
    Numpy
  • 原文地址:https://www.cnblogs.com/ldy-miss/p/13099421.html
Copyright © 2011-2022 走看看