zoukankan      html  css  js  c++  java
  • flume的配置详解

    Flume:
    =====================
        Flume是一种分布式的、可靠的、可用的服务,可以有效地收集、聚合和移动大量的日志数据。
        它有一个基于流数据的简单而灵活的体系结构。
        它具有健壮性和容错能力,具有可调的可靠性机制和许多故障转移和恢复机制。
        它使用一个简单的可扩展数据模型,允许在线分析应用程序。
    
        
    
        source:源    
            对channel而言,相当于生产者,通过接收各种格式数据发送给channel进行传输
    
        channel:通道
            相当于数据缓冲区,接收source数据发送给sink
    
        sink:沉槽
            对channel而言,相当于消费者,通过接收channel数据通过指定数据类型发送到指定位置
    
    
    
    Event:
    ===============
        flume传输基本单位:
            head + body
            
        
    
    flume安装:
    ================
        1、解压
        2、符号链接
        3、配置环境变量并使其生效
        4、修改配置文件
            1)重命名flume-env.ps1.template为flume-env.ps1
            2)重命名flume-env.sh.template为flume-env.sh
            3)修改flume-env.sh,配置jdk目录,添加
                export JAVA_HOME=/soft/jdk
    
        5、flume 查看版本
             flume-ng version
    
            
    
    flume使用:
    =========================
        //flume可以将配置文件写在zk上
    
        //flume运行命令
        flume-ng agent -n a1 -f xxx.conf    /flume-ng agent -n xx -f xxx.conf
    
        agent:    a1
        source:    s1
        channel:c1
        sink:    n1
    
        使用方法:
            1、编写配置文件r_nc.conf
                # 将agent组件起名
                a1.sources = r1
                a1.sinks = k1
                a1.channels = c1
    
                # 配置source
                a1.sources.r1.type = netcat
                a1.sources.r1.bind = localhost
                a1.sources.r1.port = 8888
    
                # 配置sink
                a1.sinks.k1.type = logger
    
                # 配置channel
                a1.channels.c1.type = memory
                a1.channels.c1.capacity = 1000
                a1.channels.c1.transactionCapacity = 100
    
                # 绑定channel-source, channel-sink
                a1.sources.r1.channels = c1
                a1.sinks.k1.channel = c1
    
            2、启动flume,指定配置文件
                flume-ng agent -n a1 -f r_nc.conf
    
            3、启动另一个会话,进行测试
                nc localhost 8888
    
    
        //用户手册
            http://flume.apache.org/FlumeUserGuide.html
    
    后台运行程序:
    =============================================
    
        ctrl + z :将程序放在后台运行 =====> [1]+  Stopped                 flume-ng agent -n a1 -f r_nc.conf
    
        通过 bg %1 的方式将程序后台运行
    
        通过jobs查看后台任务
    
        通过  fg %1 的方式将程序放在前台运行



    
    
    flume:
        海量日志数据的收集、聚合和移动
    
    
        flume-ng agent -n a1 -f xxx.conf
    
    
        source
            相对于channel是生产者    //netcat
        channel
            类似于缓冲区        //memory
        sink
            相对于channel是消费者    //logger
    
        
    Event: 
        header + body
        k v     data
    
    
    source:
    ============================================
        1、序列(seq)源:多用作测试
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = seq
            # 总共发送的事件个数
            a1.sources.r1.totalEvents = 1000    
    
            # 配置sink
            a1.sinks.k1.type = logger
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
        
        2、压力(stress)源:多用作负载测试
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = org.apache.flume.source.StressSource
            # 单个事件大小,单位:byte
            a1.sources.r1.size = 10240
            # 事件总数
            a1.sources.r1.maxTotalEvents = 1000000
    
            # 配置sink
            a1.sinks.k1.type = logger
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
        3、滚动目录(Spooldir)源:监听指定目录新文件产生,并将新文件数据作为event发送
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = spooldir
            # 设置监听目录
            a1.sources.r1.spoolDir = /home/centos/spooldir
    
            # 通过以下配置指定消费完成后文件后缀
            #a1.sources.r1.fileSuffix = .COMPLETED 
    
            # 配置sink
            a1.sinks.k1.type = logger
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
    
        4、exec源    //通过执行linux命令产生新数据
                //典型应用 tail -F (监听一个文件,文件增长的时候,输出追加数据)
                //不能保证数据完整性,很可能丢失数据
    
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = exec
            # 配置linux命令
            a1.sources.r1.command = tail -F /home/centos/readme.txt
    
            # 配置sink
            a1.sinks.k1.type = logger
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
        5、Taildir源        //监控目录下文件
                    //文件类型可通过正则指定
                    //有容灾机制
    
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = TAILDIR
            # 设置source组 可设置多个
            a1.sources.r1.filegroups = f1
            # 设置组员的监控目录和监控文件类型,使用正则表示,只能监控文件
            a1.sources.r1.filegroups.f1 = /home/centos/taildir/.*
    
            # 设置定位文件的位置
            # a1.sources.r1.positionFile     ~/.flume/taildir_position.json
    
            # 配置sink
            a1.sinks.k1.type = logger
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
    
    sink:
    ====================================
        1、fileSink    //多用作数据收集
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 8888
    
            # 配置sink
            a1.sinks.k1.type = file_roll
            # 配置目标文件夹
            a1.sinks.k1.sink.directory = /home/centos/file
            # 设置滚动间隔,默认30s,设为0则不滚动,成为单个文件
            a1.sinks.k1.sink.rollInterval = 0
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
        2、hdfsSink        //默认以seqFile格式写入
                    //k:LongWritable
                    //v: BytesWritable
                    //
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 8888
            
            # 配置sink
            a1.sinks.k1.type = hdfs
            # 配置目标文件夹
            a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/
            # 配置文件前缀
            a1.sinks.k1.hdfs.filePrefix = events-
            # 滚动间隔,秒
            a1.sinks.k1.hdfs.rollInterval = 0
            # 触发滚动文件大小,byte
            a1.sinks.k1.hdfs.rollSize = 1024
            # 配置使用本地时间戳
            a1.sinks.k1.hdfs.useLocalTimeStamp = true
            # 配置输出文件类型,默认SequenceFile
            # DataStream文本格式,不能设置压缩编解码器
            # CompressedStream压缩文本格式,需要设置编解码器
            a1.sinks.k1.hdfs.fileType = DataStream
    
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
        3、hiveSink:        //hiveserver帮助:hive --service help
                    //1、hive --service metastore 启动hive的metastore服务,metastore地址:thrift://localhost:9083
                    //2、将hcatalog的依赖放在/hive/lib下,cp hive-hcatalog* /soft/hive/lib    (位置/soft/hive/hcatalog/share/hcatalog)
                    //3、创建hive事务表
                    //SET hive.support.concurrency=true;                                  
                      SET hive.enforce.bucketing=true;                                    
                      SET hive.exec.dynamic.partition.mode=nonstrict;                     
                      SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
                      SET hive.compactor.initiator.on=true;                               
                      SET hive.compactor.worker.threads=1;
                      
                    //create table myhive.weblogs(id int, name string, age int)
                      clustered by(id) into 2 buckets                                         
                      row format delimited                                                          
                      fields terminated by '	'                                                     
                      stored as orc                                                                 
                      tblproperties('transactional'='true');                                        
    
    
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 8888
    
            # 配置sink
            a1.sinks.k1.type = hive
            a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
            a1.sinks.k1.hive.database = myhive
            a1.sinks.k1.hive.table = weblogs
            a1.sinks.k1.useLocalTimeStamp = true
            #输入格式,DELIMITED和json
            #DELIMITED    普通文本
            #json        json文件
            a1.sinks.k1.serializer = DELIMITED
            #输入字段分隔符,双引号
            a1.sinks.k1.serializer.delimiter = ","
            #输出字段分隔符,单引号
            a1.sinks.k1.serializer.serdeSeparator = '	'
            #字段名称,","分隔,不能有空格
            a1.sinks.k1.serializer.fieldnames =id,name,age
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
        4、hbaseSink            //SimpleHbaseEventSerializer将rowKey和col设置了默认值,不能自定义
                        //RegexHbaseEventSerializer可以手动指定rowKey和col字段名称
    
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 8888
            
            # 配置sink
            a1.sinks.k1.type = hbase
            a1.sinks.k1.table = flume_hbase
            a1.sinks.k1.columnFamily = f1
            a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
    
            
            # 配置col正则手动指定
            # rowKeyIndex手动指定rowKey,索引以0开头
            a1.sinks.k1.serializer.colNames = ROW_KEY,name,age
            a1.sinks.k1.serializer.regex = (.*),(.*),(.*)
            a1.sinks.k1.serializer.rowKeyIndex=0
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
        
        5、asynchbaseSink        //异步hbaseSink
                        //异步机制,写入速度快
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 8888
            
            # 配置sink
            a1.sinks.k1.type = asynchbase
            a1.sinks.k1.table = flume_hbase
            a1.sinks.k1.columnFamily = f1
            a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
            
    
    channel:缓冲区
    =====================================
        1、memorychannel
            a1.channels.c1.type = memory
            # 缓冲区中存留的最大event个数
            a1.channels.c1.capacity = 1000
            # channel从source中每个事务提取的最大event数
            # channel发送给sink每个事务发送的最大event数
            a1.channels.c1.transactionCapacity = 100
    
        2、fileChannel:    //检查点和数据存储在默认位置时,当多个channel同时开启
                    //会导致文件冲突,引发其他channel会崩溃
            
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 8888
    
            # 配置sink
            a1.sinks.k1.type = logger
    
            # 配置channel
            a1.channels = c1
            a1.channels.c1.type = file
            a1.channels.c1.checkpointDir = /home/centos/flume/checkpoint
            a1.channels.c1.dataDirs = /home/centos/flume/data
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
    
        memoryChannel:快速,但是当设备断电,数据会丢失
        
        FileChannel:  速度较慢,即使设备断电,数据也不会丢失
    
    
    Avro 
    ===============================================
        source
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = avro
            a1.sources.r1.bind = 0.0.0.0
            a1.sources.r1.port = 4444
    
            # 配置sink
            a1.sinks.k1.type = logger
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
        ***********************************************************************************************    
        *启动avro客户端,发送数据:                                      *
        *    flume-ng avro-client -H localhost -p 4444 -R ~/avro/header.txt -F ~/avro/user0.txt    *
        *                 指定ip                   指定端口 指定header文件      指定数据文件          *
        ***********************************************************************************************
    
    
        sink
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = TAILDIR
            a1.sources.r1.filegroups = f1
            a1.sources.r1.filegroups.f1 = /home/centos/taildir/.*
    
            # 配置sink
            a1.sinks.k1.type = avro
            a1.sinks.k1.bind = 192.168.23.101
            a1.sinks.k1.port = 4444
    
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
            
            
    
    
    
    Flume跃点:
    =====================================
        1、将s101的flume发送到其他节点
            xsync.sh /soft/flume
            xsync.sh /soft/apache-flume-1.8.0-bin/
    
        2、切换到root用户,分发环境变量文件
            su root
            xsync.sh /etc/profile
            exit
    
        3、配置文件
            1)配置s101    //hop.conf
                设置source:avro
                设置sink: hdfs
    
                # 将agent组件起名
                a1.sources = r1
                a1.sinks = k1
                a1.channels = c1
    
                # 配置source
                a1.sources.r1.type = avro
                a1.sources.r1.bind = 0.0.0.0
                a1.sources.r1.port = 4444
    
                # 配置sink
                a1.sinks.k1.type = hdfs
                a1.sinks.k1.hdfs.path = /flume/hop/%y-%m-%d/
                a1.sinks.k1.hdfs.filePrefix = events-
                a1.sinks.k1.hdfs.rollInterval = 0
                a1.sinks.k1.hdfs.rollSize = 1024
                a1.sinks.k1.hdfs.useLocalTimeStamp = true
                a1.sinks.k1.hdfs.fileType = DataStream
    
                # 配置channel
                a1.channels.c1.type = memory
                a1.channels.c1.capacity = 1000
                a1.channels.c1.transactionCapacity = 100
    
                # 绑定channel-source, channel-sink
                a1.sources.r1.channels = c1
                a1.sinks.k1.channel = c1
    
    
            2)配置s102-s104        //hop2.conf
                设置source:taildir
                设置sink: avro
    
                # 将agent组件起名
                a1.sources = r1
                a1.sinks = k1
                a1.channels = c1
    
                # 配置source
                a1.sources.r1.type = TAILDIR
                a1.sources.r1.filegroups = f1
                a1.sources.r1.filegroups.f1 = /home/centos/taildir/.*
    
                # 配置sink
                a1.sinks.k1.type = avro
                a1.sinks.k1.hostname = 192.168.23.101
                a1.sinks.k1.port = 4444
    
    
                # 配置channel
                a1.channels.c1.type = memory
                a1.channels.c1.capacity = 1000
                a1.channels.c1.transactionCapacity = 100
    
                # 绑定channel-source, channel-sink
                a1.sources.r1.channels = c1
                a1.sinks.k1.channel = c1
    
        4、在s102-s104创建~/taildir文件夹
            xcall.sh "mkdir ~/taildir"
    
        
        5、启动s101的flume
            flume-ng agent -n a1 -f /soft/flume/conf/hop.conf
    
        6、分别启动s102-s104的flume,并将其放在后台运行
            flume-ng agent -n a1 -f /soft/flume/conf/hop2.conf &
    
        
        7、进行测试,分别在s102-s104的taildir中创建数据,观察hdfs数据情况
            s102]$ echo 102 > taildir/1.txt 
            s103]$ echo 103 > taildir/1.txt
            s104]$ echo 104 > taildir/1.txt
    
        
    interceptor:拦截器
    ==================================
        是source端组件:负责修改或删除event
        每个source可以配置多个拦截器    ===> interceptorChain
    
        
    
        1、Timestamp Interceptor    //时间戳拦截器    + header
    
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 8888
            # 给拦截器起名
            a1.sources.r1.interceptors = i1
            # 指定拦截器类型
            a1.sources.r1.interceptors.i1.type = timestamp
    
    
            # 配置sink
            a1.sinks.k1.type = logger
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
            
        2、Static Interceptor    //静态拦截器    + header
    
        3、Host Interceptor    //主机拦截器    + header
    
        4、设置拦截器链:
            
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 8888
    
            a1.sources.r1.interceptors = i1 i2 i3
            a1.sources.r1.interceptors.i1.type = timestamp
            a1.sources.r1.interceptors.i2.type = host
            a1.sources.r1.interceptors.i3.type = static
            a1.sources.r1.interceptors.i3.key = location
            a1.sources.r1.interceptors.i3.value = NEW_YORK
    
    
            # 配置sink
            a1.sinks.k1.type = logger
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
        
    
    
    channel selector:通道挑选器
    ====================================
        是source端组件:负责将event发送到指定的channel,相当于分区
            
        当一个source设置多个channel时,默认以副本形式向每个channel发送一个event拷贝
    
        
        1、replication副本通道挑选器    //默认挑选器,source将所有channel发送event副本
                        //设置source x 1, channel x 3, sink x 3 
                        //    nc       memory    file
        
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1 k2 k3
            a1.channels = c1 c2 c3
    
            # 配置source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 8888
            a1.sources.r1.selector.type = replicating
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            a1.channels.c2.type = memory
            a1.channels.c2.capacity = 1000
            a1.channels.c2.transactionCapacity = 100
    
            a1.channels.c3.type = memory
            a1.channels.c3.capacity = 1000
            a1.channels.c3.transactionCapacity = 100
    
            
            # 配置sink
            a1.sinks.k1.type = file_roll
            a1.sinks.k1.sink.directory = /home/centos/file1
            a1.sinks.k1.sink.rollInterval = 0
    
            a1.sinks.k2.type = file_roll
            a1.sinks.k2.sink.directory = /home/centos/file2
            a1.sinks.k2.sink.rollInterval = 0
    
            a1.sinks.k3.type = file_roll
            a1.sinks.k3.sink.directory = /home/centos/file3
            a1.sinks.k3.sink.rollInterval = 0
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1 c2 c3
            a1.sinks.k1.channel = c1
            a1.sinks.k2.channel = c2
            a1.sinks.k3.channel = c3
    
    
        
        2、Multiplexing 多路复用通道挑选器    //选择avro源发送文件
                            
                            
                            
                            
    
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1 k2 k3
            a1.channels = c1 c2 c3
            
            # 配置source
            a1.sources.r1.type = avro
            a1.sources.r1.bind = 0.0.0.0
            a1.sources.r1.port = 4444
            # 配置通道挑选器
            a1.sources.r1.selector.type = multiplexing
            a1.sources.r1.selector.header = country
            a1.sources.r1.selector.mapping.CN = c1
            a1.sources.r1.selector.mapping.US = c2
            a1.sources.r1.selector.default = c3
            
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            a1.channels.c2.type = memory
            a1.channels.c2.capacity = 1000
            a1.channels.c2.transactionCapacity = 100
    
            a1.channels.c3.type = memory
            a1.channels.c3.capacity = 1000
            a1.channels.c3.transactionCapacity = 100
    
            
            # 配置sink
            a1.sinks.k1.type = file_roll
            a1.sinks.k1.sink.directory = /home/centos/file1
            a1.sinks.k1.sink.rollInterval = 0
    
            a1.sinks.k2.type = file_roll
            a1.sinks.k2.sink.directory = /home/centos/file2
            a1.sinks.k2.sink.rollInterval = 0
    
            a1.sinks.k3.type = file_roll
            a1.sinks.k3.sink.directory = /home/centos/file3
            a1.sinks.k3.sink.rollInterval = 0
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1 c2 c3
            a1.sinks.k1.channel = c1
            a1.sinks.k2.channel = c2
            a1.sinks.k3.channel = c3
    
    
            1、创建file1 file2 file3文件夹,家目录
                mkdir file1 file2 file3
    
            2、创建文件夹country,并放入头文件和数据
                创建头文件CN.txt、US.txt、OTHER.txt 
                    CN.txt ===> country CN              
                    US.txt ===> country US              
                    OTHER.txt ===> country OTHER   
                
                创建数据 1.txt 
                    1.txt ====> helloworld
    
            3、运行flume
                flume-ng agent -n a1 -f /soft/flume/selector_multi.conf
    
            4、运行Avro客户端
                flume-ng avro-client -H localhost -p 4444 -R ~/country/US.txt -F ~/country/1.txt    ===> 查看file2
                flume-ng avro-client -H localhost -p 4444 -R ~/country/CN.txt -F ~/country/1.txt    ===> 查看file1
                flume-ng avro-client -H localhost -p 4444 -R ~/country/OTHER.txt -F ~/country/1.txt    ===> 查看file3
    
    
            
    sinkProcessor
    =================================
        sink Runner 运行一个 sink Group
    
        sink Group 是由一个或多个 sink 构成
    
        sink Runner 告诉 sink Group 处理下一批 event
    
        sink Group 含有一个 sink Processor , 负责指定一个 sink 来处理这批数据
    
    
        2、failover 容灾    //将所有sink设置一个优先级
                    //数量越大,优先级越高
                    //当数据传入时,优先级最高的sink负责处理
                    //当sink挂掉,次高优先级的sink被激活,继续处理数据
                    //channel和sink必须一对一
    
            a1.sources = r1
            a1.sinks = s1 s2 s3
            a1.channels = c1 c2 c3
    
            # Describe/configure the source
            a1.sources.r1.type = seq
    
            a1.sinkgroups = g1
            a1.sinkgroups.g1.sinks = s1 s2 s3
            a1.sinkgroups.g1.processor.type = failover
            a1.sinkgroups.g1.processor.priority.s1 = 5
            a1.sinkgroups.g1.processor.priority.s2 = 10
            a1.sinkgroups.g1.processor.priority.s3 = 15
            a1.sinkgroups.g1.processor.maxpenalty = 10000
    
            # Describe the sink
            a1.sinks.s1.type = file_roll
            a1.sinks.s1.sink.directory = /home/centos/file1
            a1.sinks.s2.type = file_roll
            a1.sinks.s2.sink.directory = /home/centos/file2
            a1.sinks.s3.type = file_roll
            a1.sinks.s3.sink.directory = /home/centos/file3
    
            # Use a channel which buffers events in memory
            a1.channels.c1.type = memory
            a1.channels.c2.type = memory
            a1.channels.c3.type = memory
    
            # Bind the source and sink to the channel
            a1.sources.r1.channels = c1 c2 c3
            a1.sinks.s1.channel = c1
            a1.sinks.s2.channel = c2
            a1.sinks.s3.channel = c3
    
    
    
    
    Event事件是由Source端封装输入数据的字节数组得来的
        Event event = EventBuilder.withBody(body);
    
    
    
    Sink中的process方法返回两种状态:
        1、READY    //一个或多个event成功分发
        2、BACKOFF    //channel中没有数据提供给sink
            
    
    flume中事务的生命周期:
        
        tx.begin()    //开启事务,之后执行操作
        tx.commit()    //提交事务,操作完成后由此提交
        tx.rollback()    //回滚事务,出现异常可以采取回滚措施
        tx.close()    //关闭事务,最后一定要关闭事务
  • 相关阅读:
    python分布式进程
    python协程
    python线程同步
    Linux内核-链表
    java基础-003
    java基础-002
    JVM-class文件完全解析-魔数
    JVM-JDK命令行工具
    JVM-类文件结构
    Linux内核-模块编译和安装
  • 原文地址:https://www.cnblogs.com/zyde/p/8946069.html
Copyright © 2011-2022 走看看