zoukankan      html  css  js  c++  java
  • Kafka 与flume的整合

    flume与kafka的整合

    第一步:配置flume的conf文件

    TaildirSource-kafka.conf


    agent1.sources = source1
    agent1.sinks = sink1
    agent1.channels = channel1

    agent1.sources.source1.type = TAILDIR
    agent1.sources.source1.positionFile = /home/hadoop/bigdatasoftware/datas/nginx/taildir_position.json
    agent1.sources.source1.filegroups = f1
    agent1.sources.source1.filegroups.f1 =/home/hadoop/bigdatasoftware/datas/nginx/access_*.log

    agent1.sources.source1.interceptors = i1
    agent1.sources.source1.interceptors.i1.type = host
    agent1.sources.source1.interceptors.i1.hostHeader = hostname

    #配置sink组件为hdfs
    agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
    agent1.sinks.sink1.kafka.topic = third
    agent1.sinks.sink1.kafka.bootstrap.servers = hadoop-001:9092,hadoop-002:9092,hadoop-003:9092
    agent1.sinks.sink1.kafka.flumeBatchSize = 20
    agent1.sinks.sink1.kafka.producer.acks = 1

    #指定文件名前缀
    agent1.sinks.sink1.hdfs.filePrefix = access_log
    #指定每批下沉数据的记录条数
    agent1.sinks.sink1.hdfs.batchSize= 100
    agent1.sinks.sink1.hdfs.fileType = DataStream
    agent1.sinks.sink1.hdfs.writeFormat =Text
    #指定下沉文件按1G大小滚动
    agent1.sinks.sink1.hdfs.rollSize = 1024*1024*1024
    #指定下沉文件按1000000条数滚动
    agent1.sinks.sink1.hdfs.rollCount = 1000000
    #指定下沉文件按30分钟滚动
    #agent1.sinks.sink1.hdfs.rollInterval = 30
    #agent1.sinks.sink1.hdfs.round = true
    agent1.sinks.sink1.hdfs.roundValue = 1
    #agent1.sinks.sink1.hdfs.roundUnit = minute
    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

    #使用memory类型channel
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.capacity = 500000

    agent1.channels.channel1.transactionCapacity = 600

    # Bind the source and sink to the channel
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1

     


    启动flume


    bin/flume-ng agent --conf conf --conf-file conf/TaildirSource-kafka.conf --name agent1 -Dflume.root.logger=INFO,console


    创建消费者


    kafka-console-consumer.sh --zookeeper hadoop-001:2181 --from-beginning --topic third


    对access.log文件增加内容

    消费者中会出现消息

  • 相关阅读:
    linux基础学习6
    Linux基础学习5
    Linux基础学习4
    Linux基础学习3
    Linux基础学习2
    ASP.NET MVC学习——控制器传递数据到view的简单案例
    转载:URL链接中的不同用处
    MVC学习的心路历程
    45道SQL数据题详细超基础解析
    结束基础,开始MVC之旅!
  • 原文地址:https://www.cnblogs.com/Transkai/p/10877689.html
Copyright © 2011-2022 走看看