zoukankan      html  css  js  c++  java
  • spark-sql基础

    一:Spark SQL的特点
    1、支持多种数据源:Hive、RDD、Parquet、JSON、JDBC等。
    2、多种性能优化技术:in-memory columnar storage、byte-code generation、cost model动态评估等。
    3、组件扩展性:对于SQL的语法解析器、分析器以及优化器,用户都可以自己重新开发,并且动态扩展。

    二:Spark SQL的性能优化技术简介
    1、内存列存储(in-memory columnar storage)
      内存列存储意味着,Spark SQL的数据,不是使用Java对象的方式来进行存储,而是使用面向列的内存存储的方式来进行存储。也就是说,每一列,作为一个数据存储的单位。从而大大优化了内存使用的效率。采用了内存列存储之后,减少了对内存的消耗,也就避免了gc大量数据的性能开销。
    2、字节码生成技术(byte-code generation)
      Spark SQL在其catalyst模块的expressions中增加了codegen模块,对于SQL语句中的计算表达式,比如select num + num from t这种的sql,就可以使用动态字节码生成技术来优化其性能。
    3、Scala代码编写的优化
      对于Scala代码编写中,可能会造成较大性能开销的地方,自己重写,使用更加复杂的方式,来获取更好的性能。比如Option样例类、for循环、map/filter/foreach等高阶函数,以及不可变对象,都改成了用null、while循环等来实现,并且重用可变的对象。

    三:spark Sql的一般用法

    可参考官方文档:https://spark.apache.org/docs/latest/sql-programming-guide.html

    1- 创建DataFrame

    JavaSparkContext sc = ...; 
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json");
    df.show();
    

     2- DataFrame常用用法

    load:主要用于加载数据,创建出DataFrame;                       DataFrame df = sqlContext.read().load("users.parquet");
    save: 主要用于将DataFrame中的数据保存到文件中。
    df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
    SaveMode:ErrorIfExists--抛异常,Append--追加,Overwrite--重写,Ignore--忽略,不做操作
    format:手动指定数据类型:parquet也是一种数据类型
    DataFrame df = sqlContext.read().format("json").load("people.json");
    df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
    show:  显示数据
    collect: 获取所有数据到数组
    collectAsList:获取所有数据到List
    describe(cols: String*):获取指定字段的统计信息,比如count, mean, stddev, min, max等
    first, head, take, takeAsList:获取若干行记录
    where筛选条件
    filter:根据字段进行筛选
    select:获取指定字段值
    selectExpr:可以对指定字段进行特殊处理
    col/apply:获取指定字段
    drop:去除指定字段,保留其他字段
    limit:限制行数
    orderBy和sort:排序
    group by数据分组
    distinct数据去重
    dropDuplicates:根据指定字段去重
    agg方法实现聚合操作
    withColumn添加新的一列
    join链接
    union
    intersect方法可以计算出两个DataFrame中相同的记录,
    except获取一个DataFrame中有另一个DataFrame中没有的记录
    withColumnRenamed:重命名DataFrame中的指定字段名
    explode根据某个字段内容进行分割,然后生成多行,这时可以使用explode方法

     示例:

    DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json");
    df.show();   //打印所有数据
    df.printSchema();    //打印元数据
    df.select("name").show();  //查询某列的数据
    df.select(df.col("name"), df.col("age").plus(1)).show();  //查询某列的数据,并对其进行计算
    df.filter(df.col("age").gt(21)).show();    //对某列的值进行过滤
    df.groupBy("age").count().show();       //排序
    

    3- spark Sql所有的内置函数

    种类 函数
    聚合函数 approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct
    集合函数 array_contains, explode, size, sort_array
    日期/时间函数

    日期时间转换
    unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp
    从日期时间中提取字段
    year, month, dayofmonth, hour, minute, second

    日期/时间计算
    datediff, date_add, date_sub, add_months, last_day, next_day, months_between
    获取当前时间等
    current_date, current_timestamp, trunc, date_format

    数学函数 abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod, pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex
    混合函数 array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when
    字符串函数 ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper
    窗口函数 cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber

    4- Spark SQL支持两种方式来将RDD转换为DataFrame。
    第一种方式,是使用反射来推断包含了特定数据类型的RDD的元数据。这种基于反射的方式,代码比较简洁,当你已经知道你的RDD的元数据时,是一种非常不错的方式。

    JavaRDD<String> lines = sc.textFile("C:\Users\zhang\Desktop\students.txt");
    ​JavaRDD<Student> students = lines.map(new Function<String, Student>() {
      ​​private static final long serialVersionUID = 1L;
      ​​@Override
      public Student call(String line) throws Exception {
        ​​​String[] lineSplited = line.split(",");  
    ​​​    Student stu = new Student();
        ​​​stu.setId(Integer.valueOf(lineSplited[0].trim()));  
        ​​​stu.setName(lineSplited[1]);  
        ​​​stu.setAge(Integer.valueOf(lineSplited[2].trim()));
    ​​​    return stu;
    ​​  }
    ​});
    DataFrame studentDF = sqlContext.createDataFrame(students, Student.class); //Stundent类必须实现Serializable接口

    第二种方式,是通过编程接口来创建DataFrame,你可以在程序运行时动态构建一份元数据,然后将其应用到已经存在的RDD上。这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能通过这种动态构建元数据的方式。

    ​​import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    // 第一步,创建一个普通的RDD
    JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//students.txt");
    JavaRDD<Row> studentRDD = lines.map(new Function<String, Row>() {
      ​​​private static final long serialVersionUID = 1L;
       @Override
    ​​​  public Row call(String line) throws Exception {
    ​​​​    String[] lineSplited = line.split(",");
        return RowFactory.create(
          ​​​​​​Integer.valueOf(lineSplited[0]),
          ​​​​​​lineSplited[1],
          ​​​​​​Integer.valueOf(lineSplited[2]));      
       ​​​}
    ​​});
    ​​// 第二步,动态构造元数据
    List<StructField> structFields = new ArrayList<StructField>(); ​​structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType, true)); ​​structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); ​​structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); ​​StructType structType = DataTypes.createStructType(structFields); ​​// 第三步,使用动态构造的元数据,将RDD转换为DataFrame ​​DataFrame studentDF = sqlContext.createDataFrame(studentRDD, structType);

     可参考:https://www.jianshu.com/p/4df4aa54ad15

    四:数据源Parquet

    1--列式存储和行式存储相比有哪些优势呢?

    可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。

    压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。

    只读取需要的列,支持向量运算,能够获取更好的扫描性能

     

    2--为什么要使用parque:相对于txt文件而言,parquet查询性能的提升在某些情况下可能达到 30 倍或更高,存储的节省可高达 75%。

    用法:

            // 读取Parquet文件中的数据,创建一个DataFrame
            DataFrame userDf = sqlContext.read().parquet("D:\文档\users.parquet");
    

    3--自动分区推断:Spark SQL就会自动根据目录结构,推断出分区信息。此外,分区列的数据类型,也是自动被推断出来的。目前,Spark SQL仅支持自动推断出数字类型和字符串类型。有时,用户也许不希望Spark SQL自动推断分区列的数据类型。此时只要设置一个配置即可, spark.sql.sources.partitionColumnTypeInference.enabled,默认为true,即自动推断分区列的类型,设置为false,即不会自动推断类型。禁止自动推断分区列的类型时,所有分区列的类型,就统一默认都是String。

    可参考:spark从入门到精通--中华石杉,第78讲

     

    4-- 合并元数据:用户可以在一开始就定义一个简单的元数据,然后随着业务需要,逐渐往元数据中添加更多的列。在这种情况下,用户可能会创建多个Parquet文件,有着多个不同的但是却互相兼容的元数据。Parquet数据源支持自动推断出这种情况,并且进行多个Parquet文件的元数据的合并。

    默认情况下是不进行数据元数据的合并:

    1、读取Parquet文件时,将数据源的选项,mergeSchema,设置为true
    2、使用SQLContext.setConf()方法,将spark.sql.parquet.mergeSchema参数设置为true
    

    五:数据源

    1- json:注意的是,这里使用的JSON文件与传统意义上的JSON文件是不一样的。每行都必须,也只能包含一个,单独的,自包含的,有效的JSON对象。不能让一个JSON对象分散在多行。否则会报错。

    2- hive

    使用hive中数据:
    JavaSparkContext sparkContext = new JavaSparkContext(conf); // 创建HiveContext,注意,这里,它接收的是SparkContext作为参数,不是JavaSparkContext HiveContext hiveContext = new HiveContext(sparkContext.sc()); // 判断是否存在student_infos表,如果存在则删除 hiveContext.sql("drop table if exists student_infos");
    将DataFrame数据保存到hive表中:
    goodStudentInfoDF.saveAsTable("good_student_info");

      2- jdbc

    public class JDBCDataSource {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setAppName("JDBCDataSourceJava").setMaster("local");
            JavaSparkContext sparkContext = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sparkContext);
    
            // 分别将mysql中两张表的数据加载为DataFrame
            Map<String, String> options = new HashMap<String, String>();
            options.put("url", "jdbc:mysql://hadoop-100:3306/mytest");
            options.put("dbtable", "student_infos");
            options.put("user", "root");
            options.put("password", "zhaojun2436");
    
            DataFrame infoDF = sqlContext.read().options(options).format("jdbc").load();
    
            options.put("dbtable", "student_scores");
            DataFrame scoreDF = sqlContext.read().options(options).format("jdbc").load();
    
            // 将两个DataFrame转换为JavaPairRDD,执行join操作
            JavaPairRDD<String, Integer> infoRDD = infoDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(Row row) throws Exception {
                    return new Tuple2<>(row.getString(0), row.getInt(1));
                }
            });
    
            JavaPairRDD<String, Integer> scoreRDD = scoreDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(Row row) throws Exception {
                    return new Tuple2<>(row.getString(0), row.getInt(1));
                }
            });
    
            JavaPairRDD<String, Tuple2<Integer, Integer>> infoJoinScore = infoRDD.join(scoreRDD);
    
            // 将JavaPairRDD转换为JavaRDD<Row>
            JavaRDD<Row> infoJoinScoreRDD = infoJoinScore.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
                @Override
                public Row call(Tuple2<String, Tuple2<Integer, Integer>> v1) throws Exception {
                    return RowFactory.create(v1._1, v1._2._1, v1._2._2);
                }
            });
    
            // 过滤出分数大于80分的数据
            JavaRDD<Row> goodStudent = infoJoinScoreRDD.filter(new Function<Row, Boolean>() {
                @Override
                public Boolean call(Row v1) throws Exception {
                    if (v1.getInt(2) > 80) {
                        return true;
                    }
                    return false;
                }
            });
    
            // 转换为DataFrame
            List<StructField> fieldList = new ArrayList<>();
            fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
            fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
            fieldList.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
    
            StructType structType = DataTypes.createStructType(fieldList);
    
            DataFrame df = sqlContext.createDataFrame(goodStudent, structType);
    
            Row[] collect = df.collect();
            for(Row row : collect) {
                System.out.println(row);
            }
    
            // 将DataFrame中的数据保存到mysql表中
            // 这种方式是在企业里很常用的,有可能是插入mysql、有可能是插入hbase,还有可能是插入redis缓存
            goodStudent.foreach(new VoidFunction<Row>() {
                @Override
                public void call(Row row) throws Exception {
                    String sql = "insert into good_student_infos values("
                            + "'" + String.valueOf(row.getString(0)) + "',"
                            + Integer.valueOf(String.valueOf(row.get(1))) + ","
                            + Integer.valueOf(String.valueOf(row.get(2))) + ")";
    
                    Class.forName("com.mysql.jdbc.Driver");
    
                    Connection conn = null;
                    Statement stmt = null;
                    try {
                        conn = DriverManager.getConnection(
                                "jdbc:mysql://hadoop-100:3306/mytest", "root", "zhaojun2436");
                        stmt = conn.createStatement();
                        stmt.executeUpdate(sql);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        if(stmt != null) {
                            stmt.close();
                        }
                        if(conn != null) {
                            conn.close();
                        }
                    }
                }
            });
        }
    }
    

     扩展阅读:udf自定义函数,https://zhangslob.github.io/2018/10/29/Spark%E5%AE%9E%E6%88%98%EF%BC%88%E4%BA%8C%EF%BC%89%E5%AD%A6%E4%B9%A0UDF/

  • 相关阅读:
    linux设置tomcat开机自启动
    搭建本地的git仓库
    Linux安装配置JDK
    关于HTTP GET请求的url中文参数编码
    [dataTables.js error] Uncaught TypeError: myTable.row is not a function
    DataTables warning : Requested unknown parameter '5' from the data source for row 0
    取消mod_sofia的呼叫鉴权
    TCP服务器不回复SYN的问题
    volley7--NetworkDispatcher从网络中获取数据
    volley4--RequestQueue
  • 原文地址:https://www.cnblogs.com/parent-absent-son/p/11802043.html
Copyright © 2011-2022 走看看