zoukankan      html  css  js  c++  java
  • 37、数据源之通用的load和save操作

    一、通用的load和save操作

    1、概述

    对于Spark SQL的DataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的load和save操作。
    load操作主要用于加载数据,创建出DataFrame;save操作,主要用于将DataFrame中的数据保存到文件中。
    
    
    Java版本
    DataFrame df = sqlContext.read().load("users.parquet");
    df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
    
    Scala版本
    val df = sqlContext.read.load("users.parquet")
    df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")


    2、java实现

    package cn.spark.study.sql;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;
    
    /**
     * 通用的load和save操作
     * @author Administrator
     *
     */
    
    public class GenericLoadSave {
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf() 
                    .setAppName("GenericLoadSave");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
        
            DataFrame usersDF = sqlContext.read().load(
                    "hdfs://spark1:9000/users.parquet");
            usersDF.select("name", "favorite_color").write()
                    .save("hdfs://spark1:9000/namesAndFavColors.parquet");   
        }
        
    }


    3、scala实现

    package cn.spark.study.sql
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.DataFrame
    
    
    /**
     * @author Administrator
     */
    
    object GenericLoadSave {
      
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName("GenericLoadSave")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
      
        val usersDF = sqlContext.read.load("hdfs://spark1:9000/users.parquet")
        usersDF.write.save("hdfs://spark1:9000/namesAndFavColors_scala")  
      }
      
    }


    二、手动指定数据源类型

    1、java实现

    package cn.spark.study.sql;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;
    
    /**
     * 手动指定数据源类型
     * @author Administrator
     *
     */
     
    public class ManuallySpecifyOptions {
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf()   
                    .setAppName("ManuallySpecifyOptions");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
            
            DataFrame peopleDF = sqlContext.read().format("json")
                    .load("hdfs://spark1:9000/people.json");
            peopleDF.select("name").write().format("parquet")  
                    .save("hdfs://spark1:9000/peopleName_java");     
        }
        
    }


    2、scala实现

    package cn.spark.study.sql
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SQLContext
    
    /**
     * @author Administrator
     */
    object ManuallySpecifyOptions {
      
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName("ManuallySpecifyOptions")  
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
      
        val peopleDF = sqlContext.read.format("json").load("hdfs://spark1:9000/people.json")
        peopleDF.select("name").write.format("parquet").save("hdfs://spark1:9000/peopleName_scala")   
      }
      
    }


    三、Save Mode

    1、概述

    Spark SQL对于save操作,提供了不同的save mode。主要用来处理,当目标位置,已经有数据时,应该如何处理。而且save操作并不会执行锁操作,并且不是原子的,
    因此是有一定风险出现脏数据的。


          save mode

                               意义

    SaveMode.ErrorIfExists (默认)

    如果目标位置已经存在数据,那么抛出一个异常

    SaveMode.Append

    如果目标位置已经存在数据,那么将数据追加进去

    SaveMode.Overwrite

    如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖

    SaveMode.Ignore

    如果目标位置已经存在数据,那么就忽略,不做任何操作。


    2、java实现

    package cn.spark.study.sql;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.SaveMode;
    
    /**
     * SaveModel示例
     * @author Administrator
     *
     */
    public class SaveModeTest {
    
        @SuppressWarnings("deprecation")
        public static void main(String[] args) {
            SparkConf conf = new SparkConf()   
                    .setAppName("SaveModeTest");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
            
            DataFrame peopleDF = sqlContext.read().format("json")
                    .load("hdfs://spark1:9000/people.json"); 
            peopleDF.save("hdfs://spark1:9000/people_savemode_test", "json", SaveMode.Append);
        }
        
    }
  • 相关阅读:
    本地http://localhost打不开怎么办
    C#中lock死锁实例教程
    结对-四则运算答题器-项目进度
    Forward团队-爬虫豆瓣top250项目-代码设计规范
    Forward团队-爬虫豆瓣top250项目-设计文档
    学习使用github
    Forward团队-爬虫豆瓣top250项目-团队编程项目开发环境搭建过程
    课后作业-阅读任务-阅读提问-1
    20170915-构建之法:现代软件工程-阅读笔记
    结对-四则运算答题器-设计文档
  • 原文地址:https://www.cnblogs.com/weiyiming007/p/11274767.html
Copyright © 2011-2022 走看看