Flume介绍
一 flume基本介绍
Apache flume是一个分布式的、可靠的和可用的用来高效收集、同济和移动大量数据从众多不同sources到一个集中的数据存储库的系统。
Flume event:flume事件的定义为:一组具有字节有效负载和可选择的字符串属性集的数据流。
Flume agent:flume代理是一个通过事件流从外部资源流向下一个目的地(hop)管理组件的JVM进程。
Flume source:接收从外部资源(如web server)传送过来的事件转变成了flume的source。外部资源通过发送让目标flume source可以识别的特定格式的事件给flume。例如,Avro Flume source能够被用来接收从客户端或者其他flume agents从Avro sink获得然后发送的事件流。当flume 资源(source)收到一个事件(event),它负责把事件存储到一个活更多的管道(channel)中。管道(channel)是一个临时仓库存储事件直至它被flume sink消耗掉。File channel(文件管道)是一个支持本地系统的channel。Sink删除channel中的event然后把它放到外部的仓库中,如HDFS(通过HDFS sink)或者把它向前推送到下一个flume agent的flume source流。一个给定的agent中的Source和sink异步运行,同时events阶段性地存在channel里。可以用下图指示流程。
图1 Flume基本数据流模型
二 flume特性
Flume允许一个用户建立多个hop流,亦即events在到达最终目的地前可以流转多个agents。同样地,flume允许扇入流(fan-in)和扇出(fan-out)流,上下文线路和为失败的hops做备份线路(容错)
可靠性
事件阶段性地存在每个agent的channel里。在到达目的地后,事件才被删除。Flume用事务处理方法来保证事件传输的可靠性。Sources和sinks被封装成存储/检索各自的事件中或者由管道提供的事务提供的事务。这样确保事件集可靠地在流中点对点通过。
可恢复性
事件阶段性存在管道,这个管道负责管理失败的恢复。Flume支持持久的文件管道,这个管道也被本地文件系统所支持。但是,flume同样也有一个内存管道(memory channel),这个管道简单地把事件存储在内存队列,这样速度更快,但是事件都留在了内存管道。这样一来,如果agent挂掉,那么事件就不能恢复了。
三 简单的flume配置及运行
安装flume很简单只需要下载flume的tar包,目前的版本是flume-1.3.0,下载完后解压,然后配置下环境变量,环境变量其实就是配置,可以直接执行flume-ng命令,它就在bin目录底下。现在我们可以简单地对配置文件进行配置,然后运行flume。配置文件可以再conf目录底下新建一个,如test-flume.conf。
这样一个简单的配置文件就行了,下面就是运行flume。
这样就在agent端启动了flume命令,通过配置文件我们可以看到我们source配置的是netcat命令,这个命令监听客户端的telnet指令,也就是上面讲的,source获取的数据是要安装格式来,所以要想让source能获取到数据,必须在客户端执行telnet命令传输数据,然后source就能收集到数据,收集完数据通过channel(本例为c1,是memory channel存储),然后sinks一直在监听c1里的数据,一旦发现c1有数据了,那么它会马上按照自己定义的格式及需求从c1获取数据。本例中,sinks类型为logger,也就是用日志的方式记录下来。
通过上面的分析,我们接下来应该这样做,客户端执行telnet命令(本例用的是单节点,打开另一个console窗口执行以下命令)
这时候,在刚执行agent命令的console窗口就会显示如下内容。
看到这个结果我们简单的演示也就成功了,下面我们着重看flume三个“零部件”的一些重要属性。这三个部件分别为sources、channels和sinks。以下示例所用如下默认值。
A1.sources = r1
a1.sinks = s1
a1.channels = c1
Flume模块source配置
Avro Source
Avro source 监听avro端口,接收外部avro客户端流。加粗的属性是必须要指定的,配置文件如何配置参考示例即可,如source的名字为source1。
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
一定要为avro,a1.sources.r1.type = avro |
bind |
– |
ip地址或主机名,客户端会把数据发到此host的avrosources a1.sources.r1.bind = hostname/ip |
port |
– |
绑定主机的端口 a1.sources.r1.port = 9998 |
threads |
– |
指定最大线程数量 |
selector.type |
|
|
selector.* |
|
|
interceptors |
– |
Space separated list of interceptors |
interceptors.* |
|
|
示例a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
Exec Source
Exec source即为可执行unix命令,这个命令定义好的agent端,它会自动监控命令的文件,不需要自己在客户端执行命令。因此要求启动这个命令后,能不断产生数据到标准的输出。如果进程任意原因退出,source也就退出了,不会再产生数据。例如,cat和tail这类命令可以持续产生输出,而date就不行。加粗为要求的属性。
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be exec |
command |
– |
The command to execute |
restartThrottle |
10000 |
下一次重新执行命令的时间(ms) |
restart |
false |
Whether the executed cmd should be restarted if it dies |
logStdErr |
false |
Whether the command’s stderr should be logged |
batchSize |
20 |
同一时间读取和发送到channel的最大行数 |
selector.type |
replicating |
replicating or multiplexing |
selector.* |
|
Depends on the selector.type value |
interceptors |
– |
Space separated list of interceptors |
interceptors.* |
|
|
a1示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure,用这个命令可以模拟0.9.0的tailSource
a1.sources.r1.channels = c1
Spooling Directory Source
This source lets you ingest data by dropping files in a spooling directory on disk. Unlike other asynchronous sources, this source avoids data loss even if Flume is restarted or fails.Flume will watch the directory for new files and read then ingest them as they appear. After a given file has been fully read into the channel, it is renamed to indicate completion. This allows a cleaner process to remove completed files periodically. Note, however, that events may be duplicated if failures occur, consistent with the semantics offered by other Flume components. The channel optionally inserts the full path of the origin file into a header field of each event. This source buffers file data in memory during reads; be sure to set thebufferMaxLineLength option to a number greater than the longest line you expect to see in your input data.
Warning
This channel expects that only immutable, uniquely named files are dropped in the spooling directory. If duplicate names are used, or files are modified while being read, the source will fail with an error message. For some use cases this may require adding unique identifiers (such as a timestamp) to log file names when they are copied into the spooling directory.
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be spooldir |
spoolDir |
– |
The directory where log files will be spooled |
fileSuffix |
.COMPLETED |
Suffix to append to completely ingested files |
fileHeader |
false |
Whether to add a header storing the filename |
fileHeaderKey |
file |
Header key to use when appending filename to header |
batchSize |
10 |
Granularity at which to batch transfer to the channel |
bufferMaxLines |
100 |
Maximum number of lines the commit buffer can hold |
bufferMaxLineLength |
5000 |
Maximum length of a line in the commit buffer |
selector.type |
replicating |
replicating or multiplexing |
selector.* |
|
Depends on the selector.type value |
interceptors |
– |
Space separated list of interceptors |
interceptors.* |
|
|
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /var/log/apache/flumeSpool
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1
NetCat Source
A netcat-like source that listens on a given port and turns each line of text into an event. Acts like nc -k -l [host] [port]. In other words, it opens a specified port and listens for data. The expectation is that the supplied data is newline separated text. Each line of text is turned into a Flume event and sent via the connected channel.
Required properties are in bold.
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be netcat |
bind |
– |
Host name or IP address to bind to |
port |
– |
Port # to bind to |
max-line-length |
512 |
Max line length per event body (in bytes) |
ack-every-event |
true |
Respond with an “OK” for every event received |
selector.type |
replicating |
replicating or multiplexing |
selector.* |
|
Depends on the selector.type value |
interceptors |
– |
Space separated list of interceptors |
interceptors.* |
|
|
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
Sequence Generator Source
A simple sequence generator that continuously generates events with a counter that starts from 0 and increments by 1. Useful mainly for testing. Required properties are in bold.
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be seq |
selector.type |
|
replicating or multiplexing |
selector.* |
replicating |
Depends on the selector.type value |
interceptors |
– |
Space separated list of interceptors |
interceptors.* |
|
|
batchSize |
1 |
|
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1
Flume模块channel配置
Flume Channels
Channels are the repositories where the events are staged on a agent. Source adds the events and Sink removes it.
Memory Channel
The events are stored in a an in-memory queue with configurable max size. It’s ideal for flow that needs higher throughput and prepared to lose the staged data in the event of a agent failures. Required properties are in bold.
Property Name |
Default |
Description |
type |
– |
The component type name, needs to be memory |
capacity |
100 |
The max number of events stored in the channel |
transactionCapacity |
100 |
The max number of events stored in the channel per transaction |
keep-alive |
3 |
Timeout in seconds for adding or removing an event |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
JDBC Channel
The events are stored in a persistent storage that’s backed by a database. The JDBC channel currently supports embedded Derby. This is a durable channel that’s ideal for the flows where recoverability is important. Required properties are in bold.
Property Name |
Default |
Description |
type |
– |
The component type name, needs to be jdbc |
db.type |
DERBY |
Database vendor, needs to be DERBY. |
driver.class |
org.apache.derby.jdbc.EmbeddedDriver |
Class for vendor’s JDBC driver |
driver.url |
(constructed from other properties) |
JDBC connection URL |
db.username |
“sa” |
User id for db connection |
db.password |
– |
password for db connection |
connection.properties.file |
– |
JDBC Connection property file path |
create.schema |
true |
If true, then creates db schema if not there |
create.index |
true |
Create indexes to speed up lookups |
create.foreignkey |
true |
|
transaction.isolation |
“READ_COMMITTED” |
Isolation level for db session READ_UNCOMMITTED, READ_COMMITTED, SERIALIZABLE, REPEATABLE_READ |
maximum.connections |
10 |
Max connections allowed to db |
maximum.capacity |
0 (unlimited) |
Max number of events in the channel |
sysprop.* |
|
DB Vendor specific properties |
sysprop.user.home |
|
Home path to store embedded Derby database |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = jdbc
Recoverable Memory Channel
Warning
The Recoverable Memory Channel has been deprecated in favor of the FileChannel. FileChannel is durable channel and performs better than the Recoverable Memory Channel.
Required properties are in bold.
Property Name |
Default |
Description |
type |
– |
The component type name, needs to beorg.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel |
wal.dataDir |
${user.home}/.flume/recoverable-memory-channel |
|
wal.rollSize |
(0x04000000) |
Max size (in bytes) of a single file before we roll |
wal.minRetentionPeriod |
300000 |
Min amount of time (in millis) to keep a log |
wal.workerInterval |
60000 |
How often (in millis) the background worker checks for old logs |
wal.maxLogsSize |
(0x20000000) |
Total amt (in bytes) of logs to keep, excluding the current log |
capacity |
100 |
|
transactionCapacity |
100 |
|
keep-alive |
3 |
|
File Channel
Required properties are in bold.
Property Name Default |
Description |
|
type |
– |
The component type name, needs to be file. |
checkpointDir |
~/.flume/file-channel/checkpoint |
The directory where checkpoint file will be stored |
dataDirs |
~/.flume/file-channel/data |
The directory where log files will be stored |
transactionCapacity |
1000 |
The maximum size of transaction supported by the channel |
checkpointInterval |
30000 |
Amount of time (in millis) between checkpoints |
maxFileSize |
2146435071 |
Max size (in bytes) of a single log file |
minimumRequiredSpace |
524288000 |
Minimum Required free space (in bytes) |
capacity |
1000000 |
Maximum capacity of the channel |
keep-alive |
3 |
Amount of time (in sec) to wait for a put operation |
write-timeout |
3 |
Amount of time (in sec) to wait for a write operation |
checkpoint-timeout |
600 |
Expert: Amount of time (in sec) to wait for a checkpoint |
use-log-replay-v1 |
false |
Expert: Use old replay logic |
use-fast-replay |
false |
Expert: Replay without using queue |
encryption.activeKey |
– |
Key name used to encrypt new data |
encryption.cipherProvider |
– |
Cipher provider type, supported types: AESCTRNOPADDING |
encryption.keyProvider |
– |
Key provider type, supported types: JCEKSFILE |
encryption.keyProvider.keyStoreFile |
– |
Path to the keystore file |
encrpytion.keyProvider.keyStorePasswordFile |
– |
Path to the keystore password file |
encryption.keyProvider.keys |
– |
List of all keys (e.g. history of the activeKey setting) |
encyption.keyProvider.keys.*.passwordFile |
– |
Path to the optional key password file |
Note
By default the File Channel uses paths for checkpoint and data directories that are within the user home as specified above. As a result if you have more than one File Channel instances active within the agent, only one will be able to lock the directories and cause the other channel initialization to fail. It is therefore necessary that you provide explicit paths to all the configured channels, preferably on different disks. Furthermore, as file channel will sync to disk after every commit, coupling it with a sink/source that batches events together may be necessary to provide good performance where multiple disks are not available for checkpoint and data directories.
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
Encryption
Below is a few sample configurations:
Generating a key with a password seperate from the key store password:
keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
-keysize 128 -validity 9000 -keystore test.keystore \
-storetype jceks -storepass keyStorePassword
Generating a key with the password the same as the key store password:
keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
-keystore src/test/resources/test.keystore -storetype jceks \
-storepass keyStorePassword
a1.channels.c1.encryption.activeKey = key-0
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = key-provider-0
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0
Let’s say you have aged key-0 out and new files should be encrypted with key-1:
a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
The same scenerio as above, however key-0 has it’s own password:
a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password
Flume模块sink配置
HDFS Sink
This sink writes events into the Hadoop Distributed File System (HDFS). It currently supports creating text and sequence files. It supports compression in both file types. The files can be rolled (close current file and create a new one) periodically based on the elapsed time or size of data or number of events. It also buckets/partitions data by attributes like timestamp or machine where the event originated. The HDFS directory path may contain formatting escape sequences that will replaced by the HDFS sink to generate a directory/file name to store the events. Using this sink requires hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS cluster. Note that a version of Hadoop that supports the sync() call is required.
The following are the escape sequences supported:
Alias |
Description |
%{host} |
Substitute value of event header named “host”. Arbitrary header names are supported. |
%t |
Unix time in milliseconds |
%a |
locale’s short weekday name (Mon, Tue, ...) |
%A |
locale’s full weekday name (Monday, Tuesday, ...) |
%b |
locale’s short month name (Jan, Feb, ...) |
%B |
locale’s long month name (January, February, ...) |
%c |
locale’s date and time (Thu Mar 3 23:05:25 2005) |
%d |
day of month (01) |
%D |
date; same as %m/%d/%y |
%H |
hour (00..23) |
%I |
hour (01..12) |
%j |
day of year (001..366) |
%k |
hour ( 0..23) |
%m |
month (01..12) |
%M |
minute (00..59) |
%p |
locale’s equivalent of am or pm |
%s |
seconds since 1970-01-01 00:00:00 UTC |
%S |
second (00..59) |
%y |
last two digits of year (00..99) |
%Y |
year (2010) |
%z |
+hhmm numeric timezone (for example, -0400) |
The file in use will have the name mangled to include ”.tmp” at the end. Once the file is closed, this extension is removed. This allows excluding partially complete files in the directory. Required properties are in bold.
Note
For all of the time related escape sequences, a header with the key “timestamp” must exist among the headers of the event. One way to add this automatically is to use the TimestampInterceptor.
Name |
Default |
Description |
channel |
– |
|
type |
– |
The component type name, needs to be hdfs |
hdfs.path |
– |
HDFS directory path (eg hdfs://namenode/flume/webdata/) |
hdfs.filePrefix |
FlumeData |
Name prefixed to files created by Flume in hdfs directory |
hdfs.fileSuffix |
– |
Suffix to append to file (eg .avro - NOTE: period is not automatically added) |
hdfs.rollInterval |
30 |
Number of seconds to wait before rolling current file (0 = never roll based on time interval) |
hdfs.rollSize |
1024 |
File size to trigger roll, in bytes (0: never roll based on file size) |
hdfs.rollCount |
10 |
Number of events written to file before it rolled (0 = never roll based on number of events) |
hdfs.idleTimeout |
0 |
Timeout after which inactive files get closed (0 = disable automatic closing of idle files) |
hdfs.batchSize |
100 |
number of events written to file before it is flushed to HDFS |
hdfs.codeC |
– |
Compression codec. one of following : gzip, bzip2, lzo, snappy |
hdfs.fileType |
SequenceFile |
File format: currently SequenceFile, DataStream or CompressedStream(1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC |
hdfs.maxOpenFiles |
5000 |
Allow only this number of open files. If this number is exceeded, the oldest file is closed. |
hdfs.writeFormat |
– |
“Text” or “Writable” |
hdfs.callTimeout |
10000 |
Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring. |
hdfs.threadsPoolSize |
10 |
Number of threads per HDFS sink for HDFS IO ops (open, write, etc.) |
hdfs.rollTimerPoolSize |
1 |
Number of threads per HDFS sink for scheduling timed file rolling |
hdfs.kerberosPrincipal |
– |
Kerberos user principal for accessing secure HDFS |
hdfs.kerberosKeytab |
– |
Kerberos keytab for accessing secure HDFS |
hdfs.proxyUser |
|
|
hdfs.round |
false |
Should the timestamp be rounded down (if true, affects all time based escape sequences except %t) |
hdfs.roundValue |
1 |
Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time. |
hdfs.roundUnit |
second |
The unit of the round down value - second, minute or hour. |
hdfs.timeZone |
Local Time |
Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles. |
serializer |
TEXT |
Other possible options include avro_event or the fully-qualified class name of an implementation of theEventSerializer.Builder interface. |
serializer.* |
|
|
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become /flume/events/2012-06-12/1150/00.
Logger Sink
Logs event at INFO level. Typically useful for testing/debugging purpose. Required properties are in bold.
Property Name |
Default |
Description |
channel |
– |
|
type |
– |
The component type name, needs to be logger |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
Avro Sink
This sink forms one half of Flume’s tiered collection support. Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair. The events are taken from the configured Channel in batches of the configured batch size. Required properties are inbold.
Property Name |
Default |
Description |
channel |
– |
|
type |
– |
The component type name, needs to be avro. |
hostname |
– |
The hostname or IP address to bind to. |
port |
– |
The port # to listen on. |
batch-size |
100 |
number of event to batch together for send. |
connect-timeout |
20000 |
Amount of time (ms) to allow for the first (handshake) request. |
request-timeout |
20000 |
Amount of time (ms) to allow for requests after the first. |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
File Roll Sink
Stores events on the local filesystem. Required properties are in bold.
Property Name |
Default |
Description |
channel |
– |
|
type |
– |
The component type name, needs to be file_roll. |
sink.directory |
– |
The directory where files will be stored |
sink.rollInterval |
30 |
Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file. |
sink.serializer |
TEXT |
Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface. |
batchSize |
100 |
|
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
原文出处:http://www.cnblogs.com/caoyuanzhanlang
草原战狼淘宝小店:http://xarxf.taobao.com/ 淘宝搜小矮人鞋坊,主营精致美丽时尚女鞋,为您的白雪公主挑一双哦。
==========================================================================================================
=================================== 以上分析仅代表个人观点,欢迎指正与交流 ===================================
=================================== 尊重劳动成果,转载请注明出处,万分感谢 ===================================
==========================================================================================================