zoukankan      html  css  js  c++  java
  • 大数据入门第十二天——flume入门

    一、概述

      1.什么是flume

        官网的介绍:http://flume.apache.org/

      Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

        简明的介绍:

    u Flume是一个分布式、可靠、和高可用的海量日志采集(对实时支持)、聚合和传输的系统。

    u Flume可以采集文件,socket数据包等各种形式源数据,又可以将采集到的数据输出到HDFShbasehive、kafka等众多外部存储系统中

    一般的采集需求,通过对flume的简单配置即可实现

    u Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景

       2.flume运行机制

      

      

      1、 Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成

      2、 每一个agent相当于一个数据传递员,内部有三个组件:

        a) Source:采集源,用于跟数据源对接,以获取数据

        b) Sink:下沉地,采集数据的传送目的,用于往下一级agent传递数据或者往最终存储系统传递数据

        c) Channel:angent内部的数据传输通道,用于从source将数据传递到sink

        并且,多个之间支持串联:

        

    二、安装与配置

      1.上传安装包

      

      2.解压

    tar -zxvf apache-flume-1.6.0-bin.tar.gz -C apps/

      3.配置环境变量

    [hadoop@mini1 apache-flume-1.6.0-bin]$ sudo vim /etc/profile

      追加内容:

    export FLUME_HOME=/home/hadoop/apps/apache-flume-1.6.0-bin
    export PATH=$PATH:$FLUME_HOME/bin
    [hadoop@mini1 apache-flume-1.6.0-bin]$ source /etc/profile

      //如要查看环境变量的值,推荐使用export或者env

      4.配置flume-env.sh

    [hadoop@mini1 ~]$ cd apps/apache-flume-1.6.0-bin/
    [hadoop@mini1 apache-flume-1.6.0-bin]$ cd conf/
    [hadoop@mini1 conf]$ ls
    flume-conf.properties.template  flume-env.sh.template
    flume-env.ps1.template          log4j.properties
    [hadoop@mini1 conf]$ cp flume-env.sh.template flume-env.sh

      追加配置:

    [hadoop@mini1 conf]$ vim flume-env.sh
    export JAVA_HOME=/opt/java/jdk1.8.0_151
    export HADOOP_HOME=/home/hadoop/apps/hadoop-2.6.4

      5.验证

    [hadoop@mini1 conf]$ flume-ng version

    三、简单部署

      1.新建采集方案配置文件

        conf目录下flume-conf.properties.template 是模板配置文件,我们在此目录新建的自己的配置方案:

    vim netcat-logger.conf
    # Name the components on this agent
    #给那三个组件取个名字,a1是agent的名字
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    #类型, 从网络端口接收数据,在本机启动, 所以localhost, type=spoolDir采集目录源,目录里有就采
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    #下沉的时候是一批一批的, 下沉的时候是一个个eventChannel参数解释:
    #capacity:默认该通道中最大的可以存储的event数量
    #trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    # Name the components on this agent
    
    a1.sources = r1
    
    a1.sinks = k1
    
    a1.channels = c1
    
    # Describe/configure the source
    
    a1.sources.r1.type = netcat
    
    a1.sources.r1.bind = localhost
    
    a1.sources.r1.port = 44444
    
    # Describe the sink
    
    a1.sinks.k1.type = logger
    
    # Use a channel that buffers events in memory
    
    a1.channels.c1.type = memory
    
    a1.channels.c1.capacity = 1000
    
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    
    a1.sources.r1.channels = c1
    
    a1.sinks.k1.channel = c1
    精简配置

       当然,如果想要别的机器可以连过来,可以通过设置r1.bind处的localhost改为主机名(此处为mini1)即可!

       2.启动

    bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console

      其中,--conf指定指定配置文件目录,--conf-file指定配置方案,--name,指定agent名称,-D的为JVM参数,当然--conf等可以简写如下:

    bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1  -Dflume.root.logger=INFO,console

      //注意这里conf是相对路径了

      3.登录

        打开另外一个终端(复制SSH隧道):

    telnet localhost 44444

       // 这里本机没有安装telnet,故先安装telnet:https://www.cnblogs.com/lixuwu/p/6102444.html

        

      这里在flume服务端就可以看到接收的信息了,如果消息太长被截断,可以通过参数设置:

    max-line-length    512    Max line length per event body (in bytes)

      完整的参照官网configuration下的sources下的netcat相关的参数设置!

       以上这些完整的介绍,都在官方文档中http://flume.apache.org/FlumeUserGuide.html

      配置采集源为spool目录

        在conf下新建spool.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    #监听目录,spoolDir指定目录, fileHeader要不要给文件夹前坠名
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/hadoop/flumespool
    a1.sources.r1.fileHeader = true
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

        启动

    bin/flume-ng agent -c ./conf -f ./conf/spool-logger.conf -n a1 -Dflume.root.logger=INFO,console

        此时通过cp等命令往flumespool中添加文件即可看到效果!

      但是不能有相同的文件名,否则会报错!

     四、采集文件到HDFS

      1.conf下新建tail-hdfs.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #exec 指的是命令
    # Describe/configure the source
    a1.sources.r1.type = exec
    #F根据文件名追踪, f根据文件的nodeid追踪
    a1.sources.r1.command = tail -F /home/hadoop/log/test.log
    a1.sources.r1.channels = c1
    
    # Describe the sink
    #下沉目标
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    #指定目录, flum帮做目的替换
    a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
    #文件的命名, 前缀
    a1.sinks.k1.hdfs.filePrefix = events-
    
    #10 分钟就改目录
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    
    #文件滚动之前的等待时间(秒)
    a1.sinks.k1.hdfs.rollInterval = 3
    
    #文件滚动的大小限制(bytes)
    a1.sinks.k1.hdfs.rollSize = 500
    
    #写入多少个event数据后滚动文件(事件个数)
    a1.sinks.k1.hdfs.rollCount = 20
    
    #5个事件就往里面写入
    a1.sinks.k1.hdfs.batchSize = 5
    
    #用本地时间格式化目录
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    #下沉后, 生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
    a1.sinks.k1.hdfs.fileType = DataStream
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

      注意这里换成exec命令形式了,不采用之前的spoolDir,只是采集源不同而已

      2.准备测试数据

    [hadoop@mini1 conf]$ mkdir /home/hadoop/log
    [hadoop@mini1 conf]$ touch /home/hadoop/log/test.log
    [hadoop@mini1 ~]$ while true
    > do
    > echo 10086 >> /home/hadoop/log/test.log
    > sleep 0.5
    > done

      //当然,通过常规的新建.sh文件运行也是可以的

      新开终端可以查看效果

    [hadoop@mini1 ~]$ tail -F /home/hadoop/log/test.log

      3.启动HDFS

    start-dfs.sh

      4.启动flume

    bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1

      5.查看效果

        通过网页mini1:50070直接可以查看到相关目录的生成

      如果为spoolDir形式,参考https://www.cnblogs.com/cnmenglang/p/6543927.html

    五、多agent串联

      主要就是一个的sink为另外一个的source即可!

      参考http://blog.csdn.net/killy_uc/article/details/22916479

  • 相关阅读:
    ==和equals
    java 多重继承
    java单例模式
    基础小知识
    print流之错误日志
    print流
    实现读文本文件(IOl流)
    缓冲流(数据的复制粘贴)IO流
    力扣20题、1047(括号合法性,删除字符串中的所有相邻重复项)
    力扣232题、225题(栈实现队列,队列实现栈)
  • 原文地址:https://www.cnblogs.com/jiangbei/p/8472919.html
Copyright © 2011-2022 走看看