zoukankan      html  css  js  c++  java
  • Kafka 与flume的整合

    flume与kafka的整合

    第一步:配置flume的conf文件

    TaildirSource-kafka.conf


    agent1.sources = source1
    agent1.sinks = sink1
    agent1.channels = channel1

    agent1.sources.source1.type = TAILDIR
    agent1.sources.source1.positionFile = /home/hadoop/bigdatasoftware/datas/nginx/taildir_position.json
    agent1.sources.source1.filegroups = f1
    agent1.sources.source1.filegroups.f1 =/home/hadoop/bigdatasoftware/datas/nginx/access_*.log

    agent1.sources.source1.interceptors = i1
    agent1.sources.source1.interceptors.i1.type = host
    agent1.sources.source1.interceptors.i1.hostHeader = hostname

    #配置sink组件为hdfs
    agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
    agent1.sinks.sink1.kafka.topic = third
    agent1.sinks.sink1.kafka.bootstrap.servers = hadoop-001:9092,hadoop-002:9092,hadoop-003:9092
    agent1.sinks.sink1.kafka.flumeBatchSize = 20
    agent1.sinks.sink1.kafka.producer.acks = 1

    #指定文件名前缀
    agent1.sinks.sink1.hdfs.filePrefix = access_log
    #指定每批下沉数据的记录条数
    agent1.sinks.sink1.hdfs.batchSize= 100
    agent1.sinks.sink1.hdfs.fileType = DataStream
    agent1.sinks.sink1.hdfs.writeFormat =Text
    #指定下沉文件按1G大小滚动
    agent1.sinks.sink1.hdfs.rollSize = 1024*1024*1024
    #指定下沉文件按1000000条数滚动
    agent1.sinks.sink1.hdfs.rollCount = 1000000
    #指定下沉文件按30分钟滚动
    #agent1.sinks.sink1.hdfs.rollInterval = 30
    #agent1.sinks.sink1.hdfs.round = true
    agent1.sinks.sink1.hdfs.roundValue = 1
    #agent1.sinks.sink1.hdfs.roundUnit = minute
    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

    #使用memory类型channel
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.capacity = 500000

    agent1.channels.channel1.transactionCapacity = 600

    # Bind the source and sink to the channel
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1

     


    启动flume


    bin/flume-ng agent --conf conf --conf-file conf/TaildirSource-kafka.conf --name agent1 -Dflume.root.logger=INFO,console


    创建消费者


    kafka-console-consumer.sh --zookeeper hadoop-001:2181 --from-beginning --topic third


    对access.log文件增加内容

    消费者中会出现消息

  • 相关阅读:
    性能监控(5)–JAVA下的jstat命令
    内存分析工具-MAT(Memory Analyzer Tool)
    性能监控(4)–linux下的pidstat命令
    性能监控(3)–linux下的iostat命令
    性能监控(2)–linux下的vmstat命令
    性能监控(1)--linux下的top命令
    了解java虚拟机—在TALB上分配对象(10)
    了解java虚拟机—G1回收器(9)
    js 长按鼠标左键实现溢出内容左右滚动滚动
    html标签设置contenteditable时,去除粘贴文本自带样式
  • 原文地址:https://www.cnblogs.com/Transkai/p/10877689.html
Copyright © 2011-2022 走看看