zoukankan      html  css  js  c++  java
  • 12.Flume的安装

    先把flume包上传并解压

    给flume创建一个软链接

     给flume配置环境变量

     

     

    #flume
    export FLUME_HOME=/opt/modules/flume
    export PATH=$PATH:$FLUME_HOME/bin

    使环境变量生效

     验证flume版本信息

    flume-ng  version

    然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME

     

     

     

    先用一个最简单的例子来测试一下程序环境是否正常

     先在flume的conf目录下新建一个文件

    vim   netcat-logger.conf

    # 定义这个agent中各组件的名字

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # 描述和配置source组件:r1

    a1.sources.r1.type = netcat

    a1.sources.r1.bind = localhost

    a1.sources.r1.port = 44444

    # 描述和配置sink组件:k1

    a1.sinks.k1.type = logger

    # 描述和配置channel组件,此处使用是内存缓存的方式

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000 内存里面存放1000个事件

    a1.channels.c1.transactionCapacity = 100

    # 描述和配置source  channel   sink之间的连接关系

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    a1.sinks.k1.type = logger
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000 
    a1.channels.c1.transactionCapacity = 100
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    启动agent去采集数据

    bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1  -Dflume.root.logger=INFO,console

    -c conf   指定flume自身的配置文件所在目录

    -f conf/netcat-logger.conf  指定我们所描述的采集方案

    -n a1  指定我们这个agent的名字

     

    启动nc的客户端

     $>nc localhost 44444

     

    可以看到flume接收到

    采集本地目录的数据文件到HDFS上

    采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去

    根据需求,首先定义以下3大要素

    1.采集源,即source——监控文件目录 :  spooldir

    2.下沉目标,即sink——HDFS文件系统  :  hdfs sink

    3.source和sink之间的传递通道——channel,可用file channel 也可以用内存channel

     

    新建配置文件

    #定义三大组件的名称
    agent1.sources = source1
    agent1.sinks = sink1
    agent1.channels = channel1
    
    # 配置source组件
    agent1.sources.source1.type = spooldir(监听的文件不能重复)
    agent1.sources.source1.spoolDir =/home/hadoop/logs/
    agent1.sources.source1.fileHeader = false
    
    #配置拦截器
    agent1.sources.source1.interceptors = i1
    agent1.sources.source1.interceptors.i1.type = host
    agent1.sources.source1.interceptors.i1.hostHeader = hostname
    
    # 配置sink组件
    agent1.sinks.sink1.type = hdfs
    agent1.sinks.sink1.hdfs.path =hdfs://node1/weblog/flume-collection/%y-%m-%d/
    agent1.sinks.sink1.hdfs.filePrefix = access_log
    agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
    agent1.sinks.sink1.hdfs.batchSize= 100
    agent1.sinks.sink1.hdfs.fileType = DataStream
    agent1.sinks.sink1.hdfs.writeFormat =Text
    agent1.sinks.sink1.hdfs.rollSize = 102400
    agent1.sinks.sink1.hdfs.rollCount = 1000000
    agent1.sinks.sink1.hdfs.rollInterval = 60
    #agent1.sinks.sink1.hdfs.round = true
    #agent1.sinks.sink1.hdfs.roundValue = 10
    #agent1.sinks.sink1.hdfs.roundUnit = minute
    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
    # Use a channel which buffers events in memory
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.keep-alive = 120
    agent1.channels.channel1.capacity = 500000
    agent1.channels.channel1.transactionCapacity = 600
    
    # Bind the source and sink to the channel
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1

     

     在本地创建目录

     

    运行flume

    bin/flume-ng agent -c conf -f conf/spooldir.conf -n agent1  -Dflume.root.logger=INFO,console

     

     把a.txt数据文件拷贝到被监听的本地目录下

     HDFS上多了个目录

     我们再上传一个数据文件到本地的监听目录下

     

     

     

     

    采集文件到HDFS

    采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs

    根据需求,首先定义以下3大要素

    1.  采集源,即source——监控文件内容更新 :  exec  ‘tail -F file’

    即时输出文件变化后追加的数据。
    tail -f file 动态跟踪文件file的增长情况,tail会每隔一秒去检查一下文件是否增加新的内容。如果增加就追加在原来的输出后面显示。但这种情况,必须保证在执行tail命令时,文件已经存在。

    2.下沉目标,即sink——HDFS文件系统  :  hdfs sink

    3. Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel

     

     

    首先创建一个配置文件

    agent1.sources = source1
    agent1.sinks = sink1
    agent1.channels = channel1
    
    # Describe/configure tail -F source1
    agent1.sources.source1.type = exec
    agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log
    agent1.sources.source1.channels = channel1
    
    #configure host for source
    agent1.sources.source1.interceptors = i1
    agent1.sources.source1.interceptors.i1.type = host
    agent1.sources.source1.interceptors.i1.hostHeader = hostname
    
    # Describe sink1
    agent1.sinks.sink1.type = hdfs
    #a1.sinks.k1.channel = c1
    agent1.sinks.sink1.hdfs.path =hdfs://node1/weblog/flume/%y-%m-%d/
    agent1.sinks.sink1.hdfs.filePrefix = access_log
    agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
    agent1.sinks.sink1.hdfs.batchSize= 100
    agent1.sinks.sink1.hdfs.fileType = DataStream
    agent1.sinks.sink1.hdfs.writeFormat =Text
    agent1.sinks.sink1.hdfs.rollSize = 102400
    agent1.sinks.sink1.hdfs.rollCount = 1000000
    agent1.sinks.sink1.hdfs.rollInterval = 60
    #agent1.sinks.sink1.hdfs.round = true
    #agent1.sinks.sink1.hdfs.roundValue = 10
    #agent1.sinks.sink1.hdfs.roundUnit = minute
    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
    
    # Use a channel which buffers events in memory
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.keep-alive = 120
    agent1.channels.channel1.capacity = 500000
    agent1.channels.channel1.transactionCapacity = 600
    
    # Bind the source and sink to the channel
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1

    配置完后就启动flume

    bin/flume-ng agent -c conf -f conf/exec.conf -n agent1  -Dflume.root.logger=INFO,console

     先在本地的被监听目录下创建log日志文件,并往改文件写入内容

     

     

    可以看到HDFS的目录上产生了对于的日志文件

     

     我们给监听文件继续追加内容

    HDFS同时会更新日志文件

     

     

     

     

  • 相关阅读:
    array_map()与array_shift()搭配使用 PK array_column()函数
    Educational Codeforces Round 8 D. Magic Numbers
    hdu 1171 Big Event in HDU
    hdu 2844 poj 1742 Coins
    hdu 3591 The trouble of Xiaoqian
    hdu 2079 选课时间
    hdu 2191 珍惜现在,感恩生活 多重背包入门题
    hdu 5429 Geometric Progression 高精度浮点数(java版本)
    【BZOJ】1002: [FJOI2007]轮状病毒 递推+高精度
    hdu::1002 A + B Problem II
  • 原文地址:https://www.cnblogs.com/braveym/p/10874747.html
Copyright © 2011-2022 走看看