翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guide
篇幅限制,分为以下5篇:
【翻译】Flume 1.8.0 User Guide(用户指南)
【翻译】Flume 1.8.0 User Guide(用户指南) source
【翻译】Flume 1.8.0 User Guide(用户指南) Sink
【翻译】Flume 1.8.0 User Guide(用户指南) Channel
【翻译】Flume 1.8.0 User Guide(用户指南) Processors
Flume Sinks
1、HDFS Sink
这个sink 将事件写入Hadoop分布式文件系统(HDFS)。它目前支持创建文本和序列文件。它支持两种文件类型的压缩。可以根据运行时间、数据大小或事件数量定期滚动文件(关闭当前文件并创建一个新文件)。它还通过属性(如时间戳或事件起源的机器)存储/分区数据。HDFS目录路径可能包含格式化转义序列,该序列将被HDFS sink替换,以生成用于存储事件的目录/文件名。使用这个sink需要安装hadoop,这样Flume就可以使用hadoop jar与HDFS集群通信。请注意,需要一个支持sync()调用的Hadoop版本。
以下是支持的转义序列:
Alias | Description |
---|---|
%{host} | Substitute value of event header named “host”. Arbitrary header names are supported. |
%t | Unix time in milliseconds |
%a | locale’s short weekday name (Mon, Tue, ...) |
%A | locale’s full weekday name (Monday, Tuesday, ...) |
%b | locale’s short month name (Jan, Feb, ...) |
%B | locale’s long month name (January, February, ...) |
%c | locale’s date and time (Thu Mar 3 23:05:25 2005) |
%d | day of month (01) |
%e | day of month without padding (1) |
%D | date; same as %m/%d/%y |
%H | hour (00..23) |
%I | hour (01..12) |
%j | day of year (001..366) |
%k | hour ( 0..23) |
%m | month (01..12) |
%n | month without padding (1..12) |
%M | minute (00..59) |
%p | locale’s equivalent of am or pm |
%s | seconds since 1970-01-01 00:00:00 UTC |
%S | second (00..59) |
%y | last two digits of year (00..99) |
%Y | year (2010) |
%z | +hhmm numeric timezone (for example, -0400) |
%[localhost] | Substitute the hostname of the host where the agent is running |
%[IP] | Substitute the IP address of the host where the agent is running |
%[FQDN] | Substitute the canonical hostname of the host where the agent is running |
注意:转义字符串%[localhost]、%[IP]和%[FQDN]都依赖于Java获取主机名的能力,这在某些网络环境中可能会失败。
正在使用的文件的名称将被打乱最后是”.tmp“。一旦文件被关闭,这个扩展名将被删除。这允许在目录中排除部分完成的文件。必须属性以粗体显示。
注意,对于所有与时间相关的转义序列,带有键“timestamp”的消息头必须存在于事件的消息头中(除非是hdfs)。useLocalTimeStamp设置为true)。自动添加的一种方法是使用TimestampInterceptor。
Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be hdfs |
hdfs.path | – | HDFS directory path (eg hdfs://namenode/flume/webdata/) |
hdfs.filePrefix | FlumeData | Name prefixed to files created by Flume in hdfs directory |
hdfs.fileSuffix | – | Suffix to append to file (eg .avro - NOTE: period is not automatically added) |
hdfs.inUsePrefix | – | Prefix that is used for temporal files that flume actively writes into |
hdfs.inUseSuffix | .tmp | Suffix that is used for temporal files that flume actively writes into |
hdfs.rollInterval | 30 | Number of seconds to wait before rolling current file (0 = never roll based on time interval) |
hdfs.rollSize | 1024 | File size to trigger roll, in bytes (0: never roll based on file size) |
hdfs.rollCount | 10 | Number of events written to file before it rolled (0 = never roll based on number of events) |
hdfs.idleTimeout | 0 | Timeout after which inactive files get closed (0 = disable automatic closing of idle files) |
hdfs.batchSize | 100 | number of events written to file before it is flushed to HDFS |
hdfs.codeC | – | Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy |
hdfs.fileType | SequenceFile |
File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC |
hdfs.maxOpenFiles | 5000 | Allow only this number of open files. If this number is exceeded, the oldest file is closed. |
hdfs.minBlockReplicas | – |
Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath. |
hdfs.writeFormat | Writable |
Format for sequence file records. One of Text or Writable. Set to Text before creating data files with Flume, otherwise those files cannot be read by either Apache Impala (incubating) or Apache Hive. |
hdfs.callTimeout | 10000 |
Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring. |
hdfs.threadsPoolSize | 10 | Number of threads per HDFS sink for HDFS IO ops (open, write, etc.) |
hdfs.rollTimerPoolSize | 1 | Number of threads per HDFS sink for scheduling timed file rolling |
hdfs.kerberosPrincipal | – | Kerberos user principal for accessing secure HDFS |
hdfs.kerberosKeytab | – | Kerberos keytab for accessing secure HDFS |
hdfs.proxyUser | ||
hdfs.round | false | Should the timestamp be rounded down (if true, affects all time based escape sequences except %t) |
hdfs.roundValue | 1 | Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time. |
hdfs.roundUnit | second | The unit of the round down value - second, minute or hour. |
hdfs.timeZone | Local Time | Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles. |
hdfs.useLocalTimeStamp | false | Use the local time (instead of the timestamp from the event header) while replacing the escape sequences. |
hdfs.closeTries | 0 |
Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart. |
hdfs.retryInterval | 180 |
Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension. |
serializer | TEXT |
Other possible options include avro_event or the fully-qualified class name of an implementation of theEventSerializer.Builder interface. |
serializer.* |
agent a1 示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute
上面的配置将把时间戳四舍五入到最后10分钟。例如,时间戳为2012年6月12日上午11:54:34的事件将导致hdfs路径变为/flume/events/2012-06-12/1150/00。
2. hive sink
此接收器将包含分隔文本或JSON数据的事件直接汇入Hive表或分区。事件是使用Hive事务编写的。一旦一组事件提交给Hive,它们就会立即对Hive查询可见。可以预先创建flume要写到的分区,也可以选择,分区不存在的时候,由flume创建分区。来自传入事件数据的字段映射到Hive表中的相应列。
Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be hive |
hive.metastore | – | Hive metastore URI (eg thrift://a.b.com:9083 ) |
hive.database | – | Hive database name |
hive.table | – | Hive table name |
hive.partition | – |
Comma separate list of partition values identifying the partition to write to. May contain escape sequences. E.g: If the table is partitioned by (continent: string, country :string, time : string) then ‘Asia,India,2014-02-26-01-21’ will indicate continent=Asia,country=India,time=2014-02-26-01-21 |
hive.txnsPerBatchAsk | 100 |
Hive grants a batch of transactions instead of single transactions to streaming clients like Flume. This setting configures the number of desired transactions per Transaction Batch. Data from all transactions in a single batch end up in a single file. Flume will write a maximum of batchSize events in each transaction in the batch. This setting in conjunction with batchSize provides control over the size of each file. Note that eventually Hive will transparently compact these files into larger files. |
heartBeatInterval | 240 |
(In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats. |
autoCreatePartitions | true | Flume will automatically create the necessary Hive partitions to stream to |
batchSize | 15000 | Max number of events written to Hive in a single Hive transaction |
maxOpenConnections | 500 | Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed. |
callTimeout | 10000 | (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort. |
serializer |
Serializer is responsible for parsing out field from the event and mapping them to columns in the hive table. Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON |
|
roundUnit | minute | The unit of the round down value - second, minute or hour. |
roundValue | 1 | Rounded down to the highest multiple of this (in the unit configured using hive.roundUnit), less than current time |
timeZone | Local Time | Name of the timezone that should be used for resolving the escape sequences in partition, e.g. America/Los_Angeles. |
useLocalTimeStamp | false | Use the local time (instead of the timestamp from the event header) while replacing the escape sequences. |
Hive sink提供以下序列化器:
JSON:处理UTF8编码的JSON(严格语法)事件,不需要配置。JSON中的对象名称直接映射到Hive表中具有相同名称的列。在内部使用org.apache.hive.hcatalog.data.JsonSerDe ,但是它独立于Hive表。这个序列化器需要安装HCatalog。
DELIMITED:处理简单的分隔文本事件。内部使用LazySimpleSerde,但独立于Hive表的Serde。
Name | Default | Description |
---|---|---|
serializer.delimiter | , | (Type: string) The field delimiter in the incoming data. To use special characters, surround them with double quotes like “ ” |
serializer.fieldnames | – |
The mapping from input fields to columns in hive table. Specified as a comma separated list (no spaces) of hive table columns names, identifying the input fields in order of their occurrence. To skip fields leave the column name unspecified. Eg. ‘time,,ip,message’ indicates the 1st, 3rd and 4th fields in input map to time, ip and message columns in the hive table. |
serializer.serdeSeparator | Ctrl-A |
(Type: character) Customizes the separator used by underlying serde. There can be a gain in efficiency if the fields in serializer.fieldnames are in same order as table columns, the serializer.delimiter is same as the serializer.serdeSeparator and number of fields in serializer.fieldnames is less than or equal to number of table columns, as the fields in incoming event body do not need to be reordered to match order of table columns. Use single quotes for special characters like ‘ ’. Ensure input fields do not contain this character. NOTE: If serializer.delimiter is a single character, preferably set this to the same character |
以下是支持的转义序列:
Alias | Description |
---|---|
%{host} | Substitute value of event header named “host”. Arbitrary header names are supported. |
%t | Unix time in milliseconds |
%a | locale’s short weekday name (Mon, Tue, ...) |
%A | locale’s full weekday name (Monday, Tuesday, ...) |
%b | locale’s short month name (Jan, Feb, ...) |
%B | locale’s long month name (January, February, ...) |
%c | locale’s date and time (Thu Mar 3 23:05:25 2005) |
%d | day of month (01) |
%D | date; same as %m/%d/%y |
%H | hour (00..23) |
%I | hour (01..12) |
%j | day of year (001..366) |
%k | hour ( 0..23) |
%m | month (01..12) |
%M | minute (00..59) |
%p | locale’s equivalent of am or pm |
%s | seconds since 1970-01-01 00:00:00 UTC |
%S | second (00..59) |
%y | last two digits of year (00..99) |
%Y | year (2010) |
%z | +hhmm numeric timezone (for example, -0400) |
注意,对于所有与时间相关的转义序列,带有键“timestamp”的消息头必须存在于事件的消息头中(除非useLocalTimeStamp设置为true)。自动添加的一种方法是使用TimestampInterceptor。
hive table 示例:
create table weblogs ( id int , msg string ) partitioned by (continent string, country string, time string) clustered by (id) into 5 buckets stored as orc;
agent a1 示例:
a1.channels = c1 a1.channels.c1.type = memory a1.sinks = k1 a1.sinks.k1.type = hive a1.sinks.k1.channel = c1 a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083 a1.sinks.k1.hive.database = logsdb a1.sinks.k1.hive.table = weblogs a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M a1.sinks.k1.useLocalTimeStamp = false a1.sinks.k1.round = true a1.sinks.k1.roundValue = 10 a1.sinks.k1.roundUnit = minute a1.sinks.k1.serializer = DELIMITED a1.sinks.k1.serializer.delimiter = " " a1.sinks.k1.serializer.serdeSeparator = ' ' a1.sinks.k1.serializer.fieldnames =id,,msg
上面的配置将把时间戳四舍五入到最后10分钟。例如,将时间戳标头设置为2012年6月12日上午11:54:34,将“国家”标头设置为“印度”的事件将计算分区(大陆=“亚洲”,国家=“印度”,时间=“2012-06-12-11-50”)。序列化器被配置为接受包含三个字段的制表符分隔的输入,并跳过第二个字段。
3. Logger Sink
在INFO级别记录事件。通常用于测试/调试目的。必须属性以粗体显示。此sink是惟一不需要在日志原始数据部分中解释的额外配置的异常。
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be logger |
maxBytesToLog | 16 | Maximum number of bytes of the Event body to log |
agent a1 示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1
4. Avro Sink
这个sink构成了Flume分层收集支持的一半。发送到此sink的Flume事件被转换为Avro事件并发送到配置的主机名/端口对。事件以配置的批大小的批次从配置的Channel中获取。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be avro. |
hostname | – | The hostname or IP address to bind to. |
port | – | The port # to listen on. |
batch-size | 100 | number of event to batch together for send. |
connect-timeout | 20000 | Amount of time (ms) to allow for the first (handshake) request. |
request-timeout | 20000 | Amount of time (ms) to allow for requests after the first. |
reset-connection-interval | none |
Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent. |
compression-type | none | This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource |
compression-level | 6 |
The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression |
ssl | false |
Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”. |
trust-all-certs | false |
If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and “listen in” on the encrypted connection. |
truststore | – |
The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used. |
truststore-password | – | The password for the specified truststore. |
truststore-type | JKS | The type of the Java truststore. This can be “JKS” or other supported Java truststore type. |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
maxIoWorkers | 2 * the number of available processors in the machine | The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory. |
agent a1 示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 10.10.10.10 a1.sinks.k1.port = 4545
5. Thrift Sink
这个接收器构成了Flume分层收集支持的一半。发送到此接收器的Flume事件被转换为Thrift事件并发送到配置的主机名/端口对。事件以配置的批大小的批次从配置的Channel中获取。
通过启用kerberos身份验证,可以将Thrift sink配置为以安全模式启动。要在安全模式下与Thrift源通信,Thrift sink也应该在安全模式下运行。客户机-主体和客户机-keytab是节俭接收器用于对kerberos KDC进行身份验证的属性。服务器主体表示此sink配置为以安全模式连接的Thrift源的主体。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be thrift. |
hostname | – | The hostname or IP address to bind to. |
port | – | The port # to listen on. |
batch-size | 100 | number of event to batch together for send. |
connect-timeout | 20000 | Amount of time (ms) to allow for the first (handshake) request. |
request-timeout | 20000 | Amount of time (ms) to allow for requests after the first. |
connection-reset-interval | none |
Amount of time (s) before the connection to the next hop is reset. This will force the Thrift Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent. |
ssl | false | Set to true to enable SSL for this ThriftSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password” and “truststore-type” |
truststore | – |
The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Thrift Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used. |
truststore-password | – | The password for the specified truststore. |
truststore-type | JKS | The type of the Java truststore. This can be “JKS” or other supported Java truststore type. |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude |
kerberos | false |
Set to true to enable kerberos authentication. In kerberos mode, client-principal, client-keytab and server-principal are required for successful authentication and communication to a kerberos enabled Thrift Source. |
client-principal | —- | The kerberos principal used by the Thrift Sink to authenticate to the kerberos KDC. |
client-keytab | —- | The keytab location used by the Thrift Sink in combination with the client-principal to authenticate to the kerberos KDC. |
server-principal | – | The kerberos principal of the Thrift Source to which the Thrift Sink is configured to connect to. |
agent a1示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = thrift a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 10.10.10.10 a1.sinks.k1.port = 4545
6. IRC Sink
IRC sink接收来自附加channel的消息,并将这些消息转发到配置的IRC目的地。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be irc |
hostname | – | The hostname or IP address to connect to |
port | 6667 | The port number of remote host to connect |
nick | – | Nick name |
user | – | User name |
password | – | User password |
chan | – | channel |
name | ||
splitlines | – | (boolean) |
splitchars | n |
line separator (if you were to enter the default value into the config file, then you would need to escape the backslash, like this: “ ”) |
agent a1 示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = irc a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = irc.yourdomain.com a1.sinks.k1.nick = flume a1.sinks.k1.chan = #flume
7. File Roll Sink
在本地文件系统上存储事件。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be file_roll. |
sink.directory | – | The directory where files will be stored |
sink.pathManager | DEFAULT | The PathManager implementation to use. |
sink.pathManager.extension | – | The file extension if the default PathManager is used. |
sink.pathManager.prefix | – | A character string to add to the beginning of the file name if the default PathManager is used |
sink.rollInterval | 30 | Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file. |
sink.serializer | TEXT | Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface. |
batchSize | 100 |
agent a1 示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume
8. Null Sink
丢弃从channel接收的所有事件。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be null. |
batchSize | 100 |
agent a1 示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = null a1.sinks.k1.channel = c1
9. Hbase Sink
9.1 Hbase Sink
这个sink将数据写入HBase。Hbase配置是从第一个在类路径中的hbase-site.xml 获取的。由配置指定的实现HbaseEventSerializer的类用于将事件转换为HBase put 和/或 增量。然后将这些put和increments写入HBase。这个sink提供了与HBase相同的一致性保证,HBase目前是行原子性的。如果Hbase无法写入某些事件,sink将重播该事务中的所有事件。
HBaseSink支持编写数据来保护HBase。要写入安全模式的HBase,agent运行的用户必须具有对配置为写入的sink的表的写入权限。可以在配置中指定用于根据KDC进行身份验证的主体和keytab。Flume代理类路径中的hbase-site.xml 必须将身份验证设置为kerberos(有关如何实现这一点的详细信息,请参阅HBase文档)。
为了方便,两个序列化器配有Flume。SimpleHbaseEventSerializer (org.apache.flume.sink.hbase.SimpleHbaseEventSerializer))按原样将事件体写入HBase,并可选地增加HBase中的一列。这主要是一个示例实现。RegexHbaseEventSerializer (org.apache.flume.sink.hbase.RegexHbaseEventSerializer)基于给定的regex分解事件体,并将每个部分写入不同的列中。
类型是FQCN: org.apache.flume.sink.hbase.HBaseSink。
必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be hbase |
table | – | The name of the table in Hbase to write to. |
columnFamily | – | The column family in Hbase to write to. |
zookeeperQuorum | – | The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml |
znodeParent | /hbase | The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml |
batchSize | 100 | Number of events to be written per txn. |
coalesceIncrements | false |
Should the sink coalesce multiple increments to a cell per batch. This might give better performance if there are multiple increments to a limited number of cells. |
serializer | org.apache.flume.sink.hbase.SimpleHbaseEventSerializer | Default increment column = “iCol”, payload column = “pCol”. |
serializer.* | – | Properties to be passed to the serializer. |
kerberosPrincipal | – | Kerberos user principal for accessing secure HBase |
kerberosKeytab | – | Kerberos keytab for accessing secure HBase |
agent a1 示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hbase a1.sinks.k1.table = foo_table a1.sinks.k1.columnFamily = bar_cf a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer a1.sinks.k1.channel = c1
9.2 AsyncHBaseSink
这个sink使用异步模型将数据写入HBase。配置中指定的实现AsyncHbaseEventSerializer的类用于将事件转换为HBase put和/或增量。然后将这些put和increments写入HBase。这个sink使用Asynchbase API写入HBase。这个接收器提供了与HBase相同的一致性保证,HBase目前是行原子性的。如果Hbase无法写入某些事件,sink将重播该事务中的所有事件。类型是FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be asynchbase |
table | – | The name of the table in Hbase to write to. |
zookeeperQuorum | – | The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml |
znodeParent | /hbase | The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml |
columnFamily | – | The column family in Hbase to write to. |
batchSize | 100 | Number of events to be written per txn. |
coalesceIncrements | false |
Should the sink coalesce multiple increments to a cell per batch. This might give better performance if there are multiple increments to a limited number of cells. |
timeout | 60000 | The length of time (in milliseconds) the sink waits for acks from hbase for all events in a transaction. |
serializer |
org.apache.flume.sink.hbase. SimpleAsyncHbaseEventSerializer |
|
serializer.* | – | Properties to be passed to the serializer. |
请注意,此接收器接受配置中的Zookeeper Quorum和父znode信息。Zookeeper Quorum和父节点配置可以在flume配置文件中指定。或者,这些配置值取自类路径中的第一个hbase-site.xml文件。
如果配置中没有提供这些信息,则sink将从类路径中的第一个hbase-site.xml读取此信息.
agent a1 示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = asynchbase a1.sinks.k1.table = foo_table a1.sinks.k1.columnFamily = bar_cf a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer a1.sinks.k1.channel = c1
10 MorphlineSolrSink
这个sink从Flume事件中提取数据,对其进行转换,并将其近乎实时地加载到Apache Solr服务器中,然后由Apache Solr服务器向最终用户或搜索应用程序提供查询。
这个sink非常适合将原始数据流到HDFS(通过HdfsSink)并同时提取、转换和加载相同数据到Solr(通过MorphlineSolrSink)的用例。特别是,这个sink可以处理来自不同数据源的任意异构原始数据,并将其转换为对搜索应用程序有用的数据模型。
ETL功能可以使用一个形态线配置文件进行定制,该文件定义了一系列转换命令,将事件记录从一个命令传输到另一个命令。
形态线可以看作是Unix管道的演化,其中数据模型被一般化以处理通用记录流,包括任意二进制有效负载。形态线命令有点像flume拦截器。形态线可以嵌入到Hadoop组件中,比如Flume。
提供了开箱即用的命令来解析和转换一组标准数据格式,如日志文件、Avro、CSV、文本、HTML、XML、PDF、Word、Excel等,还可以作为形态线插件添加其他数据格式的定制命令和解析器。任何类型的数据格式都可以建立索引,任何类型Solr模式的任何Solr文档都可以生成,任何定制的ETL逻辑都可以注册和执行。
形态线操作连续的记录流。数据模型可以这样描述:记录是一组命名字段,其中每个字段都有一个或多个值的有序列表/值可以是任何Java对象。也就是说,记录本质上是一个哈希表,其中每个哈希表条目都包含一个字符串键和一个作为值的Java对象列表。(实现使用了番石榴的ArrayListMultimap,这是一个ListMultimap)。注意,一个字段可以有多个值,任何两个记录都不需要使用公共字段名。
这个sink将Flume事件的主体填充到morphline记录的_attachment_body字段中,并将Flume事件的头部复制到同名的记录字段中。然后命令可以对这些数据进行操作。
支持路由到SolrCloud集群,以提高可伸缩性。索引负载可以分散在大量的morphlinesolrsink上,以提高可伸缩性。索引负载可以跨多个morphlinesolrsink复制以获得高可用性,例如使用Flume特性(如负载平衡接收器处理器)。MorphlineInterceptor还可以帮助实现到多个Solr集合的动态路由(例如,对于多租户)。
您的环境所需的形态线和solr jar必须放在Apache Flume安装的lib目录中。
类型是FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be org.apache.flume.sink.solr.morphline.MorphlineSolrSink |
morphlineFile | – |
The relative or absolute path on the local file system to the morphline configuration file. Example: /etc/flume-ng/conf/morphline.conf |
morphlineId | null | Optional name used to identify a morphline if there are multiple morphlines in a morphline config file |
batchSize | 1000 | The maximum number of events to take per flume transaction. |
batchDurationMillis | 1000 |
The maximum duration per flume transaction (ms). The transaction commits after this duration or when batchSize is exceeded, whichever comes first. |
handlerClass |
org.apache.flume.sink.solr. morphline.MorphlineHandlerImpl |
The FQCN of a class implementing org.apache.flume.sink.solr.morphline.MorphlineHandler |
isProductionMode | false |
This flag should be enabled for mission critical, large-scale online production systems that need to make progress without downtime when unrecoverable exceptions occur. Corrupt or malformed parser input data, parser bugs, and errors related to unknown Solr schema fields produce unrecoverable exceptions. |
recoverableExceptionClasses | org.apache.solr.client.solrj.SolrServerException |
Comma separated list of recoverable exceptions that tend to be transient, in which case the corresponding task can be retried. Examples include network connection errors, timeouts, etc. When the production mode flag is set to true, the recoverable exceptions configured using this parameter will not be ignored and hence will lead to retries. |
isIgnoringRecoverableExceptions | false |
This flag should be enabled, if an unrecoverable exception is accidentally misclassified as recoverable. This enables the sink to make progress and avoid retrying an event forever. |
agent a1 示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink a1.sinks.k1.channel = c1 a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf # a1.sinks.k1.morphlineId = morphline1 # a1.sinks.k1.batchSize = 1000 # a1.sinks.k1.batchDurationMillis = 1000
11. ElasticSearchSink
这个sink将数据写入一个elasticsearch集群。默认情况下,事件将被写入,以便Kibana图形界面能够显示它们——就像logstash编写它们一样。
环境所需的elasticsearch和lucene-core jar必须放在Apache Flume安装的lib目录中。Elasticsearch要求客户机JAR的主版本与服务器的主版本匹配,并且两者运行相同的JVM小版本。如果不正确,将出现serializationexception。要选择所需的版本,首先要确定elasticsearch的版本和目标集群正在运行的JVM版本。然后选择一个与主版本匹配的elasticsearch客户端库 0.19.x客户端可以与一个 0.19.x的集群通信;0.20.x可以和0.20通信。0.90 x和可以和0.90.x对话。一旦确定了elasticsearch版本,然后读取pom。确定要使用的正确lucene-core JAR版本的xml文件。运行ElasticSearchSink的Flume代理还应该匹配目标集群运行到次要版本的JVM。
每天事件将被写入一个新的索引。名称将是-yyyy-MM-dd,其中是indexName参数。sink将在UTC午夜开始写入一个新索引。
默认情况下,ElasticSearchLogStashEventSerializer将事件序列化为elasticsearch。可以使用序列化器参数覆盖此行为。这个参数接受org.apache.flume.sink.elasticsearch的实现。ElasticSearchEventSerializer或org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory。不赞成实现ElasticSearchEventSerializer,支持更强大的ElasticSearchIndexRequestBuilderFactory。
类型是FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink
必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be org.apache.flume.sink.elasticsearch.ElasticSearchSink |
hostNames | – | Comma separated list of hostname:port, if the port is not present the default port ‘9300’ will be used |
indexName | flume |
The name of the index which the date will be appended to. Example ‘flume’ -> ‘flume-yyyy-MM-dd’ Arbitrary header substitution is supported, eg. %{header} replaces with value of named event header |
indexType | logs |
The type to index the document to, defaults to ‘log’ Arbitrary header substitution is supported, eg. %{header} replaces with value of named event header |
clusterName | elasticsearch | Name of the ElasticSearch cluster to connect to |
batchSize | 100 | Number of events to be written per txn. |
ttl | – |
TTL in days, when set will cause the expired documents to be deleted automatically, if not set documents will never be automatically deleted. TTL is accepted both in the earlier form of integer only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second), m (minute), h (hour), d (day) and w (week). Example a1.sinks.k1.ttl = 5d will set TTL to 5 days. Followhttp://www.elasticsearch.org/guide/reference/mapping/ttl-field/ for more information. |
serializer |
org.apache.flume.sink.elasticsearch. ElasticSearchLogStashEventSerializer |
The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use. Implementations of either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred. |
serializer.* | – | Properties to be passed to the serializer. |
注意,使用事件头的值来动态决定存储事件时要使用的索引名和索引类型非常方便。使用此功能时要小心,因为事件提交器现在已经控制了indexName和indexType。此外,如果使用elasticsearch REST客户端,则事件提交器可以控制所使用的URL路径。
agent a1 示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = elasticsearch a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300 a1.sinks.k1.indexName = foo_index a1.sinks.k1.indexType = bar_type a1.sinks.k1.clusterName = foobar_cluster a1.sinks.k1.batchSize = 500 a1.sinks.k1.ttl = 5d a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer a1.sinks.k1.channel = c1
12 Kite Dataset Sink
将事件写入Kite数据集的实验性接收器。这个接收器将反序列化每个传入事件的主体,并将结果记录存储在Kite数据集中。它通过按URI加载数据集来确定目标数据集。
唯一受支持的序列化是avro,记录模式必须在事件头中传递,使用flume.avro.schema中的任何一个。使用JSON模式表示的文本或flume.avro.schema。一个可以找到模式的url (hdfs:/…支持uri)。这与Log4jAppender flume客户机和使用反序列化器的假脱机目录源的Avro反序列化器兼容。schemaType =LITERAL
注1:flume.avro.schema。不支持哈希头。注2:在某些情况下,文件滚动可能会在超过滚动间隔后轻微发生。但是,这个延迟不会超过5秒。在大多数情况下,延迟是无法辨认的。
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | Must be org.apache.flume.sink.kite.DatasetSink |
kite.dataset.uri | – | URI of the dataset to open |
kite.repo.uri | – | URI of the repository to open (deprecated; use kite.dataset.uri instead) |
kite.dataset.namespace | – | Namespace of the Dataset where records will be written (deprecated; use kite.dataset.uri instead) |
kite.dataset.name | – | Name of the Dataset where records will be written (deprecated; use kite.dataset.uri instead) |
kite.batchSize | 100 | Number of records to process in each batch |
kite.rollInterval | 30 | Maximum wait time (seconds) before data files are released |
kite.flushable.commitOnBatch | true |
If true, the Flume transaction will be commited and the writer will be flushed on each batch of kite.batchSize records. This setting only applies to flushable datasets. When true, it’s possible for temp files with commited data to be left in the dataset directory. These files need to be recovered by hand for the data to be visible to DatasetReaders. |
kite.syncable.syncOnBatch | true |
Controls whether the sink will also sync data when committing the transaction. This setting only applies to syncable datasets. Syncing gaurentees that data will be written on stable storage on the remote system while flushing only gaurentees that data has left Flume’s client buffers. When the kite.flushable.commitOnBatch property is set to false, this property must also be set to false. |
kite.entityParser | avro |
Parser that turns Flume Events into Kite entities. Valid values are avro and the fully-qualified class name of an implementation of the EntityParser.Builder interface. |
kite.failurePolicy | retry |
Policy that handles non-recoverable errors such as a missing Schema in the Event header. The default value, retry, will fail the current batch and try again which matches the old behavior. Other valid values are save, which will write the raw Event to the kite.error.dataset.uri dataset, and the fully-qualified class name of an implementation of the FailurePolicy.Builder interface. |
kite.error.dataset.uri | – |
URI of the dataset where failed events are saved when kite.failurePolicy is set to save. Required when the kite.failurePolicy is set to save. |
auth.kerberosPrincipal | – | Kerberos user principal for secure authentication to HDFS |
auth.kerberosKeytab | – | Kerberos keytab location (local FS) for the principal |
auth.proxyUser | – | The effective user for HDFS actions, if different from the kerberos principal |
13. Kafka Sink
这是一个Flume Sink实现,可以将数据发布到Kafka主题。目标之一是将Flume与Kafka集成,这样基于pull的处理系统就可以处理来自各种Flume源的数据。目前支持Kafka 0.9.x系列发行版。
这个版本的Flume不再支持Kafka的旧版本(0.8.x)。
必需的属性用粗体标记。
Property Name | Default | Description |
---|---|---|
type | – | Must be set to org.apache.flume.sink.kafka.KafkaSink |
kafka.bootstrap.servers | – |
List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port |
kafka.topic | default-flume-topic |
The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here. Arbitrary header substitution is supported, eg. %{header} is replaced with value of event header named “header”. (If using the substitution, it is recommended to set “auto.create.topics.enable” property of Kafka broker to true.) |
flumeBatchSize | 100 | How many messages to process in one batch. Larger batches improve throughput while adding latency. |
kafka.producer.acks | 1 |
How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure. |
useFlumeEventFormat | false |
By default events are put as bytes onto the Kafka topic directly from the event body. Set to true to store events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers for the producing side. |
defaultPartitionId | – |
Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class). |
partitionIdHeader | – |
When set, the sink will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition, an EventDeliveryException will be thrown. If the header value is present then this setting overrides defaultPartitionId. |
allowTopicOverride | true |
When set, the sink will allow a message to be produced into a topic specified by the topicHeaderproperty (if provided). |
topicHeader | topic |
When set in conjunction with allowTopicOverride will produce a message into the value of the header named using the value of this property. Care should be taken when using in conjunction with the Kafka Source topicHeader property to avoid creating a loopback. |
kafka.producer.security.protocol | PLAINTEXT |
Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. |
more producer security props |
If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on producer. |
|
Other Kafka Producer Properties | – |
These properties are used to configure the Kafka Producer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefixkafka.producer. For example: kafka.producer.linger.ms |
注意,Kafka Sink使用来自FlumeEvent头部的主题和键属性将事件发送到Kafka。如果标题中存在主题,则事件将被发送到该特定主题,覆盖为Sink配置的主题。如果键存在于标题中,Kafka将使用该键在主题分区之间对数据进行分区。具有相同键的事件将被发送到相同的分区。如果键为null,则事件将发送到随机分区。
Kafka接收器还为key.serializer(org.apache. Kafka .common. serialize . stringserializer)和value.serializer(org.apache. Kafka .common. serialize . bytearrayserializer)提供默认值。不建议修改这些参数。
弃用属性:
Property Name | Default | Description |
---|---|---|
brokerList | – | Use kafka.bootstrap.servers |
topic | default-flume-topic | Use kafka.topic |
batchSize | 100 | Use kafka.flumeBatchSize |
requiredAcks | 1 | Use kafka.producer.acks |
下面给出了Kafka sink的一个配置示例。属性以前缀kafka开头.kafka producer.在创建Kafka生成器时传递的属性并不仅限于本例中给出的属性。还可以在这里包含定制属性,并通过作为方法参数传入的Flume上下文对象在预处理器中访问它们。
a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy
Security and Kafka Sink:
Flume和Kafka之间的通信通道支持安全认证和数据加密。对于安全身份验证,可以使用Kafka版本0.9.0中的SASL/GSSAPI (Kerberos V5)或SSL(尽管参数名为SSL,但实际的协议是TLS实现)。
到目前为止,数据加密仅由SSL/TLS提供。
设置kafka.producer.security.protocol 符合下列任何一项值意味着:
- SASL_PLAINTEXT - 没有数据加密的Kerberos或明文身份验证
- SASL_SSL - 带有数据加密的Kerberos或纯文本身份验证
- SSL - 基于TLS加密,具有可选的身份验证.
警告:启用SSL时会导致性能下降,其程度取决于CPU类型和JVM实现。参考文献:Kafka安全概述和用于跟踪这个问题的jira: Kafka -2561
TLS and Kafka Sink:
请阅读配置Kafka客户端SSL中描述的步骤,以了解用于微调的其他配置设置,例如以下任何一种:安全提供程序、密码套件、启用的协议、信任存储或密钥存储类型。
使用服务器端身份验证和数据加密的示例配置。
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sinks.sink1.kafka.topic = mytopic a1.sinks.sink1.kafka.producer.security.protocol = SSL a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>
注意:默认情况下属性ssl.endpoint.identification.algorithm没有定义,因此没有执行主机名验证。为了启用主机名验证,请设置以下属性:
a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
一旦启用,客户端将针对以下两个字段之一验证服务器的完全限定域名(FQDN):
- Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
- Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
如果还需要客户端身份验证,那么应该向Flume agent配置添加以下内容。每个Flume agent必须拥有自己的客户端证书,这些证书必须由Kafka agent单独或通过其签名链进行信任。常见的示例是通过一个根CA对每个客户端证书进行签名,而这个根CA又受到Kafka代理的信任。
a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>
如果密钥存储和密钥使用不同的密码保护,则使用ssl.key.password 属性将为生产者密钥库提供所需的额外秘密:
a1.sinks.sink1.kafka.producer.ssl.key.password = <password to access the key>
Kerberos and Kafka Sink:
要将Kafka sink与Kerberos保护的Kafka集群一起使用,请设置producer.security.protocol上面为生产者指出的属性。与Kafka代理一起使用的Kerberos keytab和主体在JAAS文件的“KafkaClient”部分中指定。“客户端”部分描述了需要时的Zookeeper连接。有关JAAS文件内容的信息,请参见Kafka文档。可以通过flume-env.sh中的JAVA_OPTS指定这个JAAS文件的位置,也可以选择指定系统范围内的kerberos配置:
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf" JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
使用SASL_PLAINTEXT的安全配置示例:
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sinks.sink1.kafka.topic = mytopic a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
使用SASL_SSL的安全配置示例:
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sinks.sink1.kafka.topic = mytopic a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>
JAAS文件示例。有关其内容的参考,请参阅SASL配置的Kafka文档中所需身份验证机制(GSSAPI/PLAIN)的客户端配置部分。与Kafka源或Kafka通道不同,“客户端”部分不是必需的,除非其他连接组件需要它。另外,请确保Flume进程的操作系统用户具有jaas和keytab文件上的读权限。
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/keytabs/flume.keytab" principal="flume/flumehost1.example.com@YOURKERBEROSREALM"; };
14 HTTP Sink
此接收器的行为是,它将从通道获取事件,并使用HTTP POST请求将这些事件发送到远程服务。事件内容作为POST主体发送。
此接收器的错误处理行为取决于目标服务器返回的HTTP响应。sink backoff/ready状态是可配置的,事务提交/回滚结果也是可配置的,该事件是否有助于成功的事件排放计数也是可配置的。
状态代码不可读的服务器返回的任何格式错误的HTTP响应都将导致回退信号,并且事件不会从channel中消费。
必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be http. |
endpoint | – | The fully qualified URL endpoint to POST to |
connectTimeout | 5000 | The socket connection timeout in milliseconds |
requestTimeout | 5000 | The maximum request processing time in milliseconds |
contentTypeHeader | text/plain | The HTTP Content-Type header |
acceptHeader | text/plain | The HTTP Accept header value |
defaultBackoff | true | Whether to backoff by default on receiving all HTTP status codes |
defaultRollback | true | Whether to rollback by default on receiving all HTTP status codes |
defaultIncrementMetrics | false | Whether to increment metrics by default on receiving all HTTP status codes |
backoff.CODE | – | Configures a specific backoff for an individual (i.e. 200) code or a group (i.e. 2XX) code |
rollback.CODE | – | Configures a specific rollback for an individual (i.e. 200) code or a group (i.e. 2XX) code |
incrementMetrics.CODE | – | Configures a specific metrics increment for an individual (i.e. 200) code or a group (i.e. 2XX) code |
请注意,最特定的HTTP状态代码匹配用于backoff、rollback和incrementMetrics配置选项。如果2XX和200状态码都有配置值,那么200个HTTP代码将使用200值,而201-299范围内的所有其他HTTP代码将使用2XX值。
在不向HTTP端点发出任何请求的情况下,将使用任何空事件或空事件。
agent a1 示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = http a1.sinks.k1.channel = c1 a1.sinks.k1.endpoint = http://localhost:8080/someuri a1.sinks.k1.connectTimeout = 2000 a1.sinks.k1.requestTimeout = 2000 a1.sinks.k1.acceptHeader = application/json a1.sinks.k1.contentTypeHeader = application/json a1.sinks.k1.defaultBackoff = true a1.sinks.k1.defaultRollback = true a1.sinks.k1.defaultIncrementMetrics = false a1.sinks.k1.backoff.4XX = false a1.sinks.k1.rollback.4XX = false a1.sinks.k1.incrementMetrics.4XX = true a1.sinks.k1.backoff.200 = false a1.sinks.k1.rollback.200 = false a1.sinks.k1.incrementMetrics.200 = true
15 Custom Sink
自定义接收器是接收器接口的自己实现。启动Flume agent时,自定义sink的类及其依赖项必须包含在代理的类路径中。自定义接收器的类型是它的FQCN。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channel | – | |
type | – | The component type name, needs to be your FQCN |
agent a1 示例:
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = org.example.MySink a1.sinks.k1.channel = c1
翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guide
篇幅限制,分为以下5篇:
【翻译】Flume 1.8.0 User Guide(用户指南)
【翻译】Flume 1.8.0 User Guide(用户指南) source
【翻译】Flume 1.8.0 User Guide(用户指南) Sink