zoukankan      html  css  js  c++  java
  • Flume(三)【进阶】

    一.Flume 数据传输流程

    重要组件:

    1)Channel选择器(ChannelSelector)

    ​ ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。

    ​ ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。

    2)SinkProcessor

    ​ SinkProcessor共有三种类型,分别是DefaultSinkProcessorLoadBalancingSinkProcessorFailoverSinkProcessor

    ​ DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。

    二.Flume 事务

    1.Put 事务流程

    ​ 将批数据先写入临时缓冲区 putList,检查 channel 内存队列是否足够合并,channel 内存队列空间不足,回滚数据。

    2.Take 事务流程

    ​ 将数据取到临时缓冲区 takeList,并将数据发送到 HDFS,如果数据全部发送成功,则清除临时缓冲区 takeList数据,发送过程中如果出现异常,rollback 将临时缓冲区 takeList 中的数据归还给 channel 内存队列。

    三.Flume 拓扑结构和案例实操

    1.简单串联

    ​ 这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。

    2.复制和多路复用

    ​ Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。

    案例

    需求:使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem.

    需求分析

    步骤

    1)准备工作

    在/opt/module/flume/job目录下创建group1文件夹

    [atguigu@hadoop102 job]$ cd group1/
    

    在/opt/module/datas/目录下创建flume3文件夹

    [atguigu@hadoop102 datas]$ mkdir flume3
    

    2)编写Flume Agent配置文件

    Flume1(hadoop102)

    在hadoop102上/opt/module/flume/job/group1创建flume-exec-arvo.conf配置文件

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/module/testdata/test_file1.txt
    a1.sources.r1.shell = /bin/bash -c
    
    # 将数据流复制给所有channel
    a1.sources.r1.selector.type = replicating
    
    
    # Describe the sink
    # sink端的avro是一个数据发送者
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop103
    a1.sinks.k1.port = 4141
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop104
    a1.sinks.k2.port = 4142
    
    # Use a channel which buffers events in memory
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    

    Flume2(hadoop103)

    在hadoop103上/opt/module/flume/job/group1创建flume-arvo-hdfs.conf配置文件

    添加以下内容:

    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    
    # Describe/configure the source
    # source端的avro是一个数据接收服务
    a2.sources.r1.type = avro
    a2.sources.r1.bind = hadoop103
    a2.sources.r1.port = 4141
    
    # Describe the sink
    a2.sinks.k1.type = hdfs
    a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume2/%Y%m%d/%H
    #上传文件的前缀
    a2.sinks.k1.hdfs.filePrefix = flume2-
    #是否按照时间滚动文件夹
    a2.sinks.k1.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a2.sinks.k1.hdfs.roundValue = 1
    #重新定义时间单位
    a2.sinks.k1.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a2.sinks.k1.hdfs.useLocalTimeStamp = true
    #积攒多少个Event才flush到HDFS一次
    a2.sinks.k1.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a2.sinks.k1.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a2.sinks.k1.hdfs.rollInterval = 600
    #设置每个文件的滚动大小大概是128M
    a2.sinks.k1.hdfs.rollSize = 134217700
    #文件的滚动与Event数量无关
    a2.sinks.k1.hdfs.rollCount = 0
    
    # Describe the channel
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    

    flume3(hadoop104)

    在hadoop104上/opt/module/flume/job/group1创建flume-arvo-fileroll.conf配置文件

    配置上级Flume输出的Source,输出是到本地目录的Sink。

    注意:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。

    添加以下内容:

    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
    
    # Describe/configure the source
    # source端的avro是一个数据接收服务
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop104
    a3.sources.r1.port = 4142
    
    # Describe the sink
    a3.sinks.k1.type = file_roll
    a3.sinks.k1.sink.directory = /opt/module/datas/flume3
    
    # Describe the channel
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1
    

    3)启动flume agent

    依次启动flume,先启下游,再启上游

    flume-arvo-filerool-->flume-arvo-hdfsk-->flume-exec-arvo

    4)检查hdfs数据和本地文件的数据

    3.负载均衡和故障转移

    ​ Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。

    案例

    需求:使用Flume1(hadoop102)监控一个端口,其sink组中的sink分别对接Flume2(hadoop103)Flume3(hadoop104),分别采用Load balancing Sink Processor实现负载均衡,FailoverSinkProcessor实现故障转移的功能

    需求分析

    步骤

    1)flume1(hadoop102)

    在job下新建group2文件夹,新建flume-netstat-arvo.conf

    # Name the components on this agent( 描述这个Agent,给各个组件取名字)
    a1.sources = r1
    a1.channels = c1
    a1.sinkgroups = g1
    a1.sinks = k1 k2
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/module/testdata/3.txt
    a1.sources.r1.shell = /bin/bash -c
    ###########################################配置为负载均衡(failover)
    #指定类型为故障转移, 启动k1的权重为5,k2为10,k2启动为active,k1备用
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.k1 = 5
    a1.sinkgroups.g1.processor.priority.k2 = 10
    a1.sinkgroups.g1.processor.maxpenalty = 10000
    
    # Describe the sink
    # sink端的avro是一个数据发送者
    a1.sinks.k1.type = avro
    #发送的目的主机ip
    a1.sinks.k1.hostname = hadoop103 
    a1.sinks.k1.port = 4141
    a1.sinks.k2.type = avro
    #发送的目的主机ip
    a1.sinks.k2.hostname = hadoop104
    a1.sinks.k2.port = 4141
    
    # Describe the channel
    #channel的类型为memory或者file
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    #1个source,2个channel
    a1.sources.r1.channels = c1
    #sink组
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1
    
    1. flume2(hadoop103)

    在job下新建group2文件夹,新建flume-avro-logger.conf

    # Name the components on this agent( 描述这个Agent,给各个组件取名字)
    a2.sources = r1
    a2.channels = c1
    a2.sinks = k1
    
    # Describe/configure the source
    # source端的avro是一个数据接收服务
    a2.sources.r1.type = avro
    #接收的主机
    a2.sources.r1.bind = hadoop103
    #要和上级的avro的sink的端口一致
    a2.sources.r1.port = 4141
    
    # Describe the sink
    a2.sinks.k1.type = logger
    
    # Describe the channel
    #channel的类型为memory或者file
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    #1个source,2个channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
    1. flume3(hadoop104)

    在job下新建group2文件夹,新建flume-avro-logger.conf

    # Name the components on this agent( 描述这个Agent,给各个组件取名字)
    a3.sources = r1
    a3.channels = c1
    a3.sinks = k1
    
    # Describe/configure the source
    # source端的avro是一个数据接收服务
    a3.sources.r1.type = avro
    #接收的主机
    a3.sources.r1.bind = hadoop104
    #要和上级的avro的sink的端口一致
    a3.sources.r1.port = 4141
    
    # Describe the sink
    a3.sinks.k1.type = logger
    
    # Describe the channel
    #channel的类型为memory或者file
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    #1个source,2个channel
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1
    

    4)分别启动flume2,flume3,最后启动fulme1

    #flume2
    bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-avro-logger.conf -Dflume.root.logger=INFO,console
    #flume3
    bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-avro-logger.conf -Dflume.root.logger=INFO,console
    #flume1
    bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-avro-logger.conf
    

    5)观察现象,向flume1发数据,数据只会发往flume3,flume2没有数据;

    ​ 然后kill掉flume3,继续往flume1发数据,flume2会收到数据

    4.聚合

    案例

    需求:Flume1(hadoop102)与Flume2(hadoop103)将数据发送给Flume3(hadoop104),Flume3将最终数据打印到控制台。

    需求分析

    1.单source

    1)flume1(hadoop102)

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/module/testdata/3.txt
    a1.sources.r1.shell = /bin/bash -c
    
    
    # Describe the sink
    # sink端的avro是一个数据发送者
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop104
    a1.sinks.k1.port = 4141
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    2)flume2(hadoop103)

    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    
    # Describe/configure the source
    a2.sources.r1.type = exec
    a2.sources.r1.command = tail -F /opt/module/testdata/3.txt
    a2.sources.r1.shell = /bin/bash -c
    
    
    # Describe the sink
    # sink端的avro是一个数据发送者
    a2.sinks.k1.type = avro
    a2.sinks.k1.hostname = hadoop104
    a2.sinks.k1.port = 4141
    
    # Use a channel which buffers events in memory
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    

    3)flume3(hadoop104)

    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
    
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop104
    a3.sources.r1.port = 4141
    
    # Describe the sink
    a3.sinks.k1.type = logger
    
    # Describe the channel
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1
    
    2.多source

    只需要更改flume3,增加一个source, 需要注意2个source的端口不能一样, flume1,flume2分别对接这个2个端口

    # Name the components on this agent
    a3.sources = r1 r2
    a3.sinks = k1
    a3.channels = c1
    
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop104
    a3.sources.r1.port = 4141
    a3.sources.r2.type = avro
    a3.sources.r2.bind = hadoop104
    a3.sources.r2.port = 4142
    
    # Describe the sink
    a3.sinks.k1.type = logger
    
    # Describe the channel
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c1
    a3.sources.r2.channels = c1
    a3.sinks.k1.channel = c1
    

    四.自定义Interceptor

    ​ 在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到Flume拓扑结构中的Multiplexing(多路复用)结构,结合channel选择器根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的value赋予不同的值。

    案例

    需求:判断消息boby中是否含有“hello”,有发往hadoop103,并打印出来;其余发往hadoop104;

    需求分析

    1)创建模块

    2)引入依赖

    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
    </dependency>
    

    3)自定义interceptor,实现Interceptor接口

    package com.bigdata.interceptor;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    public class TypeInterceptor implements Interceptor {
    
        public List<Event> addEventList;
    
        @Override
        public void initialize() {
         addEventList=new ArrayList<>();
        }
    
        @Override
        // 单个处理event
        public Event intercept(Event event) {
            // 1. 获取event的头信息
            Map<String, String> headers = event.getHeaders();
            // 2. 获取event的里面的身体信息
            String body = new String(event.getBody());
            // 3. 判断 body里面是否包含hello 来决定加头信息
            if (body.contains("hello")){
                // 4. 如果包含,在头信息加上值为hello的键值对
                headers.put("type","hello");
            }else {
                // 5. 如果不包含,在头信息加上值为nohello的键值对
                headers.put("type","nohello");
            }
            return event;
        }
    
        @Override
        //批量处理event
        public List<Event> intercept(List<Event> events) {
            // 清空事件集合
            addEventList.clear();
            // for添加到集合中
            for (Event event : events) {
                addEventList.add(intercept(event));
            }
            return addEventList;
        }
    
        @Override
        public void close() {
    
        }
        public static class Builder implements Interceptor.Builder{
            @Override
            public Interceptor build() {
                return new TypeInterceptor();
            }
    
            @Override
            // 获取或设定配置信息
            public void configure(Context context) {
    
            }
        }
    
    

    4)打jar包,上传至/opt/module/kafka/lib目录下

    5)flume1的配置文件

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    #自定义过滤器名称
    a1.sources.r1.interceptors = i1
    #全类名+$Builder
    a1.sources.r1.interceptors.i1.type = com.atguigu.interceptor.TypeInterceptor$Builder
    a1.sources.r1.selector.type = multiplexing
    #header的key
    a1.sources.r1.selector.header = type
    a1.sources.r1.selector.mapping.hello = c1
    a1.sources.r1.selector.mapping.nohello = c2
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop103
    a1.sinks.k1.port = 4141
    
    a1.sinks.k2.type=avro
    a1.sinks.k2.hostname = hadoop104
    a1.sinks.k2.port = 4242
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Use a channel which buffers events in memory
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    

    6)flume2配置文件

    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    
    a2.sources.r1.type = avro
    a2.sources.r1.bind = hadoop103
    a2.sources.r1.port = 4141
    
    a2.sinks.k1.type = logger
    
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    a2.sinks.k1.channel = c1
    a2.sources.r1.channels = c1
    

    7)flume3配置文件

    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
    
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop104
    a3.sources.r1.port = 4242
    
    a3.sinks.k1.type = logger
    
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    a3.sinks.k1.channel = c1
    a3.sources.r1.channels = c1
    

    8)分别在hadoop103,hadoop104,hadoop102上启动flume进程,注意先后顺序。

    9)在hadoop102使用netcat向localhost:44444发送字母和数字。

    10)观察hadoop103和hadoop104打印的日志

  • 相关阅读:
    【C++程序员学 python】python 的文件类型
    Python如何下载文件
    【C++程序员学 python】python split and join 分割与合并
    知道创宇研发技能表v2.1
    Scapy:局域网MAC地址扫描脚本
    LeetCode: Linked List Cycle II 解题报告
    LeetCode: Insertion Sort List 解题报告
    Lintcode: Minimum Adjustment Cost 解题报告
    LeetCode: Merge Two Sorted Lists 解题报告
    LeetCode: Palindrome Partitioning II 解题报告
  • 原文地址:https://www.cnblogs.com/wh984763176/p/13252441.html
Copyright © 2011-2022 走看看