zoukankan      html  css  js  c++  java
  • Flume基础学习

    Flume是一款非常优秀的日志采集工具。支持多种形式的日志采集,作为apache的顶级开源项目,Flume再大数据方面具有广泛的应用

    首先需要在Flume的解压目录中conf文件夹中将flume-env.sh.templete更改未flume.env.sh

    并修改jdk的位置

    Source

    我们可以从Avro,NetCat。Http,TailDir。我们在Java开发中通常都是使用的log4j等日志工具进行日志按天存储,所以我们重点关注下tailDir Source

    Taildir Source

    在Flume1.7之前如果想要监控一个文件新增的内容,我们一般采用的source 为 exec tail,但是这会有一个弊端,就是当你的服务器宕机重启后,此时数据读取还是从头开始,这显然不是我们想看到的! 在Flume1.7 没有出来之前我们一般的解决思路为:当读取一条记录后,就把当前的记录的行号记录到一个文件中,宕机重启时,我们可以先从文件中获取到最后一次读取文件的行数,然后继续监控读取下去。保证数据不丢失、不重复。

    在Flume1.7时新增了一个source 的类型为taildir,它可以监控一个目录下的多个文件,并且实现了实时读取记录保存的断点续传功能。

    但是Flume1.7中如果文件重命名,那么会被当成新文件而被重新采集。

    Channel

    Memory Channel

    Memory Channel把Event保存在内存队列中,该队列能保存的Event数量有最大值上限。由于Event数据都保存在内存中,Memory Channel有最好的性能,不过也有数据可能会丢失的风险,如果Flume崩溃或者重启,那么保存在Channel中的Event都会丢失。同时由于内存容量有限,当Event数量达到最大值或者内存达到容量上限,Memory Channel会有数据丢失。

    File Channel

    File Channel把Event保存在本地硬盘中,比Memory Channel提供更好的可靠性和可恢复性,不过要操作本地文件,性能要差一些。

    Kafka Channel

    Kafka Channel把Event保存在Kafka集群中,能提供比File Channel更好的性能和比Memory Channel更高的可靠性。

    sink

    Avro Sink

    Avro Sink是Flume的分层收集机制的重要组成部分。 发送到此接收器的Flume事件变为Avro事件,并发送到配置指定的主机名/端口对。事件将从配置的通道中按照批量配置的批量大小取出。

    Kafka Sink

    Kafka Sink将会使用FlumeEvent header中的topic和key属性来将event发送给Kafka。如果FlumeEvent的header中有topic属性,那么此event将会发送到header的topic属性指定的topic中。如果FlumeEvent的header中有key属性,此属性将会被用来对此event中的数据指定分区,具有相同key的event将会被划分到相同的分区中,如果key属性null,那么event将会被发送到随机的分区中。

    可以通过自定义拦截器来设置某个event的header中的key或者topic属性。

    Flume拦截器

    主要用于,过滤时间戳不合法和json数据不完整的日志,将错误日志、启动日志和事件日志区分开来,方便发往kafka的不同topic。配置参考后符例

    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    import java.nio.charset.Charset;
    import java.util.ArrayList;
    import java.util.List;
    
    public class LogETLInterceptor implements Interceptor {
    
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
    
            String body = new String(event.getBody(), Charset.forName("UTF-8"));
    
            // body为原始数据,newBody为处理后的数据,判断是否为display的数据类型
            if (LogUtils.validateReportLog(body)) {
                return event;
            }
    
            return null;
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
    
            ArrayList<Event> intercepts = new ArrayList<>();
    
            // 遍历所有Event,将拦截器校验不合格的过滤掉
            for (Event event : events) {
                
                Event interceptEvent = intercept(event);
    
                if (interceptEvent != null){
                    intercepts.add(interceptEvent);
                }
            }
    
            return intercepts;
        }
    
        @Override
        public void close() {
    
        }
    
        public static class Builder implements Interceptor.Builder {
    
            public Interceptor build() {
                return new LogETLInterceptor();
            }
    
    
            @Override
            public void configure(Context context) {
    
            }
        }
    }
    
    

    启动命令

    flume-ng agent

    --conf-file /opt/module/flume/conf/file-flume-kafka.conf

    --name a1
    -Dflume.root.logger=INFO,console

    第一个参数为自己编写的配置文件路径

    第二个参数为flume agent的名称。即配置文件中定义的名称

    第三个参数为在flume中打印Info级别日志,并打印到控制台

    大数据中的数据处理流程例子

    file

    从上例图可以看出。我们从日志到转化未HDFS中可以消费的数据一般还要经历两个Flume阶段

    • 日志文件-->Flume-->Kafka
    • kafka-->Flume-->HDFS

    两个阶段的处理配置

    第一阶段的配置参考

    a1.sources=r1
    a1.channels=c1 c2 
    a1.sinks=k1 k2 
    
    # configure source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.positionFile = /opt/module/flume/log_position.json
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
    a1.sources.r1.fileHeader = true
    a1.sources.r1.channels = c1 c2
    
    #interceptor
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.flume.interceptor.LogETLInterceptor$Builder
    
    # selector
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = logType
    a1.sources.r1.selector.mapping.start = c1
    a1.sources.r1.selector.mapping.event = c2
    
    # configure channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity=10000
    a1.channels.c1.byteCapacityBufferPercentage=20
    
    a1.channels.c2.type = memory
    a1.channels.c2.capacity=10000
    a1.channels.c2.byteCapacityBufferPercentage=20
    
    # configure sink
    # start-sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = topic_start
    a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.sinks.k1.kafka.flumeBatchSize = 2000
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.channel = c1
    
    # event-sink
    a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k2.kafka.topic = topic_event
    a1.sinks.k2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.sinks.k2.kafka.flumeBatchSize = 2000
    a1.sinks.k2.kafka.producer.acks = 1
    a1.sinks.k2.channel = c2
    
    

    第二阶段的配置参考

    ## 组件
    a1.sources=r1 r2
    a1.channels=c1 c2
    a1.sinks=k1 k2
    
    ## source1
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.sources.r1.kafka.zookeeperConnect = hadoop102:2181,hadoop103:2181,hadoop104:2181
    a1.sources.r1.kafka.topics=topic_start
    
    ## source2
    a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r2.batchSize = 5000
    a1.sources.r2.batchDurationMillis = 2000
    a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.sources.r2.kafka.zookeeperConnect = hadoop102:2181,hadoop103:2181,hadoop104:2181
    a1.sources.r2.kafka.topics=topic_event
    
    ## channel1
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=100000
    a1.channels.c1.transactionCapacity=10000
    
    ## channel2
    a1.channels.c2.type=memory
    a1.channels.c2.capacity=100000
    a1.channels.c2.transactionCapacity=10000
    
    ## sink1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = logstart-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 30
    a1.sinks.k1.hdfs.roundUnit = second
    
    ##sink2
    a1.sinks.k2.type = hdfs
    a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
    a1.sinks.k2.hdfs.filePrefix = logevent-
    a1.sinks.k2.hdfs.round = true
    a1.sinks.k2.hdfs.roundValue = 30
    a1.sinks.k2.hdfs.roundUnit = second
    
    ## 不要产生大量小文件
    a1.sinks.k1.hdfs.rollInterval = 30
    a1.sinks.k1.hdfs.rollSize = 0
    a1.sinks.k1.hdfs.rollCount = 0
    
    a1.sinks.k2.hdfs.rollInterval = 30
    a1.sinks.k2.hdfs.rollSize = 0
    a1.sinks.k2.hdfs.rollCount = 0
    
    ## 控制输出文件是原生文件。
    a1.sinks.k1.hdfs.fileType = CompressedStream 
    a1.sinks.k2.hdfs.fileType = CompressedStream 
    
    a1.sinks.k1.hdfs.codeC = lzop
    a1.sinks.k2.hdfs.codeC = lzop
    
    ## 拼装
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel= c1
    
    a1.sources.r2.channels = c2
    a1.sinks.k2.channel= c2
    
    

    本文由博客一文多发平台 OpenWrite 发布!

  • 相关阅读:
    ABAPHow to use TEXTEDIT(SAP 的样例摘抄)
    ABAPwhat is the difference between V1 ,V2&V3 update?
    BASIS关于Netweaver 2005的架构平台说明
    ABAP如何在SELECT语句中指定索引(example)
    ABAPHow to use Toolbar control(SAP样例摘抄)
    ABAP如何读取地址信息
    ABAP如何使用REUSE_ALV_GRID_DISPLAY函数删除内表数据(样例代码,感谢依风提供)
    ABAP一个极好的调用外部java程序的Search Help Exit的实例(RFC好例子)
    ABAP如何在ALV Grid打印页上加入页号
    ABAP一个实现Search Help Exits的完整样例
  • 原文地址:https://www.cnblogs.com/zhendiao/p/12302309.html
Copyright © 2011-2022 走看看