zoukankan      html  css  js  c++  java
  • Flume实战案例

    从端口读数据读取到本地文件

    #1.给三个组件命名
    a3.sources = r1
    a3.channels = c1
    a3.sinks = k1 
    #2.给source组件属性赋值
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop102
    a3.sources.r1.port = 6666
    #3.给channel组件属性赋值
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 10000
    a3.channels.c1.transactionCapacity = 100
    #4.给sink组件属性赋值
    a3.sinks.k1.type = file_roll
    a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/datas/fileroll
    #5.让sources、sinks连接上对应的channels
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1 
    
    

    从端口读数据读取到HDFS

    a2 ==> a2.conf
    
    
    a2.sources = r1
    a2.channels = c1
    a2.sinks = k1
    
    a2.sources.r1.type = avro
    a2.sources.r1.bind = hadoop102
    a2.sources.r1.port = 5555
    
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 10000
    a2.channels.c1.transactionCapacity = 100
    
    a2.sinks.k1.type = hdfs
    a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
    a2.sinks.k1.hdfs.filePrefix = logs-
    a2.sinks.k1.hdfs.round = true
    a2.sinks.k1.hdfs.roundValue = 1
    a2.sinks.k1.hdfs.roundUnit = hour
    a2.sinks.k1.hdfs.useLocalTimeStamp = true
    a2.sinks.k1.hdfs.batchSize = 100
    a2.sinks.k1.hdfs.fileType = DataStream
    a2.sinks.k1.hdfs.rollInterval = 60
    a2.sinks.k1.hdfs.rollSize = 134217700
    a2.sinks.k1.hdfs.rollCount = 0
    
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
    

    复制和多路复用

    可以将相同数据复制到多个channel中(flume默认),也可以将不同数据分发到不同channel中,sink可以选择传送到不同的目的地

    监控一个文件,然后通过两个channel搭配两个sink吧内容写出到控制台.

    a1.sources = r1
    a1.channels = c1 c2
    a1.sinks = k1 k2 
    
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -f /tmp/atguigu/hive.log
    a1.sources.r1.selector.type = replicating
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 10000
    a1.channels.c2.transactionCapacity = 100
    
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop102
    a1.sinks.k1.port = 5555
    
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop102
    a1.sinks.k2.port = 6666
    
    a1.sources.r1.channels = c1 c2 
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2 
    

    故障转移

    Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。这里的故障,指的是Sink故障

    1)通过sinkgroups里priority属性配置的权重来决定哪台的优先级高,同一时间只能有一台机器工作

    2)当当前的sink挂掉后切换为standby模式(假设优先级10),并立刻切换到另一台(假设优先级9),当sink修复好重新启动后,隔段时间会恢复使用优先级为10的sink

    1. 遇到故障时,我们要立即修复
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1 k2 
    
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 4444
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    #将数据写到另一台Flume服务器上
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop102
    a1.sinks.k1.port = 5555
    #将数据写到另一台Flume服务器上
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop103
    a1.sinks.k2.port = 6666
    
    #使用sink processor来控制channel的数据流向
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2  
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.k1 = 5
    a1.sinkgroups.g1.processor.priority.k2 = 10
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1
    
    

    负载均衡

    通过将sinkprocessor里的type属性来控制processor模式,分别是(负载均衡load_balance、故障转移failover)

    1. 使用负载均衡以后,channel会轮训分配任务,减少机器负荷
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1 k2 
    
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 4444
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop102
    a1.sinks.k1.port = 5555
    
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop102
    a1.sinks.k2.port = 6666
    
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = load_balance
    a1.sinkgroups.g1.processor.backoff = true
    a1.sinkgroups.g1.processor.selector = random
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1
    

    聚合

    聚合,指的是将多台日志服务器上的数据,汇总到一台日志服务器上,进行输出

    a3 ==> a3.conf(hadoop104)
    
    a3.sources = r1
    a3.channels = c1
    a3.sinks = k1 
    
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop104
    a3.sources.r1.port = 6666
    
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 10000
    a3.channels.c1.transactionCapacity = 100
    
    a3.sinks.k1.type = logger
    
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1 
    
    
    a2 ==> a2.conf(hadoop103)
    
    a2.sources = r1
    a2.channels = c1
    a2.sinks = k1 
    
    a2.sources.r1.type = netcat
    a2.sources.r1.bind = localhost
    a2.sources.r1.port = 4444
    
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 10000
    a2.channels.c1.transactionCapacity = 100
    
    a2.sinks.k1.type = avro
    a2.sinks.k1.hostname = hadoop104
    a2.sinks.k1.port = 6666
    
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1 
    
    
    a1 ==> a1.conf(hadoop102)
    
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -f /tmp/atguigu/hive.log
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop104
    a1.sinks.k1.port = 6666
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1 
    
    

    ChannelSelector案例

    ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。

    ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。默认是Replicating

    1. Multiplexing类型的ChannelSelector会根据Event中Header中的某个属性决定分发到哪个Channel。
    2. 每个event里的header默认是没有值的,所以,multiplexing类型的ChannelSelector一般会配合自定义拦截器使用

    replicating类型例子如下

    a1.sources = r1
    a1.channels = c1 c2 # 如果有100个Event,那么c1和c2中都会有这100个事件
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    

    multiplexing类型的ChannelSelector例子如下

    a1.sources = r1
    a1.channels = c1 c2
    
    a1.sources.source1.selector.type = multiplexing
    a1.sources.source1.selector.header = title # 以header中的title对应的值作为条件
    a1.sources.source1.selector.mapping.a = c2 # 如果header中title的值为a,使用c2这个channel
    a1.sources.source1.selector.mapping.b = c1 # 如果header中title的值为FAIL,使用c1这个channel
    a1.sources.source1.selector.default = c1 # 默认使用c1这个channel
    

    SinkProcessor案例

    SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor

    DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。

    自定义Interceptor

    使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

    需求

    在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。
    

    实现代码

    1)创建maven项目,引入依赖

    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
    </dependency>
    

    2)Java代码

    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.util.List;
    import java.util.Map;
    
    /**
     *  1. 如何自定义拦截器?
     *   flume的自定义拦截器需要实现Flume提供的Interceptor接口.
     *
     *  实现抽象方法:
     *      initialize: 完成一些初始化工作.
     *      close: 完成一些善后的工作
     *      intercept:拦截器的核心处理方法.  拦截的逻辑.
     *          intercept(Event event) : 单个event的拦截处理
     *          intercept(List<Event> events): 批次event的拦截处理
     *
     *  2. 拦截器的对象如何实例化?
     *    在拦截器中定义一个static的内部类,实现Flume提供的Builder接口
     *
     *   实现抽象方法:
     *      build : 用于构建拦截器对象
     *      configure:用于读取配置信息(xxxx.conf)
     *
     *
     *
     */
    public class LogDataInterceptor implements Interceptor {
        @Override
        public void initialize() {
    
        }
    
        /**
         * 需求: 判断每个Event的body中是否包含"atguigu"
         *       如果包含,给Event的header中添加一个kv:  title = at
         *       如果不包含,给Event的header中添加一个kv: title = ot
         */
        @Override
        public Event intercept(Event event) {
            //1. 获取event的 header 和 body
            Map<String, String> headers = event.getHeaders();
            String body = new String(event.getBody());  // 编码问题
    
            //2. 判断处理
            if(body.contains("atguigu")){
                headers.put("title","at");
            }else{
                headers.put("title","ot");
            }
    
            //3. 将处理好的event返回
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
            for (Event event : events) {
                intercept(event);
            }
    
            return events ;
        }
    
        @Override
        public void close() {
    
        }
    
    
        public static class MyBuilder   implements  Builder{
    
            @Override
            public Interceptor build() {
                return new LogDataInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
    
    }
    
    

    3)将代码打成jar包

    4)配置文件

    1.进阶案例 - channel选择器 - 多路
    a3 ==> a3.conf
    
    a3.sources = r1
    a3.channels = c1
    a3.sinks = k1 
    
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop102
    a3.sources.r1.port = 6666
    
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 10000
    a3.channels.c1.transactionCapacity = 100
    
    a3.sinks.k1.type = logger
    
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1 
    
    
    a2 ==> a2.conf
    
    
    a2.sources = r1
    a2.channels = c1
    a2.sinks = k1
    
    a2.sources.r1.type = avro
    a2.sources.r1.bind = hadoop102
    a2.sources.r1.port = 5555
    
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 10000
    a2.channels.c1.transactionCapacity = 100
    
    a2.sinks.k1.type =logger
    
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
    
    a1 ==> a1.conf
    
    a1.sources = r1
    a1.channels = c1 c2
    a1.sinks = k1 k2 
    
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = hadoop102
    a1.sources.r1.port = 4444
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100
    
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 10000
    a1.channels.c2.transactionCapacity = 100
    
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop102
    a1.sinks.k1.port = 5555
    
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop102
    a1.sinks.k2.port = 6666
    
    #将选择器类型改为multiplexing分发
    a1.sources.r1.selector.type = multiplexing
    #检测每个event里head的title key
    a1.sources.r1.selector.header = title
    #如果title的值为at,吧event发到channel c1里,如果为ot,发到channel c2里,如果都不匹配,默认发到c1里
    a1.sources.r1.selector.mapping.at = c1
    a1.sources.r1.selector.mapping.ot = c2
    a1.sources.r1.selector.default=c1
    #给拦截器命名i1
    a1.sources.r1.interceptors = i1
    #这里写自定义类的全类名
    a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogDataInterceptor$MyBuilder
    
    a1.sources.r1.channels = c1 c2 
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    

    5)分别在三台机器上启动flume并测试

    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
    
    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
    
    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf
    

    注:使用 jps-ml 查看Flume进程

  • 相关阅读:
    关于prototype学习
    java io 从文件的读取和输入
    java 匿名内部类
    动手做个 AI 机器人,帮我回消息!
    几个高效做事的法则,让你的一天有 25 小时
    爆肝一个月,我做了个免费的面试刷题网
    Log4j 被曝核弹级漏洞,开发者炸锅了!
    几个超火的编程网站,别错过!
    Java 处理表格,真的很爽!
    c++智能指针转化:static_pointer_cast、dynamic_pointer_cast、const_pointer_cast、reinterpret_pointer_cast
  • 原文地址:https://www.cnblogs.com/traveller-hzq/p/14099724.html
Copyright © 2011-2022 走看看