zoukankan      html  css  js  c++  java
  • spark streaming中使用flume数据源

    有两种方式,一种是sparkstreaming中的driver起监听,flume来推数据;另一种是sparkstreaming按照时间策略轮训的向flume拉数据。

    最开始我以为只有第一种方法,但是尼玛问题在于driver起来的结点是没谱的,所以每次我重启streaming后发现尼玛每次都要修改flume的sinks,蛋疼死了,后来才发现有后面的方法,好吧,把不同的方法代码写出来,其实变化不大。(代码转自官方的githup)

    第一种,监听端口:

    package org.apache.spark.examples.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.flume._
    import org.apache.spark.util.IntParam
    
    /**
     *  Produces a count of events received from Flume.
     *
     *  This should be used in conjunction with an AvroSink in Flume. It will start
     *  an Avro server on at the request host:port address and listen for requests.
     *  Your Flume AvroSink should be pointed to this address.
     *
     *  Usage: FlumeEventCount <host> <port>
     *    <host> is the host the Flume receiver will be started on - a receiver
     *           creates a server and listens for flume events.
     *    <port> is the port the Flume receiver will listen on.
     *
     *  To run this example:
     *    `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
     */
    object FlumeEventCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println(
            "Usage: FlumeEventCount <host> <port>")
          System.exit(1)
        }
    
        StreamingExamples.setStreamingLogLevels()
    
        val Array(host, IntParam(port)) = args
    
        val batchInterval = Milliseconds(2000)
    
        // Create the context and set the batch size
        val sparkConf = new SparkConf().setAppName("FlumeEventCount")
        val ssc = new StreamingContext(sparkConf, batchInterval)
    
        // Create a flume stream
        val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
    
        // Print out the count of events received from this server in each batch
        stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    第二种是轮训主动向flume拿数据

    package org.apache.spark.examples.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.flume._
    import org.apache.spark.util.IntParam
    import java.net.InetSocketAddress
    
    /**
     *  Produces a count of events received from Flume.
     *
     *  This should be used in conjunction with the Spark Sink running in a Flume agent. See
     *  the Spark Streaming programming guide for more details.
     *
     *  Usage: FlumePollingEventCount <host> <port>
     *    `host` is the host on which the Spark Sink is running.
     *    `port` is the port at which the Spark Sink is listening.
     *
     *  To run this example:
     *    `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
     */
    object FlumePollingEventCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println(
            "Usage: FlumePollingEventCount <host> <port>")
          System.exit(1)
        }
    
        StreamingExamples.setStreamingLogLevels()
    
        val Array(host, IntParam(port)) = args
    
        val batchInterval = Milliseconds(2000)
    
        // Create the context and set the batch size
        val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
        val ssc = new StreamingContext(sparkConf, batchInterval)
    
        // Create a flume stream that polls the Spark Sink running in a Flume agent
        val stream = FlumeUtils.createPollingStream(ssc, host, port)
    
        // Print out the count of events received from this server in each batch
        stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

      

  • 相关阅读:
    FastWeb2011互联网工具发布
    本站启用新型试用模式
    TDiode单相及三相整流电路辅助设计器发布
    DCOp直流多级电路计算机辅助设计软件下载
    OpDesign2阻容耦合放大电路辅助设计软件下载
    共享软件的思考
    IpAdr网络地址计算器2011发布
    使用JavaScript动态添加复选框Checkbox
    JQuery操作checkbox、radio
    如果Oracle中的字段和数据库的系统字段重名,怎么配置NHibernate的映射实体文件
  • 原文地址:https://www.cnblogs.com/hark0623/p/4499503.html
Copyright © 2011-2022 走看看