zoukankan      html  css  js  c++  java
  • (三)ELK logstash input

    一,input模块

    Logstash由三个组件构造成,分别是input、filter以及output。我们可以吧Logstash三个组件的工作流理解为:input收集数据,filter处理数据,output输出数据。

    1、文件类型:file{}

    文件类型,顾名思义,文件数据源,我们可以使用input组件的file插件来获取数据
    input{
        file{
            #path属性接受的参数是一个数组,其含义是标明需要读取的文件位置
            path => [‘pathA’,‘pathB’]
            #表示多就去path路径下查看是够有新的文件产生。默认是15秒检查一次。
            discover_interval => 15
            #排除那些文件,也就是不去读取那些文件
            exclude => [‘fileName1’,‘fileNmae2’]
            #被监听的文件多久没更新后断开连接不在监听,默认是一个小时。
            close_older => 3600
            #在每次检查文件列 表的时候, 如果一个文件的最后 修改时间 超过这个值, 就忽略这个文件。 默认一天。
            ignore_older => 86400
            #logstash 每隔多 久检查一次被监听文件状态( 是否有更新) , 默认是 1 秒。
            stat_interval => 1
            #sincedb记录数据上一次的读取位置的一个index
            sincedb_path => ’$HOME/. sincedb‘
            #logstash 从什么 位置开始读取文件数据, 默认是结束位置 也可以设置为:beginning 从头开始
            start_position => ‘beginning’
            #注意:这里需要提醒大家的是,如果你需要每次都从同开始读取文件的话,关设置start_position => beginning是没有用的,你可以选择sincedb_path 定义为 /dev/null
        }            
    
    }
    

    2、数据库类型

    数据库类型的数据源,input组件通过input组件的JDBC插件jdbc{}获取数据源

    input{
        jdbc{
        #jdbc sql server 驱动,各个数据库都有对应的驱动,需自己下载
        jdbc_driver_library => "/etc/logstash/driver.d/sqljdbc_2.0/enu/sqljdbc4.jar"
        #jdbc class 不同数据库有不同的 class 配置
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        #配置数据库连接 ip 和端口,以及数据库    
        jdbc_connection_string => "jdbc:sqlserver://200.200.0.18:1433;databaseName=test_db"
        #配置数据库用户名
        jdbc_user =>   
        #配置数据库密码
        jdbc_password =>
        #上面这些都不重要,要是这些都看不懂的话,你的老板估计要考虑换人了。重要的是接下来的内容。
        # 定时器 多久执行一次SQL,默认是一分钟
        # schedule => 分 时 天 月 年  
        # schedule => * 22  *  *  * 表示每天22点执行一次
        schedule => "* * * * *"
        #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
        clean_run => false
        #是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要表的字段名称,
        #此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
        use_column_value => true
        #如果 use_column_value 为真,需配置此参数. 这个参数就是数据库给出的一个字段名称。当然该字段必须是递增的,可以是 数据库的数据时间这类的
        tracking_column => create_time
        #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
        record_last_run => true
        #们只需要在 SQL 语句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是该文件中的值
        last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info"
        #是否将字段名称转小写。
        #这里有个小的提示,如果你这前就处理过一次数据,并且在Kibana中有对应的搜索需求的话,还是改为true,
        #因为默认是true,并且Kibana是大小写区分的。准确的说应该是ES大小写区分
        lowercase_column_names => false
        #你的SQL的位置,当然,你的SQL也可以直接写在这里。
        #statement => SELECT * FROM tabeName t WHERE  t.creat_time > :last_sql_value
        statement_filepath => "/etc/logstash/statement_file.d/my_info.sql"
        #数据类型,标明你属于那一方势力。单了ES哪里好给你安排不同的山头。
        type => "my_info"
        }
        #注意:外载的SQL文件就是一个文本文件就可以了,还有需要注意的是,一个jdbc{}插件就只能处理一个SQL语句,
        #如果你有多个SQL需要处理的话,只能在重新建立一个jdbc{}插件。
    }
    

    3、filebeat 

    input {
      beats {
        #接受数据端口
        port => 5044
        #数据类型
        type => "logs"
      }
      #这个插件需要和filebeat进行配和
    }
    

    4、标准输入:stdin{}

    input{
        stdin{
            add_field => {"key" => "value"} #向事件添加一个字段
            codec => "plain" #默认是line, 可通过这个参数设置编码方式
            tags => ["std"] #添加标记
            type => "std" #添加类型
            id => 1 #添加一个唯一的ID, 如果没有指定ID, 那么将生成一个ID
            enable_metric => true #是否开启记录日志, 默认true
        }
    }
    

    5、TCP/UDP输入:tcp/udp{}

    input{
        tcp{
           port => 8888 #端口
           mode => "server" #操作模式, server:监听客户端连接, client:连接到服务器
           host => "0.0.0.0" #当mode为server, 指定监听地址, 当mode为client, 指定连接地址, 默认0.0.0.0
           ssl_enable => false #是否启用SSL, 默认false
           ssl_cert => "" #SSL证书路径
           ssl_extra_chain_certs => [] #将额外的X509证书添加到证书链中
           ssl_key => "" #SSL密钥路径
           ssl_key_passphrase => "nil" #SSL密钥密码, 默认nil
           ssl_verify => true #核实与CA的SSL连接的另一端的身份
           tcp_keep_alive => false #TCP是否保持alives
        }
    }
    input{
        udp{
           buffer_size => 65536 #从网络读取的最大数据包大小, 默认65536
           host => 0.0.0.0 #监听地址
           port => 8888 #端口
           queue_size => 2000 #在内存中保存未处理的UDP数据包的数量, 默认2000
           workers => 2 #处理信息包的数量, 默认2
        }
    }
    

    6、syslog输入 syslog{}

    input{
        syslog{
           host => 0.0.0.0 #监听地址, 默认0.0.0.0
           port => "8888" #端口
        }
    }
    
  • 相关阅读:
    软件测试技术实战 设计、工具及管理(51Testing软件测试网作品系列)
    MATLAB智能算法超级学习手册
    HTML与CSS入门经典(第9版)
    深入理解Android 5 源代码
    中文版Dreamweaver CS6基础培训教程(第2版)
    可用性测试手册(第2版)
    网络综合布线系统与施工技术第4版
    PHP核心技术与最佳实践(第2版)
    [OC Foundation框架
    [OC Foundation框架
  • 原文地址:https://www.cnblogs.com/xiao2er/p/10491020.html
Copyright © 2011-2022 走看看