zoukankan      html  css  js  c++  java
  • [Flume][Kafka]Flume 与 Kakfa结合例子(Kakfa 作为flume 的sink 输出到 Kafka topic)


    Flume 与 Kakfa结合例子(Kakfa 作为flume 的sink 输出到 Kafka topic)


    进行准备工作:

    $sudo mkdir -p /flume/web_spooldir
    $sudo chmod a+w -R /flume

    编辑 flume的配置文件:

    $ cat /home/tester/flafka/spooldir_kafka.conf

    # Name the components on this agent
    agent1.sources = weblogsrc
    agent1.sinks = kafka-sink
    agent1.channels = memchannel

    # Configure the source
    agent1.sources.weblogsrc.type = spooldir
    agent1.sources.weblogsrc.spoolDir = /flume/web_spooldir
    agent1.sources.weblogsrc.channels = memchannel

    # Configure the sink
    agent1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
    agent1.sinks.kafka-sink.topic = weblogs
    agent1.sinks.kafka-sink.brokerList = localhost:9092
    agent1.sinks.kafka-sink.batchSize = 20
    agent1.sinks.kafka-sink.channel = memchannel

    # Use a channel which buffers events in memory
    agent1.channels.memchannel.type = memory
    agent1.channels.memchannel.capacity = 100000
    agent1.channels.memchannel.transactionCapacity = 1000
    $

    运行 Flume-ng:

    $ flume-ng agent --conf /etc/flume-ng/conf 
    > --conf-file spooldir_kafka.conf
    > --name agent1 -Dflume.root.logger=INFO,console

    输出类似如下:

    Info: Sourcing environment configuration script /etc/flume-ng/conf/flume-env.sh
    Info: Including Hadoop libraries found via (/usr/bin/hadoop) for HDFS access
    Info: Excluding /usr/lib/hadoop/lib/slf4j-api-1.7.5.jar from classpath
    Info: Excluding /usr/lib/hadoop/lib/slf4j-log4j12.jar from classpath
    Info: Including HBASE libraries found via (/usr/bin/hbase) for HBASE access
    Info: Excluding /usr/lib/hbase/bin/../lib/slf4j-api-1.7.5.jar from classpath
    Info: Excluding /usr/lib/hbase/bin/../lib/slf4j-log4j12.jar from classpath
    Info: Excluding /usr/lib/hadoop/lib/slf4j-api-1.7.5.jar from classpath
    Info: Excluding /usr/lib/hadoop/lib/slf4j-log4j12.jar from classpath
    Info: Excluding /usr/lib/hadoop/lib/slf4j-api-1.7.5.jar from classpath
    Info: Excluding /usr/lib/hadoop/lib/slf4j-log4j12.jar from classpath
    Info: Excluding /usr/lib/zookeeper/lib/slf4j-api-1.7.5.jar from classpath
    Info: Excluding /usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar from classpath
    Info: Excluding /usr/lib/zookeeper/lib/slf4j-log4j12.jar from classpath
    Info: Including Hive libraries found via () for Hive access
    + exec /usr/java/default/bin/java -Xmx500m -Dflume.root.logger=INFO,console -cp '/etc/flume-ng/conf:/usr/lib/flume- 
    ng/lib/*:/etc/hadoop/conf:/usr/lib/hadoop/lib/activation-1.1.jar:/usr/lib/hadoop/lib/apacheds-i18n-2.0.0-M15.jar

    ...

    -Djava.library.path=:/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native:/usr/lib/hbase/bin/../lib/native/Linux-amd64-64 
    org.apache.flume.node.Application --conf-file spooldir_kafka.conf --name agent1
    2017-10-23 01:15:11,209 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start 
    (PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
    2017-10-23 01:15:11,223 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider 
    $FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:spooldir_kafka.conf
    2017-10-23 01:15:11,256 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty 
    (FlumeConfiguration.java:1017)] Processing:kafka-sink

    ...

    2017-10-23 01:15:11,933 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start 
    (MonitoredCounterGroup.java:96)] Component type: SOURCE, name: weblogsrc started
    2017-10-23 01:15:13,003 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Verifying properties
    2017-10-23 01:15:13,271 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property 
    key.serializer.class is overridden to kafka.serializer.StringEncoder
    2017-10-23 01:15:13,271 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property 
    metadata.broker.list is overridden to localhost:9092
    2017-10-23 01:15:13,277 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property 
    request.required.acks is overridden to 1
    2017-10-23 01:15:13,277 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property serializer.class 
    is overridden to kafka.serializer.DefaultEncoder
    2017-10-23 01:15:13,718 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register 
    (MonitoredCounterGroup.java:120)] Monitored counter group for type: SINK, name: kafka-sink: Successfully registered new MBean.
    2017-10-23 01:15:13,719 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start 
    (MonitoredCounterGroup.java:96)] Component type: SINK, name: kafka-sink started
    ...

    2017-10-23 01:15:13,720 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents 
    (ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
    2017-10-23 01:15:13,720 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile 
    (ReliableSpoolingFileEventReader.java:348)] Preparing to move file /flume/web_spooldir/2014-01-13.log to 
    /flume/web_spooldir/2014-01-13.log.COMPLETED

    ..

    2017-10-23 01:16:11,441 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents 
    (ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
    2017-10-23 01:16:11,451 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile 
    (ReliableSpoolingFileEventReader.java:348)] Preparing to move file /flume/web_spooldir/2014-01-24.log to 
    /flume/web_spooldir/2014-01-24.log.COMPLETED
    2017-10-23 01:16:11,818 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents 
    (ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
    2017-10-23 01:16:11,819 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile 
    (ReliableSpoolingFileEventReader.java:348)] Preparing to move file /flume/web_spooldir/2014-02-15.log to 
    /flume/web_spooldir/2014-02-15.log.COMPLETED

    执行kafka consumer 程序:

    $kafka-console-consumer --zookeeper localhost:2181 --topic weblogs

    在另外的一个终端窗口,向/flume/web_spooldir 目录输入 web log:

    cp -rf /home/tester/weblogs /tmp/tmp_weblogs
    mv /tmp/tmp_weblogs/* /flume/web_spooldir
    rm -rf /tmp/tmp_weblogs

    Flume-ng 窗口显示的内容(正在传输log文件到Kafka topic weblogs):

    2017-10-23 01:36:28,436 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents 
    (ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
    2017-10-23 01:36:28,449 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile 
    (ReliableSpoolingFileEventReader.java:348)] Preparing to move file /flume/web_spooldir/2013-09-22.log to 
    /flume/web_spooldir/2013-09-22.log.COMPLETED
    2017-10-23 01:36:28,971 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents 
    (ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
    ...

    2017-10-23 01:37:39,011 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile 
    (ReliableSpoolingFileEventReader.java:348)] Preparing to move file /flume/web_spooldir/2014-02-19.log to 
    /flume/web_spooldir/2014-02-19.log.COMPLETED
    2017-10-23 01:37:39,386 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents 
    (ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
    2017-10-23 01:37:39,386 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile 
    (ReliableSpoolingFileEventReader.java:348)] Preparing to move file /flume/web_spooldir/2014-03-09.log to 
    /flume/web_spooldir/2014-03-09.log.COMPLETED


    Consumer 窗口,输出 所有 web 文件的内容(接收 topic weblogs,获得所有web log 内容):

    ...

    213.125.211.10 - 66543 [09/Mar/2014:00:00:14 +0100] "GET /KBDOC-00131.html HTTP/1.0" 200 9807 "http://www.tester.com" "tester 
    test 001"
    213.125.211.10 - 66543 [09/Mar/2014:00:00:14 +0100] "GET /theme.css HTTP/1.0" 200 6448 "http://www.tester.com" "tester test 002"

    $kafka-console-consumer --zookeeper localhost:2181 --topic weblogs

  • 相关阅读:
    大数问题(三)(大数相除)
    直接插入排序的四种实现方法
    蟠桃记
    杭电oj find your present (2)
    CSS中的class与id区别及用法
    史上最全的css hack(ie6-9,firefox,chrome,opera,safari) (zz)
    CSS之CSS hack
    HTML语言的一些元素(五)
    HTML语言的一些元素(四)
    HTML语言的一些元素(三)
  • 原文地址:https://www.cnblogs.com/gaojian/p/7718832.html
Copyright © 2011-2022 走看看