zoukankan      html  css  js  c++  java
  • Spark实例

    1、入门

    object HelloSpark {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
        val sc = new SparkContext(conf)
        sc.addJar("C:\Users\asus\IdeaProjects\HelloSpark\target\HelloSpark-1.0-SNAPSHOT.jar")
    
        val res = sc.textFile("D://info.log").map(line => {
          val f = line.split("	")
          (f(1),1)
        }).reduceByKey(_+_).map(x => {
          val host = new URL(x._1).getHost
          (host,x._2)
        }).filter(_._1 == "java").sortBy(_._2,false).saveAsTextFile("D://out2")
        //println(res)D
    
        sc.stop()
      }
    }

    2、Parallelize

    object H{
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
        val sc = new SparkContext(conf)
        val a = sc.parallelize(List((
          "A",90,27
        ),(
          "B",91,22
        ),(
          "C",90,25
        ))).sortBy(x => Girl(x._2,x._3),false).collect().toBuffer
        println(a)
      }
    }
    
    case class Girl(val a:Int,val b:Int) extends Ordered[Girl] with Serializable{
      override def compare(that: Girl): Int = {
        if(this.a == that.a){
          that.b - this.b
        } else {
          this.a - that.a
        }
      }
    }

    3、Spark Streaming(Socket)

    object SocketSparkStreaming{
      val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
        //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
        //iter.map{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
        iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0)))
      }
      def main(args: Array[String]): Unit = {
        LoggerLevels.setStreamingLogLevels()
        val conf = new SparkConf().setAppName("Spark Streaming").setMaster("local[2]")
        val sc = new SparkContext(conf)
        sc.setCheckpointDir("d://ck")
        val ssc = new StreamingContext(sc,Seconds(5))
        val rdd = ssc.socketTextStream("192.168.1.101",8888)
        val res = rdd.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc,new HashPartitioner(sc.defaultParallelism),true)
        res.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }

    4、Spark Streaming(Flume)

    object FlumeSparkStreaming{
      def main(args: Array[String]): Unit = {
        LoggerLevels.setStreamingLogLevels()
        val conf = new SparkConf().setAppName("Spark Streaming").setMaster("local[2]")
        val sc = new SparkContext(conf)
        sc.setCheckpointDir("d://ck")
        val ssc = new StreamingContext(sc,Seconds(5))
        val address = Seq(new InetSocketAddress("192.168.1.101",8888))
        val flumeStream = FlumeUtils.createPollingStream(ssc,address,StorageLevel.MEMORY_AND_DISK_SER)
        val words = flumeStream.flatMap(x => new String(x.event.getBody.array()).split(" ")).map((_,1))
        val result = words.reduceByKey(_+_)
        result.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }

    5、Kafka WordCount

    object KafkaWordCount{
      val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
        //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
        //iter.map{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
        iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0)))
      }
      def main(args: Array[String]): Unit = {
        LoggerLevels.setStreamingLogLevels()
        val Array(zkQuorum,group,topics,numThreads) = args
        val conf = new SparkConf().setAppName("Spark Streaming").setMaster("local[2]")
        val ssc = new StreamingContext(conf,Seconds(5))
        ssc.checkpoint("d://ckkafka")
        val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
        val result = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap).map(_._2)
          .flatMap(_.split(" ")).map((_,1))
          .updateStateByKey(updateFunc,new HashPartitioner(
            ssc.sparkContext.defaultParallelism),rememberPartitioner = true)
        result.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }

    6、窗口函数

    object WindowFunc{
      def main(args: Array[String]): Unit = {
        LoggerLevels.setStreamingLogLevels()
        val conf = new SparkConf().setAppName("Spark Streaming").setMaster("local[2]")
        val ssc = new StreamingContext(conf,Seconds(5))
        ssc.checkpoint("d://ck")
        val lines = ssc.socketTextStream("192.168.1.101",8888)
        val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKeyAndWindow(
          (a:Int,b:Int) => (a+b),
          Seconds(15),
          Seconds(5)
        )
        result.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
  • 相关阅读:
    谍战系列
    干将莫邪
    漫话安全众测
    一句话安全
    jsp一句话
    struts2的DevMode模式
    morse code
    Nessus的安装/激活/更新
    WinPcap4.13无法安装解决方法
    安全用网,你应该知道的事
  • 原文地址:https://www.cnblogs.com/wanxi/p/6476204.html
Copyright © 2011-2022 走看看