zoukankan      html  css  js  c++  java
  • 学习进度笔记

    学习进度笔记25

    网络数据演示

    import org.apache.spark.{SparkContext, SparkConf}

    import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

    import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.storage.StorageLevel

    object NetworkWordCount {

      def main(args: Array[String]) {

        val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")

        val sc = new SparkContext(conf)

        val ssc = new StreamingContext(sc, Seconds(20))

        // 通过Socket获取数据,该处需要提供Socket的主机名和端口号,数据保存在内存和硬盘中

        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

        // 对读入的数据进行分割、计数

        val words = lines.flatMap(_.split(","))

        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

        wordCounts.print()

        ssc.start()

        ssc.awaitTermination()

      }

    }

    销售数据统计演示

    import org.apache.log4j.{Level, Logger}

    import org.apache.spark.{SparkContext, SparkConf}

    import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

    import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.storage.StorageLevel

    object SaleAmount {

      def main(args: Array[String]) {

        if (args.length != 2) {

          System.err.println("Usage: SaleAmount <hostname> <port> ")

          System.exit(1)

        }

        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

        val conf = new SparkConf().setAppName("SaleAmount").setMaster("local[2]")

        val sc = new SparkContext(conf)

        val ssc = new StreamingContext(sc, Seconds(5))

       // 通过Socket获取数据,该处需要提供Socket的主机名和端口号,数据保存在内存和硬盘中

        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

        val words = lines.map(_.split(",")).filter(_.length == 6)

        val wordCounts = words.map(x=>(1, x(5).toDouble)).reduceByKey(_ + _)

        wordCounts.print()

        ssc.start()

        ssc.awaitTermination()

      }

    }

  • 相关阅读:
    mode
    文件操作
    深浅拷贝
    基础数据类型补充
    再谈编码 decode和encode
    Python练习题 015:一颗自由落地的球
    Python练习题 014:完数
    Python练习题 013:求解a+aa+aaa……
    Python练习题 012:字符统计
    Python练习题 011:成绩打分
  • 原文地址:https://www.cnblogs.com/xueqiuxiang/p/14467004.html
Copyright © 2011-2022 走看看