zoukankan      html  css  js  c++  java
  • 学习进度笔记

    学习进度笔记25

    网络数据演示

    import org.apache.spark.{SparkContext, SparkConf}

    import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

    import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.storage.StorageLevel

    object NetworkWordCount {

      def main(args: Array[String]) {

        val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")

        val sc = new SparkContext(conf)

        val ssc = new StreamingContext(sc, Seconds(20))

        // 通过Socket获取数据,该处需要提供Socket的主机名和端口号,数据保存在内存和硬盘中

        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

        // 对读入的数据进行分割、计数

        val words = lines.flatMap(_.split(","))

        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

        wordCounts.print()

        ssc.start()

        ssc.awaitTermination()

      }

    }

    销售数据统计演示

    import org.apache.log4j.{Level, Logger}

    import org.apache.spark.{SparkContext, SparkConf}

    import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

    import org.apache.spark.streaming.StreamingContext._

    import org.apache.spark.storage.StorageLevel

    object SaleAmount {

      def main(args: Array[String]) {

        if (args.length != 2) {

          System.err.println("Usage: SaleAmount <hostname> <port> ")

          System.exit(1)

        }

        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

        val conf = new SparkConf().setAppName("SaleAmount").setMaster("local[2]")

        val sc = new SparkContext(conf)

        val ssc = new StreamingContext(sc, Seconds(5))

       // 通过Socket获取数据,该处需要提供Socket的主机名和端口号,数据保存在内存和硬盘中

        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

        val words = lines.map(_.split(",")).filter(_.length == 6)

        val wordCounts = words.map(x=>(1, x(5).toDouble)).reduceByKey(_ + _)

        wordCounts.print()

        ssc.start()

        ssc.awaitTermination()

      }

    }

  • 相关阅读:
    16进制与10进制
    npm模块管理器
    Vue2+VueRouter2+webpack 构建项目实战(四)接通api,先渲染个列表
    cross-env使用笔记
    webpack 运行提示“The ‘mode‘ option has not been set”的原因和解决方法
    cnpm install -S 与cnpm install -D (dependencies和devDependencies的区别)
    Webpack基础学习
    webpack入门——webpack的安装与使用
    npm init 之package.json
    入门 Webpack,看这篇就够了
  • 原文地址:https://www.cnblogs.com/xueqiuxiang/p/14467004.html
Copyright © 2011-2022 走看看