zoukankan      html  css  js  c++  java
  • DataFrame

    一、DataFrame创建

    1、通过case class方式创建

    people.txt

    //定义case class,相当于表结构
    case class People(var name:String,var age:Int)
    object TestDataFrame1 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
        val sc = new SparkContext(conf)
        val context = new SQLContext(sc)
        // 将本地的数据读入 RDD, 并将 RDD 与 case class 关联
        val peopleRDD = sc.textFile("E:\666\people.txt")
          .map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt))
        import context.implicits._
        // 将RDD 转换成 DataFrames
        val df = peopleRDD.toDF
        //将DataFrames创建成一个临时的视图
        df.createOrReplaceTempView("people")
        //使用SQL语句进行查询
        context.sql("select * from people").show()
      }
    }

    2、通过structType方式创建

    object TestDataFrame2 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val fileRDD = sc.textFile("E:\666\people.txt")
        // 将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row
        val rowRDD: RDD[Row] = fileRDD.map(line => {
          val fields = line.split(",")
          Row(fields(0), fields(1).trim.toInt)
        })
        // 创建 StructType 来定义结构
        val structType: StructType = StructType(
          //字段名,字段类型,是否可以为空
          StructField("name", StringType, true) ::
          StructField("age", IntegerType, true) :: Nil
        )
        /**
          * rows: java.util.List[Row],
          * schema: StructType
          * */
        val df: DataFrame = sqlContext.createDataFrame(rowRDD,structType)
        df.createOrReplaceTempView("people")
        sqlContext.sql("select * from people").show()
      }
    }

    3、通过json文件创建 DataFrames

    object TestDataFrame3 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val df: DataFrame = sqlContext.read.json("E:\666\people.")
        df.createOrReplaceTempView("people")
        sqlContext.sql("select * from people").show()
      }
    }

    二、DataFrame数据读取

    object TestRead {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        //方式一
        val df1 = sqlContext.read.json("E:\666\people.json")
        val df2 = sqlContext.read.parquet("E:\666\users.parquet")
        //方式二
        val df3 = sqlContext.read.format("json").load("E:\666\people.json")
        val df4 = sqlContext.read.format("parquet").load("E:\666\users.parquet")
        //方式三,默认是parquet格式
        val df5 = sqlContext.load("E:\666\users.parquet")
      }
    }

    三、数据保存

    object TestSave {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val df1 = sqlContext.read.json("E:\666\people.json")
        //方式一
        df1.write.json("E:\111")
        df1.write.parquet("E:\222")
        //方式二
        df1.write.format("json").save("E:\333")
        df1.write.format("parquet").save("E:\444")
        //方式三
        df1.write.save("E:\555")
    
      }
    }

    数据保存模式:

    df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\444")

     四、DataFrame数据源

    1、json

    见上

    2、parquet

    见上

    3、mysql

    object TestMysql {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("TestMysql").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
    
        val url = "jdbc:mysql://192.168.123.102:3306/hivedb"
        val table = "dbs"
        val properties = new Properties()
        properties.setProperty("user","root")
        properties.setProperty("password","root")
        //需要传入Mysql的URL、表明、properties(连接数据库的用户名密码)
        val df = sqlContext.read.jdbc(url,table,properties)
        df.createOrReplaceTempView("dbs")
        sqlContext.sql("select * from dbs").show()
      }
    }

    4、hive

    在pom.xml文件中添加依赖

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>2.3.0</version>
            </dependency>

    开发环境则把resource文件夹下添加hive-site.xml文件,集群环境把hive的配置文件要发到$SPARK_HOME/conf目录下

    <configuration>
            <property>
                    <name>javax.jdo.option.ConnectionURL</name>
                    <value>jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true</value>
                    <description>JDBC connect string for a JDBC metastore</description>
                    <!-- 如果 mysql 和 hive 在同一个服务器节点,那么请更改 hadoop02 为 localhost -->
            </property>
            <property>
                    <name>javax.jdo.option.ConnectionDriverName</name>
                    <value>com.mysql.jdbc.Driver</value>
                    <description>Driver class name for a JDBC metastore</description>
            </property>
            <property>
                    <name>javax.jdo.option.ConnectionUserName</name>
                    <value>root</value>
                    <description>username to use against metastore database</description>
            </property>
            <property>
                    <name>javax.jdo.option.ConnectionPassword</name>
                    <value>root</value>
            <description>password to use against metastore database</description>
            </property>
        <property>
                    <name>hive.metastore.warehouse.dir</name>
                    <value>/hive/warehouse</value>
                    <description>hive default warehouse, if nessecory, change it</description>
            </property>  
    </configuration>

    代码:

    object TestHive {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
        val sc = new SparkContext(conf)
        val sqlContext = new HiveContext(sc)
        sqlContext.sql("select * from myhive.student").show()
      }
    }

    总结:

      DataFrame创建方式应该按照数据源进行划分,数据源是:普通的txt文件、json/parquet文件、mysql数据库、hive数据仓库

    1、普通txt文件:

    (1)case class 创建

    (2)structType 创建

    2、json/parquet文件:

      直接读取

    3、mysql数据库:创建连接读取

    4、hive数据仓库:创建连接读取

    五、DataFrame的常见操作:

    参考博客:https://blog.csdn.net/dabokele/article/details/52802150 

    转载博客:https://www.cnblogs.com/frankdeng/p/9301743.html

  • 相关阅读:
    Java操作redis
    Ajax & Json
    【转载】K8s-Pod时区与宿主时区时区同步
    【转载】Python中如何将字符串作为变量名
    【转载】python实现dubbo接口的调用
    机器学习避坑指南:训练集/测试集分布一致性检查
    机器学习深度研究:特征选择中几个重要的统计学概念
    机器学习数学基础:学习线性代数,千万不要误入歧途!推荐一个正确学习路线
    被 Pandas read_csv 坑了
    print('Hello World!')的新玩法
  • 原文地址:https://www.cnblogs.com/guoyu1/p/12092353.html
Copyright © 2011-2022 走看看