Fluentd+Kafka
Fluentd
What is Fluentd?
Fluentd is an open source data collector for unified logging layer.
Unified Logging Layer
Fluentd decouples data sources from backend systems by providing a unified logging layer in between.LINK
Simple yet Flexible
Fluentd's 300+ plugins connect it to many data sources and data outputs while keeping its core small and fast.
List of Data Outputs
List of Data Outputs
INSTALL
#ububtu 16.04
curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh
/etc/init.d/td-agent status
/var/log/td-agentd.log
CONFIGURE
fluentd提供一个管理后台,需要手动启动
nohup td-agentd-ui start
http://127.0.0.1:9292 user:admin passwd:changeme
INSTALL OUPUT PLUGINS
apt install gem2deb
td-agent-gem install fluent-plugin-kafka
td-agent-gem install fluent-plugin-webhdfs
td-agent-gem install fluent-plugin-influxdb
Kafka
INSTALL
安装jdk
wget http://120.52.72.23/download.oracle.com/c3pr90ntc0td/otn-pub/java/jdk/8u91-b14/jdk-8u91-linux-x64.tar.gz
/etc/profile
export JAVA_HOME=/247/ad
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$
source /etc/profile
Binary Download
wget http://apache.fayea.com/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
CONFIGURE
#server.properties
broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/ads247admin/kafka_2.9.2-0.8.2.2/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
SetUp
启动zk
bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafka
bin/kafka-server-start.sh config/server.properties
创建 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic syslog-topic
查看topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看kafka中数据是否进入
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic syslog-topic
测试过四个方案
方案1:fluentd+influxdb
方案2:fluentd+mongodb
方案3:fluentd+webhdfs
方案4:fluentd+kafka
通过脚本写json格式日志进入/var/log/20160608.log(chmod 645),然后通过fluentd收集写入kafka
<source>
type tail
path /var/log/20160608.log
tag phplog.kafka
format json
time_key time
pos_file /tmp/fluentd--1465375453.pos
</source>
<match phplog.kafka>
@type kafka
brokers localhost:9092 ##new version kafaka donnt need this?
zookeeper localhost:2181
default_topic syslog-topic
</match>
匹配写入mongodb
<match mongo.nginx>
type mongo
host 127.0.0.1
port 27017
database zhw
collection mongo.nginx
capped
capped_size 100m
user root
password abc123456
</match>
设置匹配转发,匹配到tag:login.log转发到10.4.0.6:24225
<match login.log>
type forward
heartbeat_type udp
<server>
name n-app247-te-04
host 10.4.0.6
port 24225
</server>
<secondary>
type file
path /var/log/td-agent/error
</secondary>
</match>
loger.php生成日志,直接发送到fluent server:24224存储到Haddop-HDFS
<source>
type forward
bind 0.0.0.0
port 24224
linger_timeout 0
log_level info
</source>
<match login.log>
@type webhdfs
host 172.31.22.245
port 50070
path /ad/login.log
flush_interval 10s
</match>
匹配nginx日志写入influxdb
<source>
type tail
path /var/log/nginx/access.log
tag access.nginx
format nginx
time_format %d/%b/%Y:%H:%M:%S
pos_file /tmp/fluentd--1464852502.pos
</source>
<match access.nginx>
type influxdb
dbname zhw
flush_interval 10s
host 127.0.0.1
port 8086
user root
abc123456
</match>
Created by ZhangWei @2016-07-08
ONLY 内部文档