zoukankan      html  css  js  c++  java
  • 学习进度笔记

    学习进度笔记25

    网络数据演示

    import org.apache.spark.{SparkContext, SparkConf}

    import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

    import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.storage.StorageLevel

    object NetworkWordCount {

      def main(args: Array[String]) {

        val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")

        val sc = new SparkContext(conf)

        val ssc = new StreamingContext(sc, Seconds(20))

        // 通过Socket获取数据,该处需要提供Socket的主机名和端口号,数据保存在内存和硬盘中

        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

        // 对读入的数据进行分割、计数

        val words = lines.flatMap(_.split(","))

        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

        wordCounts.print()

        ssc.start()

        ssc.awaitTermination()

      }

    }

    销售数据统计演示

    import org.apache.log4j.{Level, Logger}

    import org.apache.spark.{SparkContext, SparkConf}

    import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

    import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.storage.StorageLevel

    object SaleAmount {

      def main(args: Array[String]) {

        if (args.length != 2) {

          System.err.println("Usage: SaleAmount <hostname> <port> ")

          System.exit(1)

        }

        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

        val conf = new SparkConf().setAppName("SaleAmount").setMaster("local[2]")

        val sc = new SparkContext(conf)

        val ssc = new StreamingContext(sc, Seconds(5))

       // 通过Socket获取数据,该处需要提供Socket的主机名和端口号,数据保存在内存和硬盘中

        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

        val words = lines.map(_.split(",")).filter(_.length == 6)

        val wordCounts = words.map(x=>(1, x(5).toDouble)).reduceByKey(_ + _)

        wordCounts.print()

        ssc.start()

        ssc.awaitTermination()

      }

    }

  • 相关阅读:
    UVa 481
    ZOJ 1108 & HDU 1160
    UVa 11450
    UVa 11242
    UVa 750
    UVa 725
    UVa 483
    UVa 10258
    UVa 793
    The Little Girl who Picks Mushrooms HDU 4422 水题类似模拟的一种感觉
  • 原文地址:https://www.cnblogs.com/xueqiuxiang/p/14467004.html
Copyright © 2011-2022 走看看