zoukankan      html  css  js  c++  java
  • 学习进度笔记

    学习进度笔记25

    网络数据演示

    import org.apache.spark.{SparkContext, SparkConf}

    import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

    import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.storage.StorageLevel

    object NetworkWordCount {

      def main(args: Array[String]) {

        val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")

        val sc = new SparkContext(conf)

        val ssc = new StreamingContext(sc, Seconds(20))

        // 通过Socket获取数据,该处需要提供Socket的主机名和端口号,数据保存在内存和硬盘中

        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

        // 对读入的数据进行分割、计数

        val words = lines.flatMap(_.split(","))

        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

        wordCounts.print()

        ssc.start()

        ssc.awaitTermination()

      }

    }

    销售数据统计演示

    import org.apache.log4j.{Level, Logger}

    import org.apache.spark.{SparkContext, SparkConf}

    import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

    import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.storage.StorageLevel

    object SaleAmount {

      def main(args: Array[String]) {

        if (args.length != 2) {

          System.err.println("Usage: SaleAmount <hostname> <port> ")

          System.exit(1)

        }

        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

        val conf = new SparkConf().setAppName("SaleAmount").setMaster("local[2]")

        val sc = new SparkContext(conf)

        val ssc = new StreamingContext(sc, Seconds(5))

       // 通过Socket获取数据,该处需要提供Socket的主机名和端口号,数据保存在内存和硬盘中

        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

        val words = lines.map(_.split(",")).filter(_.length == 6)

        val wordCounts = words.map(x=>(1, x(5).toDouble)).reduceByKey(_ + _)

        wordCounts.print()

        ssc.start()

        ssc.awaitTermination()

      }

    }

  • 相关阅读:
    Sublime 官方安装方法
    Notepad2、Sublime_text带图标的右键快捷打开方式
    创业公司如何实施敏捷开发
    如果有人让你推荐编程技术书,请叫他看这个列表
    Spring cron表达式详解
    Spring定时任务的几种实现
    spring注解方式 idea报could not autowire,eclipse却没有问题
    mysql处理海量数据时的一些优化查询速度方法
    Hexo重装小结
    修改JAVA代码,需要重启Tomcat的原因
  • 原文地址:https://www.cnblogs.com/xueqiuxiang/p/14467004.html
Copyright © 2011-2022 走看看