zoukankan      html  css  js  c++  java
  • Sink

    !!!1.Logger Sink 
    记录INFO级别的日志,通常用于调试。
     
    属性说明:
    !channel  
    !type The component type name, needs to be logger
    maxBytesToLog 16 Maximum number of bytes of the Event body to log
     
    要求必须在 --conf 参数指定的目录下有 log4j的配置文件
    也可以通过-Dflume.root.logger=INFO,console在命令启动时手动指定log4j参数
     
    案例:
    参见入门案例
     
    !!!2.File Roll Sink
    在本地文件系统中存储事件。
    每隔指定时长生成文件保存这段时间内收集到的日志信息。
     
    属性说明:
    !channel  
    !type 类型,必须是"file_roll"
    !sink.directory 文件被存储的目录
    sink.rollInterval 30 滚动文件每隔30秒(应该是每隔30秒钟单独切割数据到一个文件的意思)。如果设置为0,则禁止滚动,从而导致所有数据被写入到一个文件中。
    sink.serializer TEXT Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
    batchSize 100  
     
    案例:
    编写配置文件:
    #命名Agent a1的组件
    a1.sources  =  r1
    a1.sinks  =  k1
    a1.channels  =  c1
     
    #描述/配置Source
    a1.sources.r1.type  = http
    a1.sources.r1.port  = 6666
     
    #描述Sink
    a1.sinks.k1.type  = file_roll
    a1.sinks.k1.directory = /home/park/work/apache-flume-1.6.0-bin/mysink
    #描述内存Channel
    a1.channels.c1.type  =  memory
    a1.channels.c1.capacity  =  1000
    a1.channels.c1.transactionCapacity  =  100
     
    #为Channle绑定Source和Sink
    a1.sources.r1.channels  =  c1
    a1.sinks.k1.channel  =  c1
     
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template7.conf --name a1 -Dflume.root.logger=INFO,console
    !!!3.Avro Sink
    是实现多级流动 和 扇出流(1到多) 扇入流(多到1) 的基础。
     
    属性说明:
    !channel  
    !type The component type name, needs to be avro.
    !hostname The hostname or IP address to bind to.
    !port The port # to listen on.
    batch-size 100 number of event to batch together for send.
    connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
    request-timeout 20000 Amount of time (ms) to allow for requests after the first.
    reset-connection-interval none Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.
    compression-type none This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
    compression-level 6 The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression
    ssl false Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”.
    trust-all-certs false If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and “listen in” on the encrypted connection.
    truststore The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used.
    truststore-password The password for the specified truststore.
    truststore-type JKS The type of the Java truststore. This can be “JKS” or other supported Java truststore type.
    exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
    maxIoWorkers 2 * the number of available processors in the machine The maximum number of I/O w
     
    案例1 - 多级流动
    h2:
    配置配置文件:
    #命名Agent组件
    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1
     
    #描述/配置Source
    a1.sources.r1.type=avro
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=9988
    #描述Sink
    a1.sinks.k1.type=logger
    #描述内存Channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=1000
    #为Channel绑定Source和Sink
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console
     
     
    h1:
    配置配置文件
    #命名Agent组件
    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1
     
    #描述/配置Source
    a1.sources.r1.type=http
    a1.sources.r1.port=8888
    #描述Sink
    a1.sinks.k1.type=avro
    a1.sinks.k1.hostname=192.168.242.138
    a1.sinks.k1.port=9988
    #描述内存Channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=1000
    #为Channel绑定Source和Sink
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console
     
    发送http请求到h1:
    curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://192.168.242.133:8888
    稍等几秒后,发现h2最终收到了这条消息
     
    案例2:扇出流 - 复制
    h2 h3:
    配置配置文件:
    #命名Agent组件
    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1
     
    #描述/配置Source
    a1.sources.r1.type=avro
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=9988
    #描述Sink
    a1.sinks.k1.type=logger
    #描述内存Channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=1000
    #为Channel绑定Source和Sink
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console
     
    h1:
    配置配置文件
    #命名Agent组件
    a1.sources=r1
    a1.sinks=k1 k2
    a1.channels=c1 c2
     
    #描述/配置Source
    a1.sources.r1.type=http
    a1.sources.r1.port=8888
    #描述Sink
    a1.sinks.k1.type=avro
    a1.sinks.k1.hostname=192.168.242.138
    a1.sinks.k1.port=9988
    a1.sinks.k2.type=avro
    a1.sinks.k2.hostname=192.168.242.135
    a1.sinks.k2.port=9988
    #描述内存Channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=1000
    a1.channels.c2.type=memory
    a1.channels.c2.capacity=1000
    a1.channels.c2.transactionCapacity=1000
    #为Channel绑定Source和Sink
    a1.sources.r1.channels=c1 c2
    a1.sinks.k1.channel=c1
    a1.sinks.k2.channel=c2
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console
     
    案例3:扇出流 - 多路复用(路由)
    h2 h3:
    配置配置文件:
    #命名Agent组件
    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1
     
    #描述/配置Source
    a1.sources.r1.type=avro
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=9988
    #描述Sink
    a1.sinks.k1.type=logger
    #描述内存Channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=1000
    #为Channel绑定Source和Sink
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console
     
    h1:
    配置配置文件
    #配置Agent组件
    a1.sources=r1
    a1.sinks=k1 k2
    a1.channels=c1 c2
     
    #描述/配置Source
    a1.sources.r1.type=http
    a1.sources.r1.port=8888
    a1.sources.r1.selector.type=multiplexing
    a1.sources.r1.selector.header=flag
    a1.sources.r1.selector.mapping.aaa=c1
    a1.sources.r1.selector.mapping.bbb=c2
    a1.sources.r1.selector.default=c1
    #描述Sink
    a1.sinks.k1.type=avro
    a1.sinks.k1.hostname=192.168.242.138
    a1.sinks.k1.port=9988
    a1.sinks.k2.type=avro
    a1.sinks.k2.hostname=192.168.242.135
    a1.sinks.k2.port=9988
    #描述内存Channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=1000
    a1.channels.c2.type=memory
    a1.channels.c2.capacity=1000
    a1.channels.c2.transactionCapacity=1000
    #为Channel绑定Source和Sink
    a1.sources.r1.channels=c1 c2
    a1.sinks.k1.channel=c1
    a1.sinks.k2.channel=c2
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console
    发送http请求进行测试。发现可以实现路由效果
     
    案例4:扇入流
    m3:
    编写配置文件:
    #命名Agent组件
    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1
    #描述/配置Source
    a1.sources.r1.type=avro
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=4141
    #描述Sink
    a1.sinks.k1.type=logger
    #描述内存Channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=1000
    #为Channel绑定Source和Sink
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template.conf --name a1 -Dflume.root.logger=INFO,console
     
    m1、m2:
    编写配置文件:
    #命名Agent组件
    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1
     
    #描述/配置Source
    a1.sources.r1.type=http
    a1.sources.r1.port=8888
    #描述Sink
    a1.sinks.k1.type=avro
    a1.sinks.k1.hostname=192.168.242.135
    a1.sinks.k1.port=4141
    #描述内存Channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=1000
    #为Channel绑定Source和Sink
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template9.conf --name a1 -Dflume.root.logger=INFO,console
    m1通过curl发送一条http请求,由于默认使用的是jsonHandler,数据格式必须是指定的json格式:
    [root@localhost conf]# curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
      m2通过curl发送一条http请求,由于默认使用的是jsonHandler,数据格式必须是指定的json格式:
    [root@localhost conf]# curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
    发现m3均能正确收到消息
     
     
    !!!!4.HDFS Sink
    此Sink将事件写入到Hadoop分布式文件系统HDFS中。
    目前它支持创建文本文件和序列化文件。
    对这两种格式都支持压缩。
    这些文件可以分卷,按照指定的时间或数据量或事件的数量为基础。
    它还通过类似时间戳或机器属性对数据进行 buckets/partitions 操作    It also buckets/partitions data by attributes like timestamp or machine where the event originated.
    HDFS的目录路径可以包含将要由HDFS替换格式的转移序列用以生成存储事件的目录/文件名。
    使用这个Sink要求haddop必须已经安装好,以便Flume可以通过hadoop提供的jar包与HDFS进行通信。
    注意,此版本hadoop必须支持sync()调用。
     
    属性说明:
    !channel  
    !type 类型名称,必须是“HDFS”
    !hdfs.path HDFS 目录路径 (eg hdfs://namenode/flume/webdata/)
    hdfs.filePrefix FlumeData Flume在目录下创建文件的名称前缀
    hdfs.fileSuffix 追加到文件的名称后缀 (eg .avro - 注: 日期时间不会自动添加)
    hdfs.inUsePrefix Flume正在处理的文件所加的前缀
    hdfs.inUseSuffix .tmp Flume正在处理的文件所加的后缀
    hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval)
    hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size)
    hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events)
    hdfs.idleTimeout 0 Timeout after which inactive files get closed (0 = disable automatic closing of idle files)
    hdfs.batchSize 100 number of events written to file before it is flushed to HDFS
    hdfs.codeC Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy
    hdfs.fileType SequenceFile File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
    hdfs.maxOpenFiles 5000 Allow only this number of open files. If this number is exceeded, the oldest file is closed.
    hdfs.minBlockReplicas Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.
    hdfs.writeFormat Format for sequence file records. One of “Text” or “Writable” (the default).
    hdfs.callTimeout 10000 Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring.
    hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
    hdfs.rollTimerPoolSize 1 Number of threads per HDFS sink for scheduling timed file rolling
    hdfs.kerberosPrincipal Kerberos user principal for accessing secure HDFS
    hdfs.kerberosKeytab Kerberos keytab for accessing secure HDFS
    hdfs.proxyUser    
    hdfs.round false 时间戳是否向下取整(如果是true,会影响所有基于时间的转移序列,除了%T)
    hdfs.roundValue 1 舍值的边界值
    hdfs.roundUnit 向下舍值的单位 -  second, minute , hour
    hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
    hdfs.useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
    hdfs.closeTries 0 Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart.
    hdfs.retryInterval 180 Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension.
    serializer TEXT Other possible options include avro_event or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.
     
    案例:
    编写配置文件:
    #命名Agent组件
    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1
     
    #描述/配置Source
    a1.sources.r1.type=http
    a1.sources.r1.port=8888
    #描述Sink
    a1.sinks.k1.type=hdfs
    a1.sinks.k1.hdfs.path=hdfs://0.0.0.0:9000/ppp
    #描述内存Channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=1000
    #为Channel绑定Source和Sink
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template9.conf --name a1 -Dflume.root.logger=INFO,console
     
    !!!!5.Hive Sink
    6.Custom Sink
    自定义接收器,是自己实现的接收器接口Sink来实现的。
    自定义接收器的类及其依赖类须在Flume启动前放置到Flume类加载目录下。
     
    属性说明:
    type 类型,需要指定为自己实现的Sink类的全路径名
  • 相关阅读:
    Java中的多态polymorphism
    Java中的“继承”
    面向对象(Oriented Object Programing)的本质
    Java设计方法原则
    Motor Parameter
    Motor Vocabuary
    JD
    CAN
    多个TXT文件合成一个,输出到excel里
    正则表达式
  • 原文地址:https://www.cnblogs.com/zpb2016/p/5766918.html
Copyright © 2011-2022 走看看