读取文件的数据
使用的数据:https://codeload.github.com/xsankar/fdps-v3/zip/master
读取单个文件的数据
case class Employee(EmployeeID: String, LastName: String, FirstName: String, Title: String, BirthDate: String, HireDate: String, City: String, State: String, Zip: String, Country: String, ReportsTo: String) def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.set("spark.master", "local") conf.set("spark.app.name", "spark demo") val sc = new SparkContext(conf); val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 创建spark对象 val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate(); import spark.implicits._ // 这行必须引入不然下面的报错 // header (default false): uses the first line as names of columns. val employees = spark.read.option("header", "true") .csv("hdfs://m3:9820/NW-Employees.csv").as[Employee]; employees.show(); }
数据转换成一个视图,通过sql查询
case class Employee(EmployeeID: String, LastName: String, FirstName: String, Title: String, BirthDate: String, HireDate: String, City: String, State: String, Zip: String, Country: String, ReportsTo: String) def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.set("spark.master", "local") conf.set("spark.app.name", "spark demo") val sc = new SparkContext(conf); val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 创建spark对象 val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate(); import spark.implicits._ // 这行必须引入不然下面的报错 // header (default false): uses the first line as names of columns. val employees = spark.read.option("header", "true") .csv("hdfs://m3:9820/NW-Employees.csv").as[Employee]; // Creates a temporary view using the given name employees.createOrReplaceTempView("employeesTable"); // 通过sql语句查询, 后面的表名不区分大小写 val records = spark.sql("select * from EmployeesTable"); records.show(); records.head(2); records.explain(true); }
join查询
case class Order(OrderID: String, CustomerID: String, EmployeeID: String, OrderDate: String, ShipCountry: String) case class OrderDetail(OrderID: String, ProductID: String, UnitPrice: String, Qty: String, Discount: String) def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.set("spark.master", "local") conf.set("spark.app.name", "spark demo") val sc = new SparkContext(conf); val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 创建spark对象 val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate(); import spark.implicits._ // 这行必须引入不然下面的报错 // header (default false): uses the first line as names of columns. val orders = spark.read.option("header", "true") .csv("hdfs://m3:9820/NW-Orders.csv").as[Order]; val orderDetails = spark.read.option("header", "true") .csv("hdfs://m3:9820/NW-Order-Details.csv").as[OrderDetail]; // Creates a temporary view using the given name orders.createOrReplaceTempView("orders") orderDetails.createOrReplaceTempView("orderDetails") // show 方法如果不显示的指定显示多少行,则默认显示20行 // orders.show(); // orderDetails.show(); // 如果对表不指定别名,则别名和表明一样 val joinResult = spark.sql("select o.OrderID, orderDetails.ProductID from orders o inner join orderDetails on o.OrderID = orderDetails.OrderID") joinResult.show }
数据的读取和写出
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.set("spark.master", "local") conf.set("spark.app.name", "spark demo") val sc = new SparkContext(conf); val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 创建spark对象 val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate(); import spark.implicits._ // 这行必须引入不然下面的报错 // header (default false): uses the first line as names of columns. // inferSchema (default `false`): infers the input schema automatically from data. It // requires one extra pass over the data. // read data from file val cars = spark.read.option("header", "true").option("inferSchema", "true") .csv("hdfs://m3:9820/cars.csv"); cars.show(5) cars.printSchema() // write data to file // overwrite 覆盖原来的数据 // csv 保存数据 cars.write.mode("overwrite").option("header", "true").csv("hdfs://m3:9820/cars_csv") // parquet 格式存储数据 cars.write.mode("overwrite").partitionBy("year").parquet("hdfs://m3:9820/cars_parquet") }
统计方法
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.set("spark.master", "local") conf.set("spark.app.name", "spark demo") val sc = new SparkContext(conf); val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 创建spark对象 val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate(); import spark.implicits._ // 这行必须引入不然下面的报错 // header (default false): uses the first line as names of columns. // inferSchema (default `false`): infers the input schema automatically from data. It // requires one extra pass over the data. // read data from file val cars = spark.read.option("header", "true").option("inferSchema", "true") .csv("hdfs://m3:9820/cars.csv"); cars.show(5) cars.printSchema() // 显示某一列的最大值、最小值、平均值、标准偏差 cars.describe("model").show() // groupBy 分组 avg 求平均值 cars.groupBy("year").avg("year").show() cars.show() }
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.set("spark.master", "local") conf.set("spark.app.name", "spark demo") val sc = new SparkContext(conf); val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 创建spark对象 val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate(); import spark.implicits._ // 这行必须引入不然下面的报错 // header (default false): uses the first line as names of columns. // inferSchema (default `false`): infers the input schema automatically from data. It // requires one extra pass over the data. // read data from file val passagers = spark.read.option("header", "true").option("inferSchema", "true") .csv("hdfs://m3:9820/titanic3_02.csv"); // Pclass,Survived,Name,Gender,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Boat,Body,HomeDest // 选择dataset里面的一些列,生成新的dataset val passagers1 = passagers.select(passagers("Pclass"), passagers("Survived"), passagers("Gender"), passagers("Age"), passagers("SibSp"), passagers("Parch"), passagers("Fare")) passagers1.show passagers1.printSchema() passagers1.groupBy("Gender").count.show passagers1.stat.crosstab("Survived", "SibSp").show }
线性回归
def main(args: Array[String]): Unit = { val conf = new SparkConf() //conf.set("spark.master", "spark://m2:7077") conf.set("spark.master", "local[4]") // 创建SparkSession对象 val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate(); // 创建sparkContext对象 // val sc = spark.sparkContext // inferSchema为true可以自动推测数据的类型,默认false,则所有的数据都是String类型的 // 1、加载数据 val cars = spark.read.option("header", "true").option("inferSchema", "true").csv("hdfs://m2:9820/car-milage.csv") /*cars.show(5) cars.printSchema() // mpg|displacement| hp|torque|CRatio|RARatio|CarbBarrells|NoOfSpeed|length|width|weight|automatic| cars.describe("mpg", "hp", "weight", "automatic").show val corr = cars.stat.corr("hp", "weight") println("correlation is %2.4f".format(corr)) val cov = cars.stat.cov("hp", "weight") // 协方差 println("covariance is %2.4f".format(cov))*/ // Returns a new [[DataFrame]] that drops rows containing any null or NaN values. val cars1 = cars.na.drop() // 2、创建一个向量 val assembler = new VectorAssembler() // 设置输入 assembler.setInputCols(Array("displacement", "hp", "torque", "CRatio", "RARatio", "CarbBarrells" ,"NoOfSpeed" ,"length", "width" , "weight" ,"automatic" )) // 设置输出 assembler.setOutputCol("features") // 转换 val cars2 = assembler.transform(cars1) // cars2.show(); // 3、分类数据 val train = cars2.filter(cars2("weight") <= 4000) val test = cars2.filter(cars2("weight") > 4000) // test.show // 4、设置线性回归的一些参数 val linearReg = new LinearRegression // Set the maximum number of iterations(迭代) linearReg.setMaxIter(100) // Set the regularization(正则化) parameter linearReg.setRegParam(0.3) // Set the ElasticNet mixing parameter // L2 (ridge regression) // - L1 (Lasso) // L2 + L1 (elastic net) // 默认是0 L2(ridge regression), 0 L2, 1 L1(Lasso) 大于0小于1是L2 + L1 linearReg.setElasticNetParam(0.8) linearReg.setLabelCol("mpg") // 这个就是被预测的值得label // println("train count: " + train.count()) // 5、对数据进行训练 val mdlLR = linearReg.fit(train) println("totalIterations: " + mdlLR.summary.totalIterations) // 6、根据训练模型预测数据(prediction) val predictions = mdlLR.transform(test) predictions.show val evaluator = new RegressionEvaluator evaluator.setLabelCol("mpg") val rmse = evaluator.evaluate(predictions) // rmse root mean squared error println("root mean squared error = " + "%6.3f".format(rmse)) evaluator.setMetricName("mse") val mse = evaluator.evaluate(predictions) // mean squared error println("mean squared error = " + "%6.3f".format(mse)) }
分类
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.set("spark.master", "spark://m2:7077") // conf.set("spark.master", "local[8]") // 创建SparkSession对象 val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate(); // 创建sparkContext对象 // val sc = spark.sparkContext // inferSchema为true可以自动推测数据的类型,默认false,则所有的数据都是String类型的 // 1、加载数据 val passagers = spark.read.option("header", "true").option("inferSchema", "true") .csv("hdfs://m2:9820/titanic3_02.csv") // passagers.show() // passagers.printSchema() // 2、提取特征 val passagers1 = passagers.select(passagers("Pclass"), passagers("Survived").cast(DoubleType).as("Survived"), passagers("Gender"), passagers("Age"), passagers("SibSp"), passagers("Parch") , passagers("Fare")) // VectorAssembler 不支持字符串类型,转换Gender为数字类型 val indexer = new StringIndexer indexer.setInputCol("Gender") indexer.setOutputCol("GenderCat") val passagers2 = indexer.fit(passagers1).transform(passagers1) // passagers2.show // 删除包含null或者NAN的行 val passagers3 = passagers2.na.drop() println("total count:" + passagers2.count() + " droped count is: " + (passagers2.count() - passagers3.count())) val vectorAssembler = new VectorAssembler vectorAssembler.setInputCols(Array("Pclass", "GenderCat", "Age", "SibSp", "Parch", "Fare")) vectorAssembler.setOutputCol("features") val passagers4 = vectorAssembler.transform(passagers3) // passagers4.show() // 3、数据分类,分为训练数据和测试数据 val Array(train, test) = passagers4.randomSplit(Array(0.9, 0.1)) // train.show() val algtree = new DecisionTreeClassifier algtree.setLabelCol("Survived") algtree.setImpurity("gini") algtree.setMaxBins(32) // Maximum depth of the tree algtree.setMaxDepth(5) // 模型 val mdlTree = algtree.fit(train) // println(mdlTree.toDebugString) // println(mdlTree.toString) // println(mdlTree.featureImportances) //4、 利用模型评估 val predictions = mdlTree.transform(test) predictions.show // 5、模型评估 val evaluator = new MulticlassClassificationEvaluator evaluator.setLabelCol("Survived") // metric(度量标准) name in evaluation // (supports `"f1"` (default), `"weightedPrecision"`,`"weightedRecall"`, `"accuracy"`) evaluator.setMetricName("accuracy") val accuracy = evaluator.evaluate(predictions) println("the accuracy is %.2f%%".format(accuracy)) }
聚类
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.set("spark.master", "local[4]") // conf.set("spark.master", "local[8]") // 创建SparkSession对象 val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate(); // 创建sparkContext对象 // val sc = spark.sparkContext // inferSchema为true可以自动推测数据的类型,默认false,则所有的数据都是String类型的 // 1、加载数据 val points = spark.read.option("header", "true").option("inferSchema", "true") .csv("hdfs://m2:9820/cluster-points-v2.csv") // points.show() // points.printSchema() // 2、数据转换 val vectorAssembler = new VectorAssembler vectorAssembler.setInputCols(Array("X", "Y")) vectorAssembler.setOutputCol("features") val points1 = vectorAssembler.transform(points) // points1.show() // points1.printSchema() // 3、聚类是一个非监督学习算法,不需要把数据分为train和test,这里是用k-means算法 // key值(2)代表有多少个cluster val algKmeans = new KMeans().setK(2) // 模型 val mdlKmeans = algKmeans.fit(points1) // 4、利用模型预测 val predictions = mdlKmeans.transform(points1) // predictions.show // 5、评估 wsse 每个cluster中点到cluster中心的距离之和,越小越好 val wsse = mdlKmeans.computeCost(points1) println(wsse) }
推荐
def parseRating(row: Row): Rating = { val aList = row.getList[String](0) Rating(aList.get(0).toInt, aList.get(1).toInt, aList.get(2).toDouble) //.getInt(0), row.getInt(1), row.getDouble(2)) } def rowSqDiff(row:Row) : Double = { math.pow( (row.getDouble(2) - row.getFloat(3).toDouble),2) } def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.set("spark.master", "local[4]") // conf.set("spark.master", "local[8]") // 创建SparkSession对象 val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate(); // 创建sparkContext对象 // val sc = spark.sparkContext // inferSchema为true可以自动推测数据的类型,默认false,则所有的数据都是String类型的 val startTime = System.nanoTime() // 1、加载数据 val movies = spark.read.text("hdfs://m3:9820/movies.dat") // movies.show() // movies.printSchema() val ratings = spark.read.text("hdfs://m3:9820/ratings.dat") // ratings.show() // ratings.printSchema() val users = spark.read.text("hdfs://m3:9820/users.dat") // users.show() // users.printSchema() val ratings1 = ratings.select(split(ratings("value"), "::")).as("values") // ratings1.show // 2、数据转换 Rating val rating2 = ratings1.rdd.map(parseRating(_)) val rating3 = spark.createDataFrame(rating2) // rating3.show // 3、数据分为train和test val Array(train, test) = rating3.randomSplit(Array(0.8, 0.2)) // 4、构建模型,训练数据 val algAls = new ALS algAls.setItemCol("product") algAls.setRank(12) algAls.setRegParam(0.1) // 正则化参数 algAls.setMaxIter(20) // 模型 val mdlReco = algAls.fit(train) // mdlReco. // 5、预测数据 val predictions = mdlReco.transform(test) predictions.show predictions.printSchema() // 6、算法评估 // 过滤一些NAN数据 val nanState = predictions.na.fill(99999.0) println(nanState.filter(nanState("prediction") > 99998).count()) nanState.filter(nanState("prediction") > 99998).show(5) // val pred = predictions.na.drop() println("Orig = "+predictions.count()+" Final = "+ pred.count() + " Dropped = "+ (predictions.count() - pred.count())) // Calculate RMSE & MSE val evaluator = new RegressionEvaluator() evaluator.setLabelCol("rating") var rmse = evaluator.evaluate(pred) println("Root Mean Squared Error = "+"%.3f".format(rmse)) // evaluator.setMetricName("mse") var mse = evaluator.evaluate(pred) println("Mean Squared Error = "+"%.3f".format(mse)) mse = pred.rdd.map(r => rowSqDiff(r)).reduce(_+_) / predictions.count().toDouble println("Mean Squared Error (Calculated) = "+"%.3f".format(mse)) // // val elapsedTime = (System.nanoTime() - startTime) / 1e9 println("Elapsed time: %.2f seconds".format(elapsedTime)) // MatrixFactorizationModel }