1、配置文件现状
1.1 Flume数据接收端
IP地址:54.0.95.67
功能:接收各个端口发来的数据。
启动方式:进入目录 /usr/local/flume/*bin 在终端运行 ./recei*.sh 启动agent。
该agent的配置文件在 /usr/local/flume/*bin/conf/receiveTest.conf 文件中。
1.2 Flume 数据搜集
目前只配置了16台主机,分别是:1. root@54.0.193.30:22,2. root@54.0.76.192:22,6. root@54.0.105.32:22,7. root@54.0.105.33:22,8. root@54.0.105.35:22 ,13. root@54.0.76.11:22,20. root@54.0.193.56:22 ,21. root@54.0.193.57:22,24. root@54.0.193.55:22 ,36. root@54.0.160.24:22,37. root@54.0.160.25:22 ,54. root@54.0.193.35:22 ,58. root@54.0.193.32:22,以及第9,第10和第11台主机。
功能:监听某个文件(自定),若文件有新增内容,agent.sources立即搜集,经channels传输至相应的sinks中。
启动方式:进入目录 /tmp/flume/*bin 在终端运行 ./cli*.sh 启动相应agent。
该agent的配置文件在 /tmp/flume/*bin/conf/cli*.conf 文件中。
2、所用配置文件介绍
2.1 Flume Sources
2.1.1 拦截器
a1.sources = r1
a1.sources.r1.interceptors = i1 i2 i3
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader = hostname
a1.sources.r1.interceptors.i2.useIP = false
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = datatype
a1.sources.r1.interceptors.i3.value = TYPE_A
(1) TimeStampInterceptor:使用时间戳拦截
type = timestamp
设定timestamp,则会在flume event中添加一个键为timestamp的头,值为当前系统的datetime。设定timestamp的主要作用是当sink为hdfs格式时,可将该时间戳用在文件名或生成的文件的所在目录中。
(2) StaticInterceptor:可以自定义事件的header的value
type = static
用于向每个处理的event插入任意单个的键/值头。默认为保留已存在的具有相同键的头。
也可以理解为自定义标签。
(3) HostInterceptor:使用IP或hostname拦截
type = host
如果未指定hostHeader,则头的键就是host。useIP默认为true,即值使用IP,当设定为false,则value为hostname。preserveExisting属性代表是否覆盖消息头中已经有的IP,默认为false。
2.1.2 选择器 Selector
有两种模式:replicating、multiPlexing。
(1)replicating (默认):复制。把所有event发送到每一个channel中,也反映了sorces与channels可以是一对多的关系。
a1.sources = r1
a1.channels = c1 c2 c3
a1.source.r1.selector.type = replicating
a1.source.r1.channels = c1 c2 c3
a1.source.r1.selector.optional = c3
(2) multiPlexing:分流。这要求指定到一个通道集合的映射事件属性。Selector会在事件头检查每一个配置属性,若匹配该value,则把event发给该channels(这mapping允许每一个value有重复的channel),若都不匹配,则可以发给默认(default)的channels。
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = datetype
a1.sources.r1.selector.mapping.TYPE_A = c1
a1.sources.r1.selector.mapping.TYPE_B= c2 c3
a1.sources.r1.selector.default = c4
2.2 Flume Channels
Memory-channel效率高但可能丢失数据,file-channel安全性高但性能不高。这里用的是memory-channel,但如果把capacity设置的足够小,使内存中的数据尽可能少,在意外重启或断电时丢失的数据就会很少了。
2.3 Flume Sinks
2.3.1 文件命名
当sink的type为hdfs格式时,可以指定文件的前缀与后缀,前提是消息头设置有IP和时间戳。
a1.sinks.k1.hdfs.filePrefix = %{hostname}-%y-%m-%d
a1.sinks.k1.hdfs.fileSuffix = .log
%{hostname} :取消息头中的主机名作为文件名的一部分。
%y-%m-%d :取消息头中的时间戳作为文件名的一部分,此时间格式可根据需要修改。
fileSuffix = .log 文件后缀为.log,当文件还未关闭时,此后缀的后面还有一个.tmp尾巴,通过inUseSuffix将此.tmp(默认)消去:a1.sinks.k1.hdfs.inUseSuffix =
2.3.2 rollInterval、rollSize、rollCount
在54.0.95.67主机中,sink的type为hdfs,rollInterval、rollSize以及rollCount可以很好的控制hdfs文件的大小。
(1) rollInterval 默认为30秒,即每隔30秒建立一个新的文件,此处可以根据需要设置合理的按时间间隔生成一个新文件。
(2) rollSize 默认为1024个字节,当文件数据达到此大小,则产生一个新的文件。经验证,若某个event的传入超过了rollSize,且传入之前未满,则该event依然装入该文件,下一个event则另起一个新文件。
(3) rollCount 默认为10,即一个文件装10个event。
若三个值都设定为0,则hdfs就只有一个文件,里面是变得越来越大的文件。
2.3.3 文件类型 filetype
(1)序列文件 SequenceFile (默认)
通常文件会以换行符确定每一条记录,若数据中含有换行符,使用SequenceFile可以解决问题。
(2)数据流DataStream
只输出未压缩的值。
(3)压缩流 CompressedStream
类似DataStream,但是数据写入时会被压缩。在MapReduce中只有某些压缩格式才可能进行分割。
3、注意点
3.1 JDK版本不兼容问题
JDK用1.6或1.6以上版本。
3.2 对应关系
Sources与channnels可以是一对一或者一对多关系,不可以多对一。
Channels与sinks是一对一关系或者多对一。
3.3 收到多份相同数据
若发现新增一条数据,却可以收到多份相同数据,原因是未正常关闭tail,有多个tail进程在同时跑。
3.4 HDFS目录
当sinks的type为hdfs时,文件目录需要在主机master中建立。
4、flume各组件支持的类型
4.1 flume source 支持的类型
Source类型 |
说明 |
Avro Source |
支持Avro协议(实际上是是Avro RPC) |
Syslog Source |
支持UDP和TCP两种协议 |
HTTP Source |
基于HTTP POST或get方式的数据,支持JSON,BLOB表示形式。 |
Spooling Dir Source |
监控指定目录内数据变更 |
Netcat Source |
监控某个端口 |
Exec Source |
基于Unix的command |
…… |
…… |
4.2 flume channel 支持的类型
Channel类型 |
说明 |
Memory Channel |
Event数据存储在内存中 |
JDBC |
数据存储在持久化存储中,当前channel 内置支持Derby |
File Channel |
数据存储在磁盘文件中 |
…… |
…… |
4.3 flume sink支持的类型
Sink 类型 |
说明 |
HDFS Sink |
数据写入HDFS |
Logger Sink |
数据写入日志文件 |
Avro sink |
数据被转换成Avro Event ,在发送到配置的RPC端口上 |
File Roll Sink |
存储数据到本地文件系统 |
Null Sink |
丢弃掉所有数据 |
Hbase Sink |
数据写入Hbase数据库 |
…… |
…… |