zoukankan      html  css  js  c++  java
  • spark streaming中使用flume数据源




    package org.apache.spark.examples.streaming
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.flume._
    import org.apache.spark.util.IntParam
     *  Produces a count of events received from Flume.
     *  This should be used in conjunction with an AvroSink in Flume. It will start
     *  an Avro server on at the request host:port address and listen for requests.
     *  Your Flume AvroSink should be pointed to this address.
     *  Usage: FlumeEventCount <host> <port>
     *    <host> is the host the Flume receiver will be started on - a receiver
     *           creates a server and listens for flume events.
     *    <port> is the port the Flume receiver will listen on.
     *  To run this example:
     *    `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
    object FlumeEventCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
            "Usage: FlumeEventCount <host> <port>")
        val Array(host, IntParam(port)) = args
        val batchInterval = Milliseconds(2000)
        // Create the context and set the batch size
        val sparkConf = new SparkConf().setAppName("FlumeEventCount")
        val ssc = new StreamingContext(sparkConf, batchInterval)
        // Create a flume stream
        val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
        // Print out the count of events received from this server in each batch
        stream.count().map(cnt => "Received " + cnt + " flume events." ).print()


    package org.apache.spark.examples.streaming
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.flume._
    import org.apache.spark.util.IntParam
    import java.net.InetSocketAddress
     *  Produces a count of events received from Flume.
     *  This should be used in conjunction with the Spark Sink running in a Flume agent. See
     *  the Spark Streaming programming guide for more details.
     *  Usage: FlumePollingEventCount <host> <port>
     *    `host` is the host on which the Spark Sink is running.
     *    `port` is the port at which the Spark Sink is listening.
     *  To run this example:
     *    `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
    object FlumePollingEventCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
            "Usage: FlumePollingEventCount <host> <port>")
        val Array(host, IntParam(port)) = args
        val batchInterval = Milliseconds(2000)
        // Create the context and set the batch size
        val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
        val ssc = new StreamingContext(sparkConf, batchInterval)
        // Create a flume stream that polls the Spark Sink running in a Flume agent
        val stream = FlumeUtils.createPollingStream(ssc, host, port)
        // Print out the count of events received from this server in each batch
        stream.count().map(cnt => "Received " + cnt + " flume events." ).print()


  • 相关阅读:
    2-6 求阶乘序列前N项和
    2-5 求平方根序列前N项和
    2-4 求交错序列前N项和
    2-3 求平方与倒数序列的部分和
    2-2 阶梯电价
    2-1 求整数均值
    2-17 生成3的乘方表
  • 原文地址:https://www.cnblogs.com/hark0623/p/4499503.html
Copyright © 2011-2022 走看看