zoukankan      html  css  js  c++  java
  • spark streaming集成flume

    1、 安装flume

    flume安装,解压后修改flume_env.sh配置文件,指定java_home即可。

    cp hdfs jar包到flume lib目录下(否则无法抽取数据到hdfs上):

    $ cp /opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/share/hadoop/hdfs/hadoop-hdfs-2.5.0-cdh5.3.6.jar

    /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6-bin/lib/

    2、 spark streaming集成flume

    2.1)编译spark,获得集成flume jar包:

    参考文档:http://www.cnblogs.com/wcwen1990/p/7688027.html

    说明:spark streaming集成flume或者kafka需要一些支持jar包,这些jar包在编译spark过程中会自动在external目录下生成相应的jar文件,因此,这里需要编译spark来获得这些jar包。

    Spark streaming集成flume主要需要:spark-streaming-flume_2.10-1.3.0.jar包。

    2.2)集成jar包

    $mkdir –p /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs

    $cp spark-1.3.0/external/flume/target/spark-streaming-flume_2.10-1.3.0.jar

    /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/

    $ pwd

    /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6-bin/lib

    $ cp flume-avro-source-1.5.0-cdh5.3.6.jar flume-ng-sdk-1.5.0-cdh5.3.6.jar

    /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/

    $ cd /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/

    $ ll

    flume-avro-source-1.5.0-cdh5.3.6.jar

    flume-ng-sdk-1.5.0-cdh5.3.6.jar

    spark-streaming-flume_2.10-1.3.0.jar

    3、 编译flume配置文件(配置sources、channel、sink):

    $ cat flume-spark-push.conf

    ss.sources = sss

    ss.channels = ssc

    ss.sinks = ssk

    ss.sources.sss.type = exec

    ss.sources.sss.command = tail -f /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6-bin/wctotal.log

    ss.sources.sss.shell = /bin/bash -c

    ss.channels.ssc.type = memory

    ss.channels.ssc.capacity = 1000

    ss.channels.ssc.transactionCapacity = 100

    ss.sinks.ssk.type = avro

    ss.sinks.ssk.hostname = chavin.king

    ss.sinks.ssk.port = 9999

    ss.sources.sss.channels = ssc

    ss.sinks.ssk.channel = ssc

    4、 编写spark streaming程序:

    import org.apache.spark._

    import org.apache.spark.streaming._

    import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.streaming.flume._

    import org.apache.spark.storage.StorageLevel

    val ssc = new StreamingContext(sc, Seconds(5))

    // read data

    val stream = FlumeUtils.createStream(ssc, "chavin.king", 9999, StorageLevel.MEMORY_ONLY_SER_2)

    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()

    ssc.start() // Start the computation

    ssc.awaitTermination() // Wait for the computation to terminate

    5、 spark-shell local模式测试spark streaming集成flume

    $ bin/spark-shell --master local[2] --jars

    /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/spark-streaming-flume_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/flume-avro-source-1.5.0-cdh5.3.6.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/flume-ng-sdk-1.5.0-cdh5.3.6.jar

    执行步骤4中程序:

    scala> import org.apache.spark._

    import org.apache.spark._

    scala> import org.apache.spark.streaming._

    import org.apache.spark.streaming._

    scala> import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.streaming.StreamingContext._

    scala> import org.apache.spark.streaming.flume._

    import org.apache.spark.streaming.flume._

    scala> import org.apache.spark.storage.StorageLevel

    import org.apache.spark.storage.StorageLevel

    scala> val ssc = new StreamingContext(sc, Seconds(5))

    ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@412dea3c

    scala> val stream = FlumeUtils.createStream(ssc, "chavin.king", 9999, StorageLevel.MEMORY_ONLY_SER_2)

    stream: org.apache.spark.streaming.dstream.ReceiverInputDStream[org.apache.spark.streaming.flume.SparkFlumeEvent] = org.apache.spark.streaming.flume.FlumeInputDStream@2bf9884

    scala> stream.count().map(cnt => "Received " + cnt + " flume events." ).print()

    //输入以下命令启动spark streaming

    scala> ssc.start()

    scala> ssc.awaitTermination()

    6、 启动flume

    bin/flume-ng agent -c conf -n ss -f conf/flume-spark-push.conf -Dflume.root.logger=DEBUG,console

    7、 测试spark streaming集成flume:

    $ echo “hadoop oracle mysql” >>/opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6-bin/wctotal.log

    执行上边命令,可以在spark streaming命令行界面下看到如下内容:

    -------------------------------------------

    Time: 1499976790000 ms

    -------------------------------------------

    Received 1 flume events.

    8、参考文档:http://spark.apache.org/docs/1.3.0/streaming-flume-integration.html

  • 相关阅读:
    HDU 3401 Trade
    POJ 1151 Atlantis
    HDU 3415 Max Sum of MaxKsubsequence
    HDU 4234 Moving Points
    HDU 4258 Covered Walkway
    HDU 4391 Paint The Wall
    HDU 1199 Color the Ball
    HDU 4374 One hundred layer
    HDU 3507 Print Article
    GCC特性之__init修饰解析 kasalyn的专栏 博客频道 CSDN.NET
  • 原文地址:https://www.cnblogs.com/wcwen1990/p/7898168.html
Copyright © 2011-2022 走看看