一 logstash收集日志写入kafka
1.1.1 编写logstash配置文件
[root@localhost ~]# cat /etc/logstash/conf.d/nginx-kafka.conf input { file { path => "/opt/vhosts/fatai/logs/access_json.log" start_position => "beginning" type => "nginx-accesslog" codec => "json" stat_interval => "2" } } output { kafka { bootstrap_servers => "" topic_id => 'nginx-access-kafkaceshi' codec => "json" } }
1.1.2 验证并重启logstash
[root@localhost ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/nginx-kafka.conf -t WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console Configuration OK [root@localhost ~]# systemctl restart logstash.service
1.1.3 kafka端验证主题
[root@DNS-Server tools]# /tools/kafka/bin/kafka-topics.sh --list --zookeeper,, nginx-access-kafkaceshi
二 logstash收集kafka日志并写入elk
1.1.1 编写logstash配置文件
[root@Docker ~]# cat /etc/logstash/conf.d/nginx_kafka.conf input { kafka { bootstrap_servers => "" #kafka地址 topics => "nginx-access-kafkaceshi" #定义主题 group_id => "nginx-access-kafkaceshi" #自定义 codec => "json" #指定编码 consumer_threads => 1 #消费者线程 decorate_events => true #要不要加kafka标记 } } output { if [type] == "nginx-accesslog"{ #type 是收集时候logstash定义的 elasticsearch { hosts => [""] index=> "nginx-accesslog-kafka-test-%{+YYYY.MM.dd}" } } }
1.1.2 检测并重启
[root@Docker ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/nginx_kafka.conf -t WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console Configuration OK [root@Docker ~]# systemctl restart logstash.service
1.1.3 elasticsearch验证