zoukankan      html  css  js  c++  java
  • sparksql_dateframe操作

    1.dataframe简介:

     sparksql中的dataframe等效于关系型数据表。对表的查询等操作,都可以使用dataframe的API接口实现

    参考文档:http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.DataFrame

    2.dataframe创建:

     2.1 parquet文件

    val sparkConf = new SparkConf().setAppName("Test")
    val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    session.read.parquet(filepath)

     2.2 json文件

      val sparkConf = new SparkConf().setAppName("Test")
      val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
      session.read.json(filepath)

     2.3 RDD

     2.3.1 反射方式  -- 样例类

    package spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    
    object Execute extends App {
    
      case class Person(id: String, cust_num: String)
    
      val sparkConf = new SparkConf().setAppName("Read")
      val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
      val resultRDD = session.sql("select id,cust_num from sospdm.tmp_yinfei_test").rdd
    
      import session.implicits._
      
      val result: DataFrame = resultRDD.map(person => {
        Person(person.getAs[String]("id"), person.getAs[String]("cust_num"))
      }).toDF()
      result.show()
    }

     2.3.2 编程接口方式  动态创建元数据

    package spark
    
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    
    object Execute extends App {
    
      case class Person(id: String, cust_num: String)
    
      val sparkConf = new SparkConf().setAppName("Read")
      val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
      val resultRDD = session.sql("select id,cust_num from sospdm.tmp_yinfei_test").rdd
    
     
      val result = resultRDD.map(row => {
        Row(row.getAs[String]("id"), row.getAs[String]("cust_num"))
      })
    
      //构建structType
      val structType = StructType(Array(StructField("id", StringType, true), StructField("cust_num", StringType, false)))
    
      //创建DF
      val resultDF = session.createDataFrame(result, structType)
      resultDF.show()
    }

     2.4 其他:mysql等

    3.dataframe操作:

    package spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    
    object Test extends App {
    
      val sparkConf = new SparkConf().setAppName("Test")
      val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    
      val resultDF1 = session.sql("select id1,cust_num1 from sospdm.tmp_yinfei_test_1")
      val resultDF2 = session.sql("select id2,cust_num2 from sospdm.tmp_yinfei_test_2")
      // inner, outer, left_outer, right_outer, leftsemi
      val result: DataFrame = resultDF1.join(resultDF2, resultDF1("id1") === resultDF2("id2"), "inner")
    
    }
  • 相关阅读:
    三范式最简单最易记的解释
    Mysql添加用户错误:ERROR 1364 (HY000): Field 'ssl_cipher' doesn't have a default value解决方法
    mysql体系结构管理
    mysql的简单操作
    flush privileges刷新MySQL的系统权限相关表
    二进制安装mysql
    扩展一台mysql-5.6.40
    mysql5.6.40部署过程
    三剑客-awk
    三剑客-sed
  • 原文地址:https://www.cnblogs.com/yin-fei/p/10899777.html
Copyright © 2011-2022 走看看