zoukankan      html  css  js  c++  java
  • Source 介绍

    !!!1.Avro Source
    监听AVRO端口来接受来自外部AVRO客户端的事件流。
    利用Avro Source可以实现多级流动、扇出流、扇入流等效果。
    另外也可以接受通过flume提供的Avro客户端发送的日志信息。
     
     
    支持的属性:
    !channels  
         !type 类型名称,"AVRO"    
    !bind 需要监听的主机名或IP
    !port 要监听的端口
    threads 工作线程最大线程数
    selector.type    
    selector.*    
    interceptors 空格分隔的拦截器列表
    interceptors.*    
    compression-type none 压缩类型,可以是“none”或“default”,这个值必须和AvroSource的压缩格式匹配
    ssl false 是否启用ssl加密,如果启用还需要配置一个“keystore”和一个“keystore-password”.
    keystore 为SSL提供的 java密钥文件 所在路径
    keystore-password 为SSL提供的 java密钥文件 密码
    keystore-type JKS 密钥库类型可以是 “JKS” 或 “PKCS12”.
    exclude-protocols SSLv3 空格分隔开的列表,用来指定在SSL / TLS协议中排除。SSLv3将总是被排除除了所指定的协议。
    ipFilter false 如果需要为netty开启ip过滤,将此项设置为true
    ipFilterRules 陪netty的ip过滤设置表达式规则
     
    案例:
    编写配置文件:
    #命名Agent a1的组件
    a1.sources  =  r1
    a1.sinks  =  k1
    a1.channels  =  c1
     
    #描述/配置Source
    a1.sources.r1.type  =  avro
    a1.sources.r1.bind  =  0.0.0.0
    a1.sources.r1.port  =  33333
     
    #描述Sink
    a1.sinks.k1.type  =  logger
    #描述内存Channel
    a1.channels.c1.type  =  memory
    a1.channels.c1.capacity  =  1000
    a1.channels.c1.transactionCapacity  =  100
     
    #为Channle绑定Source和Sink
    a1.sources.r1.channels  =  c1
    a1.sinks.k1.channel  =  c1
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template2.conf --name a1 -Dflume.root.logger=INFO,console
    通过flume提供的avro客户端向指定机器指定端口发送日志信息:
    ./flume-ng avro-client --conf ../conf --host 0.0.0.0 --port 33333 --filename ../mydata/log1.txt
    发现确实收集到了日志
     
    2.Exec Source
    可以将命令产生的输出作为源
     
    属性说明:
    !channels  
    !type 类型名称,需要是"exec"
    !command 要执行的命令
    shell A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.
    restartThrottle 10000 毫秒为单位的时间,用来声明等待多久后尝试重试命令
    restart false 如果cmd挂了,是否重启cmd
    logStdErr false 无论是否是标准错误都该被记录
    batchSize 20 同时发送到通道中的最大行数
    batchTimeout 3000 如果缓冲区没有满,经过多长时间发送数据
    selector.type 复制还是多路复用
    selector.*   Depends on the selector.type value
    interceptors 空格分隔的拦截器列表
    interceptors.*    
    案例:
    编写配置文件:
    #命名Agent a1的组件
    a1.sources  =  r1
    a1.sinks  =  k1
    a1.channels  =  c1
     
    #描述/配置Source
    a1.sources.r1.type  =  avro
    a1.sources.r1.bind  =  0.0.0.0
    a1.sources.r1.port  =  33333
     
    #描述Sink
    a1.sinks.k1.type  =  logger
    #描述内存Channel
    a1.channels.c1.type  =  memory
    a1.channels.c1.capacity  =  1000
    a1.channels.c1.transactionCapacity  =  100
     
    #为Channle绑定Source和Sink
    a1.sources.r1.channels  =  c1
    a1.sinks.k1.channel  =  c1
     
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template2.conf --name a1 -Dflume.root.logger=INFO,console
     
    **可以通过tail命令,收集日志文件中后续追加的日志
     
    !!!3.Spooling Directory Source
    这个Source允许你将文件将要收集的数据放置到"自动搜集"目录中。这个Source将监视该目录,并将解析新文件的出现。事件处理逻辑是可插拔的,当一个文件被完全读入信道,它会被重命名或可选的直接删除。
    要注意的是,放置到自动搜集目录下的文件不能修改,如果修改,则flume会报错。
    另外,也不能产生重名的文件,如果有重名的文件被放置进来,则flume会报错。
     
     
    属性说明:
    !channels  
    !type 类型,需要指定为"spooldir"
    !spoolDir 读取文件的路径,即"搜集目录"
    fileSuffix .COMPLETED 对处理完成的文件追加的后缀
    deletePolicy never 处理完成后是否删除文件,需是"never"或"immediate"
    fileHeader false Whether to add a header storing the absolute path filename.
    fileHeaderKey file Header key to use when appending absolute path filename to event header.
    basenameHeader false Whether to add a header storing the basename of the file.
    basenameHeaderKey basename Header Key to use when appending basename of file to event header.
    ignorePattern ^$ 正则表达式指定哪些文件需要忽略
    trackerDir .flumespool Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
    consumeOrder 处理文件的策略,oldest, youngest 或 random。
    maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter.
    batchSize 100 Granularity at which to batch transfer to the channel
    inputCharset UTF-8 读取文件时使用的编码。
    decodeErrorPolicy FAIL 当在输入文件中发现无法处理的字符编码时如何处理。FAIL:抛出一个异常而无法 解析该文件。REPLACE:用“替换字符”字符,通常是Unicode的U + FFFD更换不可解析角色。 忽略:掉落的不可解析的字符序列。
    deserializer LINE 声明用来将文件解析为事件的解析器。默认一行为一个事件。处理类必须实现EventDeserializer.Builder接口。
    deserializer.*   Varies per event deserializer.
    bufferMaxLines (Obselete) This option is now ignored.
    bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
    selector.type replicating replicating or multiplexing
    selector.*   Depends on the selector.type value
    interceptors Space-separated list of interceptors
    interceptors.*    
     
    案例:
    编写配置文件:
    #命名Agent a1的组件
    a1.sources  =  r1
    a1.sinks  =  k1
    a1.channels  =  c1
     
    #描述/配置Source
    a1.sources.r1.type  = spooldir
    a1.sources.r1.spoolDir  = /home/park/work/apache-flume-1.6.0-bin/mydata
     
    #描述Sink
    a1.sinks.k1.type  =  logger
    #描述内存Channel
    a1.channels.c1.type  =  memory
    a1.channels.c1.capacity  =  1000
    a1.channels.c1.transactionCapacity  =  100
     
    #为Channle绑定Source和Sink
    a1.sources.r1.channels  =  c1
    a1.sinks.k1.channel  =  c1
     
     
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template4.conf --name a1 -Dflume.root.logger=INFO,console
     
    向指定目录中传输文件,发现flume收集到了该文件,将文件中的每一行都作为日志来处理。
     
     
    !!!4.NetCat Source
    一个NetCat Source用来监听一个指定端口,并将接收到的数据的每一行转换为一个事件。
     
    属性说明:
    !channels  
    !type 类型名称,需要被设置为"netcat"
    !bind 指定要绑定到的ip或主机名。
    !port 指定要绑定到的端口号
    max-line-length 512 单行最大字节数
    ack-every-event true 对于收到的每一个Event是否响应"OK"
    selector.type
    selector.*  
    interceptors
    interceptors.*
     
    案例:
    参见快速入门案例
     
    5.Sequence Generator Source -- 序列发生器源
    一个简单的序列发生器,不断的产生事件,值是从0开始每次递增1。
    主要用来进行测试。
     
    参数说明:
    !channels  
    !type 类型名称,必须为"seq"
    selector.type  
    selector.*
    interceptors
    interceptors.*    
    batchSize 1
     
    案例:
    编写配置文件:
    #命名Agent a1的组件
    a1.sources  =  r1
    a1.sinks  =  k1
    a1.channels  =  c1
     
    #描述/配置Source
    a1.sources.r1.type  = seq
     
    #描述Sink
    a1.sinks.k1.type  =  logger
    #描述内存Channel
    a1.channels.c1.type  =  memory
    a1.channels.c1.capacity  =  1000
    a1.channels.c1.transactionCapacity  =  100
     
    #为Channle绑定Source和Sink
    a1.sources.r1.channels  =  c1
    a1.sinks.k1.channel  =  c1
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template4.conf --name a1 -Dflume.root.logger=INFO,console
    发现打印了日志
     
    6.HTTP Source
    此Source接受HTTP的GET和POST请求作为Flume的事件。
    其中GET方式应该只用于试验。
    需要提供一个可插拔的"处理器"来将请求转换为事件对象,这个处理器必须实现HTTPSourceHandler接口
    这个处理器接受一个 HttpServletRequest对象,并返回一个Flume Envent对象集合。
    从一个HTTP请求中得到的事件将在一个事务中提交到通道中。thus allowing for increased efficiency on channels like the file channel。
    如果处理器抛出一个异常,Source将会返回一个400的HTTP状态码。
    如果通道已满,无法再将Event加入Channel,则Source返回503的HTTP状态码,表示暂时不可用。
     
    参数说明:
    !type   类型,必须为"HTTP"
    !port 监听的端口
    bind 0.0.0.0 监听的主机名或ip
    handler org.apache.flume.source.http.JSONHandler 处理器类,需要实现HTTPSourceHandler接口
    handler.* 处理器的配置参数
    selector.type
    selector.*  
    interceptors
    interceptors.*    
    enableSSL false 是否开启SSL,如果需要设置为true。注意,HTTP不支持SSLv3。
    excludeProtocols SSLv3 空格分隔的要排除的SSL/TLS协议。SSLv3总是被排除的。
    keystore   密钥库文件所在位置。
    keystorePassword Keystore 密钥库密码
     
    案例:
    编写配置文件:
    #命名Agent a1的组件
    a1.sources  =  r1
    a1.sinks  =  k1
    a1.channels  =  c1
     
    #描述/配置Source
    a1.sources.r1.type  = http
    a1.sources.r1.port  = 66666
     
    #描述Sink
    a1.sinks.k1.type  =  logger
    #描述内存Channel
    a1.channels.c1.type  =  memory
    a1.channels.c1.capacity  =  1000
    a1.channels.c1.transactionCapacity  =  100
     
    #为Channle绑定Source和Sink
    a1.sources.r1.channels  =  c1
    a1.sinks.k1.channel  =  c1
     
    启动flume:
    ./flume-ng agent --conf ../conf --conf-file ../conf/template6.conf --name a1 -Dflume.root.logger=INFO,console
    通过命令发送HTTP请求到指定端口:
    curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://0.0.0.0:6666
    发现flume收集到了日志
     
    常见的Handler:
    JSONHandler
    可以处理JSON格式的数据,并支持UTF-8 UTF-16 UTF-32字符集
    该handler接受Evnet数组,并根据请求头中指定的编码将其转换为Flume Event
    如果没有指定编码,默认编码为UTF-8.
    JSON格式如下:
    --
    [{
      "headers" : {
                 "timestamp" : "434324343",
                 "host" : "random_host.example.com"
                 },
      "body" : "random_body"
      },
      {
      "headers" : {
                 "namenode" : "namenode.example.com",
                 "datanode" : "random_datanode.example.com"
                 },
      "body" : "really_random_body"
      }]
    --
    To set the charset, the request must have content type specified as application/json;charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).
    One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method.
    Typetype=newTypeToken<List<JSONEvent>>(){}.getType();
     
    BlobHandler
    BlobHandler是一种将请求中上传文件信息转化为event的处理器。
    参数说明,加!为必须属性:
    !handler The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler
    handler.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request
     
    7.Custom source -- 自定义源
    自定义源是自己实现源接口得到的。
    自定义源的类和其依赖包必须在开始时就放置到Flume的类加载目录下。
     
    参数说明,加!为必须属性:
    !channels  
    !type 类型,必须设置为自己的自定义处理类的全路径名
    selector.type  
    selector.*
    interceptors
    interceptors.*
  • 相关阅读:
    ios学习笔记之block在ios开发中的应用
    ios学习笔记之block在ios开发中的应用
    20款优秀的移动产品原型和线框图设计工具
    字段约束,索引,主外键
    建表,建库
    MYSQL索引类型。MYSQLc储存引擎
    数据库服务概述,构建MYSQL服务器,数据库基本管理,mysql数据类型,表结构的调整
    Zabbix报警机制,Zabbix进阶操作,监控案例
    parted分区和挂载及非交互式操作
    《总结2》项目实验课
  • 原文地址:https://www.cnblogs.com/zpb2016/p/5766915.html
Copyright © 2011-2022 走看看