zoukankan      html  css  js  c++  java
  • logstash 安装 配置

    1.Logstash 
    安装:

    在产生日志的服务器上安装 Logstash
    1.安装java环境
    # yum install java-1.8.0-openjdk.x86_64
    2.安装logstash(使用yum安装)
    # rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearc
      添加此文件和内容
    # vi /etc/yum.repos.d/logstash.repo
    [logstash-6.x]
    name=Elastic repository for 6.x packages
    baseurl=https://artifacts.elastic.co/packages/6.x/yum
    gpgcheck=1
    gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
    enabled=1
    autorefresh=1
    type=rpm-md

    安装logstash
    # yum install logstash
    程序启动文件位置
    /usr/share/logstash/bin/logstash
    建立软连接到环境变量位置,方便日后使用
    # ln -s /usr/share/logstash/bin/logstash /usr/local/bin/logstash



    配置(定义管道):

     1.采用最直接的模式

     先测试一下 

    bin/logstash -e 'input { stdin { } } output { stdout {} }'

    输入 hello world     结果如下

    Hello World(输入)经过 Logstash 管道(过滤)变成:

    2018-04-12T02:20:05.744Z cacti hello world (输出)。

     

     

    在生产环境中,Logstash 的管道要复杂很多,可能需要配置多个输入、过滤器和输出插件。

     

    因此,需要一个配置文件管理输入、过滤器和输出相关的配置。配置文件内容格式如下:

    # 输入
    input {
      ...
    }
    # 过滤器
    filter {
      ...
    }
    # 输出
    output {
      ...
    }

    根据自己的需求在对应的位置配置 输入插件过滤器插件输出插件 和 编码解码插件 即可。

     

     

    插件用法

     

    在使用插件之前,我们先了解一个概念:事件。

     

    Logstash 每读取一次数据的行为叫做事件。

     

    在 Logstach 目录中创建一个配置文件,名为 logstash.conf(名字任意)。

     

    输入插件   input 模块

     

    输入插件允许一个特定的事件源可以读取到 Logstash 管道中,配置在 input {} 中,且可以设置多个。

     

    input {
        # file为常用文件插件,插件内选项很多,可根据需求自行判断
        file {
            path => "/var/lib/mysql/slow.log"
            # 要导入的文件的位置,可以使用*,例如/var/log/nginx/*.log
            Excude =>”*.gz”
            # 要排除的文件
            start_position => "beginning"
            # 从文件开始的位置开始读,end表示从结尾开始读
            ignore_older => 0  
            # 多久之内没修改过的文件不读取,0为无限制,单位为秒
            sincedb_path => "/dev/null"
            # 记录文件上次读取位置,输出到null表示每次都从文件首行开始解析
            type => "mysql-slow"
            # type字段,可表明导入的日志类型
        }   
    }

     

     

     

    修改配置文件:

    input {
        # 从文件读取日志信息
        file {
            path => "/var/log/messages"
            type => "system"
            start_position => "beginning"
        }
    }
    # filter {
    #
    # }
    output {
        # 标准输出
        stdout { codec => rubydebug }
    }

    其中,messages 为系统日志。

    保存文件。键入:

    logstash -f logstash.conf

    在控制台结果如下:

           "message" => "Apr 10 00:53:29 cacti systemd: logstash.service: main process exited, code=exited, status=1/FAILURE",
        "@timestamp" => 2018-04-12T02:33:17.825Z,
              "host" => "cacti",
          "@version" => "1",
              "type" => "system",
              "path" => "/var/log/messages"
    }
    {
           "message" => "Apr 10 00:53:29 cacti systemd: Unit logstash.service entered failed state.",
        "@timestamp" => 2018-04-12T02:33:17.825Z,
              "host" => "cacti",
          "@version" => "1",
              "type" => "system",
              "path" => "/var/log/messages"

     

    从redis输入 

    input {
        # redis插件为常用插件,插件内选项很多,可根据需求自行判断
        redis {
            batch_count => 1 
            # EVAL命令返回的事件数目,设置为5表示一次请求返回5条日志信息
            data_type => "list" 
            # logstash redis插件工作方式
            key => "logstash-test-list" 
            # 监听的键值
            host => "127.0.0.1" 
            # redis地址
            port => 6379 
            # redis端口号
            password => "123qwe" 
            # 如果有安全认证,此项为认证密码
            db => 0 
            # 如果应用使用了不同的数据库,此为redis数据库的编号,默认为0。
            threads => 1 
            # 启用线程数量
          }
    }

    常用的 input 插件其实有很多,这里只举例了两种。其他还有 kafka,tcp 等等

     

     

    输出插件

    输出插件将事件数据发送到特定的目的地,配置在 output {} 中,且可以设置多个。

     

    output {
        # tdout { codec => "rubydebug" }
        # 筛选过滤后的内容输出到终端显示
    
        elasticsearch {  # 导出到es,最常用的插件
            codec => "json"
            # 导出格式为json
            hosts => ["127.0.0.1:9200"]
            # ES地址+端口
            index => "logstash-slow-%{+YYYY.MM.dd}"
            # 导出到index内,可以使用时间变量
            user => "admin"
            password => "xxxxxx"
            # ES如果有安全认证就使用账号密码验证,无安全认证就不需要
            flush_size => 500
            # 默认500,logstash一次性攒够500条的数据在向es发送
            idle_flush_time => 1
            # 默认1s,如果1s内没攒够500,还是会一次性把数据发给ES
        }   
    }

     

    输出到redis

    output {
         redis{  # 输出到redis的插件,下面选项根据需求使用
             batch => true
             # 设为false,一次rpush,发一条数据,true为发送一批
             batch_events => 50
             # 一次rpush发送多少数据
             batch_timeout => 5
             # 一次rpush消耗多少时间
             codec => plain
             # 对输出数据进行codec,避免使用logstash的separate filter
             congestion_interval => 1
             # 多长时间进项一次拥塞检查
             congestion_threshold => 5
             # 限制一个list中可以存在多少个item,当数量足够时,就会阻塞直到有其他消费者消费list中的数据
             data_type => list
             # 使用list还是publish
             db => 0
             # 使用redis的那个数据库,默认为0号
             host => ["127.0.0.1:6379"]
             # redis 的地址和端口,会覆盖全局端口
             key => xxx
             # list或channel的名字
             password => xxx
             # redis的密码,默认不使用
             port => 6379
             # 全局端口,默认6379,如果host已指定,本条失效
             reconnect_interval => 1
             # 失败重连的间隔,默认为1s
             timeout => 5
             # 连接超时的时间
             workers => 1
             # 工作进程
         }
    }

     

     

     

    filter 模块

    filter {  # 插件很多,这里选取我使用过的插件做讲述
        if ([message] =~ "正则表达式")  {  drop {}  }
        # 正则匹配=~,!~,包含判断in,not in ,字符串匹配==,!=,等等,匹配之后可以做任何操作,这里过滤掉匹配行,除了做过滤操作,if后面可以作任意操作,甚至可以为匹配到的任意行做单独的正则分割操作
    
        multiline {
            pattern => "正则表达式"
            negate => true
            what => "previous"
            # 多行合并,由于一些日志存在一条多行的情况,这个模块可以进行指定多行合并,通过正则匹配,匹配到的内容上面的多行合并为一条日志。
        }   
    
        grok {
            match => { "message" => "正则表达式"
             # 正则匹配日志,可以筛选分割出需要记录的字段和值
            }   
            remove_field => ["message"]
            # 删除不需要记录的字段
       }   
    
        date {
            match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] 
           # 记录@timestamp时间,可以设置日志中自定的时间字段,如果日志中没有时间字段,也可以自己生成
            target=>“@timestamp”
            # 将匹配的timestamp字段放在指定的字段 默认是@timestamp
        }
    
        ruby {
            code => "event.timestamp.time.localtime"
            # timestamp时区锁定
        }   
    }

     

     

     

  • 相关阅读:
    小三学算术
    The Balance
    扫描线
    2019牛客暑期多校训练营(第九场)
    后缀数组
    Manacher
    局部变量和全局变量的区别
    2386:Lake Counting-poj
    简单背包问题(0032)-swust oj
    编程中易犯错误集锦-持续更新。。。。。
  • 原文地址:https://www.cnblogs.com/centos2017/p/9329982.html
Copyright © 2011-2022 走看看