zoukankan      html  css  js  c++  java
  • Logstash:Data转换,分析,提取,丰富及核心操作

    Logstash:Data转换,分析,提取,丰富及核心操作

    Logstash plugins

    Logstash是一个非常容易进行扩张的框架。它可以对各种的数据进行分析处理。这依赖于目前提供的超过200多个plugin。
    首先,我们来查看一下目前有哪些plugin:

    Input plugins

    首先进入到Logstash的安装目录下的bin子目录,并在命令行中打入如下的命令:

    $ ./logstash-plugin list --group input

    显示:

    logstash-input-azure_event_hubs
    logstash-input-beats
    logstash-input-couchdb_changes
    logstash-input-elasticsearch
    logstash-input-exec
    logstash-input-file
    logstash-input-ganglia
    logstash-input-gelf
    logstash-input-generator
    logstash-input-graphite
    logstash-input-heartbeat
    logstash-input-http
    logstash-input-http_poller
    logstash-input-imap
    logstash-input-jdbc
    logstash-input-jms
    logstash-input-kafka
    logstash-input-pipe
    logstash-input-rabbitmq
    logstash-input-redis
    logstash-input-s3
    logstash-input-snmp
    logstash-input-snmptrap
    logstash-input-sqs
    logstash-input-stdin
    logstash-input-syslog
    logstash-input-tcp
    logstash-input-twitter
    logstash-input-udp
    logstash-input-unix
    

    Filter plugs

    在命令行打入如下的命令:

    $ ./logstash-plugin list --group filter
    logstash-filter-aggregate
    logstash-filter-anonymize
    logstash-filter-cidr
    logstash-filter-clone
    logstash-filter-csv
    logstash-filter-date
    logstash-filter-de_dot
    logstash-filter-dissect
    logstash-filter-dns
    logstash-filter-drop
    logstash-filter-elasticsearch
    logstash-filter-fingerprint
    logstash-filter-geoip
    logstash-filter-grok
    logstash-filter-http
    logstash-filter-jdbc_static
    logstash-filter-jdbc_streaming
    logstash-filter-json
    logstash-filter-kv
    logstash-filter-memcached
    logstash-filter-metrics
    logstash-filter-mutate
    logstash-filter-prune
    logstash-filter-ruby
    logstash-filter-sleep
    logstash-filter-split
    logstash-filter-syslog_pri
    logstash-filter-throttle
    logstash-filter-translate
    logstash-filter-truncate
    logstash-filter-urldecode
    logstash-filter-useragent
    logstash-filter-uuid
    logstash-filter-xml
    

    Output plugins

    在命令行打入如下的命令:

    $ ./logstash-plugin list --group output
    logstash-output-cloudwatch
    logstash-output-csv
    logstash-output-elastic_app_search
    logstash-output-elasticsearch
    logstash-output-email
    logstash-output-file
    logstash-output-graphite
    logstash-output-http
    logstash-output-lumberjack
    logstash-output-nagios
    logstash-output-null
    logstash-output-pipe
    logstash-output-rabbitmq
    logstash-output-redis
    logstash-output-s3
    logstash-output-sns
    logstash-output-sqs
    logstash-output-stdout
    logstash-output-tcp
    logstash-output-udp
    logstash-output-webhdfs
    

    Codec plugins:

    在命令行打入如下的命令:

    $ ./logstash-plugin list codec
    logstash-codec-avro
    logstash-codec-cef
    logstash-codec-collectd
    logstash-codec-dots
    logstash-codec-edn
    logstash-codec-edn_lines
    logstash-codec-es_bulk
    logstash-codec-fluent
    logstash-codec-graphite
    logstash-codec-json
    logstash-codec-json_lines
    logstash-codec-line
    logstash-codec-msgpack
    logstash-codec-multiline
    logstash-codec-netflow
    logstash-codec-plain
    logstash-codec-rubydebug
    

    在这上面显示都是我们在安装Logstash后,已经给我们配置好的plugin。我们可以自己开发自己的plugin,并安装它。我们也可以安装一个别人已经开发好的plugin。

    从上面我们可以看出来,因为file都在input及output之中,我们甚至可以做如下的配置:

    input {
       file {
          path => "C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/*access*"
          type => "apache"
       }
    } 
    output {
       file {
          path => "C:/tpwork/logstash/bin/log/output.log"
       }
    }
    

    这样我们把input文件读入到Logstash,经过它的处理后,就会变成下面的这种输出:

    0:0:0:0:0:0:0:1 - - [
       25/Dec/2016:18:37:00 +0800] "GET / HTTP/1.1" 200 11418
    
    {
       "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
       localhost_access_log.2016-12-25.txt",
       "@timestamp":"2016-12-25T10:37:00.363Z","@version":"1","host":"Dell-PC",
       "message":"0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:00 +0800] "GET /
       HTTP/1.1" 200 11418
    ","type":"apache","tags":[]
    }
    

    安装plugin

    在标准的logstash中,有很多的plugin已经被安装了,但是在有些场合,我们需要手动来安装一些我们所需要的plugin,比如Exec output plugin。我们可以在bin目录先打人如下的命令:

    ./bin/logstash-plugin install logstash-output-exec

    这样我们用如下的命令来检查上面的plugin是否已经被成功安装了:

    ./bin/logstash-plugin list --group output | grep exec
    
    $ ./bin/logstash-plugin list --group output | grep exec
    Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.bouncycastle.jcajce.provider.drbg.DRBG (file:/Users/liuxg/elastic/logstash-7.4.0/vendor/jruby/lib/ruby/stdlib/org/bouncycastle/bcprov-jdk15on/1.61/bcprov-jdk15on-1.61.jar) to constructor sun.security.provider.Sun()
    WARNING: Please consider reporting this to the maintainers of org.bouncycastle.jcajce.provider.drbg.DRBG
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    logstash-output-exec
    

    读取log文件

    Logstash很容易设置来读取一个log文件。比如,我们可以通过如下的方式来读取一个Apache的log文件:

    input {
      file { 
      	type => "apache"
      	path => "/Users/liuxg/data/apache_logs"
     	  start_position => "beginning"
    	  sincedb_path => "null"
      }
    }
     
    output {
    	stdout { 
    		codec => rubydebug 
    	}
    }
    

    我们甚至可以读取多个文件:

    # Pull in application-log data. They emit data in JSON form.
    input {
      file {
        path => [
          "/var/log/app/worker_info.log",
          "/var/log/app/broker_info.log",
          "/var/log/app/supervisor.log"
        ]
        exclude => "*.gz"
        type    => "applog"
        codec   => "json"
      }
    }
    

    数据的系列化

    我们可以使用已经提供的Codec来把我们的数据进行系列化,比如:

    input {
      // Deserialize newline separated JSON
      file  { path => “/some/sample.log”, codec => json }
    }
     
    output {
      // Serialize to the msgpack format
      redis { codec => msgpack }
      stdout {
        codec => rubydebug
      }
    }
    

    在我们的longstash运行起来后,我们可以通过如下的命令在一个terminal中向文件sample.json添加内容:
    $ echo '{"name2", "liuxg2"}' >> ./sample.log

    我们可以看到如下的输出:

    {
          "@version" => "1",
           "message" => "{"name2", "liuxg2"}",
        "@timestamp" => 2019-09-12T07:37:56.639Z,
              "host" => "localhost",
              "tags" => [
            [0] "_jsonparsefailure"
        ],
              "path" => "/Users/liuxg/data/sample.log"
    }
    

    最常用的codec

    1. line 使用“message”中的数据将每行转换为logstash事件。 也可以将输出格式化为自定义行 。

    2. multiline: 允许您为“message”构成任意边界。 经常用于stacktraces等。也可以在filebeat中完成。

    3. json_lines: 解析换行符分隔的JSON数据

    4. json: 解析所有JSON。 仅适用于面向消息的输入/输出,如Redis / Kafka / HTTP等

    还有很多其它的Codec。

    解析及提取

    Grok Filter

    filter {
    	grok {
    		match => [
    			"message", "%{TIMESTAMP_ISO8601:timestamp_string}%{SPACE}%{GREEDYDATA:line}"
    		]
    	}
    }
    

    上面的例子可以帮我们很方便地把如下的log信息变成一个机构化的数据:

    2019-09-09T13:00:00Z Whose woods these are I think I know.
    

    更多grok的pattern可以在地址grok pattern找到。

    Date filter

    filter {
      date {
        match => ["timestamp_string", "ISO8601"]
      }
    }
    

    Date filter可以帮我们把一个字符串,变成一个我们想要的格式的时间,并且把这个值赋予给@timestamp字段。

    Dissect filter

    是一个更快,轻量级的更小的grok:

    filter {
      dissect {
        mapping => {“message” => “%{id} %{function->} %{server}”}
      }
    }
    

    字段和分隔符模式的格式类似于Grok。

    例子:

        input {
          generator {
            message => "<1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54"
            count => 1
          }
        }
         
        filter {
          if [message] =~ "THREAT," {
            dissect {
              mapping => {
                message => "<%{priority}>%{syslog_timestamp} %{+syslog_timestamp} %{+syslog_timestamp} %{logsource} %{pan_fut_use_01},%{pan_rec_time},%{pan_serial_number},%{pan_type},%{pan_subtype},%{pan_fut_use_02},%{pan_gen_time},%{pan_src_ip},%{pan_dst_ip},%{pan_nat_src_ip},%{pan_nat_dst_ip},%{pan_rule_name},%{pan_src_user},%{pan_dst_user},%{pan_app},%{pan_vsys},%{pan_src_zone},%{pan_dst_zone},%{pan_ingress_intf},%{pan_egress_intf},%{pan_log_fwd_profile},%{pan_fut_use_03},%{pan_session_id},%{pan_repeat_cnt},%{pan_src_port},%{pan_dst_port},%{pan_nat_src_port},%{pan_nat_dst_port},%{pan_flags},%{pan_prot},%{pan_action},%{pan_misc},%{pan_threat_id},%{pan_cat},%{pan_severity},%{pan_direction},%{pan_seq_number},%{pan_action_flags},%{pan_src_location},%{pan_dst_location},%{pan_content_type},%{pan_pcap_id},%{pan_filedigest},%{pan_cloud},%{pan_user_agent},%{pan_file_type},%{pan_xff},%{pan_referer},%{pan_sender},%{pan_subject},%{pan_recipient},%{pan_report_id},%{pan_anymore}"
              }
            }
          }
        }
         
         
        output {
        	stdout { 
        		codec => rubydebug 
        	}
        }
    

    运行后:

        {
                     "@timestamp" => 2019-09-12T09:20:46.514Z,
                     "pan_dst_ip" => "9",
                 "pan_nat_src_ip" => "10",
                       "sequence" => 0,
                      "logsource" => "www1",
                 "pan_session_id" => "23",
                       "pan_vsys" => "16",
                        "pan_cat" => "34",
                  "pan_rule_name" => "12",
                   "pan_gen_time" => "2016/10/16 20:21:20",
                 "pan_seq_number" => "37",
                    "pan_subject" => "50",
           
                        ....
           
                        "message" => "<1>Oct 16 20:21:22 www1 1,2016/10/16 20:21:20,3,THREAT,SCAN,6,2016/10/16 20:21:20,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54",
                 "pan_fut_use_02" => "6",
                      "pan_flags" => "29",
               "syslog_timestamp" => "Oct 16 20:21:22",
                    "pan_anymore" => "53,54"
        }
    

    KV filter

    解析键/值对中数据的简便方法

        filter {
          kv {
            source => “message”
            target => “parsed”
            value_split => “:”
          }
        }
    

    我们运行这样的conf文件:

        input {
          generator {
            message => "pin=12345~0&d=123&e=foo@bar.com&oq=bobo&ss=12345"
            count => 1
          }
        }
         
        filter {
        	kv {
        		source => "message"
        		target => "parsed"
        		field_split => "&?"
        	}
        }
         
        output {
        	stdout { 
        		codec => rubydebug 
        	}
        }
    

    显示的结果是:

        {
            "@timestamp" => 2019-09-12T09:46:04.944Z,
                  "host" => "localhost",
                "parsed" => {
                 "ss" => "12345",
                  "e" => "foo@bar.com",
                "pin" => "12345~0",
                 "oq" => "bobo",
                  "d" => "123"
            },
               "message" => "pin=12345~0&d=123&e=foo@bar.com&oq=bobo&ss=12345",
              "sequence" => 0,
              "@version" => "1"
        }
    

    对于kv flter来说,我们也可以使用一个target来把信息组织到一个object里,比如:

        filter {
          kv {
            source => “message”
            target => “parsed”
            value_split => “:”
          }
        }
    

    核心操作Mutate filter

    这个filter提供很多功能:

    • 转换字段类型(从字符串到整数等)
    • 添加/重命名/替换/复制字段
    • 大-小写转换
    • 将数组连接在一起(对于Array => String操作很有用)
    • 合并哈希
    • 将字段拆分为数组
    • 剥去空白
        input {
          generator {
            message => "pin=12345~0&d=123&e=foo@bar.com&oq=bobo&ss=12345"
            count => 1
          }
        }
         
        filter {
        	kv {
        		source => "message"
        		field_split => "&?"
        	}
         
        	if [pin] == "12345~0" {
            	mutate { add_tag => [ 'metrics' ]
            }
         
            mutate {
            	split => ["message", "&"]
            	add_field => {"foo" => "bar-%{pin}"}
          	}
          }
        }
         
        output {
        	stdout { 
        		codec => rubydebug 
        	}
         
        	if "metrics" in [tags] {
              stdout {
                 codec => line { format => "custom format: %{message}" }
              }
           }
        }
    

    显示的结果是:

        {
             "foo" => "bar-12345~0",
              "e" => "foo@bar.com",
              "sequence" => 0,
               "message" => [
                [0] "pin=12345~0",
                [1] "d=123",
                [2] "e=foo@bar.com",
                [3] "oq=bobo",
                [4] "ss=12345"
            ],
                   "pin" => "12345~0",
                     "d" => "123",
                  "host" => "localhost",
                    "ss" => "12345",
            "@timestamp" => 2019-09-14T15:03:15.141Z,
                    "oq" => "bobo",
              "@version" => "1",
                  "tags" => [
                [0] "metrics"
            ]
        }
        custom format: pin=12345~0,d=123,e=foo@bar.com,oq=bobo,ss=12345
    

    最核心的转化filters

    • Mute - 修改/添加每个项
    • Split - 把一个事件转化为多个事件
    • Drop - 丢掉一个事件

    条件逻辑

    if/else

    • 可以用 =~来使用regexps(正则)
    • 可以在一个数组里检查一个会员
        filter {
          mutate { lowercase => “account” }
          if [type] == “batch” {
            split { 
                field => actions 
               target => action 
            }
          }
         
          if { “action” =~ /special/ } {
            drop {}
          }
        }
    

    GeoIP

    丰富IP地址信息:

    filter { geoip { fields => “my_geoip_field” }}

    运行如下的配置:

        input {
          generator {
            message => "83.149.9.216"
            count => 1
          }
        }
         
        filter {
        	grok {
            	match => {
              		"message" => '%{IPORHOST:clientip}'
            	}
            }
         
            geoip {
            	source => "clientip"
          	}
        }
         
        output {
        	stdout {
        		codec => rubydebug
        	}
        }
    

    显示的结果如下:

        {
                  "host" => "localhost",
              "@version" => "1",
              "clientip" => "83.149.9.216",
               "message" => "83.149.9.216",
            "@timestamp" => 2019-09-15T06:54:46.695Z,
              "sequence" => 0,
                 "geoip" => {
                      "timezone" => "Europe/Moscow",
                   "region_code" => "MOW",
                      "latitude" => 55.7527,
                 "country_code3" => "RU",
                "continent_code" => "EU",
                     "longitude" => 37.6172,
                  "country_name" => "Russia",
                      "location" => {
                    "lat" => 55.7527,
                    "lon" => 37.6172
                },
                            "ip" => "83.149.9.216",
                   "postal_code" => "102325",
                 "country_code2" => "RU",
                   "region_name" => "Moscow",
                     "city_name" => "Moscow"
            }
        }
    

    我们可以看到在geoip之下,有很多具体的信息。

    DNS filter

    用DNS信息丰富主机名的更多信息

    filter { dns { fields => “my_dns_field” }}

    我们定义如下的一个Logstash配置文件:

        input {
          generator {
            message => "www.google.com"
            count => 1
          }
        }
         
        filter {
         	mutate {
            	add_field => { "hostname" => "172.217.160.110"}
        	}
         
         
        	dns {
        		reverse => ["hostname"]
        		action => "replace"	 
        	}   
         
        }
         
        output {
        	stdout {
        		codec => rubydebug
        	}
        }
    

    上面是谷歌的地址,那么它的输出结果是:

        {
                  "host" => "localhost",
              "sequence" => 0,
               "message" => "www.google.com",
            "@timestamp" => 2019-09-15T11:35:43.791Z,
              "hostname" => "tsa03s06-in-f14.1e100.net",
              "@version" => "1"
        }
    

    在这里我们可以看到hostname的值。

    Useragent filer

    让浏览器的useragent信息更加丰富。我们使用如下的Logstash配置:

        input {
          generator {
            message => '83.149.9.216 - - [17/May/2015:10:05:50 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-dashboard.png HTTP/1.1" 200 321631 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"'
            count => 1
          }
        }
         
        filter {
        	grok {
        	    match => {
        	      "message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}'
        	    }
        	  }
         
        	useragent {
            	source => "agent"
            	target => "useragent"
          	}
        }
         
        output {
        	stdout {
        		codec => rubydebug
        	}
        }
    

    运行出来的结果是:

        {
                "request" => "/presentations/logstash-monitorama-2013/images/kibana-dashboard.png",
              "useragent" => {
                    "name" => "Chrome",
                   "build" => "",
                  "device" => "Other",
                "os_major" => "10",
                      "os" => "Mac OS X",
                   "minor" => "0",
                   "major" => "32",
                 "os_name" => "Mac OS X",
                   "patch" => "1700",
                "os_minor" => "9"
            },
               "sequence" => 0,
                "message" => "83.149.9.216 - - [17/May/2015:10:05:50 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-dashboard.png HTTP/1.1" 200 321631 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"",
              "timestamp" => "17/May/2015:10:05:50 +0000",
               "referrer" => ""http://semicomplete.com/presentations/logstash-monitorama-2013/"",
               "clientip" => "83.149.9.216",
                  "ident" => "-",
                   "auth" => "-",
               "response" => 200,
               "@version" => "1",
                   "verb" => "GET",
                   "host" => "localhost",
             "@timestamp" => 2019-09-15T12:03:34.650Z,
            "httpversion" => "1.1",
                  "bytes" => 321631,
                  "agent" => ""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36""
        }
    

    我们在useragent里可以看到更加详细的信息啊。

    Translate Filter

    使用本地的数据来使得数据更加丰富。我们使用如下的Logstash配置文件:

        input {
          generator {
            message => '83.149.9.216 - - [17/May/2015:10:05:50 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-dashboard.png HTTP/1.1" 200 321631 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"'
            count => 1
          }
        }
         
        filter {
        	grok {
        	    match => {
        	      "message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}'
        	    }
        	 }
         
        	translate {
        		field => "[response]"
        		destination => "[http_status_description]"
        		dictionary => {
                 	"100" => "Continue"
                  	"101" => "Switching Protocols"
                  	"200" => "OK"
                  	"500" => "Server Error"
        		}
        		
        		fallback => "I'm a teapot"
        	}
        	
        }
         
         
        output {
        	stdout {
        		codec => rubydebug
        	}
        }
    

    运行显示的结果是:

        {
                               "auth" => "-",
                               "host" => "localhost",
                          "timestamp" => "17/May/2015:10:05:50 +0000",
                            "message" => "83.149.9.216 - - [17/May/2015:10:05:50 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-dashboard.png HTTP/1.1" 200 321631 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"",
                        "httpversion" => "1.1",
                           "@version" => "1",
                           "response" => 200,
                           "clientip" => "83.149.9.216",
                               "verb" => "GET",
                           "sequence" => 0,
                           "referrer" => ""http://semicomplete.com/presentations/logstash-monitorama-2013/"",
                              "agent" => ""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"",
            "http_status_description" => "OK",
                              "ident" => "-",
                         "@timestamp" => 2019-09-15T12:30:09.575Z,
                              "bytes" => 321631,
                            "request" => "/presentations/logstash-monitorama-2013/images/kibana-dashboard.png"
        }
    

    我们可以看到一项http_status_description,它的值变为“OK”。

    Elasticsearch Filter

    从Elasticsearch中的index得到数据,并丰富事件。为了做这个测试,我们先建立一个叫做elasticsearch_filter的index:

        PUT ç/_doc/1
        {
          "name":"liuxg",
          "age": 20,
          "@timestamp": "2019-09-15"
        }
    

    在这里,我必须指出来的是:我们必须有一个叫做@timestamp的项,否则会有错误。这个是用来做sort用的。

    我们采用如下的Logstash配置:

        input {
          generator {
            message => "liuxg"
            count => 1
          }
        }
         
        filter {
        	elasticsearch {
        		hosts => ["http://localhost:9200"]
        		index => ["elasticsearch_filter"]
        		query => "name.keyword:%{[message]}"
        		result_size => 1
        		fields => {"age" => "user_age"}
        	}
        }
         
        output {
        	stdout {
        		codec => rubydebug
        	}
        }
    

    运行上面的例子显示的结果是:

        {
              "user_age" => 20,
                  "host" => "localhost",
               "message" => "liuxg",
              "@version" => "1",
            "@timestamp" => 2019-09-15T13:21:29.742Z,
              "sequence" => 0
        }
    

    我们可以看到user_age是20。这个是通过搜索name:liuxg来得到的。

    参考:https://opensource.com/article/17/10/logstash-fundamentals

  • 相关阅读:
    Android自动化框架学习中遇到的方法
    Python中使用adb命令行
    monkeyrunner无法运行的问题解决方案总结
    TCP与UDP的区别
    KVM虚拟机的认知
    HTTP状态码分类
    FTP主动模式(Port)和被动模式(Passive)的区别
    Linux df -h 显示磁盘空间满,但实际未占用满——问题分析
    浅谈AD域
    zabbix连接Mysql提示Can’t connect to local MySQL server through socket的解决方法
  • 原文地址:https://www.cnblogs.com/sanduzxcvbnm/p/12076477.html
Copyright © 2011-2022 走看看