title: Flume笔记
记录Flume的基本搭建和配置
-
flume 安装
-
将下载的flume包,解压到/home/xxx目录中
-
将软件中的template文件重命名或者拷贝重命名,去掉template,然后flume-env.sh 配置文件,主要是JAVA_HOME变量设置
样例1:监控一个文件,实时采集新增的数据输出到控制台
$ tail -F Agent选型 exec source + memory channel + logger sink
- 配置实现
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/briup/log/test.log
# 命令从-c后的字符串读取
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 运行测试
其中参数:-c/conf 指config文件的目录 –f/-conf-file 是本agent的配置文件 –n/name是指定agent的名称
flume-ng agent -c apache-flume-1.9.0-bin/conf/ -f apache-flume-1.9.0-bin/conf/log.flm -n a1 -Dflume.root.logger=INFO,console
-Dflume.root.logger=INFO,console 在控制台输出执行信息
案例2:Spool
Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:
-
拷贝到spool目录下的文件不可以再打开编辑。
-
spool目录下不可包含相应的子目录
-
创建agent配置文件
在flume目录下创建配置文件: conf/spool.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir =/home/briup/flume_test
a1.sources.r1.fileHeader = true
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume-ng agent -c ~/apache-flume-1.9.0-bin/conf/ -f ~/apache-flume-1.9.0-bin/conf/spool.flm -n a1 -Dflume.root.logger=INFO,console
样例3:从指定网络端口采集单行数据 输出到控制台
agent netcat + memory + logger
netcat source监听一个给定的端口,然后把text文件的每一行转换成一个event。
配置实现:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动agent
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template -Dflume.root.logger=INFO,console
使用telnet进行测试
telnet localhost 44444
案例4:Syslogtcp
Syslogtcp监听TCP的端口做为数据源
UDP source以整条消息作为一个简单event。TCP source以新一行”n“分割的字符串作为一个新的event
a. 创建agent配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
b. 启动flume agent a1
flume-ng agent -c . -f ./syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console
c. 测试产生syslog
echo "hello briup.com" | nc localhost 5140
```
样例5:将A端服务器日志实时采集到B端服务器
技术选型
```bash
exec source + memory channel + avro sink
avro source + memory channel + logger sink
代码实现
A端服务器
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /home/briup/log/test.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = localhost
exec-memory-avro.sinks.avro-sink.port = 44444
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
B端服务器
avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel
avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind = localhost
avro-memory-logger.sources.avro-source.port = 44444
avro-memory-logger.sinks.logger-sink.type = logger
avro-memory-logger.channels.memory-channel.type = memory
avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel
$ flume-ng agent -c ~/apache-flume-1.9.0-bin/conf/ -f ~/apache-flume-1.9.0-bin/conf/a.flm -n exec-memory-avro -Dflume.root.logger=INFO,console
$ flume-ng agent -c ~/apache-flume-1.9.0-bin/conf/ -f ~/apache-flume-1.9.0-bin/conf/b.flm -n avro-memory-logger -Dflume.root.logger=INFO,console
案例6:Hadoop sink
注意:此操作之前先将hadoop的依赖的jar包(htrace-core-3.0.4.jar, commons-configuration-1.6.jar, commons-configuration-1.6.jar, hadoop-hdfs-2.6.0.jar等,具体可根据相关的异常判断)拷贝到flume的lib目录下
a. 创建agent配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://172.16.0.4:9000/user/zhaojing/syslogtcp-%y-%m-%d
a1.sinks.k1.hdfs.filePrefix = Syslog
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
b. 启动flume agent a1
flume-ng agent -c . -f . /hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console
c. 测试产生syslog
echo "hello briupData flume -> hadoop testing one" | nc localhost 5140
d. 在server1上再打开一个窗口,去hadoop上检查文件是否生成
hadoop fs -ls /user/zhaojing/syslogtcp
案例7:JSONHandler
a. 创建agent配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 8888
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
b. 启动flume agent a1
flume-ng agent -c . -f ./josn.flm -n a1 -Dflume.root.logger=INFO,console