zoukankan      html  css  js  c++  java
  • Flume 进阶

    Flume 进阶

    1.Flume事务

    数据输入端 : source分为两种:主动拉取:Spooldir Source  Taildir Source  被动推动:Exec source netcat source 
    put事务阶段:
    source 会采集到批量之后才会执行put事务流程,
    do commit成功会会把数据提交到channel中,然后pulist中的数据被销毁
    如果batch data 超过channel 的数据量 docommit 失败,
    执行rollback回滚,putList中的数据被销毁,这时如果source类型为主动拉取时,重新拉取数据 数据不会丢失,
     如果source类型为被动推动时,重新不会拉取数据 数据丢失,所以要合理调整channel的存放数据的大小。
     take事务阶段:
     channel中的数据先进入takelist中,但是channel中还有数据,如果docommit成功的话,把takelist,channel中的数据销毁
      如果docommit失败的话,执行rollback,将takelist的数据销毁,channel中的数据还存在,继续拉取即可。
    

    2.Flume Agent内部原理

    1)ChannelSelector
      ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。
       ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。
     2)SinkProcessor
       SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor
       DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能, 
       FailoverSinkProcessor可以错误恢复的功能。故障转移功能
       一个sink只能处理一个channel的数据 多对一  
    

    3.Flume拓扑结构

    3.1 简单串联

       这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。
    

    3.2 复制和多路复用

      Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。
       1)案例需求
         使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem。
    


    2)在/opt/module/flume/job目录下创建group1文件夹 在group1中创建a1.conf a2.conf a3.conf 文件内容如下
    a1.conf

    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    
    a1.sources.r1.selector.type = replicating
    # Describe/configure the source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /opt/module/hive/logs/hive.log
    # Describe the sink avro
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop103
    a1.sinks.k1.port = 4545
    
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop104
    a1.sinks.k2.port = 4545
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    
    
         a2.conf  记得事先要开启hdfs 
    
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    
    # Describe/configure the source  bind 监听本地客户端
    
    a2.sources.r1.type = avro
    a2.sources.r1.bind = 0.0.0.0
    a2.sources.r1.port = 4545
    # Describe the sink
    
    a2.sinks.k1.type = hdfs
    
    a2.sinks.k1.hdfs.path = /flume/group1/%y-%m-%d/%H%M
    a2.sinks.k1.hdfs.filePrefix = events-%[localhost]
    a2.sinks.k1.hdfs.rollInterval = 60
    a2.sinks.k1.hdfs.rollSize = 134217728
    a2.sinks.k1.hdfs.rollCount = 0
    a2.sinks.k1.hdfs.fileType = DataStream
    a2.sinks.k1.hdfs.round = true
    a2.sinks.k1.hdfs.roundValue = 10
    a2.sinks.k1.hdfs.roundUnit = minute
    a2.sinks.k1.hdfs.useLocalTimeStamp = true
    # Use a channel which buffers events in memory
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
    
           a3.conf
    
    
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
     
    # Describe/configure the source bind 监听本地客户端
    
    a3.sources.r1.type = avro
    a3.sources.r1.bind = 0.0.0.0
    a3.sources.r1.port = 4545
    # Describe the sink
    a3.sinks.k1.type = file_roll
    a3.sinks.k1.sink.directory = /opt/module/flume/a3
    # Use a channel which buffers events in memory
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1
    
    
        3)开启flume ,先开启后面的 最后开始前面的
          bin/flume-ng agent -n a2 -c conf -f job/group1/a2.conf
          bin/flume-ng agent -n a3 -c conf -f job/group1/a3.conf
          bin/flume-ng agent -n a1 -c conf -f job/group1/a1.conf
        4)观察hdfs 和本地/opt/module/flume/a3上的数据
    

    3.3 负载均衡和故障转移

       你可以把多个sink分成一个组, 这时候 Sink组逻辑处理器 可以对这同一个组里的几个sink进行负载均衡或者其中一个sink发生故障后将输出Event的任务转移到其他的sink上
        记得一定要配置sinkgroup  
    
       1)案例需求
         使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用FailoverSinkProcessor,实现故障转移的功能。
    

       2)在/opt/module/flume/job目录下创建group2文件夹 ,在创建group2文件夹下 创建flume agent 配置文件 a1.conf a2.conf a3.conf
       a1.conf内容如下:
    
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1
    
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.k1 = 5
    a1.sinkgroups.g1.processor.priority.k2 = 10
    a1.sinkgroups.g1.processor.maxpenalty = 10000
    
    # Describe/configure the source  //0.0.0.0  监控本地 使用hostname的时候 仅限于本机 
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 0.0.0.0      
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop103
    a1.sinks.k1.port = 4545
    
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop104
    a1.sinks.k2.port = 4545
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1
    
    
         a2.conf 内容如下
    
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    
    # Describe/configure the source  bind 监听本地客户端
    
    a2.sources.r1.type = avro
    a2.sources.r1.bind = 0.0.0.0
    a2.sources.r1.port = 4545
    # Describe the sink
    
    a2.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
    
        a3.conf内容如下
    
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
    
    # Describe/configure the source  bind 监听本地客户端
    
    a3.sources.r1.type = avro
    a3.sources.r1.bind = 0.0.0.0
    a3.sources.r1.port = 4545
    # Describe the sink
    
    a3.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1
    
    
           3)写完配置文件之后分发到各个集群中
           4)开启flume 先开启后面的 在开启前面的
            bin/flume-ng agent -n a2 -c conf -f job/group1/a2.conf
            bin/flume-ng agent -n a3 -c conf -f job/group1/a3.conf
            bin/flume-ng agent -n a1 -c conf -f job/group1/a1.conf
           5)nc localhost 44444  然后发送数据
           6)观察hadoop103 hadoop104上的数据变化
             因为a1.sinkgroups.g1.processor.priority.k1 = 5
                a1.sinkgroups.g1.processor.priority.k2 = 10
             所以k2上面的flume 不宕掉的话一直发送数据到k2 上,k2一旦宕掉的话,会发送数据到k1上的flume
    

    3.4 聚合


    这种模式是我们最常见的,也非常实用,日常web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务器部署一个flume采集日志,传送到一个集中收集日志的flume,再由此flume上传到hdfs、hive、hbase等,进行日志分析。
    1)案例需求
    hadoop102上的Flume-1监控文件/opt/module/hive/logs/hive.log
    hadoop103上的Flume-2监控某一个端口的数据流,
    Flume-1与Flume-2将数据发送给hadoop104上的Flume-3,Flume-3将最终数据打印到控制台。
    2)在/opt/module/flume/job目录下创建group3文件夹 ,在创建group3文件夹下 创建flume agent 配置文件 a1.conf a2.conf a3.conf
    a1.conf

    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
    a1.sources.r1.filegroups = f1 
    a1.sources.r1.filegroups.f1 = /opt/module/hive/logs/hive.log
    
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop104
    a1.sinks.k1.port = 4545
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    
          a2.conf
    
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    
    # Describe/configure the source  //0.0.0.0  使用hostname的时候 仅限于本机 
    a2.sources.r1.type = netcat	
    a2.sources.r1.bind = 0.0.0.0      
    a2.sources.r1.port = 44444
    
    # Describe the sink
    a2.sinks.k1.type = avro
    a2.sinks.k1.hostname = hadoop104
    a2.sinks.k1.port = 4545
    
    # Use a channel which buffers events in memory
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
    
          a3.conf
    
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
    
    # Describe/configure the source  bind 监听本地客户端
    
    a3.sources.r1.type = avro
    a3.sources.r1.bind = 0.0.0.0
    a3.sources.r1.port = 4545
    
    # Describe the sink
    
    a3.sinks.k1.type = logger
    # Use a channel which buffers events in memory
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1
    
    
          3)写完配置文件之后分发到各个集群中 (这一点要切记,后期修改后也要分发)
           4)开启flume 先开启后面的 在开启前面的
            bin/flume-ng agent -n a3 -c conf -f job/group1/a3.conf
            bin/flume-ng agent -n a2 -c conf -f job/group1/a2.conf
            bin/flume-ng agent -n a1 -c conf -f job/group1/a1.conf
           5)nc localhost 44444  然后发送数据
           6)观察hadoop103 hadoop104上的数据变化     
    

    3.5 自定义Interceptor

       1)需求 我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。
       2)实现步骤
        (1)创建一个maven项目,并引入以下依赖。
          <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.9.0</version>
          </dependency>
        (2)定义MyInterceptor类并实现Interceptor接口  最后打包到$flume/lib下
    
    package com.atguigu.flume;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    
    import java.nio.charset.StandardCharsets;
    import java.util.List;
    import java.util.Map;
    
    public class MyInterceptor implements Interceptor {
        public void initialize() {
    
        }
    
        public Event intercept(Event event) {
            byte[] body = event.getBody();
            Map<String, String> headers = event.getHeaders();
            //将body编码成string
            String line = new String(body, StandardCharsets.UTF_8);
            //获取第一个字母
            char c = line.charAt(0);
    
    
            if ((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z')) {
                //以字母开头
                headers.put("leixing", "zimu");
            } else if (c >= '0' && c <= '9') {
                //以数字开头
                headers.put("leixing", "shuzi");
            } else {
                //其他情况,抛弃不要
                return null;
            }
            return event;
        }
    
        public List<Event> intercept(List<Event> list) {
            for (Event event : list) {
                intercept(event);
            }
            return list;
        }
    
        public void close() {
    
        }
        public static class Builder implements Interceptor.Builder{
    
            public Interceptor build() {
                return new MyInterceptor();
            }
    
            public void configure(Context context) {
    
            }
        }
    }
    
    
        (3)编辑flume配置文件 在/opt/module/flume/job/group4  a1.conf a2.conf a3.conf
           a1.conf   记住a1.sources.r1.interceptors.i1.type = com.atguigu.flume.MyInterceptor$Builder 最后有个$符
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.atguigu.flume.MyInterceptor$Builder
    
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = leixing
    a1.sources.r1.selector.mapping.zimu = c1
    a1.sources.r1.selector.mapping.shuzi = c2
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop103
    a1.sinks.k1.port = 4141
    
    a1.sinks.k2.type=avro
    a1.sinks.k2.hostname = hadoop104
    a1.sinks.k2.port = 4242
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Use a channel which buffers events in memory
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    
    
          a2.conf
    
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    
    a2.sources.r1.type = avro
    a2.sources.r1.bind = hadoop103
    a2.sources.r1.port = 4141
    
    a2.sinks.k1.type = logger
    
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    a2.sinks.k1.channel = c1
    a2.sources.r1.channels = c1
    
    
          a3.conf
    
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
    
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop104
    a3.sources.r1.port = 4242
    
    a3.sinks.k1.type = logger
    
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    a3.sinks.k1.channel = c1
    a3.sources.r1.channels = c1
    
    
           (4)分发文件到各个机器中  然后开启flume
                bin/flume-ng agent -n a3 -c conf -f job/group4/a3.conf
                bin/flume-ng agent -n a2 -c conf -f job/group4/a2.conf
                bin/flume-ng agent -n a1 -c conf -f job/group4/a1.conf
            (5)观察数据的输出状况
    
    3. Kafka Sink(将数据发往多topic)      将包含不同数据的分发的不同topic 中直接利用kafka sink 的特性  a1.sinks.k1.kafka.topic = other   默认分发到other中  其他的只需指定topic 
     
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1 
    
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 6666
    
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.EventHeaderInterceptor$MyBuilder
    
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.sinks.k1.kafka.topic = other
    a1.sinks.k1.kafka.producer.acks = -1 
    a1.sinks.k1.useFlumeEventFormat = false
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkasink-topics.conf -n a1 -Dflume.root.logger=INFO,console
    
    
    package com.atguigu.flume;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.List;
    import java.util.Map;
    
    public class EventHeaderInterceptor implements Interceptor {
        public void initialize() {
    
        }
    
        public Event intercept(Event event) {
            byte[] body = event.getBody();
            Map<String, String> headers = event.getHeaders();
            //将body编码成string
            String line = new String(body, StandardCharsets.UTF_8);
            //判断body中是否含有"atguigu" "shangguigu"
            if(line.contains("atguigu")){
                headers.put("topic","atguigu");
            }else if(line.contains("shangguigu")){
                headers.put("topic","shangguigu");
            }
            return event;
        }
    
        public List<Event> intercept(List<Event> list) {
            for (Event event : list) {
                intercept(event);
            }
            return list;
        }
    
        public void close() {
    
        }
        public static class Builder implements Interceptor.Builder{
    
            public Interceptor build() {
                return new EventHeaderInterceptor();
            }
    
            public void configure(Context context) {
    
            }
        }
    }
    
    
  • 相关阅读:
    简单实现vue列表点击某个高亮显示
    vue中子组件直接修改父组件prop属性bug
    Java基础——关于接口和抽象类的几道练习题
    Eclipse中导入外部jar包步骤
    Java基础——关于jar包的知识
    Java基础——关于访问权限的一道例题
    Java基础——抽象类和接口
    Java基础——多态
    Java基础——继承
    三个案例,解读静态代码块和构造代码块
  • 原文地址:https://www.cnblogs.com/xiao-bu/p/14337517.html
Copyright © 2011-2022 走看看