zoukankan      html  css  js  c++  java
  • spark 基本操作

    读取文件的数据

    使用的数据:https://codeload.github.com/xsankar/fdps-v3/zip/master

    读取单个文件的数据

    case class Employee(EmployeeID: String,
        LastName: String, FirstName: String, Title: String,
        BirthDate: String, HireDate: String,
        City: String, State: String, Zip: String, Country: String,
        ReportsTo: String)
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.set("spark.master", "local")
        conf.set("spark.app.name", "spark demo")
        val sc = new SparkContext(conf);
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        // 创建spark对象
        val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
    
        import spark.implicits._ // 这行必须引入不然下面的报错
        // header (default false): uses the first line as names of columns.
        val employees = spark.read.option("header", "true")
            .csv("hdfs://m3:9820/NW-Employees.csv").as[Employee];
    
       employees.show();
    
      }
    

     数据转换成一个视图,通过sql查询

    case class Employee(EmployeeID: String,
        LastName: String, FirstName: String, Title: String,
        BirthDate: String, HireDate: String,
        City: String, State: String, Zip: String, Country: String,
        ReportsTo: String)
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.set("spark.master", "local")
        conf.set("spark.app.name", "spark demo")
        val sc = new SparkContext(conf);
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        // 创建spark对象
        val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
    
        import spark.implicits._ // 这行必须引入不然下面的报错
        // header (default false): uses the first line as names of columns.
        val employees = spark.read.option("header", "true")
            .csv("hdfs://m3:9820/NW-Employees.csv").as[Employee];
        // Creates a temporary view using the given name
        employees.createOrReplaceTempView("employeesTable");
        // 通过sql语句查询, 后面的表名不区分大小写
        val records = spark.sql("select * from EmployeesTable");
        records.show();
        records.head(2);
        records.explain(true);
    
      }
    

     join查询 

    case class Order(OrderID: String,
        CustomerID: String, EmployeeID: String, OrderDate: String,
        ShipCountry: String)
    
      case class OrderDetail(OrderID: String,
        ProductID: String, UnitPrice: String, Qty: String,
        Discount: String)
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.set("spark.master", "local")
        conf.set("spark.app.name", "spark demo")
        val sc = new SparkContext(conf);
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        // 创建spark对象
        val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
    
        import spark.implicits._ // 这行必须引入不然下面的报错
        // header (default false): uses the first line as names of columns.
        val orders = spark.read.option("header", "true")
          .csv("hdfs://m3:9820/NW-Orders.csv").as[Order];
        val orderDetails = spark.read.option("header", "true")
          .csv("hdfs://m3:9820/NW-Order-Details.csv").as[OrderDetail];
        // Creates a temporary view using the given name
        orders.createOrReplaceTempView("orders")
        orderDetails.createOrReplaceTempView("orderDetails")
        // show 方法如果不显示的指定显示多少行,则默认显示20行
        // orders.show();
        // orderDetails.show();
        // 如果对表不指定别名,则别名和表明一样
        val joinResult = spark.sql("select o.OrderID, orderDetails.ProductID from orders o inner join orderDetails  on o.OrderID = orderDetails.OrderID")
        joinResult.show
        
      }
    

     数据的读取和写出 

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.set("spark.master", "local")
        conf.set("spark.app.name", "spark demo")
        val sc = new SparkContext(conf);
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        // 创建spark对象
        val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
    
        import spark.implicits._ // 这行必须引入不然下面的报错
        // header (default false): uses the first line as names of columns.
        // inferSchema (default `false`): infers the input schema automatically from data. It
        // requires one extra pass over the data.
        // read data from file
        val cars = spark.read.option("header", "true").option("inferSchema", "true")
          .csv("hdfs://m3:9820/cars.csv");
        cars.show(5)
        cars.printSchema()   
        
        // write  data to file
        // overwrite 覆盖原来的数据
        // csv 保存数据
        cars.write.mode("overwrite").option("header", "true").csv("hdfs://m3:9820/cars_csv")
        
        // parquet 格式存储数据
        cars.write.mode("overwrite").partitionBy("year").parquet("hdfs://m3:9820/cars_parquet")
      }
    

     统计方法

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.set("spark.master", "local")
        conf.set("spark.app.name", "spark demo")
        val sc = new SparkContext(conf);
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        // 创建spark对象
        val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
    
        import spark.implicits._ // 这行必须引入不然下面的报错
        // header (default false): uses the first line as names of columns.
        // inferSchema (default `false`): infers the input schema automatically from data. It
        // requires one extra pass over the data.
        // read data from file
        val cars = spark.read.option("header", "true").option("inferSchema", "true")
          .csv("hdfs://m3:9820/cars.csv");
        cars.show(5)
        cars.printSchema()   
        
        // 显示某一列的最大值、最小值、平均值、标准偏差
        cars.describe("model").show()
        
        // groupBy 分组    avg 求平均值
        cars.groupBy("year").avg("year").show()
        cars.show()
     
      }
    

      

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.set("spark.master", "local")
        conf.set("spark.app.name", "spark demo")
        val sc = new SparkContext(conf);
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        // 创建spark对象
        val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
    
        import spark.implicits._ // 这行必须引入不然下面的报错
        // header (default false): uses the first line as names of columns.
        // inferSchema (default `false`): infers the input schema automatically from data. It
        // requires one extra pass over the data.
        // read data from file
        val passagers = spark.read.option("header", "true").option("inferSchema", "true")
          .csv("hdfs://m3:9820/titanic3_02.csv");
         
        // Pclass,Survived,Name,Gender,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Boat,Body,HomeDest
        // 选择dataset里面的一些列,生成新的dataset
        val passagers1 = passagers.select(passagers("Pclass"), passagers("Survived"), 
               passagers("Gender"), passagers("Age"), passagers("SibSp"), 
               passagers("Parch"), passagers("Fare"))
               
        passagers1.show
        
        passagers1.printSchema()
        
        passagers1.groupBy("Gender").count.show
        
        passagers1.stat.crosstab("Survived", "SibSp").show
        
        
      }
    

     线性回归 

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        //conf.set("spark.master", "spark://m2:7077")
        conf.set("spark.master", "local[4]")
        // 创建SparkSession对象
        val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
        // 创建sparkContext对象
        // val sc = spark.sparkContext
        // inferSchema为true可以自动推测数据的类型,默认false,则所有的数据都是String类型的
        // 1、加载数据
        val cars = spark.read.option("header", "true").option("inferSchema", "true").csv("hdfs://m2:9820/car-milage.csv")
        /*cars.show(5)
        cars.printSchema()
        
        //  mpg|displacement| hp|torque|CRatio|RARatio|CarbBarrells|NoOfSpeed|length|width|weight|automatic|
        cars.describe("mpg", "hp", "weight", "automatic").show
        
        val corr = cars.stat.corr("hp", "weight")
        println("correlation is %2.4f".format(corr))
        
        val cov = cars.stat.cov("hp", "weight")
        // 协方差
        println("covariance is %2.4f".format(cov))*/
        
        // Returns a new [[DataFrame]] that drops rows containing any null or NaN values.
        val cars1 = cars.na.drop()
        
        // 2、创建一个向量
        val assembler = new VectorAssembler()
        
        // 设置输入
        assembler.setInputCols(Array("displacement", "hp", "torque", "CRatio",
          "RARatio", "CarbBarrells" ,"NoOfSpeed" ,"length", "width" , "weight" ,"automatic"    
        ))   
        // 设置输出
        assembler.setOutputCol("features")
        
        // 转换
        val cars2 = assembler.transform(cars1)
        // cars2.show();
        
        // 3、分类数据
        
        val train = cars2.filter(cars2("weight") <= 4000)
        
        val test = cars2.filter(cars2("weight") > 4000)
        
        // test.show
        // 4、设置线性回归的一些参数
        val linearReg = new LinearRegression
        // Set the maximum number of iterations(迭代)
        linearReg.setMaxIter(100)
        // Set the regularization(正则化) parameter
        linearReg.setRegParam(0.3)
        //  Set the ElasticNet mixing parameter
        // L2 (ridge regression)
        // - L1 (Lasso)
        // L2 + L1 (elastic net)
        // 默认是0 L2(ridge regression), 0 L2, 1 L1(Lasso) 大于0小于1是L2 + L1 
        linearReg.setElasticNetParam(0.8)
        linearReg.setLabelCol("mpg") // 这个就是被预测的值得label
        
        // println("train count: " + train.count())
        // 5、对数据进行训练
        val mdlLR = linearReg.fit(train)
         
        println("totalIterations: " + mdlLR.summary.totalIterations)
        
        // 6、根据训练模型预测数据(prediction)
        val predictions = mdlLR.transform(test)
        predictions.show
        val evaluator = new RegressionEvaluator
        evaluator.setLabelCol("mpg")
        val rmse = evaluator.evaluate(predictions)
        // rmse root mean squared error
        println("root mean squared error = " + "%6.3f".format(rmse))
        
        evaluator.setMetricName("mse")
        val mse = evaluator.evaluate(predictions)
        // mean squared error
        println("mean squared error = " + "%6.3f".format(mse))
      }
    

     分类

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.set("spark.master", "spark://m2:7077")
        // conf.set("spark.master", "local[8]")
        // 创建SparkSession对象
        val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
        // 创建sparkContext对象
        // val sc = spark.sparkContext
        // inferSchema为true可以自动推测数据的类型,默认false,则所有的数据都是String类型的
        // 1、加载数据
        val passagers = spark.read.option("header", "true").option("inferSchema", "true")
             .csv("hdfs://m2:9820/titanic3_02.csv")
        // passagers.show()
        // passagers.printSchema()
     
        // 2、提取特征
           val passagers1 = passagers.select(passagers("Pclass"), passagers("Survived").cast(DoubleType).as("Survived"),
               passagers("Gender"), passagers("Age"), passagers("SibSp"), passagers("Parch")
               , passagers("Fare")) 
           
           // VectorAssembler 不支持字符串类型,转换Gender为数字类型
           val indexer = new StringIndexer
           indexer.setInputCol("Gender")
           indexer.setOutputCol("GenderCat")
           val passagers2 = indexer.fit(passagers1).transform(passagers1)
           // passagers2.show
           
           // 删除包含null或者NAN的行
           val passagers3 = passagers2.na.drop()
           println("total count:" + passagers2.count() + "  droped count is: " + (passagers2.count() - passagers3.count()))
           
           val vectorAssembler = new VectorAssembler
           vectorAssembler.setInputCols(Array("Pclass", "GenderCat", "Age", "SibSp", "Parch", "Fare"))
           vectorAssembler.setOutputCol("features")
           val passagers4 = vectorAssembler.transform(passagers3)
           // passagers4.show()
           
           // 3、数据分类,分为训练数据和测试数据
           
           val Array(train, test) = passagers4.randomSplit(Array(0.9, 0.1))
           // train.show()
           
           val algtree = new DecisionTreeClassifier
           algtree.setLabelCol("Survived")
           algtree.setImpurity("gini")
           algtree.setMaxBins(32)
           // Maximum depth of the tree
           algtree.setMaxDepth(5)
           
           // 模型
           val mdlTree = algtree.fit(train)
           // println(mdlTree.toDebugString)
           // println(mdlTree.toString)
           // println(mdlTree.featureImportances)
           
           //4、 利用模型评估
           val predictions = mdlTree.transform(test)
           predictions.show
           
           // 5、模型评估
           val evaluator = new MulticlassClassificationEvaluator
           evaluator.setLabelCol("Survived")
           // metric(度量标准) name in evaluation 
           // (supports `"f1"` (default), `"weightedPrecision"`,`"weightedRecall"`, `"accuracy"`)
           evaluator.setMetricName("accuracy")
           val accuracy = evaluator.evaluate(predictions)
           println("the accuracy is %.2f%%".format(accuracy))
           
      }
    

      

    聚类

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.set("spark.master", "local[4]")
        // conf.set("spark.master", "local[8]")
        // 创建SparkSession对象
        val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
        // 创建sparkContext对象
        // val sc = spark.sparkContext
        // inferSchema为true可以自动推测数据的类型,默认false,则所有的数据都是String类型的
        // 1、加载数据
        val points = spark.read.option("header", "true").option("inferSchema", "true")
             .csv("hdfs://m2:9820/cluster-points-v2.csv")
         // points.show()
         // points.printSchema()
      
        // 2、数据转换
        val vectorAssembler = new VectorAssembler
        vectorAssembler.setInputCols(Array("X", "Y"))
        vectorAssembler.setOutputCol("features")
        val points1 = vectorAssembler.transform(points)
        // points1.show()
        // points1.printSchema()
        
        // 3、聚类是一个非监督学习算法,不需要把数据分为train和test,这里是用k-means算法
        // key值(2)代表有多少个cluster
        val algKmeans = new KMeans().setK(2)
        // 模型
        val mdlKmeans = algKmeans.fit(points1)
        
        // 4、利用模型预测
        val predictions = mdlKmeans.transform(points1)
        // predictions.show
        // 5、评估 wsse 每个cluster中点到cluster中心的距离之和,越小越好
        val wsse = mdlKmeans.computeCost(points1)
        println(wsse)
             
      }
    

      推荐

    def parseRating(row: Row): Rating = {
        val aList = row.getList[String](0)
        Rating(aList.get(0).toInt, aList.get(1).toInt, aList.get(2).toDouble) //.getInt(0), row.getInt(1), row.getDouble(2))
      }
      
      def rowSqDiff(row:Row) : Double = {
    	  math.pow( (row.getDouble(2) - row.getFloat(3).toDouble),2)
    	}
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.set("spark.master", "local[4]")
        // conf.set("spark.master", "local[8]")
        // 创建SparkSession对象
        val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
        // 创建sparkContext对象
        // val sc = spark.sparkContext
        // inferSchema为true可以自动推测数据的类型,默认false,则所有的数据都是String类型的
        val startTime = System.nanoTime()
        // 1、加载数据
        
        val movies = spark.read.text("hdfs://m3:9820/movies.dat")
        // movies.show()
        // movies.printSchema()
    
        val ratings = spark.read.text("hdfs://m3:9820/ratings.dat")
        // ratings.show()
        // ratings.printSchema()
    
        val users = spark.read.text("hdfs://m3:9820/users.dat")
        // users.show()
        // users.printSchema()
    
        val ratings1 = ratings.select(split(ratings("value"), "::")).as("values")
        // ratings1.show
        // 2、数据转换  Rating
        val rating2 = ratings1.rdd.map(parseRating(_))
        val rating3 = spark.createDataFrame(rating2)
        // rating3.show
        
        // 3、数据分为train和test
        val Array(train, test) = rating3.randomSplit(Array(0.8, 0.2))
        
        // 4、构建模型,训练数据
        val algAls = new ALS
        algAls.setItemCol("product")
        algAls.setRank(12)
        algAls.setRegParam(0.1) // 正则化参数
        algAls.setMaxIter(20)
        // 模型
        val mdlReco = algAls.fit(train)
    
        // mdlReco.
        // 5、预测数据
        val predictions = mdlReco.transform(test)
        predictions.show
        predictions.printSchema()
         
        // 6、算法评估
        // 过滤一些NAN数据
        val nanState = predictions.na.fill(99999.0)
        println(nanState.filter(nanState("prediction") > 99998).count())
        nanState.filter(nanState("prediction") > 99998).show(5)
        //
        val pred = predictions.na.drop()
        println("Orig = "+predictions.count()+" Final = "+ pred.count() + " Dropped = "+ (predictions.count() - pred.count()))
        // Calculate RMSE & MSE
        val evaluator = new RegressionEvaluator()
    		evaluator.setLabelCol("rating")
    		var rmse = evaluator.evaluate(pred)
    		println("Root Mean Squared Error = "+"%.3f".format(rmse))
    		//
    		evaluator.setMetricName("mse")
    		var mse = evaluator.evaluate(pred)
    		println("Mean Squared Error = "+"%.3f".format(mse))
    		mse = pred.rdd.map(r => rowSqDiff(r)).reduce(_+_) / predictions.count().toDouble
    		println("Mean Squared Error (Calculated) = "+"%.3f".format(mse))
    		//
        //
        val elapsedTime = (System.nanoTime() - startTime) / 1e9
        println("Elapsed time: %.2f seconds".format(elapsedTime))
        
        // MatrixFactorizationModel 
      }
    

      

  • 相关阅读:
    Oracle ROWID格式及rdba
    Solaris10上配置log server
    Powerpath的IO路径工作模式
    hagui启动时报警VRTSjre15: not found
    stty设置终端参数
    Jumpstart安装报错:Warning: Could not find matching rule in rules.ok
    解决xmanager无法连接Solaris10的问题
    Solaris10下syslogng安装配置
    VS2005 常用的快捷键
    物理路径和虚拟路径 的访问
  • 原文地址:https://www.cnblogs.com/heml/p/6202871.html
Copyright © 2011-2022 走看看