zoukankan      html  css  js  c++  java
  • logstash中将kafka数据直接存储到es中

    下载

    建议到官网下载最新版
    https://www.elastic.co/cn/downloads/logstash
    本文使用logstash7.0.0
    https://artifacts.elastic.co/downloads/logstash/logstash-7.0.0.tar.gz

    wget https://artifacts.elastic.co/downloads/logstash/logstash-7.0.0.tar.gz
    tar -xzvf logstash-7.0.0.tar.gz
    mv logstash-7.0.0.tar.gz /usr/local/logstash



    如图,Logstash Pipeline中input和output为必须元素,filter为可选元素,input插件使用来自源的数据,filter插件在您指定时修改数据output插件将数据写入目标。

    我们可以通过运行最简单Logstash Pipeline来进行测试,步骤如下:

    进入Logstash的解压目录,通过-e参数运行一个管道

    cd logstash-7.0.0
    ./bin/logstash -e 'input { stdin { } } output { stdout {} }'
    1
    2
    -e可以直接从命令行指定配置。通过在命令行指定配置,可以快速测试配置,而无需在迭代之间编辑文件。示例中的管道从标准输入stdin获取输入数据,并以结构化格式将输入数据移动到标准输出stdout 。




    input {
        kafka {
        bootstrap_servers => ["192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092"]
        group_id => "elastic_group"
        topics => ["elastic_test"]
        consumer_threads => 12
        decorate_events => true
        }
    }
    output {
        elasticsearch {
        hosts => ["http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200"]
        index => "elastic_test"
        password => "XXX"
        user => "elastic"
        }
    }
    

      



    logstash版本为5.5.3,kafka版本为2.11,此版本默认内置了kafka插件,可直接配置使用,不需要重新安装插件;注意logstash5.x版本前后配置不太一样,注意甄别,必要时可去elasticsearch官网查看最新版配置参数的变化,例如logstash5.x版本以前kafka插件配置的是zookeeper地址,5.x以后配置的是kafka实例地址。
    
    input{
          kafka{
            bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
            client_id => "test"
            group_id => "test"
            auto_offset_reset => "latest" //从最新的偏移量开始消费
            consumer_threads => 5
            decorate_events => true //此属性会将当前topic、offset、group、partition等信息也带到message中
            topics => ["logq","loge"] //数组类型,可配置多个topic
            type => "bhy" //所有插件通用属性,尤其在input里面配置多个数据源时很有用
          }
    }
     使用了decorate_events属性,注意看logstash控制台打印的信息,会输出如下
    
    "kafka":{"consumer_group":"test","partition":0,"offset":10430232,"topic":"logq","key":null}
     另外一个input里面可设置多个kafka,
    
    input{
          kafka{
            bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
            client_id => "test1"
            group_id => "test1"
            auto_offset_reset => "latest"
            consumer_threads => 5
            decorate_events => true
            topics => ["loge"]
            type => "classroom"
          }
          kafka{
            bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
            client_id => "test2"
            group_id => "test2"
            auto_offset_reset => "latest"
            consumer_threads => 5
            decorate_events => true
            topics => ["logq"]
            type => "student"
          }
    }
     假如你在filter模块中还要做其他过滤操作,并且针对input里面的每个数据源做得操作不一样,那你就可以根据各自定义的type来匹配
    
    filter{
            if[type] == "classroom"{
                grok{
                   ........
                }
            }
            if[type] == "student"{
                mutate{
                   ........
                }
            }
    }
     不只filter中可以这样,output里面也可以这样;并且当output为elasticsearch的时候,input里面的定义的type将会成为elasticsearch的你定义的index下的type
    
    output {
            if[type] == "classroom"{
              elasticsearch{
                   hosts => ["192.168.110.31:9200"]
                   index => "school"
                   timeout => 300
                   user => "elastic"
                   password => "changeme"
              }
    
            }
            if[type] == "student"{
                ........
            }
     }
     对于第一个存储到elasticsearch的路径为localhost:9200/school/classroom;第二个存储到elasticsearch的路径为localhost:9200/school/student。假如从来没有定义过type,默认的type为logs,访问路径为第一个存储到elasticsearch的路径为localhost:9200/school/logs,默认的type也可不加。
    
    

     

    读取文件直接发送到es

    • 修改/usr/local/logstash/config/logstash-sample.conf
    # Sample Logstash configuration for creating a simple
    # Beats -> Logstash -> Elasticsearch pipeline.
    
    input {
      #beats {
       # port => 5044
      #}
      file {
        path => "/var/log/httpd/access_log"
        start_position => beginning
      }
    }
    
    output {
      elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "%{[@metadata][logstash]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
        #user => "elastic"
        #password => "changeme"
      }
    }
    
    • 检查配置文件是否正确:(假设当前目录为/usr/local/logstash/config/)
    ../bin/logstash -t -f logstash-sample.conf
    启动:
    ../bin/logstash -f logstash-sample.conf
    加载本文件夹所有配置文件启动:
    ../bin/logstash -f ./
    或后台启动:
    nohup ../bin/logstash -f config/ &
    
      • 常用命令参数
        -f:通过这个命令可以指定Logstash的配置文件,根据配置文件配置logstash
        -e:后面跟着字符串,该字符串可以被当做logstash的配置(如果是“” 则默认使用stdin作为输入,stdout作为输出)
        -l:日志输出的地址(默认就是stdout直接在控制台中输出)
        -t:测试配置文件是否正确,然后退出。
  • 相关阅读:
    LPT算法--时间调度问题
    Java语法学习1
    用JS动态显示文本
    用JS动态创建一个有序表(根据输入添加子列表项)
    邻接表链式结构的实现和顺序结构的实现
    HDU 1242 特殊化带结构体BFS
    POJ 1562深搜判断连体油田个数
    Uva 8道比较水的数论 (练练英语阅读理解)
    HDU 2024 C语言合法标识符(笑)
    再做POJ2406 KMPnext数组的运用
  • 原文地址:https://www.cnblogs.com/ExMan/p/14959639.html
Copyright © 2011-2022 走看看