zoukankan      html  css  js  c++  java
  • logstash中将kafka数据直接存储到es中

    下载

    建议到官网下载最新版
    https://www.elastic.co/cn/downloads/logstash
    本文使用logstash7.0.0
    https://artifacts.elastic.co/downloads/logstash/logstash-7.0.0.tar.gz

    wget https://artifacts.elastic.co/downloads/logstash/logstash-7.0.0.tar.gz
    tar -xzvf logstash-7.0.0.tar.gz
    mv logstash-7.0.0.tar.gz /usr/local/logstash



    如图,Logstash Pipeline中input和output为必须元素,filter为可选元素,input插件使用来自源的数据,filter插件在您指定时修改数据output插件将数据写入目标。

    我们可以通过运行最简单Logstash Pipeline来进行测试,步骤如下:

    进入Logstash的解压目录,通过-e参数运行一个管道

    cd logstash-7.0.0
    ./bin/logstash -e 'input { stdin { } } output { stdout {} }'
    1
    2
    -e可以直接从命令行指定配置。通过在命令行指定配置,可以快速测试配置,而无需在迭代之间编辑文件。示例中的管道从标准输入stdin获取输入数据,并以结构化格式将输入数据移动到标准输出stdout 。




    input {
        kafka {
        bootstrap_servers => ["192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092"]
        group_id => "elastic_group"
        topics => ["elastic_test"]
        consumer_threads => 12
        decorate_events => true
        }
    }
    output {
        elasticsearch {
        hosts => ["http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200"]
        index => "elastic_test"
        password => "XXX"
        user => "elastic"
        }
    }
    

      



    logstash版本为5.5.3,kafka版本为2.11,此版本默认内置了kafka插件,可直接配置使用,不需要重新安装插件;注意logstash5.x版本前后配置不太一样,注意甄别,必要时可去elasticsearch官网查看最新版配置参数的变化,例如logstash5.x版本以前kafka插件配置的是zookeeper地址,5.x以后配置的是kafka实例地址。
    
    input{
          kafka{
            bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
            client_id => "test"
            group_id => "test"
            auto_offset_reset => "latest" //从最新的偏移量开始消费
            consumer_threads => 5
            decorate_events => true //此属性会将当前topic、offset、group、partition等信息也带到message中
            topics => ["logq","loge"] //数组类型,可配置多个topic
            type => "bhy" //所有插件通用属性,尤其在input里面配置多个数据源时很有用
          }
    }
     使用了decorate_events属性,注意看logstash控制台打印的信息,会输出如下
    
    "kafka":{"consumer_group":"test","partition":0,"offset":10430232,"topic":"logq","key":null}
     另外一个input里面可设置多个kafka,
    
    input{
          kafka{
            bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
            client_id => "test1"
            group_id => "test1"
            auto_offset_reset => "latest"
            consumer_threads => 5
            decorate_events => true
            topics => ["loge"]
            type => "classroom"
          }
          kafka{
            bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
            client_id => "test2"
            group_id => "test2"
            auto_offset_reset => "latest"
            consumer_threads => 5
            decorate_events => true
            topics => ["logq"]
            type => "student"
          }
    }
     假如你在filter模块中还要做其他过滤操作,并且针对input里面的每个数据源做得操作不一样,那你就可以根据各自定义的type来匹配
    
    filter{
            if[type] == "classroom"{
                grok{
                   ........
                }
            }
            if[type] == "student"{
                mutate{
                   ........
                }
            }
    }
     不只filter中可以这样,output里面也可以这样;并且当output为elasticsearch的时候,input里面的定义的type将会成为elasticsearch的你定义的index下的type
    
    output {
            if[type] == "classroom"{
              elasticsearch{
                   hosts => ["192.168.110.31:9200"]
                   index => "school"
                   timeout => 300
                   user => "elastic"
                   password => "changeme"
              }
    
            }
            if[type] == "student"{
                ........
            }
     }
     对于第一个存储到elasticsearch的路径为localhost:9200/school/classroom;第二个存储到elasticsearch的路径为localhost:9200/school/student。假如从来没有定义过type,默认的type为logs,访问路径为第一个存储到elasticsearch的路径为localhost:9200/school/logs,默认的type也可不加。
    
    

     

    读取文件直接发送到es

    • 修改/usr/local/logstash/config/logstash-sample.conf
    # Sample Logstash configuration for creating a simple
    # Beats -> Logstash -> Elasticsearch pipeline.
    
    input {
      #beats {
       # port => 5044
      #}
      file {
        path => "/var/log/httpd/access_log"
        start_position => beginning
      }
    }
    
    output {
      elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "%{[@metadata][logstash]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
        #user => "elastic"
        #password => "changeme"
      }
    }
    
    • 检查配置文件是否正确:(假设当前目录为/usr/local/logstash/config/)
    ../bin/logstash -t -f logstash-sample.conf
    启动:
    ../bin/logstash -f logstash-sample.conf
    加载本文件夹所有配置文件启动:
    ../bin/logstash -f ./
    或后台启动:
    nohup ../bin/logstash -f config/ &
    
      • 常用命令参数
        -f:通过这个命令可以指定Logstash的配置文件,根据配置文件配置logstash
        -e:后面跟着字符串,该字符串可以被当做logstash的配置(如果是“” 则默认使用stdin作为输入,stdout作为输出)
        -l:日志输出的地址(默认就是stdout直接在控制台中输出)
        -t:测试配置文件是否正确,然后退出。
  • 相关阅读:
    某个牛人做WINDOWS系统文件详解
    常用ASP脚本程序集锦
    LINUX基础:文件安全与权限
    proftpd+mysql+quota
    apache2.0.49tomcat5.0.19jk2建立virtualHost
    URL Redirection(转) Anny
    顶级域名后缀列表(转) Anny
    \u4E00\u9FA5意义 Anny
    How to POST Form Data Using Ruby(转) Anny
    How to get rid of 'Enter password to unlock your login keyring' in Ubuntu(转) Anny
  • 原文地址:https://www.cnblogs.com/ExMan/p/14959639.html
Copyright © 2011-2022 走看看