zoukankan      html  css  js  c++  java
  • Spark SQL读写方法

    一、DataFrame:有列名的RDD

    首先,我们知道SparkSQL的目的是用sql语句去操作RDD,和Hive类似。SparkSQL的核心结构是DataFrame,如果我们知道RDD里面的字段,也知道里面的数据类型,就好比关系型数据库里面的一张表。那么我们就可以写SQL,所以其实这儿我们是不能用面向对象的思维去编程的。我们最好的方式就是把抽象成为一张表,然后去用SQL语句去操作它。

    DataFrame的存储方式:它采用的存储是类似于数据库的表的形式进行存储的。一个数据表有几部分组成:1、数据,这个数据是一行一行进行存储的,一条记录就是一行,2、数据表的数据字典,包括表的名称,表的字段和字段的类型等元数据信息。那么DataFrame也是按照行进行存储的,这个类是Row,一行一行的进行数据存储。一般情况下处理粒度是行粒度的,不需要对其行内数据进行操作。

    二、SparkSQL的程序入口:

    在Spark2.0之前,是有sqlContext和hiveContext的概念的,因为这两个概念难以区分,Spark2.0之后统一称为SparkSession,除此之外SparkSession还封装了SparkConf和SparkContext。

    值得注意的一点是:Hive有很多依赖包,所以这些依赖包没有包含在默认的Spark包里面。如果Hive依赖的包能在classpath找到,Spark将会自动加载它们。这些Hive依赖包必须复制到所有的工作节点上,因为它们为了能够访问存储在Hive的数据,会调用Hive的序列化和反序列化(SerDes)包。Hive的配置文件hive-site.xml、core-site.xml(security配置)和hdfs-site.xml(HDFS配置)是保存在conf目录下面。 
    当使用Hive时,必须初始化一个支持Hive的SparkSession,用户即使没有部署一个Hive的环境仍然可以使用Hive。当没有配置hive-site.xml时,Spark会自动在当前应用目录创建metastore_db和创建由spark.sql.warehouse.dir配置的目录,如果没有配置,默认是当前应用目录下的spark-warehouse目录。 

    注意:从Spark 2.0.0版本开始,hive-site.xml里面的hive.metastore.warehouse.dir属性已经被spark.sql.warehouse.dir替代,用于指定warehouse的默认数据路径(必须有写权限)。

    于是SparkSQL在与Hive有交互的情况下,需要指定支持Hive:

    val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}")
    val spark = SparkSession.builder().config(conf).config("spark.sql.warehouse.dir",
    "hdfs://hadoop1:9000/user/hive/warehouse").enableHiveSupport().getOrCreate()

    回到正题,程序入口:

    1.6版本:

    val conf=new SparkConf()
    conf.setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
    val sc=new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    2.0版本:

    SparkSQL的程序入口缩减为一句

     val sparkSession=SparkSession.builder().appName(s"${this.getClass.getSimpleName}").master("local").getOrCreate()

    两个版本一个获得sqlContext(或者hiveContext),一个获得sparkSession。

    三、算了,还是放在一起写吧。。

    case  class Person(var name:String,var age:Int)
    object Test {
      def main(args: Array[String]): Unit = {
        //1.6版本入口
        val conf=new SparkConf()
        conf.setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
        val sc=new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        //第一种创建DataFrame的方式:直接读取列式存储的格式,可以直接形成DataFrame(后续怎么操作呢?)
        val df: DataFrame = sqlContext.read.json("")
        //第二种创建DataFrame的方式:因为rdd没有toDF()方法,需要进行隐式转化,通过map后形成一个数组
        import sqlContext.implicits._
        val df: DataFrame = sc.textFile("C:\Users\wangyongxiang\Desktop\plan\person.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
        //第二种方法的另一种形态,用sqlContext或者sparkSession的createDataFrame(),其实和toDF()方法是雷同的
        val rdd: RDD[Person] = sc.textFile("C:\Users\wangyongxiang\Desktop\plan\person.txt")
          .map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
        val df: DataFrame = sqlContext.createDataFrame(rdd)
        //第三种创建DataFrame:生成一个RowRDD,然后给出构造的描述
        val rdd=sc.textFile("C:\Users\wangyongxiang\Desktop\plan\person.txt")
        val rowRDD: RDD[Row] = rdd.map(_.split(",")).map(p=>Row(p(0),p(1).trim.toInt))
        val schame=StructType(
          StructField("name",StringType,true)::
          StructField("age",IntegerType,true)::Nil
        )
        val df: DataFrame = sqlContext.createDataFrame(rowRDD,schame)
     
        //后续代码,可以创建临时视图作为查询,与mysql互操作要创建临时视图才能做查询
        //用hiveContext则直接在hive中创建表,然后将数据load到hive表中,可以直接进行条件查询,无需创建临时视图,后面与hive集成会有说明
        df.registerTempTable("person")
        sqlContext.sql("select * from person where age>21").show()
        //将处理后的数据用jdbc保存到mysql数据库中成为一张表,注意这里要使用user而不能使用username,因为系统也有一个username,会覆盖你的用户名
        val properties=new Properties()
        properties.put("user","root")
        properties.put("password","root")
        df.write.mode(SaveMode.Overwrite)jdbc("jdbc:mysql://localhost:3306/test","test",properties)
      }
    }

    四、load和save操作。

    object saveAndLoadTest {
      def main(args: Array[String]): Unit = {
        val conf =new SparkConf().setAppName("").setMaster("local")
        val sc=new SparkContext(conf)
        val sqlContext=new SQLContext(sc)
     
        //read,load:读取
        sqlContext.read.json("")
    //  sqlContext.read.jdbc("url","table",properties)
        sqlContext.read.load("parquet路径")
        sqlContext.read.format("json").load("路径")
        val df: DataFrame = sqlContext.read.format("parquet").load("路径")
     
        //write,save保存
        df.write.parquet("路径.parquet")
        df.write.json("路径.json")
    //  df.write.jdbc("url","table",properties)
        df.write.format("parquet").save("路径.parquet")
        df.write.format(("json")).save("路径.json")
        //保存模式可选择覆盖,追加等
        df.write.mode(SaveMode.Overwrite).save("")
      }
    }

    个人理解是read和load都是读取的作用,write和save都是保存的作用,通过上述的代码,我们可以完成文件格式转换的工作,将效率低的一些格式转化成parquet这种sparksql原生支持的文件类型

  • 相关阅读:
    最小生成树 kruskal算法&prim算法
    Floyd算法解决多源最短路问题
    dijkstra算法解决单源最短路问题
    改进后的作业排序
    第一篇 基本结构
    循环轮转算法
    在线工具生成接入信息mqtt.fx快速接入阿里云
    NodeMCU使用ArduinoJson判断指定键值对存在与否
    NodeMCU获取并解析心知天气信息
    快速导出jekyll博客文件进行上传部署
  • 原文地址:https://www.cnblogs.com/itboys/p/9603675.html
Copyright © 2011-2022 走看看