<pre name="code" class="html">grok: 解析任意文本并构造它: Grok 是当前最好的方式在logstash 解析蹩脚的非结构化日志数据 到一些结构化的可查询的。 这个工具是完美的对于syslog logs, apache和其他webserver logs,mysqllogs,在一般情况下,任何日志格式 通常对于人是友好的而不是对于电脑 Logstash 有120种模式默认,你可以找到它们在:https://github.com/logstash-plugins/logstash-patterns- core/tree/master/patterns. Grok Basics: Grok 通过结合文本模式来匹配你的日志 语法对于一个grok 是 %{SYNTAX:SEMANTIC} 语法是 模式的名字会匹配你的文本,比如,3.44 会通过NUMBER 模式匹配和55.3.244.1 通过IP模式匹配。 语法是你如何匹配: SEMANTIC (语义)是标识 你给到一块文本被匹配。 比如,3.44 可能是一个一个事件的持续事件,因此你可以简单的调用它。 此外, 一个字符串 55.3.244.1 可能识别客户端发出的请求。 在上述例子中,你的grok filter 可以看起来像这样: %{NUMBER:duration} %{IP:client} 你可以添加一个数据类型转换成你的grok 模式。默认的 所有的语义都保存作为字符串. 如果你希望 转换一个语义的数据类型,比如改变一个字符串为一个整型 然后将其后缀为目标数据类型。 比如 %{NUMBER:num:int} 会转换num语义从一个字符串到一个整型,当前只支持转换是int和float 例子: 这个质疑的语法和语义,我们可以把有用的字段从一个简单的日志像这个虚构的http 请求日志: 55.3.244.1 GET /index.html 15824 0.043 匹配模式: %{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} { "client": [ "55.3.244.1" ], "method": [ "GET" ], "request": [ "/index.html" ], "bytes": [ "15824" ], "duration": [ "0.043" ] } 正则表达式: Grok 坐在正则表达式之上,因此很多的正则表达式也是正确的在grok里。 正则表达式库是Oniguruma,你可以看到完整的支持的正则表达式的语言在Oniguruma 网站 自定义 模式: 有时候logstash没有你需要的模式,你有几个选项: 第一,你可以使用Oniguruma 语法用于命名捕获 让你匹配一个文件的片段,保存作为字段 (?<field_name>the pattern here) /******** 55.3.244.1 GET /index.html 15824 0.043 (?<field_name>S+) 输出: { "field_name": [ "55.3.244.1" ] } (?<field_name>S+s+) 输出:多了个空格 { "field_name": [ "55.3.244.1 " ] } (?<field_name>S+s+S+) 输出: { "field_name": [ "55.3.244.1 GET" ] } 例如, 后缀日志有一个队列id 是10或者11 个16进制字符,你可以捕获像这样: (?<queue_id>[0-9A-F]{10,11}) d4111111112 表达式: (?<queue_id>[0-9A-F]{10,11}) 输出: { "queue_id": [ "4111111112" ] } 或者,你也可以创建一个自定义模式的文件: 创建一个目录叫做patterns 里面有个文件叫做extra(文件名不重要,但是名字得对你有意义) 在这个文件中,写pattern 你需要的作为pattern名字,一个空格,然后正则用于哪个模式 例如: 后缀队列id例子: # contents of ./patterns/postfix: POSTFIX_QUEUEID [0-9A-F]{10,11} Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com> filter { grok { patterns_dir => ["./patterns"] match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" } } } 上面的会被匹配,结果是下面的字段: timestamp: Jan 1 06:25:43 logsource: mailserver14 program: postfix/cleanup pid: 21403 queue_id: BEF25A72965 syslog_message: message-id=<20130101142543.5828399CCAF@mailserver14.example.com> timestamp, logsource, program, 和pid 来自SYSLOGBASE 模式本身定义了一些模式 /******************* zjtest7-frontend:/usr/local/logstash-2.3.4/config# pwd /usr/local/logstash-2.3.4/config zjtest7-frontend:/usr/local/logstash-2.3.4/config# ls -lr patterns/ total 4 -rw-r--r-- 1 root root 32 Aug 30 13:33 postfix zjtest7-frontend:/usr/local/logstash-2.3.4/config/patterns# cat postfix POSTFIX_QUEUEID [0-9A-F]{10,11} zjtest7-frontend:/usr/local/logstash-2.3.4/config# cat stdin.conf input { stdin { } } filter { grok { patterns_dir => ["./patterns"] match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" } } } output { stdout { codec=>rubydebug{} } } zjtest7-frontend:/usr/local/logstash-2.3.4/config# ../bin/logstash -f stdin.conf Settings: Default pipeline workers: 1 Pipeline main started Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com> { "message" => "Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com>", "@version" => "1", "@timestamp" => "2016-08-30T05:34:11.849Z", "host" => "0.0.0.0", "timestamp" => "Jan 1 06:25:43", "logsource" => "mailserver14", "program" => "postfix/cleanup", "pid" => "21403", "queue_id" => "BEF25A72965", "syslog_message" => "message-id=<20130101142543.5828399CCAF@mailserver14.example.com>" } 简介: 插件支持下面的配置选项: 需要的配置选项: grok { } 细节: add_field 1.值类型是hash 2. 默认值是{} 如果 filter 是成功的,增加任何属性字段到这个事件,Field名字可以动态的和包含event部分使用%{field}. filter { grok { add_field => { "foo_%{somefield}" => "Hello world, from %{host}" } patterns_dir => ["./patterns"] match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" } } } 输出; zjtest7-frontend:/usr/local/logstash-2.3.4/config# ../bin/logstash -f stdin.conf Settings: Default pipeline workers: 1 Pipeline main started Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com> { "message" => "Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com>", "@version" => "1", "@timestamp" => "2016-08-30T05:44:35.071Z", "host" => "0.0.0.0", "timestamp" => "Jan 1 06:25:43", "logsource" => "mailserver14", "program" => "postfix/cleanup", "pid" => "21403", "queue_id" => "BEF25A72965", "syslog_message" => "message-id=<20130101142543.5828399CCAF@mailserver14.example.com>", "foo_%{somefield}" => "Hello world, from 0.0.0.0" } ##你可以一次增加多个字段: filter { grok { add_field => { "foo_%{somefield}" => "Hello world, from %{host}" "new_field" => "new_static_value" } patterns_dir => ["./patterns"] match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" } } } 输出; zjtest7-frontend:/usr/local/logstash-2.3.4/config# ../bin/logstash -f stdin.conf Settings: Default pipeline workers: 1 Pipeline main started Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com> { "message" => "Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com>", "@version" => "1", "@timestamp" => "2016-08-30T05:46:37.029Z", "host" => "0.0.0.0", "timestamp" => "Jan 1 06:25:43", "logsource" => "mailserver14", "program" => "postfix/cleanup", "pid" => "21403", "queue_id" => "BEF25A72965", "syslog_message" => "message-id=<20130101142543.5828399CCAF@mailserver14.example.com>", "foo_%{somefield}" => "Hello world, from 0.0.0.0", "new_field" => "new_static_value" add_tag 1.值类型是array 2.默认是[] 如果filter 成功,增加任意的tags 到这个事件。Tags 可以动态的包含事件的部分使用%{field} syntax. filter { grok { add_tag => [ "foo_%{somefield}" ] } } # You can also add multiple tags at once: filter { grok { add_tag => [ "foo_%{somefield}", "taggedy_tag"] } } zjtest7-frontend:/usr/local/logstash-2.3.4/config# ../bin/logstash -f stdin.conf Settings: Default pipeline workers: 1 Pipeline main started Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com> { "message" => "Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com>", "@version" => "1", "@timestamp" => "2016-08-30T05:50:18.451Z", "host" => "0.0.0.0", "timestamp" => "Jan 1 06:25:43", "logsource" => "mailserver14", "program" => "postfix/cleanup", "pid" => "21403", "queue_id" => "BEF25A72965", "syslog_message" => "message-id=<20130101142543.5828399CCAF@mailserver14.example.com>", "foo_%{somefield}" => "Hello world, from 0.0.0.0", "new_field" => "new_static_value", "tags" => [ [0] "foo_%{somefield}" ] } break_on_match 1.值类型是波尔型 2.默认值是true Break 在第一个匹配,第一次成功匹配通过grok 会导致filter 被完成。如果你需要grok 尝试所有的patterns( 可能解析不同的东西),设置这个为false match: 1.值类型是hash 2.默认是{} filter { grok { match => { "message" => "Duration: %{NUMBER:duration}" } } }