zoukankan      html  css  js  c++  java
  • Spark基础:(六)Spark SQL

    1、相关介绍

    Datasets:一个 Dataset 是一个分布式的数据集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的优点(强类型化, 能够使用强大的 lambda 函数)与Spark SQL执行引擎的优点。

    DataFrame: 一个 DataFrame 是一个 Dataset 组成的指定列。、

    SparkSession: Spark SQL中所有功能的入口点是 SparkSession 类,要创建一个 SparkSession, 使用 SparkSession.builder()就可以。

    需要注意的是:

    import spark.implicits._
    //这里的spark是SparkSession的变量名

    对DataFrame和Dataset进行操作许多操作都需要这个包进行支持。DataFrames可以从多种数据源创建,例如:结构化数据文件(JSON)、Hive中的表、外部数据库或者已存在的RDDs。

    2、入门

    (1)Json
    准备json文件

    {"id":1,"age":15,"name":"zhangsan"},
    {"id":2,"age":13,"name":"zhangsi"},
    {"id":3,"age":14,"name":"zhangwu"}

    读取json文件
    java版

    public class SQLJava {
        public static void main(String[] args) {
            SparkSession session = SparkSession.builder()
                                .appName("SQLJava")
                                .config("spark.master","local")
                                .getOrCreate();
    
            Dataset<Row> df1 = session.read().json("file:///f:/spark/info.json");
            df1.show();
            df1.printSchema();
    
            //创建临时视图
            df1.createOrReplaceTempView("customers");
            //按照sql方式查询
            Dataset<Row> df2 = session.sql("select * from customers where age > 13");
            df2.show();
            System.out.println("=================");
    
            //聚合查询
            Dataset<Row> dfCount = session.sql("select count(1) from customers");
            dfCount.show();
    
            //DataFrame和RDD互操作
            JavaRDD<Row> rdd = df1.toJavaRDD();
            rdd.collect().forEach(new Consumer<Row>() {
                public void accept(Row row) {
                    long age = row.getLong(0);
                    long id = row.getLong(1);
                    String name = row.getString(2);
                    System.out.println(age + "," + id + "," + name);
                }
            });
    
            //保存处理,设置保存模式
            df2.write().mode(SaveMode.Append).json("file:///f:/spark/json/out.dat");
    }

    Scala版

    object SparkSql {
      def main(args: Array[String]): Unit = {
        val session=SparkSession.builder()
          .appName("AQLScala")
          .config("spark.master","local")
          .getOrCreate()
        val df1=session.read.json("file:///f:/spark/info.json")
        df1.show()
        df1.printSchema()
        //创建视图
        df1.createOrReplaceTempView("customers")
        val df2 = session.sql("select * from customers where age >13")
        df2.show()
    
        //聚合操作
        val df3 = session.sql("select count(*) from customers")
        df3.show()
    
        //DataFrame和RDD互操作
        df1.rdd.map(e=>{
          "id="+e(e.fieldIndex("id"))+"	name="+e(e.fieldIndex("name"))+"	age="+e(e.fieldIndex("age"))
        }).foreach(println(_))
    
        df1.write.mode(SaveMode.Append).json("file:///f:/spark/json/out.dat")
      }

    部分输出
    这里写图片描述

    还要注意的就是Save Modes : Save operations (保存操作)可以选择使用 SaveMode , 它指定如何处理现有数据如果存在的话. 重要的是要意识到, 这些 save modes (保存模式)不使用任何 locking (锁定)并且不是 atomic (原子). 另外, 当执行 Overwrite 时, 数据将在新数据写出之前被删除.
    Spark SQL中的临时视图是session级别的, 也就是会随着session的消失而消失. 如果你想让一个临时视图在所有session中相互传递并且可用, 直到Spark 应用退出, 你可以建立一个全局的临时视图.
    这里写图片描述

    (2)jdbc
    java代码

    public class SQLJDBCJava1 {
        public static void main(String[] args) {
            SparkSession session = SparkSession.builder()
                                .appName("SQLJava")
                                .config("spark.master","local")
                                .getOrCreate();
            String url = "jdbc:mysql://localhost:3306/mybatis" ;
            //要查询的数据库中的表
            String table = "users" ;
            //查询数据库
            Dataset<Row> df = session.read()
                    .format("jdbc")
                    .option("url", url)
                    .option("dbtable", table)
                    .option("user", "root")
                    .option("password", "root")
                    .option("driver", "com.mysql.jdbc.Driver")
                    .load();
            df.show();
            //投影查询
            Dataset<Row> df2 = df.select(new Column("name"),new Column("age"));
            //过滤
            df2 = df2.where("age > 13");
            df2.show();
            //去重
            df2 = df2.distinct();
            df2.show();
            //
            Properties prop = new Properties();
            prop.put("user", "root");
            prop.put("password", "root");
            prop.put("driver", "com.mysql.jdbc.Driver");
            //写入数据库中  subpersons为表
            df2.write().jdbc(url,"subpersons",prop);
            df2.show();
        }
    }

    scala代码

    object SQLJdbc {
      def main(args: Array[String]): Unit = {
        val session=SparkSession.builder()
          .config("spark.master","local")
          .appName("SQLJdbc")
          .getOrCreate()
        val url="jdbc:mysql://localhost:3306/mybatis"
        //要查询的数据库中的表
        val table="users"
        //连接属性
        val df=session.read
          .format("jdbc")
          .option("url", url)
          .option("dbtable", table)
          .option("user", "root")
          .option("password", "root")
          .option("driver", "com.mysql.jdbc.Driver")
          .load()
        df.show()
        println("===============")
        val df2=df.select(new Column("name"),new Column("age"))
        df2.where("name like 't%'")
        df2.show()
        //数据库的连接属性
        val prop = new Properties()
        prop.put("user", "root")
        prop.put("password", "root")
        prop.put("driver", "com.mysql.jdbc.Driver")
        //写入数据库中  subpersons为表
        df2.write.jdbc(url, "subpersons", prop)
        df2.show()
      }
    }

    (3)Hive
    Spark SQL 还支持读取和写入存储在 Apache Hive 中的数据。但是,由于 Hive 具有大量依赖关系,因此这些依赖关系不包含在默认 Spark 分发中。 如果在类路径中找到 Hive 依赖项,Spark 将自动加载它们。 请注意,这些 Hive 依赖关系也必须存在于所有工作节点上,因为它们将需要访问 Hive 序列化和反序列化库 (SerDes),以访问存储在 Hive 中的数据。

    通过将 hive-site.xml, core-site.xml(用于安全配置)和 hdfs-site.xml (用于 HDFS 配置)文件放在 conf/ 中来完成配置。

    启动hiveserver2服务器,监听端口是10000 ——-进入hive/bin下,启动hiveserver2,或者采用命令
    hive –service hiveserver2 & 来启动,(通过hive-jdbc驱动程序采用jdbc方式远程访问远程数据仓库

    但是我按照文档上的方法没有成功,采用以下的方法实现了

    java版本

    public class SQLHiveJava {
        public static void main(String[] args) throws Exception {
            Class.forName("org.apache.hive.jdbc.HiveDriver");
            Connection conn = DriverManager.getConnection("jdbc:hive2://s201:10000");
            Statement st = conn.createStatement();
            ResultSet rs = st.executeQuery("select * from mydb.t1 where age > 13");
            while(rs.next()){
                int id=rs.getInt(1);
                String name=rs.getString(2);
                int age=rs.getInt(3);
                System.out.println(id+","+name+","+age);
            }
            rs.close();
        }
    }

    scala版本

    object SparkSqlHive {
      def main(args: Array[String]): Unit = {
        Class.forName("org.apache.hive.jdbc.HiveDriver")
        val conn = DriverManager.getConnection("jdbc:hive2://s201:10000")
        //创建上下文
        val stat = conn.createStatement()
        //结果
        val resultSet = stat.executeQuery("select * from mydb.t1 where age > 13")
        while ( resultSet.next) {
          val id = resultSet.getInt(1)
          val name = resultSet.getString(2)
          val age = resultSet.getInt(3)
          println(id + "," + name + "," + age)
        }
        resultSet.close()
      }
    }

    输出结果:
    这里写图片描述

    注意:通过远程jdbc方式连接到hive数据仓库
    (1)首先启动hiveserver2服务器,监听端口10000
    (2)通过beeline命令行连接到hiveserver2 (进入beeline命令行 hive –service beeline
    通过命令 !connect jdbc:hive2://localhost:10000/mydb 连接到数据库
    如图:
    这里写图片描述

    希望在知识中书写人生的代码
  • 相关阅读:
    [算法] 堆栈
    [刷题] PTA 02-线性结构3 Reversing Linked List
    java IO流 (八) RandomAccessFile的使用
    java IO流 (七) 对象流的使用
    java IO流 (六) 其它的流的使用
    java IO流 (五) 转换流的使用 以及编码集
    java IO流 (四) 缓冲流的使用
    java IO流 (三) 节点流(或文件流)
    java IO流 (二) IO流概述
    java IO流 (一) File类的使用
  • 原文地址:https://www.cnblogs.com/tongxupeng/p/10259544.html
Copyright © 2011-2022 走看看