在实际应用中,Logstash进程会被氛围两个不同的角色。
运行在应用服务器上的尽量减轻运行压力,只做读取和转发,这个角色叫做shipper
运行在独立的服务器上完成数据解析处理,负责写入到Elasticsearch的角色,叫做Indexer
5.1.1 读取redis 数据:
zjtest7-frontend:/usr/local/logstash-2.3.4/config# ../bin/logstash -f redis.comf
Settings: Default pipeline workers: 1
Pipeline main started
{
"message" => "Hello world",
"tags" => [
[0] "_jsonparsefailure"
],
"@version" => "1",
"@timestamp" => "2016-08-19T06:26:12.854Z"
}
zjtest7-frontend:/usr/local/logstash-2.3.4/config# cat redis.comf
input {
redis {
data_type =>"pattern_channel"
key =>"logstash-*"
host=>"192.168.32.67"
port=>6379
password => "1234567"
}
}
output {
stdout {
codec=>rubydebug{}
}
}
采用list类型扩展Logstash:
127.0.0.1:6379> PUBLISH logstash-list "Hello xxxxxx"
(integer) 2
127.0.0.1:6379> PUBLISH logstash-list "Hello yyyyy"
zjtest7-frontend:/usr/local/logstash-2.3.4/config# ../bin/logstash -f redis.comf
Settings: Default pipeline workers: 1
Pipeline main started
{
"message" => "Hello xxxxxx",
"tags" => [
[0] "_jsonparsefailure"
],
"@version" => "1",
"@timestamp" => "2016-08-19T07:46:27.031Z"
}
{
"message" => "Hello yyyyy",
"tags" => [
[0] "_jsonparsefailure"
],
"@version" => "1",
"@timestamp" => "2016-08-19T07:46:37.365Z"
zjtest7-frontend:/usr/local/logstash-2.3.4/config# ../bin/logstash -f redis.comf
Settings: Default pipeline workers: 1
Pipeline main started
{
"message" => "Hello xxxxxx",
"tags" => [
[0] "_jsonparsefailure"
],
"@version" => "1",
"@timestamp" => "2016-08-19T07:46:26.964Z"
}
{
"message" => "Hello yyyyy",
"tags" => [
[0] "_jsonparsefailure"
],
"@version" => "1",
"@timestamp" => "2016-08-19T07:46:37.362Z"
}
zjtest7-frontend:/usr/local/logstash-2.3.4/config# cat redis.comf
input {
redis {
data_type =>"pattern_channel"
key =>"logstash-list"
host=>"192.168.32.67"
port=>6379
password => "1234567"
}
}
output {
stdout {
codec=>rubydebug{}
}
}
两个终端同时启动logstash -f redis.conf 进程,结果会是两个终端都输出消息。
这个时候,就需要用list 类型,在这种类型中,数据输入到Redis 服务器上暂存,Logstash 则连上Redis 服务器取走(BLPOP命令,所以只要logstash不堵塞,redis 服务器上也不会
有数据堆积占用空间)数据。
1.配置示例:
zjtest7-frontend:/usr/local/logstash-2.3.4/config# cat redis.comf
input {
redis {
data_type =>"list"
key =>"logstash-list"
host=>"192.168.32.67"
port=>6379
password => "1234567"
}
}
output {
stdout {
codec=>rubydebug{}
}
}
这时候可以看到, 只有一个终端输出了结果
连续RPUSH几次, 可以看到两个终端近乎各自输出一半条目。
3.批量推送:
RPUSH 支持batch 方式,修改Logstash 配置中的batch_count值,
5.1.3 输出到Redis:
1.配置示例
zjtest7-frontend:/usr/local/logstash-2.3.4/config# cat inputredis.conf
input {stdin {} }
output {
redis {
data_type =>"channel"
key=>"logstash-chan-%{+yyyy.MM.dd}"
host=>"192.168.32.67"
port=>6379
password => "1234567"
}
}
127.0.0.1:6379> SUBSCRIBE logstash-chan-2016.08.19
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "logstash-chan-2016.08.19"
3) (integer) 1
1) "message"
2) "logstash-chan-2016.08.19"
3) "{"message":"","@version":"1","@timestamp":"2016-08-19T08:27:07.190Z","host":"0.0.0.0"}"
1) "message"
2) "logstash-chan-2016.08.19"
3) "{"message":"","@version":"1","@timestamp":"2016-08-19T08:27:07.736Z","host":"0.0.0.0"}"
1) "message"
2) "logstash-chan-2016.08.19"
3) "{"message":"","@version":"1","@timestamp":"2016-08-19T08:27:07.772Z","host":"0.0.0.0"}"
1) "message"
2) "logstash-chan-2016.08.19"
3) "{"message":"","@version":"1","@timestamp":"2016-08-19T08:27:07.808Z","host":"0.0.0.0"}"
1) "message"
2) "logstash-chan-2016.08.19"
3) "{"message":"","@version":"1","@timestamp":"2016-08-19T08:27:07.844Z","host":"0.0.0.0"}"
1) "message"
2) "logstash-chan-2016.08.19"
3) "{"message":"","@version":"1","@timestamp":"2016-08-19T08:27:07.880Z","host":"0.0.0.0"}"
1) "message"
2) "logstash-chan-2016.08.19"
3) "{"message":"","@version":"1","@timestamp":"2016-08-19T08:27:07.916Z","host":"0.0.0.0"}"
1) "message"
2) "logstash-chan-2016.08.19"
3) "{"message":"hello world","@version":"1","@timestamp":"2016-08-19T08:27:10.486Z","host":"0.0.0.0"}"
1) "message"
2) "logstash-chan-2016.08.19"
3) "{"message":"what'sup scab","@version":"1","@timestamp":"2016-08-19T08:27:37.919Z","host":"0.0.0.0"}"