zoukankan      html  css  js  c++  java
  • Spark Streaming中向flume拉取数据

    在这里看到的解决方法

    https://issues.apache.org/jira/browse/SPARK-1729

    请是个人理解,有问题请大家留言。

    其实本身flume是不支持像KAFKA一样的发布/订阅功能的,也就是说无法让spark去flume拉取数据,所以老外就想了个取巧的办法。

    在flume中其实sinks是向channel主动拿数据的,那么就让就自定义sinks进行自监听,然后使sparkstreaming先和sinks连接在一起, 让streaming来决定是否拿数据及拿数据的频率, 那么这不就是实现了由streaming来向flume拿数据的需求了嘛?

    你看,真是聪明人的作法,但我觉得吧,如果真的有发布/订阅的需求,其实还是上KAFKA吧…

    最后,现在来说一下应该怎么去使用

    首先,需要将以下代码编译成jar包,然后在flume中使用,代码转自这里 (如果发现需要依赖的工具类神马的,请在相同目录下的scala文件中找一找)

    package org.apache.spark.streaming.flume.sink
    
    import java.net.InetSocketAddress
    import java.util.concurrent._
    
    import org.apache.avro.ipc.NettyServer
    import org.apache.avro.ipc.specific.SpecificResponder
    import org.apache.flume.Context
    import org.apache.flume.Sink.Status
    import org.apache.flume.conf.{Configurable, ConfigurationException}
    import org.apache.flume.sink.AbstractSink
    
    /**
     * A sink that uses Avro RPC to run a server that can be polled by Spark's
     * FlumePollingInputDStream. This sink has the following configuration parameters:
     *
     * hostname - The hostname to bind to. Default: 0.0.0.0
     * port - The port to bind to. (No default - mandatory)
     * timeout - Time in seconds after which a transaction is rolled back,
     * if an ACK is not received from Spark within that time
     * threads - Number of threads to use to receive requests from Spark (Default: 10)
     *
     * This sink is unlike other Flume sinks in the sense that it does not push data,
     * instead the process method in this sink simply blocks the SinkRunner the first time it is
     * called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol.
     *
     * Each time a getEventBatch call comes, creates a transaction and reads events
     * from the channel. When enough events are read, the events are sent to the Spark receiver and
     * the thread itself is blocked and a reference to it saved off.
     *
     * When the ack for that batch is received,
     * the thread which created the transaction is is retrieved and it commits the transaction with the
     * channel from the same thread it was originally created in (since Flume transactions are
     * thread local). If a nack is received instead, the sink rolls back the transaction. If no ack
     * is received within the specified timeout, the transaction is rolled back too. If an ack comes
     * after that, it is simply ignored and the events get re-sent.
     *
     */
    
    class SparkSink extends AbstractSink with Logging with Configurable {
    
      // Size of the pool to use for holding transaction processors.
      private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS
    
      // Timeout for each transaction. If spark does not respond in this much time,
      // rollback the transaction
      private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT
    
      // Address info to bind on
      private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
      private var port: Int = 0
    
      private var backOffInterval: Int = 200
    
      // Handle to the server
      private var serverOpt: Option[NettyServer] = None
    
      // The handler that handles the callback from Avro
      private var handler: Option[SparkAvroCallbackHandler] = None
    
      // Latch that blocks off the Flume framework from wasting 1 thread.
      private val blockingLatch = new CountDownLatch(1)
    
      override def start() {
        logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " +
          hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " +
          transactionTimeout + ".")
        handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout,
          backOffInterval))
        val responder = new SpecificResponder(classOf[SparkFlumeProtocol], handler.get)
        // Using the constructor that takes specific thread-pools requires bringing in netty
        // dependencies which are being excluded in the build. In practice,
        // Netty dependencies are already available on the JVM as Flume would have pulled them in.
        serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
        serverOpt.foreach(server => {
          logInfo("Starting Avro server for sink: " + getName)
          server.start()
        })
        super.start()
      }
    
      override def stop() {
        logInfo("Stopping Spark Sink: " + getName)
        handler.foreach(callbackHandler => {
          callbackHandler.shutdown()
        })
        serverOpt.foreach(server => {
          logInfo("Stopping Avro Server for sink: " + getName)
          server.close()
          server.join()
        })
        blockingLatch.countDown()
        super.stop()
      }
    
      override def configure(ctx: Context) {
        import SparkSinkConfig._
        hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
        port = Option(ctx.getInteger(CONF_PORT)).
          getOrElse(throw new ConfigurationException("The port to bind to must be specified"))
        poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS)
        transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
        backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL)
        logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " +
          "poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " +
          "backoffInterval: " + backOffInterval)
      }
    
      override def process(): Status = {
        // This method is called in a loop by the Flume framework - block it until the sink is
        // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
        // being shut down.
        logInfo("Blocking Sink Runner, sink will continue to run..")
        blockingLatch.await()
        Status.BACKOFF
      }
    
      private[flume] def getPort(): Int = {
        serverOpt
          .map(_.getPort)
          .getOrElse(
            throw new RuntimeException("Server was not started!")
          )
      }
    
      /**
       * Pass in a [[CountDownLatch]] for testing purposes. This batch is counted down when each
       * batch is received. The test can simply call await on this latch till the expected number of
       * batches are received.
       * @param latch
       */
      private[flume] def countdownWhenBatchReceived(latch: CountDownLatch) {
        handler.foreach(_.countDownWhenBatchAcked(latch))
      }
    }
    
    /**
     * Configuration parameters and their defaults.
     */
    private[flume]
    object SparkSinkConfig {
      val THREADS = "threads"
      val DEFAULT_THREADS = 10
    
      val CONF_TRANSACTION_TIMEOUT = "timeout"
      val DEFAULT_TRANSACTION_TIMEOUT = 60
    
      val CONF_HOSTNAME = "hostname"
      val DEFAULT_HOSTNAME = "0.0.0.0"
    
      val CONF_PORT = "port"
    
      val CONF_BACKOFF_INTERVAL = "backoffInterval"
      val DEFAULT_BACKOFF_INTERVAL = 200
    }
    

      

    然后在你的streaming中使用如下的代码

    package org.apache.spark.examples.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.flume._
    import org.apache.spark.util.IntParam
    import java.net.InetSocketAddress
    
    /**
     *  Produces a count of events received from Flume.
     *
     *  This should be used in conjunction with the Spark Sink running in a Flume agent. See
     *  the Spark Streaming programming guide for more details.
     *
     *  Usage: FlumePollingEventCount <host> <port>
     *    `host` is the host on which the Spark Sink is running.
     *    `port` is the port at which the Spark Sink is listening.
     *
     *  To run this example:
     *    `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
     */
    object FlumePollingEventCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println(
            "Usage: FlumePollingEventCount <host> <port>")
          System.exit(1)
        }
    
        StreamingExamples.setStreamingLogLevels()
    
        val Array(host, IntParam(port)) = args
    
        val batchInterval = Milliseconds(2000)
    
        // Create the context and set the batch size
        val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
        val ssc = new StreamingContext(sparkConf, batchInterval)
    
        // Create a flume stream that polls the Spark Sink running in a Flume agent
        val stream = FlumeUtils.createPollingStream(ssc, host, port)
    
        // Print out the count of events received from this server in each batch
        stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

      

  • 相关阅读:
    nullnull聚类小文(二)
    classjavaCore Java Question List #6
    设置仿真器HJTAG ARM仿真器和MDK 联调设置
    二分图判断hdu 1829 A Bug's Life
    信息掩码游戏地图掩码相关(msk)
    导航accessibilityAndroid 抽屉导航
    算法nodehdu 2112 hdu today
    链接函数hdu 1234开门人和关门人
    分配器内存一步一步写STL:空间配置器(1)
    程序连接如何使用JLink V8 烧写程序到NOR Flash
  • 原文地址:https://www.cnblogs.com/hark0623/p/4500439.html
Copyright © 2011-2022 走看看