zoukankan      html  css  js  c++  java
  • spark sql的简单操作

    测试数据
    sparkStu.text
    zhangxs 24 chenxy
    wangYr 21 teacher
    wangx 26 teacher
    sparksql
    {
    "name":"zhangxs","age":24,"job":"chengxy",
    "name":"li","age":21,"job":"teacher",
    "name":"tao","age":14,"job":"student"
    }
     
    object CreateDataFream {
    //创建student对象
    case class Student(name:String,age:BigInt,job:String);
    
    def main(args: Array[String]){
    //初始化sparkSession 这个sparkSession要用val关键字修饰
    val spark = SparkSession
    .builder()
    .appName("Spark SQL Example")
    .master("spark://服务器ip:7077")
    .getOrCreate();
    // runDataSetCreate(spark);
    // runSarkOnFile(spark);
    // applySchema(spark);
    //loadParquet(spark);
    //jsonFile(spark);
    //销毁sparkSession
    spark.stop();
    }
    
    }
     
    //对指定的列进行查询
    private def test1(spark :SparkSession){
    //因为要使用变量,$符号,所以导入这个包
    import spark.implicits._
    //从hdfs上读取json数据文件并创建dataFream
    var dataFreamS= spark.read.json("hdfs://服务器ip:8020/tmp/dataTest/sparksql");
    //显示dataFream所有数据
    dataFreamS.show();
    //打印dataFrame结构
    dataFreamS.printSchema();
    //显示指定列的数据
    dataFreamS.select("name").show()
    //查询指定的列,并修改数据
    dataFreamS.select($"name", $"age"+1).show();
    //查询年龄大于10的人
    dataFreamS.select($"age" > 10).show();
    //查看每个年龄段的人数
    dataFreamS.groupBy("age").count();
    //创建临时视图,如果这个视图已经存在就覆盖掉
    dataFreamS.createOrReplaceTempView("zhangxsView");
    }
     
    //创建dataFrame并运行 
    private def runDataSetCreate(spark:SparkSession){
    import spark.implicits._
    //创建DataSets对象 类型是Student
    val dataStu = Seq(Student("Andy", 32,"baiLing")).toDS();
    //显示数据集信息
    dataStu.show();
    //创建数据的dataSet
    var dataArr=Seq(1,2,3).toDS();
    //显示数据集的信息
    dataArr.show();
    //对属性进行简单操作
    print(dataArr.map (_ +1 ).collect());
    //dataFrame能够被转换成自定义对象类型的dataSet,
    val dfStu=spark.read.json("hdfs://服务器ip:8020/tmp/dataTest/sparksql").as[Student];
    dfStu.show();
    //jsonFile支持嵌套表,读入并注册成表
    spark.read.json("hdfs://服务器ip:8020/tmp/dataTest/sparksql").registerTempTable("student");
    //根据sql查询注册的table
    val temsql=spark.sqlContext.sql("select name from student");
    //显示name的value
    print(temsql.show())
    }
    //从hdfs上读取数据文件并转为student对象进行操作
    private def runSarkOnFile(spark:SparkSession){
    import spark.implicits._
    //读取数据文件 并生成rdd
    var rdd=spark.read.textFile("hdfs://服务器ip:8020/tmp/dataTest/sparkStu.txt");
    //对获取的rdd进行解析,并生成sutdent对象
    var sturdd=rdd.map { x => x.split(" ")}.map { z => Student(z(0).toString(),z(1).toInt,z(2).toString())};
    //显示student对象
    sturdd.show();
    //将sutdent对象注册成临时表 student
    sturdd.registerTempTable("student");
    //查询临时表中的数据,并显示
    var sqlDF=spark.sql("select t.name,t.age,t.job from friend t where t.age>14 and t.age<26");
    sqlDF.show();
    }
    private def applySchema(spark:SparkSession){
    import spark.implicits._
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    //确定schema名称(列的名称)
    var schemaString="name,age,job";
    //解析schemaString,并生成StructType对象数组
    var schemaType=StructType(schemaString.split(",").map { x => StructField(x,StringType,true)})
    //从hdfs上读取数据文件
    var stuDS=spark.sparkContext.textFile(path);
    //使用Row对象,创建rowRdd
    var sDS=stuDS.map { x => x.split(" ")}.map(s => Row(s(0),s(1),s(2)))
    //创建schemaRDD
    var rowDF=spark.createDataFrame(sDS, schemaType); // var rowDF=spark.sqlContext.applySchema(sDS, schemaType); 这种方法已经过时
     //打印schemaRDD的结构
    rowDF.printSchema();
    //注册Student table
    rowDF.createOrReplaceTempView("Student"); // rowDF.registerTempTable("Student"); 这种方法已经过时
    //rowDF.collect().foreach {print(_) }
    //var resDS=spark.sql("select * from Student where age > 24");
    var resDS=spark.sql("select name from Student");
    resDS.show();
    }
     
    //使用parquet文件的方式
    private def loadParquet(spark:SparkSession){
    import spark.implicits._
    //确定schema 列名称
    var schemaString="name,age,job";
    //解析schemaString,并生成StructType对象数组
    var schemaType=StructType(schemaString.split(",").map { x => StructField(x,StringType,true)})
    //创建rowRdd
    var stuDS=spark.sparkContext.textFile(path);
    var sDS=stuDS.map { x => x.split(" ")}.map(s => Row(s(0),s(1),s(2)))
    //将schemaRDD保存成parquet文件
    var rowDF=spark.sqlContext.applySchema(sDS, schemaType);
    //将文件写到hdfs://服务器ip:8020/tmp/dataTest/
    rowDF.write.parquet("hdfs://服务器ip:8020/tmp/dataTest/student.parquet");
    -------------------------------------------------------------------
    //读取parquet文件
    var redParfile=spark.read.parquet("hdfs://服务器ip:8020/tmp/dataTest/student.parquet");
    redParfile.createOrReplaceTempView("redParfilered");
    var resultRdd=spark.sql("select * from redParfilered t where t.name='zhangxs'");
    //DataFrame.rdd 可以将dataFrame转为RDD类型
    resultRdd.rdd.map { x => "name"+x(0) }.collect().foreach { print(_) }
    }
     
    /**
    * spark可以自动的识别一个json模式并加载成数据集,
    * 这种转换可以使用SparkSession.read.json() 函数
    * 这个数据集的来源可以是一个rdd,也可以是一个json文件
    *
    */
    private def jsonFile(spark:SparkSession){
    var jsonRdd=spark.read.json("hdfs://192.168.177.124:8020/tmp/dataTest/sparksql");
    jsonRdd.createOrReplaceTempView("student");
    var jfRdd= spark.sql("select * from student t where t.age >24");
    jfRdd.show();

     

    /**
    * 使用Json类型的rdd加载json
    *
    * 如果加:: Nil,返回是一个char类型的rdd,加上则返回的是String类型的rdd
    */
    var rdd=spark.sparkContext.makeRDD("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil);
    var rddre=spark.read.json(rdd);
    rddre.show();
    }
  • 相关阅读:
    JS生成Guid
    MVC——分页
    MVC入门——删除页
    MVC入门——编辑页
    MVC入门——详细页
    MVC入门——列表页
    MVC入门——增
    pandas使用
    简单线性回归预测实现
    flask 自定义url转换器
  • 原文地址:https://www.cnblogs.com/zhangXingSheng/p/6512599.html
Copyright © 2011-2022 走看看