zoukankan      html  css  js  c++  java
  • Hadoop生态圈---flume

    一、Flume基本介绍

    1.1 什么是flume

          说白了flume就是一个采集数据的软件,是cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的软件;

          flume的核心就是把数据从数据源(source)收集过来,为了保证传输的成功性,会先缓存数据(channel),待数据到达目的地(sink)的时候,再删除自己缓存的数据;

          flume支持定制各类数据发送方,用于手机各类型的数据,同时也可以支持定制各种数据的接收方,用于最终存储数据

    1.2 运行机制

                 

    每一个agent相当于一个数据邮递员,将一些数据从一个地方运输到另外一个地方。

    source:采集源,用于跟数据对接,以获取数据

    sink:下层地,将数据发送到下一个agent或者是发往最终的目的地

    channel:agent内部的一个传输数据的通道 用于连接source和sink,将数据从source传递到sink;

    在整个数据的传输过程中,流动的是event,将数据进行封装,并携带头信息。

    一个完整的event包括:event headers、event body、event信息、其中event信息就是flume手机到的日记记录。

    1.3 组件

    二、 flume安装部署

            2.1安装

    上传安装包到数据源所在节点上
    然后解压  tar -zxvf apache-flume-1.8.0-bin.tar.gz -C 安装的目录
    然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME

            2.2测试环境是否正常

    执行: vi   netcat-logger.conf
    
    文件内容如下:netcat-logger.conf
    
    # 定义这个agent中各组件的名字
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # 描述和配置source组件:r1
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # 描述和配置sink组件:k1
    a1.sinks.k1.type = logger
    
    # 描述和配置channel组件,此处使用是内存缓存的方式
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 描述和配置source  channel   sink之间的连接关系
    
    
    
    a1.sources.r1.channels = c1 
    a1.sinks.k1.channel = c1
    
    2) 启动agent去采集数据
    bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
    
    
        -c conf   指定flume自身的配置文件所在目录
    
    ​	-f conf/netcat-logger.con  指定我们所描述的采集方案
    
    ​	-n a1  指定我们这个agent的名字
    
    
    3) 测试
    
    先要往agent采集监听的端口上发送数据,让agent有数据可采。
    随便在一个能跟agent节点联网的机器上:
    telnet anget-hostname  port   (telnet localhost 44444)
    

    三、flume采集案例

    3.1 采集目录到hdfs

                  采集需求:服务器的某特定目录下,会不断产生新的文件,每当有新的文件产生时,就需要把文件采集到的hdfs

                  采集三大要素

                  source:监控目录--spooldir

                  sink:hdfs sink

                  channel:可以用file channel也可以用内存channel

                  步骤:

                     1)配置文件在flume中的conf中

                     2)开启命令    bin/flume-ng agent -c conf -f conf/spooldir-hdfs.conf -n a1  -Dflume.root.logger=INFO,console

                     3)往监测的目录中加入数据  此时就会读取加入的数据发送到hdfs上

              注意: 
                    1) 在收集文件夹的时候, 不允许在文件夹下面传递相同文件名的文件, 否则flume将会直接罢工不干活
                    2) 启动之前, 目录是要存在的
                    解决:将同名称删除, 然后重启agent才可以使用

    3.2 采集文件到hdfs

                    采集需求: 比如业务系统使用log4j生成的日志, 日志内容不断增加, 需要把追加到日志文件中的数据实时采集到hdfs中

                    采集的三大要素

                    source:控制文件内容更新 --  exec :  通过linux的命令获取对应数据 (常用于监控文件)

                    sink:hdfs sink

                    channel:内存channel 

                    步骤:

                    1)配置监测文件到conf目录下

                    2)启动命令   bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1  -Dflume.root.logger=INFO,console

                    3)模拟数据的生成 向被检测的文件中写入动态的数据 这样就会被检测到然后按照指定的要求把数据写入到hdfs中   while true; do echo "test test....." >> /export/flumedata/hhh.log;sleep 0.5;done

                    Q: 如何知道这个文件产生了新的数据????
                          tail -f  文件路径

    3.3 参数解析

    **参数解析:**
    
    - **rollInterval**
    
      默认值:30
    
      hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒;
    
      如果设置成0,则表示不根据时间来滚动文件;
    
      注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;
    
    - **rollSize**
    
      默认值:1024
    
      当临时文件达到该大小(单位:bytes)时,滚动成目标文件;
    
      如果设置成0,则表示不根据临时文件大小来滚动文件;
    
    - **rollCount**
    
      默认值:10
    
      当events数据达到该数量时候,将临时文件滚动成目标文件;
    
      如果设置成0,则表示不根据events数据来滚动文件;
    
    - **round**
    
      默认值:false
    
      是否启用时间上的“舍弃”,这里的“舍弃”,类似于“四舍五入”。
    
    - **roundValue**
    
      默认值:1
    
      时间上进行“舍弃”的值;
    
    - **roundUnit**
    
      默认值:seconds
    
      时间上进行“舍弃”的单位,包含:second,minute,hour

    四、负载均衡

               负载均衡(load-balance)是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。Load balancing Sink Processor能够实现load balance功能,如下图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上

            当一个节点出现了大量的数据, 导致这个节点执行效率会很低,将这个节点扩展多份 来共同承担处理的任务 
            注意: 如果有多个agent, 在启动的时候:  先启动那些远离数据源的agent

    我这里有三台服务器
    
    -- 第一台的配置:
    #agent1 name
    agent1.channels = c1
    agent1.sources = r1
    agent1.sinks = k1 k2
    
    #set gruop
    agent1.sinkgroups = g1
    
    #set channel
    agent1.channels.c1.type = memory
    agent1.channels.c1.capacity = 1000
    agent1.channels.c1.transactionCapacity = 100
    
    agent1.sources.r1.channels = c1
    agent1.sources.r1.type = exec
    agent1.sources.r1.command = tail -F /export/flumedata/123.log
    
    # set sink1
    agent1.sinks.k1.channel = c1
    agent1.sinks.k1.type = avro
    agent1.sinks.k1.hostname = hadoop02
    
    agent1.sinks.k1.port = 52020
    
    # set sink2
    agent1.sinks.k2.channel = c1
    agent1.sinks.k2.type = avro
    agent1.sinks.k2.hostname = hadoop03
    agent1.sinks.k2.port = 52020
    
    #set sink group
    agent1.sinkgroups.g1.sinks = k1 k2
    
    #负载均衡的配置
    agent1.sinkgroups.g1.processor.type = load_balance
    # 是否开启黑名单机制
    agent1.sinkgroups.g1.processor.backoff = true
    agent1.sinkgroups.g1.processor.selector = round_robin
    agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000
    
    -- 第二台
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = hadoop02
    a1.sources.r1.port = 52020
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    -- 第三台
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = hadoop03
    a1.sources.r1.port = 52020
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    
    
    
    bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

    五、容错机制

    当一个节点出现了宕机的问题后, 将备份节点更改为active状态, 提供使用即可保证服务器集群不会出现宕机的风险通过配置sink的优先级, 来决定sink的执行的顺序

    -- 第一台
    #agent1 name
    agent1.channels = c1
    agent1.sources = r1
    agent1.sinks = k1 k2
    
    #set gruop
    agent1.sinkgroups = g1
    
    #set channel
    agent1.channels.c1.type = memory
    agent1.channels.c1.capacity = 1000
    agent1.channels.c1.transactionCapacity = 100
    
    agent1.sources.r1.channels = c1
    agent1.sources.r1.type = exec
    agent1.sources.r1.command = tail -F /export/flumedata/456.log
    
    # set sink1
    agent1.sinks.k1.channel = c1
    agent1.sinks.k1.type = avro
    agent1.sinks.k1.hostname = node02
    agent1.sinks.k1.port = 52020
    
    # set sink2
    agent1.sinks.k2.channel = c1
    agent1.sinks.k2.type = avro
    agent1.sinks.k2.hostname = node03
    agent1.sinks.k2.port = 52020
    
    #set sink group
    agent1.sinkgroups.g1.sinks = k1 k2
    
    #set failover
    agent1.sinkgroups.g1.processor.type = failover
    agent1.sinkgroups.g1.processor.priority.k1 = 10
    agent1.sinkgroups.g1.processor.priority.k2 = 1
    agent1.sinkgroups.g1.processor.maxpenalty = 10000
    
    
    
  • 相关阅读:
    【4N魔方阵】
    【2(2N+1)魔方阵 】
    【二分查找法(折半查找法)】
    【循环搜寻法(使用卫兵)】
    【合并排序法】
    【快速排序法一】
    【快速排序二】
    【快速排序三】
    【数据结构】之 线性表详解
    【计算机网络基础】
  • 原文地址:https://www.cnblogs.com/haojia/p/12386224.html
Copyright © 2011-2022 走看看