zoukankan      html  css  js  c++  java
  • DataFrame

    一、DataFrame创建

    1、通过case class方式创建

    people.txt

    //定义case class,相当于表结构
    case class People(var name:String,var age:Int)
    object TestDataFrame1 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
        val sc = new SparkContext(conf)
        val context = new SQLContext(sc)
        // 将本地的数据读入 RDD, 并将 RDD 与 case class 关联
        val peopleRDD = sc.textFile("E:\666\people.txt")
          .map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt))
        import context.implicits._
        // 将RDD 转换成 DataFrames
        val df = peopleRDD.toDF
        //将DataFrames创建成一个临时的视图
        df.createOrReplaceTempView("people")
        //使用SQL语句进行查询
        context.sql("select * from people").show()
      }
    }

    2、通过structType方式创建

    object TestDataFrame2 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val fileRDD = sc.textFile("E:\666\people.txt")
        // 将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row
        val rowRDD: RDD[Row] = fileRDD.map(line => {
          val fields = line.split(",")
          Row(fields(0), fields(1).trim.toInt)
        })
        // 创建 StructType 来定义结构
        val structType: StructType = StructType(
          //字段名,字段类型,是否可以为空
          StructField("name", StringType, true) ::
          StructField("age", IntegerType, true) :: Nil
        )
        /**
          * rows: java.util.List[Row],
          * schema: StructType
          * */
        val df: DataFrame = sqlContext.createDataFrame(rowRDD,structType)
        df.createOrReplaceTempView("people")
        sqlContext.sql("select * from people").show()
      }
    }

    3、通过json文件创建 DataFrames

    object TestDataFrame3 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val df: DataFrame = sqlContext.read.json("E:\666\people.")
        df.createOrReplaceTempView("people")
        sqlContext.sql("select * from people").show()
      }
    }

    二、DataFrame数据读取

    object TestRead {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        //方式一
        val df1 = sqlContext.read.json("E:\666\people.json")
        val df2 = sqlContext.read.parquet("E:\666\users.parquet")
        //方式二
        val df3 = sqlContext.read.format("json").load("E:\666\people.json")
        val df4 = sqlContext.read.format("parquet").load("E:\666\users.parquet")
        //方式三,默认是parquet格式
        val df5 = sqlContext.load("E:\666\users.parquet")
      }
    }

    三、数据保存

    object TestSave {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val df1 = sqlContext.read.json("E:\666\people.json")
        //方式一
        df1.write.json("E:\111")
        df1.write.parquet("E:\222")
        //方式二
        df1.write.format("json").save("E:\333")
        df1.write.format("parquet").save("E:\444")
        //方式三
        df1.write.save("E:\555")
    
      }
    }

    数据保存模式:

    df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\444")

     四、DataFrame数据源

    1、json

    见上

    2、parquet

    见上

    3、mysql

    object TestMysql {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("TestMysql").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
    
        val url = "jdbc:mysql://192.168.123.102:3306/hivedb"
        val table = "dbs"
        val properties = new Properties()
        properties.setProperty("user","root")
        properties.setProperty("password","root")
        //需要传入Mysql的URL、表明、properties(连接数据库的用户名密码)
        val df = sqlContext.read.jdbc(url,table,properties)
        df.createOrReplaceTempView("dbs")
        sqlContext.sql("select * from dbs").show()
      }
    }

    4、hive

    在pom.xml文件中添加依赖

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>2.3.0</version>
            </dependency>

    开发环境则把resource文件夹下添加hive-site.xml文件,集群环境把hive的配置文件要发到$SPARK_HOME/conf目录下

    <configuration>
            <property>
                    <name>javax.jdo.option.ConnectionURL</name>
                    <value>jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true</value>
                    <description>JDBC connect string for a JDBC metastore</description>
                    <!-- 如果 mysql 和 hive 在同一个服务器节点,那么请更改 hadoop02 为 localhost -->
            </property>
            <property>
                    <name>javax.jdo.option.ConnectionDriverName</name>
                    <value>com.mysql.jdbc.Driver</value>
                    <description>Driver class name for a JDBC metastore</description>
            </property>
            <property>
                    <name>javax.jdo.option.ConnectionUserName</name>
                    <value>root</value>
                    <description>username to use against metastore database</description>
            </property>
            <property>
                    <name>javax.jdo.option.ConnectionPassword</name>
                    <value>root</value>
            <description>password to use against metastore database</description>
            </property>
        <property>
                    <name>hive.metastore.warehouse.dir</name>
                    <value>/hive/warehouse</value>
                    <description>hive default warehouse, if nessecory, change it</description>
            </property>  
    </configuration>

    代码:

    object TestHive {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
        val sc = new SparkContext(conf)
        val sqlContext = new HiveContext(sc)
        sqlContext.sql("select * from myhive.student").show()
      }
    }

    总结:

      DataFrame创建方式应该按照数据源进行划分,数据源是:普通的txt文件、json/parquet文件、mysql数据库、hive数据仓库

    1、普通txt文件:

    (1)case class 创建

    (2)structType 创建

    2、json/parquet文件:

      直接读取

    3、mysql数据库:创建连接读取

    4、hive数据仓库:创建连接读取

    五、DataFrame的常见操作:

    参考博客:https://blog.csdn.net/dabokele/article/details/52802150 

    转载博客:https://www.cnblogs.com/frankdeng/p/9301743.html

  • 相关阅读:
    Write an algorithm such that if an element in an MxN matrix is 0, its entire row and column is set to 0.
    旋转二维数组
    replace empty char with new string,unsafe method和native implementation的性能比较
    判断一字符串是否可以另一字符串重新排列而成
    移除重复字符的几个算法简单比较
    也来纠结一下字符串翻转
    判断重复字符存在:更有意义一点
    程序员常去网站汇总
    sublime
    针对程序集 'SqlServerTime' 的 ALTER ASSEMBLY 失败,因为程序集 'SqlServerTime' 未获授权(PERMISSION_SET = EXTERNAL_ACCESS)
  • 原文地址:https://www.cnblogs.com/guoyu1/p/12092353.html
Copyright © 2011-2022 走看看