zoukankan      html  css  js  c++  java
  • scala 基础到高阶

    本文打算对这小段时间学习 scala 以及 spark 编程技术做个小结,一来温故而知新,而来为以后查阅方便
    spark scala 入门小例子 
    文本文件  UserPurchaseHistory.csv:
    John,iPhone Cover,9.99
    John,Headphones,5.49
    Jack,iPhone Cover,9.99
    Jill,Samsung Galaxy Cover,8.95
    Bob,iPad Cover,5.49
    
    
    下面开始读取文件进行分析
    
    package com.ghc.bigdata
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by Yu Li on 12/5/2017.
      */
    object ScalaApp {
        def main(args:Array[String]):Unit={
          val sparkConf=new SparkConf()
          sparkConf.setMaster("local[2]")
          sparkConf.setAppName("Scala App")
          val sc=new SparkContext(sparkConf)
          val textFromFile=sc.textFile("./src/main/scala/UserPurchaseHistory.csv")
          val data=textFromFile.map(x=>x.split(",")).map(p=>(p(0),p(1),p(2)))
          //购买次数
          val countOfPurchase=data.count()
          //多少不同客户购买过商品
          val distinctCountOfPurchase=data.map(x=>x._1).distinct.count()
          println("共有 "+distinctCountOfPurchase+" 不同客户购买过商品")
          println("共有 "+data.map{case(user,product,price)=>user}.distinct.count()+" 不同客户购买过商品")
          val totalRevenue=data.map{case(user,product,price)=>price.toDouble}.sum() //只有 double才能 sum
          println("totalRevenue: "+totalRevenue)
          //最畅销的产品,可以想到对产品分组求和 reduceByKey
          val mostPoplularProduct=data.map{case(user,product,price)=>(product,1)}.reduceByKey(_+_).collect().sortBy(-_._2) // 负是倒序的意思
          println(mostPoplularProduct.mkString(","))
          println("mostPoplularProduct: "+mostPoplularProduct(0)) //mostPoplularProduct(0)
    
          println("共购买:"+countOfPurchase+" 件商品")
          println("共有 "+data.map{case(user,product,price)=>user}.distinct.count()+" 不同客户购买过商品")
          println("总收入: "+totalRevenue)
          println("最畅销的产品是: %s  购买了 %d 件".format(mostPoplularProduct.take(1)(0)._1,mostPoplularProduct.take(1)(0)._2))
        }
    }
    
    
    
    package com.ghc.bigdata
    import org.apache.spark.{SparkConf,SparkContext}
    /**
      * Created by Yu Li on 12/6/2017.
      */
    object SparkScalaApp {
      def main(args:Array[String]):Unit={
        val sparkConf:SparkConf=new SparkConf()
        sparkConf.setAppName("Spark Scala App")
        sparkConf.setMaster("local[2]")
        val sc:SparkContext=new SparkContext(sparkConf)
        val fileName:String="./src/main/scala/UserPurchaseHistory.csv"
        printResult(sc,fileName)
      }
      /*println("共购买:*/
      def printResult(sc:SparkContext,fileName:String):Unit={
          // 共购买多少产品
          val rawRDD=sc.textFile(fileName).map(x=>x.split(",")).map(purchase=>(purchase(0),purchase(1),purchase(2)))
          val countOfPurchase=rawRDD.count()
          // 共有多少不同客户购买产品
          val countOfDistinctPurchase=rawRDD.map{case(user,item,price)=>user}.distinct.count()
         // 总收入
          val sumRevenue=rawRDD.map{case(user,item,price)=>price.toDouble}.sum()
        //每一类产品收入
          val sumGroupByItemRevenue=rawRDD.map{case(user,item,price)=>(item,price.toDouble)}.reduceByKey(_+_).sortBy(-_._2).collect()(0)  //按照每一类商品收入倒序取最大的那个值
        // 最畅销的产品
          val mostPopularItem=rawRDD.map{case(user,item,price)=>(item,1)}.reduceByKey(_+_).sortBy(-_._2).collect()(0)
    
          println("共购买 "+countOfPurchase+" 件商品")
          println("共有 "+countOfDistinctPurchase+" 个不同用户购买过商品")
          println("总收入 "+sumRevenue)
          println("收入最高的商品是 %s 卖了 %.2f 钱".format(sumGroupByItemRevenue._1,sumGroupByItemRevenue._2))
          println("最畅销的产品是 %s 卖了 %d 件".format(mostPopularItem._1,mostPopularItem._2))
      }
    }
    View Code
     
    package com.ghc.bigdata
    import org.apache.spark.{SparkConf,SparkContext}
    /**
      * Created by Yu Li on 12/6/2017.
      */
    object SparkScalaApp {
      def main(args:Array[String]):Unit={
        val sparkConf:SparkConf=new SparkConf()
        sparkConf.setAppName("Spark Scala App")
        sparkConf.setMaster("local[2]")
        val sc:SparkContext=new SparkContext(sparkConf)
        println("2 在List中的位置: "+getPosition(List(1,2,3),2))
        for(i<- map[Int,String](List(1,2,3),{num:Int=>num+"2"})){
          println(i)
        }
      }
      def getPosition[A](l:List[A],v:A): Int ={
        l.indexOf(v)
      }
      def map[A,B](list:List[A],func: A=>B)=list.map(func)
    
    }
    scala 简单泛型小例子

    package com.ghc.spark.sql
    
    import java.io.{BufferedReader, File, IOException}
    
    import scala.collection.mutable.ArrayBuffer
    import scala.io.{BufferedSource, Source}
    import scala.util.Random
    
    abstract class Animal{
      def move()
      val name:String
    }
    
    class Dog extends Animal{
      override val name="trump"
      override def move(): Unit = {
        println(this.name+" move...")
      }
    }
    
    object Dog{
    
      def apply(): Dog = {
        new Dog()
      }
    }
    
    class SuperDog(){
      def fly(): Unit ={
        println("fly...")
      }
    }
    
    class RichFile(file:File){
      def read(){
        val source: BufferedSource = Source.fromFile(file.getPath)
        source.bufferedReader().readLine()
      }
    }
    
    object DeepScala {
      def main(args:Array[String]): Unit ={
          /*println(sum(1,2,3))
        val dog:Dog = Dog()
        dog.move()
        match1()
        println(match1PartialFunc("frank"))
    
        match2(1)
        match2("abc")
        match2(Map(1->"a", 2->"b"))
        match2(Array(1,2,3))
        matchException()
    
        matchCaseClass()
        println(s"add result: ${add(3,4)}")
        println(s"sum result ${sum(3)(4)}")
        */
        implicit def Dog2SuperDog(dog:Dog):SuperDog={
          new SuperDog()
        }
        val dog = Dog()
        dog.fly()
    
        implicit def file2RichFile(file:File): RichFile ={new RichFile(file)}
        val file = new File("C:\Users\Administrator\IdeaProjects\sparkapi\input\words.txt")
        println(file.read())
        readLine("C:\\Users\\Administrator\\IdeaProjects\\sparkapi\\input\\words.txt")
        readNet("http://www.baidu.com")
        high_func()
      }
    
      def sum(numbers:Int*):Int = {
        var result = 0
        for(number <- numbers){
          result += number
        }
        result
      }
    
      // 简单基本数据类型值匹配
      def match1()={
        val arr = Array("frank", "stacy", "unknown")
        val name = arr(Random.nextInt(arr.length))
        name match{
          case "frank" => println("me")
          case "stacy" => println("lover")
          case _ => println("frank love stacy")
        }
      }
    
      def match1PartialFunc:PartialFunction[String, String]={
          case "frank" => "me"
          case "stacy" => "she"
          case _ => "unknown"
      }
    
      // 类型匹配
      def match2(obj:Any):Unit={
        obj match{
          case x:Int => println("obj is Int")
          case y:String => println("obj is String")
          case m:Map[_,_] => m.foreach(println)
          case _ => println("other type...")
        }
      }
    
      // IO Exception
      var source:BufferedSource = null;
      def matchException()= {
        try {
          source = Source.fromFile("input/words.txt")
          val reader: BufferedReader = source.bufferedReader()
          var line: String = reader.readLine()
          var content: ArrayBuffer[String] = ArrayBuffer[String]() // 可变数组
          while (line != null) {
            content += line
            line = reader.readLine()
          }
          content.flatMap(_.split("\s+"))
            .map((_, 1))
            .groupBy(_._1)
            .toList
            .map(t => (t._1,
              (for (i <- t._2) yield i._2).reduce(_ + _)
            ))
            .sortBy(_._2)
            .foreach(println)
        } catch {
          case ex: IOException => println(ex.toString)
          case ex: Exception => println(ex.toString)
          case _: Any => println("whatever...")
        }finally {
          source.close()
        }
      }
      // case class match
      def matchCaseClass(){
        trait Person
        case class CTO(name: String, age: Int) extends Person
        case class Manager(name: String, age: Int) extends Person
        case class Clerk(age: Int, desc: String) extends Person
    
        val arr = CTO("CTO No.007 Linux", 18) :: Manager("Manager No.001 Frank", 20) :: Clerk(25, "no desc...") :: Nil
        val x = arr(Random.nextInt(arr.length))
        x match{
          case CTO(name, age) => printf("CTO<%s , %d>", name, age)
          case Manager(name, age) => printf("Manager<%s , %d>", name, age)
          case Clerk(age, desc) => println(s"Clerk<$age, $desc>")
          case _ => println(s"default match")
         }
        val multi_lines =
          """
            |first line
            |second line
            |third line
          """.stripMargin
        println(multi_lines)
      }
    
      // 匿名函数
      def add = (a:Int, b:Int) => a+b
    
      //  curring
      def sum(a:Int)(b:Int) = {a+b}  // aggregate(0) (_+_, _+_)
    
      // 高阶函数
      def high_func(): Unit ={
        val numbers = List(List(1,2,3), List(4,5,6), List(7,8,9))
        val res1 = numbers.map(_.map(_*2)).flatten
        val res2 = numbers.flatMap(_.map(_*2))
        val res3 = numbers.flatMap(t=> (for(i<-t) yield i*2))
        println(s"res1: ${res1} 
    res2: ${res2}
    res3: ${res3}")
      }
    
    
      // IO
      def readLine(filePath:String):Unit={
        val file = Source.fromFile(filePath)
        for(line <- file.getLines()){
          println(line)
        }
      }
    
      def readNet(url:String="http://www.baidu.com"): Unit ={
        val netFile = Source.fromURL(url)
        for(file <- netFile.getLines()){
          println(file)
        }
      }
    }

    spark 线性回归算法预测谋杀率

    package com.ghc.bigdata
    import org.apache.spark.ml.feature.VectorAssembler
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.ml.regression.LinearRegression
    import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
    import org.apache.log4j.{Level, Logger}
    /**
      * Created by Yu Li on 12/7/2017.
      */
    // 因为 ml 推荐使用 DataFrame 所以下面开始都用 DataFrame
    object LRDemo {
      def main(args:Array[String]):Unit={
      //屏蔽日志
          Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
          Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
          //设置 Spark 模拟环境
          val sparkConf=new SparkConf().setAppName("Regression Demo").setMaster("local[2]")
          val sc=new SparkContext(sparkConf)
    
          //真实文本数据转化为 VectorDataFrame  准备真实数据集
          val realFile="./src/main/scala/com/ghc/bigdata/LR_data"
          val delimeter=","
          val vecDF_real=transform2VectorDFFromFile(sc,realFile,delimeter)
          println("start print df from real: ")
          println(vecDF_real.collect.mkString("	"))
    
          //预测文本数据转化为 VectorDataFrame  准备预测数据集  可以 select 的
          val predictFile="./src/main/scala/com/ghc/bigdata/LR_data_for_predict"
          val vecDF_predict=transform2VectorDFFromFile(sc,predictFile,delimeter)
          println("start print df from predict: ")
          println(vecDF_real.collect.mkString("	"))
          // res:  1 行 50 列 , 每列 里又是 1 行 2 列  左边,由 select 可知 [  [A]]  A 里的是 feature 列
          //  [3615.0,3624.0,2.1,69.05,15.1,41.3,20.0,50708.0,[3615.0,3624.0,2.1,69.05,41.3,20.0,50708.0]]
    
          //建立模型预测 谋杀率
    
          //设置线性回归参数
          val lr=new LinearRegression()
          lr.setFeaturesCol("features") // 设置特征列
          lr.setLabelCol("Murder")  // 设置需要被预测的列
          lr.setFitIntercept(true)  // 数据中心化
          lr.setMaxIter(20) // 设置迭代次数
          lr.setRegParam(0.4) // 设置正则化参数
          lr.setElasticNetParam(0.8) //设置弹性化混合参数
    
          // 将真实数据 训练集合代入训练
          val lrModel=lr.fit(vecDF_real)  // 代入真实数据
          lrModel.extractParamMap()   // 输出模型全部参数
          println(s"coefficients:  ${lrModel.coefficients},intercept: ${lrModel.intercept}")
    
          //对训练结果进行评价
          val trainingSummary=lrModel.summary
          println(s"numIterations: ${trainingSummary.totalIterations}") //总迭代次数
          println(s"objectiveHistory: ${trainingSummary.objectiveHistory.toList}")  //每次迭代损失?依次递减
          trainingSummary.residuals.show()// 残差
          println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")   //均方根差  RMSE(均方根、标准差)
          println(s"r2: ${trainingSummary.r2}") //决定系数
    
          val predictions: DataFrame = lrModel.transform(vecDF_predict)  // 用同一个模型来训练
          println("输出预测结果")
          val predict_result: DataFrame =predictions.selectExpr("features","Murder", "round(prediction,1) as prediction")
          predict_result.foreach(println(_))
          sc.stop()
      }
    
      //定义一个从文件中获取 VectorDataFrame 的方法,因为实际文本数据与预测文本数据都需要,便于复用
      def transform2VectorDFFromFile(sc:SparkContext,fileName:String,delimeter:String):DataFrame={
          val sqc=new SQLContext(sc) //  SQLContext是过时的,被 SparkSession 替代
          val raw_data=sc.textFile(fileName)
          val map_data=raw_data.map{x=>
                                      val split_list=x.split(delimeter)
            (split_list(0).toDouble,split_list(1).toDouble,split_list(2).toDouble,split_list(3).toDouble,split_list(4).toDouble,split_list(5).toDouble,split_list(6).toDouble,split_list(7).toDouble)}
          val df=sqc.createDataFrame(map_data)
          val data = df.toDF("Population", "Income", "Illiteracy", "LifeExp", "Murder", "HSGrad", "Frost", "Area")
          val colArray = Array("Population", "Income", "Illiteracy", "LifeExp", "HSGrad", "Frost", "Area")
          val assembler = new VectorAssembler().setInputCols(colArray).setOutputCol("features")
          val vecDF: DataFrame = assembler.transform(data)
          vecDF
      }
    }
    View Code
    如果有来生,一个人去远行,看不同的风景,感受生命的活力。。。
  • 相关阅读:
    [BZOJ]1040: [ZJOI2008]骑士
    [BZOJ]1177: [Apio2009]Oil
    【luogu3384】【模板】树链剖分
    【NOIP2012TG】solution
    【NOIP2014TG】solution
    【NOIP2016TG】solution
    【NOIP2015TG】solution
    【NOIP2016】【LCA】【树上差分】【史诗级难度】天天爱跑步
    【网络流】【BZOJ1221】【HNOI2001】软件开发
    【网络流】【BZOJ1061】【NOI2008】志愿者招募
  • 原文地址:https://www.cnblogs.com/Frank99/p/7991812.html
Copyright © 2011-2022 走看看