zoukankan      html  css  js  c++  java
  • 【翻译】Flume 1.8.0 User Guide(用户指南) Sink

    翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guide

    篇幅限制,分为以下5篇:

    【翻译】Flume 1.8.0 User Guide(用户指南)

    【翻译】Flume 1.8.0 User Guide(用户指南) source

    【翻译】Flume 1.8.0 User Guide(用户指南) Sink

    【翻译】Flume 1.8.0 User Guide(用户指南) Channel

    【翻译】Flume 1.8.0 User Guide(用户指南) Processors

    Flume Sinks

    1、HDFS Sink 

    这个sink 将事件写入Hadoop分布式文件系统(HDFS)。它目前支持创建文本和序列文件。它支持两种文件类型的压缩。可以根据运行时间、数据大小或事件数量定期滚动文件(关闭当前文件并创建一个新文件)。它还通过属性(如时间戳或事件起源的机器)存储/分区数据。HDFS目录路径可能包含格式化转义序列,该序列将被HDFS sink替换,以生成用于存储事件的目录/文件名。使用这个sink需要安装hadoop,这样Flume就可以使用hadoop jar与HDFS集群通信。请注意,需要一个支持sync()调用的Hadoop版本。

    以下是支持的转义序列:

    AliasDescription
    %{host} Substitute value of event header named “host”. Arbitrary header names are supported.
    %t Unix time in milliseconds
    %a locale’s short weekday name (Mon, Tue, ...)
    %A locale’s full weekday name (Monday, Tuesday, ...)
    %b locale’s short month name (Jan, Feb, ...)
    %B locale’s long month name (January, February, ...)
    %c locale’s date and time (Thu Mar 3 23:05:25 2005)
    %d day of month (01)
    %e day of month without padding (1)
    %D date; same as %m/%d/%y
    %H hour (00..23)
    %I hour (01..12)
    %j day of year (001..366)
    %k hour ( 0..23)
    %m month (01..12)
    %n month without padding (1..12)
    %M minute (00..59)
    %p locale’s equivalent of am or pm
    %s seconds since 1970-01-01 00:00:00 UTC
    %S second (00..59)
    %y last two digits of year (00..99)
    %Y year (2010)
    %z +hhmm numeric timezone (for example, -0400)
    %[localhost] Substitute the hostname of the host where the agent is running
    %[IP] Substitute the IP address of the host where the agent is running
    %[FQDN] Substitute the canonical hostname of the host where the agent is running

    注意:转义字符串%[localhost]、%[IP]和%[FQDN]都依赖于Java获取主机名的能力,这在某些网络环境中可能会失败。 

    正在使用的文件的名称将被打乱最后是”.tmp“。一旦文件被关闭,这个扩展名将被删除。这允许在目录中排除部分完成的文件。必须属性以粗体显示。

    注意,对于所有与时间相关的转义序列,带有键“timestamp”的消息头必须存在于事件的消息头中(除非是hdfs)。useLocalTimeStamp设置为true)。自动添加的一种方法是使用TimestampInterceptor。

    NameDefaultDescription
    channel  
    type The component type name, needs to be hdfs
    hdfs.path HDFS directory path (eg hdfs://namenode/flume/webdata/)
    hdfs.filePrefix FlumeData Name prefixed to files created by Flume in hdfs directory
    hdfs.fileSuffix Suffix to append to file (eg .avro - NOTE: period is not automatically added)
    hdfs.inUsePrefix Prefix that is used for temporal files that flume actively writes into
    hdfs.inUseSuffix .tmp Suffix that is used for temporal files that flume actively writes into
    hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval)
    hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size)
    hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events)
    hdfs.idleTimeout 0 Timeout after which inactive files get closed (0 = disable automatic closing of idle files)
    hdfs.batchSize 100 number of events written to file before it is flushed to HDFS
    hdfs.codeC Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy
    hdfs.fileType SequenceFile

    File format: currently SequenceFileDataStream or CompressedStream (1)DataStream

    will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC

    hdfs.maxOpenFiles 5000 Allow only this number of open files. If this number is exceeded, the oldest file is closed.
    hdfs.minBlockReplicas

    Specify minimum number of replicas per HDFS block. If not specified, it comes from the

    default Hadoop config in the classpath.

    hdfs.writeFormat Writable

    Format for sequence file records. One of Text or Writable. Set to Text before creating data files with Flume,

    otherwise those files cannot be read by either Apache Impala (incubating) or Apache Hive.

    hdfs.callTimeout 10000

    Number of milliseconds allowed for HDFS operations, such as open, write, flush, close.

    This number should be increased if many HDFS timeout operations are occurring.

    hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
    hdfs.rollTimerPoolSize 1 Number of threads per HDFS sink for scheduling timed file rolling
    hdfs.kerberosPrincipal Kerberos user principal for accessing secure HDFS
    hdfs.kerberosKeytab Kerberos keytab for accessing secure HDFS
    hdfs.proxyUser    
    hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
    hdfs.roundValue 1 Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
    hdfs.roundUnit second The unit of the round down value - secondminute or hour.
    hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
    hdfs.useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
    hdfs.closeTries 0

    Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1,

    this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure),

    and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until

    the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open

    if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart.

    hdfs.retryInterval 180

    Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode,

    so setting this too low can cause a lot of load on the name node. If set to 0 or less,

    the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension.

    serializer TEXT

    Other possible options include avro_event or the fully-qualified class name of an implementation of

    theEventSerializer.Builder interface.

    serializer.*    

    agent a1 示例:

    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute

    上面的配置将把时间戳四舍五入到最后10分钟。例如,时间戳为2012年6月12日上午11:54:34的事件将导致hdfs路径变为/flume/events/2012-06-12/1150/00。

     2. hive sink

    此接收器将包含分隔文本或JSON数据的事件直接汇入Hive表或分区。事件是使用Hive事务编写的。一旦一组事件提交给Hive,它们就会立即对Hive查询可见。可以预先创建flume要写到的分区,也可以选择,分区不存在的时候,由flume创建分区。来自传入事件数据的字段映射到Hive表中的相应列。

    NameDefaultDescription
    channel  
    type The component type name, needs to be hive
    hive.metastore Hive metastore URI (eg thrift://a.b.com:9083 )
    hive.database Hive database name
    hive.table Hive table name
    hive.partition

    Comma separate list of partition values identifying the partition to write to. May contain escape sequences.

    E.g: If the table is partitioned by (continent: string, country :string, time : string) then ‘Asia,India,2014-02-26-01-21’

    will indicate continent=Asia,country=India,time=2014-02-26-01-21

    hive.txnsPerBatchAsk 100

    Hive grants a batch of transactions instead of single transactions to streaming clients like Flume.

    This setting configures the number of desired transactions per Transaction Batch.

    Data from all transactions in a single batch end up in a single file. Flume will write a maximum of

    batchSize events in each transaction in the batch.

    This setting in conjunction with batchSize provides control over the size of each file.

    Note that eventually Hive will transparently compact these files into larger files.

    heartBeatInterval 240

    (In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring.

    Set this value to 0 to disable heartbeats.

    autoCreatePartitions true Flume will automatically create the necessary Hive partitions to stream to
    batchSize 15000 Max number of events written to Hive in a single Hive transaction
    maxOpenConnections 500 Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed.
    callTimeout 10000 (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort.
    serializer  

    Serializer is responsible for parsing out field from the event and mapping them to columns in the hive table.

    Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON

    roundUnit minute The unit of the round down value - secondminute or hour.
    roundValue 1 Rounded down to the highest multiple of this (in the unit configured using hive.roundUnit), less than current time
    timeZone Local Time Name of the timezone that should be used for resolving the escape sequences in partition, e.g. America/Los_Angeles.
    useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.

    Hive sink提供以下序列化器:

    JSON:处理UTF8编码的JSON(严格语法)事件,不需要配置。JSON中的对象名称直接映射到Hive表中具有相同名称的列。在内部使用org.apache.hive.hcatalog.data.JsonSerDe ,但是它独立于Hive表。这个序列化器需要安装HCatalog。
    DELIMITED:处理简单的分隔文本事件。内部使用LazySimpleSerde,但独立于Hive表的Serde。

    NameDefaultDescription
    serializer.delimiter , (Type: string) The field delimiter in the incoming data. To use special characters, surround them with double quotes like “ ”
    serializer.fieldnames

    The mapping from input fields to columns in hive table. Specified as a comma separated list (no spaces) of

    hive table columns names, identifying the input fields in order of their occurrence. To skip fields leave the

    column name unspecified. Eg. ‘time,,ip,message’ indicates the 1st, 3rd and 4th fields in input map to time,

    ip and message columns in the hive table.

    serializer.serdeSeparator Ctrl-A

    (Type: character) Customizes the separator used by underlying serde. There can be a gain in efficiency if the

    fields in serializer.fieldnames are in same order as table columns, the serializer.delimiter is same as the

    serializer.serdeSeparator and number of fields in serializer.fieldnames is less than or equal to number of

    table columns, as the fields in incoming event body do not need to be reordered to match order of table columns.

    Use single quotes for special characters like ‘ ’. Ensure input fields do not contain this character.

    NOTE: If serializer.delimiter is a single character, preferably set this to the same character

    以下是支持的转义序列:

    AliasDescription
    %{host} Substitute value of event header named “host”. Arbitrary header names are supported.
    %t Unix time in milliseconds
    %a locale’s short weekday name (Mon, Tue, ...)
    %A locale’s full weekday name (Monday, Tuesday, ...)
    %b locale’s short month name (Jan, Feb, ...)
    %B locale’s long month name (January, February, ...)
    %c locale’s date and time (Thu Mar 3 23:05:25 2005)
    %d day of month (01)
    %D date; same as %m/%d/%y
    %H hour (00..23)
    %I hour (01..12)
    %j day of year (001..366)
    %k hour ( 0..23)
    %m month (01..12)
    %M minute (00..59)
    %p locale’s equivalent of am or pm
    %s seconds since 1970-01-01 00:00:00 UTC
    %S second (00..59)
    %y last two digits of year (00..99)
    %Y year (2010)
    %z +hhmm numeric timezone (for example, -0400)

    注意,对于所有与时间相关的转义序列,带有键“timestamp”的消息头必须存在于事件的消息头中(除非useLocalTimeStamp设置为true)。自动添加的一种方法是使用TimestampInterceptor。

     hive table 示例:

    create table weblogs ( id int , msg string )
        partitioned by (continent string, country string, time string)
        clustered by (id) into 5 buckets
        stored as orc;

    agent a1 示例:

    a1.channels = c1
    a1.channels.c1.type = memory
    a1.sinks = k1
    a1.sinks.k1.type = hive
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
    a1.sinks.k1.hive.database = logsdb
    a1.sinks.k1.hive.table = weblogs
    a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
    a1.sinks.k1.useLocalTimeStamp = false
    a1.sinks.k1.round = true
    a1.sinks.k1.roundValue = 10
    a1.sinks.k1.roundUnit = minute
    a1.sinks.k1.serializer = DELIMITED
    a1.sinks.k1.serializer.delimiter = "	"
    a1.sinks.k1.serializer.serdeSeparator = '	'
    a1.sinks.k1.serializer.fieldnames =id,,msg

    上面的配置将把时间戳四舍五入到最后10分钟。例如,将时间戳标头设置为2012年6月12日上午11:54:34,将“国家”标头设置为“印度”的事件将计算分区(大陆=“亚洲”,国家=“印度”,时间=“2012-06-12-11-50”)。序列化器被配置为接受包含三个字段的制表符分隔的输入,并跳过第二个字段。

     3. Logger Sink

    在INFO级别记录事件。通常用于测试/调试目的。必须属性以粗体显示。此sink是惟一不需要在日志原始数据部分中解释的额外配置的异常。

    Property NameDefaultDescription
    channel  
    type The component type name, needs to be logger
    maxBytesToLog 16 Maximum number of bytes of the Event body to log

     agent a1 示例:

    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = logger
    a1.sinks.k1.channel = c1

    4. Avro Sink

    这个sink构成了Flume分层收集支持的一半。发送到此sink的Flume事件被转换为Avro事件并发送到配置的主机名/端口对。事件以配置的批大小的批次从配置的Channel中获取。必须属性以粗体显示。

    Property NameDefaultDescription
    channel  
    type The component type name, needs to be avro.
    hostname The hostname or IP address to bind to.
    port The port # to listen on.
    batch-size 100 number of event to batch together for send.
    connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
    request-timeout 20000 Amount of time (ms) to allow for requests after the first.
    reset-connection-interval none

    Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop.

    This will allow the sink to connect to hosts behind a hardware load-balancer when news

    hosts are added without having to restart the agent.

    compression-type none This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
    compression-level 6

    The level of compression to compress event. 0 = no compression and 1-9 is compression.

    The higher the number the more compression

    ssl false

    Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally

    set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”.

    trust-all-certs false

    If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked.

    This should NOT be used in production because it makes it easier for an attacker to execute a

    man-in-the-middle attack and “listen in” on the encrypted connection.

    truststore

    The path to a custom Java truststore file. Flume uses the certificate authority information in this file

    to determine whether the remote Avro Source’s SSL authentication credentials should be trusted.

    If not specified, the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts”

    in the Oracle JRE) will be used.

    truststore-password The password for the specified truststore.
    truststore-type JKS The type of the Java truststore. This can be “JKS” or other supported Java truststore type.
    exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
    maxIoWorkers 2 * the number of available processors in the machine The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.

    agent a1 示例:

    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = avro
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hostname = 10.10.10.10
    a1.sinks.k1.port = 4545

    5. Thrift Sink

    这个接收器构成了Flume分层收集支持的一半。发送到此接收器的Flume事件被转换为Thrift事件并发送到配置的主机名/端口对。事件以配置的批大小的批次从配置的Channel中获取。
    通过启用kerberos身份验证,可以将Thrift sink配置为以安全模式启动。要在安全模式下与Thrift源通信,Thrift sink也应该在安全模式下运行。客户机-主体和客户机-keytab是节俭接收器用于对kerberos KDC进行身份验证的属性。服务器主体表示此sink配置为以安全模式连接的Thrift源的主体。必须属性以粗体显示。

    Property NameDefaultDescription
    channel  
    type The component type name, needs to be thrift.
    hostname The hostname or IP address to bind to.
    port The port # to listen on.
    batch-size 100 number of event to batch together for send.
    connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
    request-timeout 20000 Amount of time (ms) to allow for requests after the first.
    connection-reset-interval none

    Amount of time (s) before the connection to the next hop is reset. This will force the Thrift Sink to reconnect to the next hop.

    This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.

    ssl false Set to true to enable SSL for this ThriftSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password” and “truststore-type”
    truststore

    The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote

    Thrift Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically

    “jssecacerts” or “cacerts” in the Oracle JRE) will be used.

    truststore-password The password for the specified truststore.
    truststore-type JKS The type of the Java truststore. This can be “JKS” or other supported Java truststore type.
    exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude
    kerberos false

    Set to true to enable kerberos authentication. In kerberos mode, client-principal, client-keytab and server-principal are required for

    successful authentication and communication to a kerberos enabled Thrift Source.

    client-principal —- The kerberos principal used by the Thrift Sink to authenticate to the kerberos KDC.
    client-keytab —- The keytab location used by the Thrift Sink in combination with the client-principal to authenticate to the kerberos KDC.
    server-principal The kerberos principal of the Thrift Source to which the Thrift Sink is configured to connect to.

     agent a1示例:

    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = thrift
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hostname = 10.10.10.10
    a1.sinks.k1.port = 4545

    6. IRC Sink

    IRC sink接收来自附加channel的消息,并将这些消息转发到配置的IRC目的地。必须属性以粗体显示。

    Property NameDefaultDescription
    channel  
    type The component type name, needs to be irc
    hostname The hostname or IP address to connect to
    port 6667 The port number of remote host to connect
    nick Nick name
    user User name
    password User password
    chan channel
    name    
    splitlines (boolean)
    splitchars n

    line separator (if you were to enter the default value into the config file,

    then you would need to escape the backslash, like this: “ ”)

     agent a1 示例:

    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = irc
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hostname = irc.yourdomain.com
    a1.sinks.k1.nick = flume
    a1.sinks.k1.chan = #flume

    7. File Roll Sink

    在本地文件系统上存储事件。必须属性以粗体显示。

    Property NameDefaultDescription
    channel  
    type The component type name, needs to be file_roll.
    sink.directory The directory where files will be stored
    sink.pathManager DEFAULT The PathManager implementation to use.
    sink.pathManager.extension The file extension if the default PathManager is used.
    sink.pathManager.prefix A character string to add to the beginning of the file name if the default PathManager is used
    sink.rollInterval 30 Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.
    sink.serializer TEXT Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
    batchSize 100  

     agent a1 示例:

    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = file_roll
    a1.sinks.k1.channel = c1
    a1.sinks.k1.sink.directory = /var/log/flume

    8. Null Sink

    丢弃从channel接收的所有事件。必须属性以粗体显示。

    Property NameDefaultDescription
    channel  
    type The component type name, needs to be null.
    batchSize 100  

     agent a1 示例:

    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = null
    a1.sinks.k1.channel = c1

    9. Hbase Sink

    9.1 Hbase Sink

    这个sink将数据写入HBase。Hbase配置是从第一个在类路径中的hbase-site.xml 获取的。由配置指定的实现HbaseEventSerializer的类用于将事件转换为HBase put 和/或 增量。然后将这些put和increments写入HBase。这个sink提供了与HBase相同的一致性保证,HBase目前是行原子性的。如果Hbase无法写入某些事件,sink将重播该事务中的所有事件。
    HBaseSink支持编写数据来保护HBase。要写入安全模式的HBase,agent运行的用户必须具有对配置为写入的sink的表的写入权限。可以在配置中指定用于根据KDC进行身份验证的主体和keytab。Flume代理类路径中的hbase-site.xml 必须将身份验证设置为kerberos(有关如何实现这一点的详细信息,请参阅HBase文档)。
    为了方便,两个序列化器配有Flume。SimpleHbaseEventSerializer (org.apache.flume.sink.hbase.SimpleHbaseEventSerializer))按原样将事件体写入HBase,并可选地增加HBase中的一列。这主要是一个示例实现。RegexHbaseEventSerializer (org.apache.flume.sink.hbase.RegexHbaseEventSerializer)基于给定的regex分解事件体,并将每个部分写入不同的列中。
    类型是FQCN: org.apache.flume.sink.hbase.HBaseSink。
    必须属性以粗体显示。

    Property NameDefaultDescription
    channel  
    type The component type name, needs to be hbase
    table The name of the table in Hbase to write to.
    columnFamily The column family in Hbase to write to.
    zookeeperQuorum The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
    znodeParent /hbase The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
    batchSize 100 Number of events to be written per txn.
    coalesceIncrements false

    Should the sink coalesce multiple increments to a cell per batch.

    This might give better performance if there are multiple increments to a limited number of cells.

    serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer Default increment column = “iCol”, payload column = “pCol”.
    serializer.* Properties to be passed to the serializer.
    kerberosPrincipal Kerberos user principal for accessing secure HBase
    kerberosKeytab Kerberos keytab for accessing secure HBase

     agent a1 示例:

    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = hbase
    a1.sinks.k1.table = foo_table
    a1.sinks.k1.columnFamily = bar_cf
    a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
    a1.sinks.k1.channel = c1

    9.2  AsyncHBaseSink

    这个sink使用异步模型将数据写入HBase。配置中指定的实现AsyncHbaseEventSerializer的类用于将事件转换为HBase put和/或增量。然后将这些put和increments写入HBase。这个sink使用Asynchbase API写入HBase。这个接收器提供了与HBase相同的一致性保证,HBase目前是行原子性的。如果Hbase无法写入某些事件,sink将重播该事务中的所有事件。类型是FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink。必须属性以粗体显示。

    Property NameDefaultDescription
    channel  
    type The component type name, needs to be asynchbase
    table The name of the table in Hbase to write to.
    zookeeperQuorum The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
    znodeParent /hbase The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
    columnFamily The column family in Hbase to write to.
    batchSize 100 Number of events to be written per txn.
    coalesceIncrements false

    Should the sink coalesce multiple increments to a cell per batch. This might give better performance

    if there are multiple increments to a limited number of cells.

    timeout 60000 The length of time (in milliseconds) the sink waits for acks from hbase for all events in a transaction.
    serializer

    org.apache.flume.sink.hbase.

    SimpleAsyncHbaseEventSerializer

     
    serializer.* Properties to be passed to the serializer.

    请注意,此接收器接受配置中的Zookeeper Quorum和父znode信息。Zookeeper Quorum和父节点配置可以在flume配置文件中指定。或者,这些配置值取自类路径中的第一个hbase-site.xml文件。
    如果配置中没有提供这些信息,则sink将从类路径中的第一个hbase-site.xml读取此信息.
    agent a1 示例:

    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = asynchbase
    a1.sinks.k1.table = foo_table
    a1.sinks.k1.columnFamily = bar_cf
    a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
    a1.sinks.k1.channel = c1

    10 MorphlineSolrSink

    这个sink从Flume事件中提取数据,对其进行转换,并将其近乎实时地加载到Apache Solr服务器中,然后由Apache Solr服务器向最终用户或搜索应用程序提供查询。
    这个sink非常适合将原始数据流到HDFS(通过HdfsSink)并同时提取、转换和加载相同数据到Solr(通过MorphlineSolrSink)的用例。特别是,这个sink可以处理来自不同数据源的任意异构原始数据,并将其转换为对搜索应用程序有用的数据模型。
    ETL功能可以使用一个形态线配置文件进行定制,该文件定义了一系列转换命令,将事件记录从一个命令传输到另一个命令。
    形态线可以看作是Unix管道的演化,其中数据模型被一般化以处理通用记录流,包括任意二进制有效负载。形态线命令有点像flume拦截器。形态线可以嵌入到Hadoop组件中,比如Flume。
    提供了开箱即用的命令来解析和转换一组标准数据格式,如日志文件、Avro、CSV、文本、HTML、XML、PDF、Word、Excel等,还可以作为形态线插件添加其他数据格式的定制命令和解析器。任何类型的数据格式都可以建立索引,任何类型Solr模式的任何Solr文档都可以生成,任何定制的ETL逻辑都可以注册和执行。
    形态线操作连续的记录流。数据模型可以这样描述:记录是一组命名字段,其中每个字段都有一个或多个值的有序列表/值可以是任何Java对象。也就是说,记录本质上是一个哈希表,其中每个哈希表条目都包含一个字符串键和一个作为值的Java对象列表。(实现使用了番石榴的ArrayListMultimap,这是一个ListMultimap)。注意,一个字段可以有多个值,任何两个记录都不需要使用公共字段名。
    这个sink将Flume事件的主体填充到morphline记录的_attachment_body字段中,并将Flume事件的头部复制到同名的记录字段中。然后命令可以对这些数据进行操作。
    支持路由到SolrCloud集群,以提高可伸缩性。索引负载可以分散在大量的morphlinesolrsink上,以提高可伸缩性。索引负载可以跨多个morphlinesolrsink复制以获得高可用性,例如使用Flume特性(如负载平衡接收器处理器)。MorphlineInterceptor还可以帮助实现到多个Solr集合的动态路由(例如,对于多租户)。
    您的环境所需的形态线和solr jar必须放在Apache Flume安装的lib目录中。
    类型是FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
    必须属性以粗体显示。

    Property NameDefaultDescription
    channel  
    type The component type name, needs to be org.apache.flume.sink.solr.morphline.MorphlineSolrSink
    morphlineFile

    The relative or absolute path on the local file system to the morphline configuration file.

    Example: /etc/flume-ng/conf/morphline.conf

    morphlineId null Optional name used to identify a morphline if there are multiple morphlines in a morphline config file
    batchSize 1000 The maximum number of events to take per flume transaction.
    batchDurationMillis 1000

    The maximum duration per flume transaction (ms). The transaction commits after this

    duration or when batchSize is exceeded, whichever comes first.

    handlerClass

    org.apache.flume.sink.solr.

    morphline.MorphlineHandlerImpl

    The FQCN of a class implementing org.apache.flume.sink.solr.morphline.MorphlineHandler
    isProductionMode false

    This flag should be enabled for mission critical, large-scale online production systems that

    need to make progress without downtime when unrecoverable exceptions occur.

    Corrupt or malformed parser input data, parser bugs, and errors related to unknown

    Solr schema fields produce unrecoverable exceptions.

    recoverableExceptionClasses org.apache.solr.client.solrj.SolrServerException

    Comma separated list of recoverable exceptions that tend to be transient,

    in which case the corresponding task can be retried. Examples include network connection errors,

    timeouts, etc. When the production mode flag is set to true,

    the recoverable exceptions configured using this parameter will not be ignored and hence will lead to retries.

    isIgnoringRecoverableExceptions false

    This flag should be enabled, if an unrecoverable exception is accidentally misclassified as recoverable.

    This enables the sink to make progress and avoid retrying an event forever.

     agent a1 示例:

    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
    # a1.sinks.k1.morphlineId = morphline1
    # a1.sinks.k1.batchSize = 1000
    # a1.sinks.k1.batchDurationMillis = 1000

    11. ElasticSearchSink

    这个sink将数据写入一个elasticsearch集群。默认情况下,事件将被写入,以便Kibana图形界面能够显示它们——就像logstash编写它们一样。
    环境所需的elasticsearch和lucene-core jar必须放在Apache Flume安装的lib目录中。Elasticsearch要求客户机JAR的主版本与服务器的主版本匹配,并且两者运行相同的JVM小版本。如果不正确,将出现serializationexception。要选择所需的版本,首先要确定elasticsearch的版本和目标集群正在运行的JVM版本。然后选择一个与主版本匹配的elasticsearch客户端库 0.19.x客户端可以与一个 0.19.x的集群通信;0.20.x可以和0.20通信。0.90 x和可以和0.90.x对话。一旦确定了elasticsearch版本,然后读取pom。确定要使用的正确lucene-core JAR版本的xml文件。运行ElasticSearchSink的Flume代理还应该匹配目标集群运行到次要版本的JVM。
    每天事件将被写入一个新的索引。名称将是-yyyy-MM-dd,其中是indexName参数。sink将在UTC午夜开始写入一个新索引。
    默认情况下,ElasticSearchLogStashEventSerializer将事件序列化为elasticsearch。可以使用序列化器参数覆盖此行为。这个参数接受org.apache.flume.sink.elasticsearch的实现。ElasticSearchEventSerializer或org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory。不赞成实现ElasticSearchEventSerializer,支持更强大的ElasticSearchIndexRequestBuilderFactory。
    类型是FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink
    必须属性以粗体显示。

    Property NameDefaultDescription
    channel  
    type The component type name, needs to be org.apache.flume.sink.elasticsearch.ElasticSearchSink
    hostNames Comma separated list of hostname:port, if the port is not present the default port ‘9300’ will be used
    indexName flume

    The name of the index which the date will be appended to. Example ‘flume’ -> ‘flume-yyyy-MM-dd’

    Arbitrary header substitution is supported, eg. %{header} replaces with value of named event header

    indexType logs

    The type to index the document to, defaults to ‘log’ Arbitrary header substitution is supported,

    eg. %{header} replaces with value of named event header

    clusterName elasticsearch Name of the ElasticSearch cluster to connect to
    batchSize 100 Number of events to be written per txn.
    ttl

    TTL in days, when set will cause the expired documents to be deleted automatically,

    if not set documents will never be automatically deleted. TTL is accepted both in the earlier form

    of integer only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second), m (minute),

    h (hour), d (day) and w (week). Example a1.sinks.k1.ttl = 5d will set TTL to 5 days.

    Followhttp://www.elasticsearch.org/guide/reference/mapping/ttl-field/ for more information.

    serializer

    org.apache.flume.sink.elasticsearch.

    ElasticSearchLogStashEventSerializer

    The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use.

    Implementations of either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred.

    serializer.* Properties to be passed to the serializer.

    注意,使用事件头的值来动态决定存储事件时要使用的索引名和索引类型非常方便。使用此功能时要小心,因为事件提交器现在已经控制了indexName和indexType。此外,如果使用elasticsearch REST客户端,则事件提交器可以控制所使用的URL路径。

     agent a1 示例:

    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = elasticsearch
    a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
    a1.sinks.k1.indexName = foo_index
    a1.sinks.k1.indexType = bar_type
    a1.sinks.k1.clusterName = foobar_cluster
    a1.sinks.k1.batchSize = 500
    a1.sinks.k1.ttl = 5d
    a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
    a1.sinks.k1.channel = c1

    12  Kite Dataset Sink

    将事件写入Kite数据集的实验性接收器。这个接收器将反序列化每个传入事件的主体,并将结果记录存储在Kite数据集中。它通过按URI加载数据集来确定目标数据集。
    唯一受支持的序列化是avro,记录模式必须在事件头中传递,使用flume.avro.schema中的任何一个。使用JSON模式表示的文本或flume.avro.schema。一个可以找到模式的url (hdfs:/…支持uri)。这与Log4jAppender flume客户机和使用反序列化器的假脱机目录源的Avro反序列化器兼容。schemaType =LITERAL
    注1:flume.avro.schema。不支持哈希头。注2:在某些情况下,文件滚动可能会在超过滚动间隔后轻微发生。但是,这个延迟不会超过5秒。在大多数情况下,延迟是无法辨认的。

    Property NameDefaultDescription
    channel  
    type Must be org.apache.flume.sink.kite.DatasetSink
    kite.dataset.uri URI of the dataset to open
    kite.repo.uri URI of the repository to open (deprecated; use kite.dataset.uri instead)
    kite.dataset.namespace Namespace of the Dataset where records will be written (deprecated; use kite.dataset.uri instead)
    kite.dataset.name Name of the Dataset where records will be written (deprecated; use kite.dataset.uri instead)
    kite.batchSize 100 Number of records to process in each batch
    kite.rollInterval 30 Maximum wait time (seconds) before data files are released
    kite.flushable.commitOnBatch true

    If true, the Flume transaction will be commited and the writer will be

    flushed on each batch of kite.batchSize records. This setting only

    applies to flushable datasets. When true, it’s possible for temp files with

    commited data to be left in the dataset directory. These files need to be

    recovered by hand for the data to be visible to DatasetReaders.

    kite.syncable.syncOnBatch true

    Controls whether the sink will also sync data when committing the transaction.

    This setting only applies to syncable datasets. Syncing gaurentees that data will

    be written on stable storage on the remote system while flushing only gaurentees

    that data has left Flume’s client buffers. When the kite.flushable.commitOnBatch 

    property is set to false, this property must also be set to false.

    kite.entityParser avro

    Parser that turns Flume Events into Kite entities. Valid values are avro 

    and the fully-qualified class name of an implementation of the EntityParser.Builder interface.

    kite.failurePolicy retry

    Policy that handles non-recoverable errors such as a missing Schema in the Event header.

    The default value, retry, will fail the current batch and try again which matches the old behavior.

    Other valid values are save, which will write the raw Event to the kite.error.dataset.uri dataset,

    and the fully-qualified class name of an implementation of the FailurePolicy.Builder interface.

    kite.error.dataset.uri

    URI of the dataset where failed events are saved when kite.failurePolicy is set to save.

     Required when the kite.failurePolicy is set to save.

    auth.kerberosPrincipal Kerberos user principal for secure authentication to HDFS
    auth.kerberosKeytab Kerberos keytab location (local FS) for the principal
    auth.proxyUser The effective user for HDFS actions, if different from the kerberos principal

     13.  Kafka Sink

    这是一个Flume Sink实现,可以将数据发布到Kafka主题。目标之一是将Flume与Kafka集成,这样基于pull的处理系统就可以处理来自各种Flume源的数据。目前支持Kafka 0.9.x系列发行版。
    这个版本的Flume不再支持Kafka的旧版本(0.8.x)。
    必需的属性用粗体标记。

    Property NameDefaultDescription
    type Must be set to org.apache.flume.sink.kafka.KafkaSink
    kafka.bootstrap.servers

    List of brokers Kafka-Sink will connect to, to get the list of topic partitions

    This can be a partial list of brokers, but we recommend at least two for HA.

    The format is comma separated list of hostname:port

    kafka.topic default-flume-topic

    The topic in Kafka to which the messages will be published.

    If this parameter is configured, messages will be published to this topic.

    If the event header contains a “topic” field, the event will be published to that

    topic overriding the topic configured here. Arbitrary header substitution is supported,

    eg. %{header} is replaced with value of event header named “header”. (If using the substitution,

    it is recommended to set “auto.create.topics.enable” property of Kafka broker to true.)

    flumeBatchSize 100 How many messages to process in one batch. Larger batches improve throughput while adding latency.
    kafka.producer.acks 1

    How many replicas must acknowledge a message before its considered successfully written.

    Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas)

    Set this to -1 to avoid data loss in some cases of leader failure.

    useFlumeEventFormat false

    By default events are put as bytes onto the Kafka topic directly from the event body.

    Set to true to store events as the Flume Avro binary format. Used in conjunction with

    the same property on the KafkaSource or with the parseAsFlumeEvent property on the

    Kafka Channel this will preserve any Flume headers for the producing side.

    defaultPartitionId

    Specifies a Kafka partition ID (integer) for all events in this channel to be sent to,

    unless overriden by partitionIdHeader. By default, if this property is not set,

    events will be distributed by the Kafka Producer’s partitioner - including by key 

    if specified (or by a partitioner specified by kafka.partitioner.class).

    partitionIdHeader

    When set, the sink will take the value of the field named using the value of this property from

    the event header and send the message to the specified partition of the topic.

    If the value represents an invalid partition, an EventDeliveryException will be thrown.

    If the header value is present then this setting overrides defaultPartitionId.

    allowTopicOverride true

    When set, the sink will allow a message to be produced into a topic specified by the 

    topicHeaderproperty (if provided).

    topicHeader topic

    When set in conjunction with allowTopicOverride will produce a message

    into the value of the header named using the value of this property. Care should be taken

    when using in conjunction with the Kafka Source topicHeader property to avoid creating a loopback.

    kafka.producer.security.protocol PLAINTEXT

    Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security.

    See below for additional info on secure setup.

    more producer security props  

    If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties

    that need to be set on producer.

    Other Kafka Producer Properties

    These properties are used to configure the Kafka Producer. Any producer property

    supported by Kafka can be used. The only requirement is to prepend the property name with the

    prefixkafka.producer. For example: kafka.producer.linger.ms

    注意,Kafka Sink使用来自FlumeEvent头部的主题和键属性将事件发送到Kafka。如果标题中存在主题,则事件将被发送到该特定主题,覆盖为Sink配置的主题。如果键存在于标题中,Kafka将使用该键在主题分区之间对数据进行分区。具有相同键的事件将被发送到相同的分区。如果键为null,则事件将发送到随机分区。
    Kafka接收器还为key.serializer(org.apache. Kafka .common. serialize . stringserializer)和value.serializer(org.apache. Kafka .common. serialize . bytearrayserializer)提供默认值。不建议修改这些参数。
    弃用属性:

    Property NameDefaultDescription
    brokerList Use kafka.bootstrap.servers
    topic default-flume-topic Use kafka.topic
    batchSize 100 Use kafka.flumeBatchSize
    requiredAcks 1 Use kafka.producer.acks

    下面给出了Kafka sink的一个配置示例。属性以前缀kafka开头.kafka producer.在创建Kafka生成器时传递的属性并不仅限于本例中给出的属性。还可以在这里包含定制属性,并通过作为方法参数传入的Flume上下文对象在预处理器中访问它们。

    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = mytopic
    a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    a1.sinks.k1.kafka.producer.compression.type = snappy

    Security and Kafka Sink:

     

    Flume和Kafka之间的通信通道支持安全认证和数据加密。对于安全身份验证,可以使用Kafka版本0.9.0中的SASL/GSSAPI (Kerberos V5)或SSL(尽管参数名为SSL,但实际的协议是TLS实现)。

    到目前为止,数据加密仅由SSL/TLS提供。

    设置kafka.producer.security.protocol 符合下列任何一项值意味着:

    • SASL_PLAINTEXT - 没有数据加密的Kerberos或明文身份验证
    • SASL_SSL - 带有数据加密的Kerberos或纯文本身份验证
    • SSL - 基于TLS加密,具有可选的身份验证.

    警告:启用SSL时会导致性能下降,其程度取决于CPU类型和JVM实现。参考文献:Kafka安全概述和用于跟踪这个问题的jira: Kafka -2561

    TLS and Kafka Sink:

     

    请阅读配置Kafka客户端SSL中描述的步骤,以了解用于微调的其他配置设置,例如以下任何一种:安全提供程序、密码套件、启用的协议、信任存储或密钥存储类型。

    使用服务器端身份验证和数据加密的示例配置。

    a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
    a1.sinks.sink1.kafka.topic = mytopic
    a1.sinks.sink1.kafka.producer.security.protocol = SSL
    a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
    a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

    注意:默认情况下属性ssl.endpoint.identification.algorithm没有定义,因此没有执行主机名验证。为了启用主机名验证,请设置以下属性:

    a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS

    一旦启用,客户端将针对以下两个字段之一验证服务器的完全限定域名(FQDN):

    1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
    2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

    如果还需要客户端身份验证,那么应该向Flume agent配置添加以下内容。每个Flume agent必须拥有自己的客户端证书,这些证书必须由Kafka agent单独或通过其签名链进行信任。常见的示例是通过一个根CA对每个客户端证书进行签名,而这个根CA又受到Kafka代理的信任。

    a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
    a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>

    如果密钥存储和密钥使用不同的密码保护,则使用ssl.key.password 属性将为生产者密钥库提供所需的额外秘密:

    a1.sinks.sink1.kafka.producer.ssl.key.password = <password to access the key>

    Kerberos and Kafka Sink: 

    要将Kafka sink与Kerberos保护的Kafka集群一起使用,请设置producer.security.protocol上面为生产者指出的属性。与Kafka代理一起使用的Kerberos keytab和主体在JAAS文件的“KafkaClient”部分中指定。“客户端”部分描述了需要时的Zookeeper连接。有关JAAS文件内容的信息,请参见Kafka文档。可以通过flume-env.sh中的JAVA_OPTS指定这个JAAS文件的位置,也可以选择指定系统范围内的kerberos配置: 

    JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
    JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

    使用SASL_PLAINTEXT的安全配置示例:

    a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
    a1.sinks.sink1.kafka.topic = mytopic
    a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
    a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

    使用SASL_SSL的安全配置示例: 

    a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
    a1.sinks.sink1.kafka.topic = mytopic
    a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
    a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
    a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
    a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
    a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

    JAAS文件示例。有关其内容的参考,请参阅SASL配置的Kafka文档中所需身份验证机制(GSSAPI/PLAIN)的客户端配置部分。与Kafka源或Kafka通道不同,“客户端”部分不是必需的,除非其他连接组件需要它。另外,请确保Flume进程的操作系统用户具有jaas和keytab文件上的读权限。

    KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      keyTab="/path/to/keytabs/flume.keytab"
      principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
    };

    14  HTTP Sink

    此接收器的行为是,它将从通道获取事件,并使用HTTP POST请求将这些事件发送到远程服务。事件内容作为POST主体发送。

    此接收器的错误处理行为取决于目标服务器返回的HTTP响应。sink backoff/ready状态是可配置的,事务提交/回滚结果也是可配置的,该事件是否有助于成功的事件排放计数也是可配置的。

    状态代码不可读的服务器返回的任何格式错误的HTTP响应都将导致回退信号,并且事件不会从channel中消费。

    必须属性以粗体显示。

    Property NameDefaultDescription
    channel  
    type The component type name, needs to be http.
    endpoint The fully qualified URL endpoint to POST to
    connectTimeout 5000 The socket connection timeout in milliseconds
    requestTimeout 5000 The maximum request processing time in milliseconds
    contentTypeHeader text/plain The HTTP Content-Type header
    acceptHeader text/plain The HTTP Accept header value
    defaultBackoff true Whether to backoff by default on receiving all HTTP status codes
    defaultRollback true Whether to rollback by default on receiving all HTTP status codes
    defaultIncrementMetrics false Whether to increment metrics by default on receiving all HTTP status codes
    backoff.CODE Configures a specific backoff for an individual (i.e. 200) code or a group (i.e. 2XX) code
    rollback.CODE Configures a specific rollback for an individual (i.e. 200) code or a group (i.e. 2XX) code
    incrementMetrics.CODE Configures a specific metrics increment for an individual (i.e. 200) code or a group (i.e. 2XX) code

    请注意,最特定的HTTP状态代码匹配用于backoff、rollback和incrementMetrics配置选项。如果2XX和200状态码都有配置值,那么200个HTTP代码将使用200值,而201-299范围内的所有其他HTTP代码将使用2XX值。

    在不向HTTP端点发出任何请求的情况下,将使用任何空事件或空事件。

    agent a1 示例:

    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = http
    a1.sinks.k1.channel = c1
    a1.sinks.k1.endpoint = http://localhost:8080/someuri
    a1.sinks.k1.connectTimeout = 2000
    a1.sinks.k1.requestTimeout = 2000
    a1.sinks.k1.acceptHeader = application/json
    a1.sinks.k1.contentTypeHeader = application/json
    a1.sinks.k1.defaultBackoff = true
    a1.sinks.k1.defaultRollback = true
    a1.sinks.k1.defaultIncrementMetrics = false
    a1.sinks.k1.backoff.4XX = false
    a1.sinks.k1.rollback.4XX = false
    a1.sinks.k1.incrementMetrics.4XX = true
    a1.sinks.k1.backoff.200 = false
    a1.sinks.k1.rollback.200 = false
    a1.sinks.k1.incrementMetrics.200 = true

    15 Custom Sink

    自定义接收器是接收器接口的自己实现。启动Flume agent时,自定义sink的类及其依赖项必须包含在代理的类路径中。自定义接收器的类型是它的FQCN。必须属性以粗体显示。

    Property NameDefaultDescription
    channel  
    type The component type name, needs to be your FQCN

    agent a1 示例:

    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = org.example.MySink
    a1.sinks.k1.channel = c1

    翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guide

    篇幅限制,分为以下5篇:

    【翻译】Flume 1.8.0 User Guide(用户指南)

    【翻译】Flume 1.8.0 User Guide(用户指南) source

    【翻译】Flume 1.8.0 User Guide(用户指南) Sink

    【翻译】Flume 1.8.0 User Guide(用户指南) Channel

    【翻译】Flume 1.8.0 User Guide(用户指南) Processors

  • 相关阅读:
    C语言I博客作业02
    第一次C语言作业
    C语言I博客作业02
    网页常用分享代码
    js生成验证码并验证
    js时间格式的转换
    Git 常用命令
    ASP.NET MVC中使用事务写法
    数据库游标导入数据
    js截取所需字符串长度
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/10341705.html
Copyright © 2011-2022 走看看