zoukankan      html  css  js  c++  java
  • flume(2)

    接续上一篇:https://www.cnblogs.com/metianzing/p/9511852.html

    这里也是主要记录配置文件。

    以上一篇案例五为基础,考虑到日志服务器和采集日志的服务器往往不是一台,本篇采用多agent的形式。

    agent1:  source:TAILDIR

          sink:avro

    agent2:  source:avro

          sink:kafka

    1>agent1配置文件

    cd /usr/local/flume/conf
    vim agent1_abtd.conf
    

    编写 agent1_abtd.conf,内容如下:

    # Name the components on this agent
    agent1.sources = s1
    agent1.sinks = k1
    agent1.channels = ch1
    
    # Describe/configure the source
    agent1.sources.s1.type = TAILDIR
    agent1.sources.s1.positionFile = /opt/classiclaw/nginx/logs/abtd_magent/taildir_position.json
    agent1.sources.s1.filegroups = f1
    agent1.sources.s1.filegroups.f1 = /opt/classiclaw/nginx/logs/access.log.*
    agent1.sources.s1.headers.f1.headerKey1 = value1
    agent1.sources.s1.fileHeader = true
    
    # Describe the sink
    agent1.sinks.k1.type = avro
    agent1.sinks.k1.hostname = ubuntu
    agent1.sinks.k1.port = 44444
    
    # Set channel
    agent1.channels.ch1.type = file
    agent1.channels.ch1.checkpointDir = /opt/classiclaw/nginx/logs/flume_data/checkpoint
    agent1.channels.ch1.dataDirs = /opt/classiclaw/nginx/logs/flume_data/data
    
    # bind
    agent1.sources.s1.channels = ch1
    agent1.sinks.k1.channel = ch1
    

    2>  agent2配置文件

    cd /usr/local/flume/conf
    vim agent2_abtd.conf

    编写agent2_abtd.conf  ,内容如下:

    # Name the components on this agent
    agent2.sources = s2
    agent2.sinks = k2
    agent2.channels = ch2
    
    # Describe/configure the source
    agent2.sources.s2.type = avro
    agent2.sources.s2.bind = 0.0.0.0
    agent2.sources.s2.port = 44444
    
    # Describe the sink
    agent2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    agent2.sinks.k2.brokerList = ubuntu:9092
    agent2.sinks.k2.topic = abtd_magent
    agent2.sinks.k2.kafka.flumeBatchSize = 20
    agent2.sinks.k2.kafka.producer.acks = 1
    agent2.sinks.k2.kafka.producer.linger.ms = 1
    agent2.sinks.k2.kafka.producer.compression.type = snappy
    
    # set channel
    agent2.channels.ch2.type = memory
    agent2.channels.ch2.capacity = 1000000
    agent2.channels.ch2.transactionCapacity = 1000000
    
    # Bind the source and sink to the channel
    agent2.sources.s2.channels = ch2
    agent2.sinks.k2.channel = ch2
    

    3>创建 topic:abtd_magent

    前提是已经启动zookeeper和Kafka。

    cd /usr/local/kafka
    ./bin/kafka-topics.sh --zookeeper ubuntu:2181 --create --topic abtd_magent --replication-factor 1 --partitions 3 
    

    4>启动agent2

    cd /usr/local/flume
    ./bin/flume-ng agent --name agent2 --conf conf/ --conf-file conf/agent2_abtd.conf -Dflume.root.logger=INFO,console
    

    5>启动agent1

    cd /usr/local/flume
    ./bin/flume-ng agent --name agent1 --conf conf/ --conf-file conf/agent1_abtd.conf -Dflume.root.logger=INFO,console

    6>agent1 :添加文件到flume source目录  

    cd /opt/classiclaw/nginx/logs
    echo -e "this is a test file! 
    http://www.aboutyun.com20170820">access.log.1
    

    7>查看kafka consumer

    agent1:  

  • 相关阅读:
    浅谈 iOS 之 Crash log 符号化
    聊聊 Statsd 和 Collectd 那点事!
    如何使用 Zend Expressive 建立 NASA 图片库?
    Nagios 邮箱告警的方式太OUT了!
    如何从软硬件层面提升 Android 动画性能?
    这样查看告警邮件要慢一点……
    Android 共享文件的 Runtime 权限
    第38节:hashCode()与toString()与equals()函数的作用,内部类和匿名内部类
    第37节:多线程安全问题
    第37节:多线程安全问题
  • 原文地址:https://www.cnblogs.com/metianzing/p/9515873.html
Copyright © 2011-2022 走看看