zoukankan      html  css  js  c++  java
  • spark复习笔记(7):sparkSQL

    一、saprkSQL模块,使用类sql的方式访问Hadoop,实现mr计算,底层使用的是rdd

      1.hive      //hadoop  mr  sql

      2.phenoix    //hbase上构建sql的交互过程

      该模块能在spark上运行sql语句

      3.DataFrame    //数据框,表

      在spark中的数据框,sparkSQL就能以完全分布式的方式来处理数据。组合数据框可以来自各种数据源来进行查询的处理

      4.SparkSQL    //SQL  |  DataFrame API

      5.RDD[Customer]===>

        $scala>df=sc.createDataFrame(rdd);

    $scala>df = sc.createDataFrame(rdd);
    //创建样例类 $scala
    >case class Customer1(id:Int,name:String,age:Int)
    //构造数据
    $scala>val arr = Array("1,tom,12","2,tomas,13","3,tomasLee,14")
    $scala>val rdd1 = sc.makeRDD(arr)
    //创建对象RDD
    $scala>val rdd2=rdd1.map(e=>{
      val arr= e.split(",");
      Customer1(arr(0).toInt,arr(1),arr(2).toInt)
    })
    //创建customer的rdd,通过rdd创建数据框
    $scala>val df = spark.createDataFrame(rdd2)
    //打印表结构
    //创建临时视图
    $scala>df.createTempView("customers")
    //打印表结构
    $scala>df.printSchema
    $scala>df.show    //等价于查询数据
    //创建临时视图
    $scala>df.createTempView("customers")
    //使用sparkSQL来进行相关的查询
    val df2 = spark.sql("select * from customers")
    //将上述结果进行相关的显示
    df2.show
    //带条件进行相关的查询
    val df2 = spark.sql("select * from customers where id<2")
    df2.show
    //或者用如下的方式直接show
    spark.sql("select * from customer").show  
    val df1 = spark.sql("select * from customer where id<2")
    
    val df2 = spark.sql("select * from customers where id>2")
    
    df1.show
    
    df2.show
    
    df.create
    
    df1.createTempView("c1")
    
    df2.createTempView("c2")
    
    val dff = spark.sql("select * from c1 union select * from c2")

    dff.show      //显示前面查询的结果


    $scala>spark.sql("select * from c1 from union select *from c2").show

    df1.union(df2).show

    spark.sql("select count(*) from customer").show

    spark.sql("select * from customer limit 1").show

    spark.sql("select *from customers where name like 't%' order by name desc").show
    //映射聚合操作
    df.map(_.getAs[Int]("age")).reduce(_ + _)

    //聚合函数
    df.agg(sum("age"),max("age"),min("age"))

      sparkQSL :使用类似SQL方式访问hadoop,实现MR计算。RDD

      df= sc.createDataFrame(rdd);

      DataSet<Row> ===DataFrame===>//类似于table操作

    保存spark的sql计算结果(json)

      JavaRDD<Row> rdd = df1.toJava();

    保存spark的sql计算结果(json)

      //保存成json文件。

      df.write().json(dir)  //这个地方写的是文件夹,就是保存文件的上级目录

      //设置保存模式

      df.mode(SaveMode.APPEND);

    json文件的读写

    ---------------------------------

      SparkSession.read().json("")  //读取json文件形成数据框

      //将数据框的数据写入json文件

      SparkSession.write().json("........")  //将数据框的数据写成json文件

    SparkDataFrame以jdbc的方式操纵表

    SparkDataFrame以jdbc的方式来操纵表

      1.引入mysql驱动

        pom.xml直接修改

    spark整合Hive

      1.hive的类库需要在spark的worker节点,他们也需要通过类库来访问hive

      2.复制core-site.xml(hdfs) + hdfs-site.xml + hive-site.xml(hive)这三个文件复制到spark/conf目录下面

      3.指定hive的home目录环境变量

      4.赋值mysql驱动序列到/soft/spark/jars目录下面 

      5.启动spark-shell,指定启动模式

        spark-shell --master local[4]

        create table tt(id int,anme string,age int)

        row format delimited fields terminated by ','

        lines terminated by ' ' stored as textfile;

      6.SparkSQL操纵Hive表

        (1)复制配置文件到resources目录下

          core-site.xml

          hdfs-site.xml

          hive-site.xml

        (2)pom.xml中增加依赖

        (3)编码

        

    package com.jd.spark.java;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    public class SQLHiveJava {
        public static void main(String[] args){
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("SQLHive");
            SparkSession sess = SparkSession.builder().appName("SQLHiveJava").config("spark.master","local").getOrCreate();
            sess.sql("use mydb2.db");
            Dataset<Row> df =  sess.sql("select * from mydb2.tt");
            df.show();
    
        }
    
    }

    分布式SQL引擎

    -------------------------------------------- 

      1.启动spark集群(完全分布式-standalone)

         $>/soft/spark/sbin/start-all.sh

         master    //s11

           worker    //s12-s14

      2.在默认库下创建hive数据表

        hive -e "create table tt(id int,name string , age int) row format delimited fields terminated by ',' lines terminated by ' ' stored as textfile"

      3.加载数据到hive表中去

        $>hive -e "load data local inpath 'file:///home/centos/data.txt' into table tt"

      4.分发三个文件到worker节点

        

      5.启动spark集群

        $>soft/spark/sbin/start-all.sh

      6.启动spark-shell

        $>spark-shell --master spark://s11:7070

      7.启动thriftserver服务器

        $>start

      8.连接beeline进行操作:

        beeline -u jdbc:hive://localhost:10000 -d org.apache.hive.jdbc.HiveDriver

  • 相关阅读:
    webpack—从零开始配置
    多媒体标签 API(video、audio)
    node 爬虫
    node 操作数据库
    es6+
    UI 组件库 引入使用的问题
    单页应用存在 的问题
    ajax 封装(集中 认证、错误、请求loading处理)
    moment.js 时间库
    文件上传大小被限制的解决方案。
  • 原文地址:https://www.cnblogs.com/bigdata-stone/p/9910692.html
Copyright © 2011-2022 走看看