zoukankan      html  css  js  c++  java
  • Flume 多个agent串联

    多个agent串联

    采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs,使用agent串联

     

     

     

     

    根据需求,首先定义以下3大要素

    第一台flume agent

    l  采集源,即source——监控文件内容更新 :        exec  ‘tail -F file’

    l  下沉目标,即sink——数据的发送者,实现序列化  :  avro sink

    l  Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel

    第二台flume agent

    l  采集源,即source——接受数据。并实现反序列化 : avro source

    l  下沉目标,即sink——HDFS文件系统 :  HDFS sink

    l  Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel

    配置文件编写:

    Flume-agent1:tail-avro-avro-logger.conf

    #tail-avro-avro-logger.conf

    # Name the components on this agent

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

     

    # Describe/configure the source

    a1.sources.r1.type = exec

    a1.sources.r1.command = tail -F

    /home/hadoop/bigdatasoftware/datas/access.log

    a1.sources.r1.channels = c1

     

    # Describe the sink

    ##sink端的avro是一个数据发送者

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = hadoop-001

    a1.sinks.k1.port = 41414

    a1.sinks.k1.batch-size = 10

     

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

     

    Flume-agent2: avro-hdfs.conf

    a1.sources = r1

    a1.sinks =s1

    a1.channels = c1

     

    ##source中的avro组件是一个接收者服务

    a1.sources.r1.type = avro

    a1.sources.r1.bind = 0.0.0.0

    a1.sources.r1.port = 41414

     

    a1.sinks.s1.type=hdfs

    a1.sinks.s1.hdfs.path=hdfs://hadoop-001:9000/logs/flume/

    a1.sinks.s1.hdfs.filePrefix = access_log

    a1.sinks.s1.hdfs.batchSize= 100

    a1.sinks.s1.hdfs.fileType = DataStream

    a1.sinks.s1.hdfs.writeFormat =Text

    a1.sinks.s1.hdfs.rollSize = 10240

    a1.sinks.s1.hdfs.rollCount = 1000

    a1.sinks.s1.hdfs.rollInterval = 10

    a1.sinks.s1.hdfs.round = true

    a1.sinks.s1.hdfs.roundValue = 10

    a1.sinks.s1.hdfs.roundUnit = minute

     

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

     

    a1.sources.r1.channels = c1

    a1.sinks.s1.channel = c1

     

     输入执行flume指令:

    第一个终端:


     ./bin/flume-ng agent -c conf -f /home/hadoop/bigdatasoftware/flume-1.5.0/conf/avro-hdfs.conf -n a1 -Dflume.root.logger=INFO,console


     第二个终端:


    ./bin/flume-ng agent -c conf -f /home/hadoop/bigdatasoftware/flume-1.5.0/conf/tail-avro-avro-logger.conf -n a1 -Dflume.root.logger=INFO,console


    第三个终端

    在/home/hadoop/bigdatasoftware/datas/access.log文件中添加数据

     查看hdfs

    cat一下

  • 相关阅读:
    C++test 关于resource参数和include/exclude参数说明
    开发程序 ip 127.0.0.0 0.0.0.0原理与区别
    安装rails旧版本出错bin_path': can't find gem railties (["3.0.5"]) with executable rails (Gem::GemNotFoundException)
    rails excel的创建
    ruby nil? empty? blank? 的区别
    rails 调试工具pry 换掉debugger 和 rails c
    rails respond_to 的原理与使用
    rails transaction 的用法
    ssh的传送文件命令
    出现了pid的错误A server is already running. 和如何改变webrick的端口值
  • 原文地址:https://www.cnblogs.com/Transkai/p/10628425.html
Copyright © 2011-2022 走看看