zoukankan      html  css  js  c++  java
  • logstash.conf 配置:input kafka,filter,output elasticsearch/mysql

    # Tuning and Profiling Logstash Performance:
    # pipeline.workers, pipeline.batch.size, pipeline.batch.delay

    logstash -input:

    input {
    	
    	stdin { codec => plain { charset => "GBK" } }
    	
    	# metadata :
    	# [@metadata][kafka][topic]: Original Kafka topic from where the message was consumed.
    	# [@metadata][kafka][consumer_group]: Consumer group
    	# [@metadata][kafka][partition]: Partition info for this message.
    	# [@metadata][kafka][offset]: Original record offset for this message.
    	# [@metadata][kafka][key]: Record key, if any.
    	# [@metadata][kafka][timestamp]: Timestamp when this message was received by the Kafka broker.
    	# common options:
    	# add_field,codec,enable_metric,id,tags,type
    	# 
    	kafka {
    		id => "my_plugin_id"
    		bootstrap_servers => ["192.168.117.191:9092"]	
    		topics => ["topic_name"]  	# kafka.topic
    		#group_id => "logstash" 	# default logstash 
    		client_id => "cmd"			# default logstash
    		consumer_threads => 5 
    		auto_offset_reset => "latest"	# earliest,latest
    		decorate_events => true		# metadata
    		enable_auto_commit => true	# when the process fails: offset_reset
    		#codec => "json"
    		
    		# request_timeout_ms ,retry_backoff_ms => 
    		
    	}
    }

    logstash-filter:

    filter {
      # common options:add_field,add_tag,id,remove_field,remove_tag
      alter {
        # "field_name": "value1"
    	coalesce => [
           "field_name", "value1", "value2", "value3", ...
        ]
    	# change value "field_name": "new_value"
    	condrewrite => [
    	  "field_name", "expected_value", "new_value",
    	  "field_name2", "expected_value2", "new_value2",
        ]
    	condrewriteother => [
    	  "field_name", "expected_value", "field_name_to_change", "value",
    	  "field_name2", "expected_value2", "field_name_to_change2", "value2",
        ]
    	
        add_field => {
          "foo_%{somefield}" => "Hello world, from %{host}"
          "new_field" => "new_static_value"
        }
      }
    
      ### ## ### ## ### ## Date formats ## ### ## ### ## ### 
      #
      # "Apr 17 09:32:01"			MMM dd HH:mm:ss
      # 1366125117000				UNIX_MS 
      # 1326149001.132				UNIX 
      # "2011-04-19T03:44:01.103Z"	ISO8601 
      date {
    	#match [ field, formats... ]
    	#Example match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]
    	match => [ "logdate", "MMM dd yyyy HH:mm:ss","MMM  d yyyy HH:mm:ss", "ISO8601" ]
    	
    	add_field => {
          "foo_%{somefield}" => "Hello world, from %{host}"
          "new_field" => "new_static_value"
        }
    	remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]
      }
     date {
           # [field, to_format, input_format]
         match => ["create_at", "yyyy-MM-dd HH:mm:ss,SSS", "UNIX"]
         target => "@timestamp"
          locale => "cn"
     }  
    
      ### ## ### ## ### ## Dissect ## ### ## ### ## ### 
      # 
      # unstructured loginfo
      dissect {
        mapping => { "message" => "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}" }
      }
      ### ## ### ## ### ## Grok ## ### ## ### ## ### 
      # 
      # HTTP
      # syntax %{SYNTAX:SEMANTIC} dataType: field_name
      # 55.3.244.1 GET /index.html 15824 0.043
      grok {
        match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
      }
      
      ### ## ### ## ### ## Josn ## ### ## ### ## ### 
      #
      json {
        source => "message"		#required source_field
    	target => "kfk"			#expanded field "kfk" => {input-keys: input-values}
      }
      
      ### ## ### ## ### ## Mutate ## ### ## ### ## ### 
      # executed in this order:  
      # coerce >rename >update >replace ...>remove >split >join >merge >copy  
      # You can control the order by using separate mutate blocks: mutate-1 >mutate-2
      #  
      mutate {
    	split => ["hostname", "."]
    	add_field => { "shortHostname" => "%{hostname[0]}" }
      }
      mutate {
    	rename => ["shortHostname", "hostname" ]
      }
      
      mutate {
        #data_type: integer,float,string,boolean,
        convert => {
          "fieldname" => "integer"
          "booleanfield" => "boolean"
        }
    	
    	copy => { "source_field" => "dest_field" }
    	rename => { "HOSTORIP" => "client_ip" }
    	
    	join => { "fieldname" => "," }
    	split => { "fieldname" => "," }
    	
    	replace => { "message" => "%{source_host}: My new message" }
    	update => { "sample" => "My new message" } #If the field does not exist, then no action will be taken.
    	
      }
    }     

    logstash-output:

    output {
      # common options: codec,enable_metric,id
      elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "logstash_output-%{+YYYY.MM.dd}"
        #user => "elastic"
        #password => "changeme"
      }
      
      jdbc {
    
        driver_jar_path => "D:/Program Files/Maven/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar"
    
        driver_class => "com.mysql.jdbc.Driver"
    
        connection_string => "jdbc:mysql://localhost:3306/test?user=root&password=root"
    
        statement => [ "insert into logstash_stdout (TEST_TIME ,TEST_HOST,MESSAGES) values (?,?,?)","%{@timestamp}" ,"%{host}","%{message}" ]
    
      }
      stdout {}
    }
    

     mutate event sample:

    input { stdin { } }
    
    filter {
      mutate { add_field => { "show" => "This data will be in the output" } }
      # @metadata.test = "Hello"
      mutate { add_field => { "[@metadata][test]" => "Hello" } }
      mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }
    }
    
    output {
      if [@metadata][test] == "Hello" {
        stdout { codec => rubydebug { metadata => true } }
      }
    }
    

      

     

  • 相关阅读:
    线性时间将两个有序链表合成一个有序链表(constant additional space)
    C++定义指针数组
    cmd运行java编译文件
    java的方法
    Java流程控制
    用户交互-Scanner
    Java的注释
    编译型语言和解释性语言
    JDK、JRE和JVM
    MarkDown的简单使用
  • 原文地址:https://www.cnblogs.com/andea/p/11183304.html
Copyright © 2011-2022 走看看