zoukankan      html  css  js  c++  java
  • 大数据-sparkSQL

    SparkSQL采用Spark on Hive模式,hive只负责数据存储,Spark负责对sql命令解析执行。

    SparkSQL基于Dataset实现,Dataset是一个分布式数据容器,Dataset中同时存储原始数据和元数据(schema)

    Dataset的底层封装了RDD,Row类型的RDD就是Dataset< Row >,DataFrame

    Dataset数据源包括:json,JDBC,hive,parquet,hdfs,hbase,avro...

    API

    自带API

    Dataset自带了一套api能够对数据进行操作,处理逻辑与sql处理逻辑相同。

    //ds代表了一个Dataset<Row>,包括字段:age,name
    //select name from table
    ds.select(ds.col("name")).show();
    //select name ,age+10 as addage from table
    ds.select(ds.col("name"),ds.col("age").plus(10).alias("addage")).show();
    //select name ,age from table where age>19
    ds.select(ds.col("name"),ds.col("age")).where(ds.col("age").gt(19)).show();
    //select age,count(*) from table group by age
    ds.groupBy(ds.col("age")).count().show();

    SQL的API

    将Dataset转为临时表,在通过session对象执行sql,sql结果为Dataset类型

    //将Dataset数据临时注册为临时表,指定表名。可以使用两种方法创建
    ds.registerTempTable("table1");
    ds.createOrReplaceTempView("table2");
    //执行sql返回结果Dataset,sql可以同时使用多张临时表
    Dataset<Row> result = sparkSession.sql("select * from table1");

    Dataset方法

    //打印元数据,以树形结构呈现
    ds.printSchema();

    //展示dataset的数据,默认显示二十条,可以指定显示条数。显示时数据列以ascii排序显示
    ds.show();

    //将dataset转为RDD<Row>
    JavaRDD<Row> rowRDD = ds.javaRDD();
    //将JavaRDD<Row>解析为普通RDD
    JavaRDD<String> map = rowRDD.map(new Function<Row, String>() {
    private static final long serialVersionUID = 1L;
       @Override
       public String call(Row row) throws Exception {
           //方式1 指定字段名
           String id = row.getAs("id");
           String name = row.getAs("name");
           Integer age = row.getAs("age");
           //方式2 指定字段顺序。不常用。在json新构建的dataset会对字段按ASCII排序
           id = row.getString(1);
           name = row.getString(2);
           age = row.getInt(0);
           //返回结果
           return id + name + age;
      }
    });

    输出Dataset

    Dataset的保存策略(SaveMode)包括:Overwrite(覆盖),Append(追加),ErrorIfExists(如果存在就报错),Ignore(如果存在就忽略)

    jdbc输出

    Dataset<Row> dataset=sparkSession.sql("...");
    //构建参数,存入用户名和用户密码
    Properties userinfo = new Properties();
    properties.setProperty("user", "root");
    properties.setProperty("password", "root");

    //指定写出模式,数据库连接,用户信息
    result.writer().
       mode(SaveMode.Overwrite).
       jdbc( "jdbc:mysql://127.0.0.1:3306/spark",  "table", userinfo);

    json输出

    df.write().mode(SaveMode.Overwrite).format("json").save("data/parquet");
    df.write().mode(SaveMode.Overwrite).json("data/parquet");

    parquet输出

    df.write().mode(SaveMode.Overwrite).format("parquet").save("data/parquet");
    df.write().mode(SaveMode.Overwrite).parquet("data/parquet");

    hive输出

    在session中预先开启hive支持,在项目文件中导入hive,hdfs的3个xml

    //对session执行hive语句,注意:需要指定表空间
    sparkSession.sql("use database");
    //移除原hive表
    sparkSession.sql("DROP TABLE IF EXISTS table01");
    //将dataset存入hive中
    df.write().mode(SaveMode.Overwrite).saveAsTable("table01");

    Dataset创建

    通过session创建和停止

    //创建session对象
    SparkSession sparkSession = SparkSession
                  .builder()
                  .appName("jsonrdd")
                  .master("local")
                  .getOrCreate();
    /* 逻辑代码 */

    //关闭session
    sparkSession.stop();

    基于json文件

    • 基于json的构建,在json中已经存在了元数据,可以直接构造dataset

    • 不支持嵌套json

    //通过session对象读取json文件构建Dataset
    Dataset<Row> ds = sparkSession.read().format("json").load("data/json");
    //简写方式:Dataset<Row> ds =sparkSession.read.json("data/json");
    
    //将Dataset转为临时表
    ds.createOrReplaceTempView("table")
    

    基于RDD< json>

    RDD的每条数据都是一个json字符串,一个单行数据,自带了元数据信息

    //通过上下文构建RDD
    SparkContext sc = sparkSession.sparkContext();
    JavaSparkContext jsc = new JavaSparkContext(sc);
    JavaRDD<String> RDD = jsc.parallelize(Arrays.asList(
        "{"name":"zs","score":"100"}",
        "{"name":"sl","score":"200"}",
        "{"name":"ww","score":"300"}"
    ));
    //通过RDD构建dataset
    Dataset<Row> ds = sparkSession.read().json(RDD);
    

    对象list创建

    //构建存储对象的容器
    Person person = new Person();
    person.setId("1");
    person.setAge(18);
    person.setName("zs");
    Person person2 = new Person();
    person2.setId("2");
    person2.setAge(20);
    person2.setName("ls");
    List<Person> people = Arrays.asList(person, person2);
    
    //对bean类进行转码
    Encoder<Person> personEncoder = Encoders.bean(Person.class);
    
    //获取person类型的dataset,内部自带了
    Dataset<Person> dataset = sparkSession.createDataset(people, personEncoder);
    
    dataset.registerTempTable("person");
    

    反射创建

    不建议使用,修改字段较为麻烦

    • 需要预先创建数据的javabean对象,实现序列化

    • 读取数据构建RDD< String>,通过map算子转为RDD< Bean对象 >

    • createDataFrame传入RDD与bean.class

    //通过上下文构建RDD
    SparkContext sc = sparkSession.sparkContext();
    JavaSparkContext jsc = new JavaSparkContext(sc);
    JavaRDD<String> lineRDD = jsc.textFile("/person.txt");
    //将RDD的范型处理为bean类型
    JavaRDD<Person> RDD = lineRDD.map(new Function<String, Person>() {
        //指定序列化版本
        private static final long serialVersionUID = 1L;
        @Override
        public Person call(String line) throws Exception {
            Person p = new Person();
            p.setId(line.split(",")[0]);
            p.setName(line.split(",")[1]);
            p.setAge(Integer.valueOf(line.split(",")[2]));
            return p;
        }
    });
    //将RDD转为Dataset,传入RDD和反射类
    Dataset<Row> dataFrame = sparkSession.createDataFrame(RDD,Person.class);
    

    bean对象

    public class Person implements Serializable {
    	/*指定序列化版本*/
    	private static final long serialVersionUID = 1L;
    	private String id ;
    	private String name;
    	private Integer age;
        //get+set
    	public String getId() { return id; } public void setId(String id) {this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; }
    }
    

    动态创建

    • 读取数据构建RDD< String>

    • 通过map算子将String转为Row,使用RowFactory的create方法存入数据

    • 构建元数据对象schema,构建方法中传入元数据封装容器,内部存了每个字段的属性

    • 通过RDD与schema构建出Dataset

    • 对象中传入每个字段的属性

    //通过上下文构建RDD
    SparkContext sc = sparkSession.sparkContext();
    JavaSparkContext jsc = new JavaSparkContext(sc);
    JavaRDD<String> lineRDD = jsc.textFile("/person.txt");
    
    //将一行数据转为Row类型,使用RowFactory的create方法
    JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
        private static final long serialVersionUID = 1L;
        @Override
        public Row call(String line) throws Exception {
            //使用RowFactory.create构建row对象
            return RowFactory.create(
                    line.split(",")[0],
                    line.split(",")[1],
                    Integer.valueOf(line.split(",")[2])
                );
        }
    });
    
    //构建字段属性容器,可以通过数据库查询后填充容器,实现动态调整
    List<StructField> list = Arrays.asList(
        //分别构建每个字段,参数:字段名,数据类型,是否允许空值
        DataTypes.createStructField("id", DataTypes.StringType, true),
        DataTypes.createStructField("name", DataTypes.StringType, true),
        DataTypes.createStructField("age", DataTypes.IntegerType, true)
    );
    //构建schema对象
    StructType schema = sparkSession.createStructType(list);
    
    //构建Dataset
    sparkSession.createDataFrame(rowRDD,schema)
    

    JDBC创建

    /* 方式1:创建参数容器传入 */
    Map<String, String> options = new HashMap<String, String>();
    options.put("url", "jdbc:mysql://127.0.0.1:3306/spark");
    options.put("driver", "com.mysql.jdbc.Driver");
    options.put("user", "root");
    options.put("password", "root");
    options.put("dbtable", "person");
    //加载参数,传入参数
    Dataset<Row> result = sparkSession.read().format("jdbc").options(options).load();
    
    /* 方式2:依次传入参数 */
    //创建读取器
    DataFrameReader reader = sparkSession.read().format("jdbc");
    //依次载入数据
    reader.option("url", "jdbc:mysql://127.0.0.1:3306/spark");
    reader.option("driver", "com.mysql.jdbc.Driver");
    reader.option("user", "root");
    reader.option("password", "root");
    reader.option("dbtable", "score");
    //加载数据,构建dataset
    Dataset<Row> result = reader.load();
    
    

    parquet创建

    parque格式的文件中,存储了数据结构,因此可以直接构建dataset

    //数据载入
    Dataset load =sparkSession.read().format(parquet).load("data/parquet");
    Dataset load = sparkSession.read().parquet("data/parquet");
    

    Hive创建

    • 在session中启动hive支持

    • 项目中导入hive-site.xml,hdfs-site.xml,core-site.xml配置文件

    • 需要hive中提前启动metastore服务

    //开启hive支持,在项目文件中导入hive,hdfs的3个xml
    SparkSession sparkSession = SparkSession
    	.builder()
        .appName("hive")
        .enableHiveSupport()
        .getOrCreate();
    //对session执行hive语句,注意:需要指定表空间
    sparkSession.sql("use database");
        
    //将hive_sql结果保存在为dataset
    Dataset<Row> goodStudentsDF = sparkSession.sql("xxx");
    
    
    //Dataset<Row> df = hiveContext.table("student_infos");
    

    hive-site.xml的示例:

    <configuration>  
    <property>  
      <name>hive.metastore.uris</name>  
      <value>thrift://192.168.78.103:9083</value>  
    </property>
    </configuration>  
    

    序列化

    • 序列化与反序列化版本需要一致 private static final long serialVersionUID = 1L

    • 子类实现序列化接口,父类未实现序列化接口,则子类中的父类字段无法序列化

      若父类实现了序列化,子类所有字段也能序列化

    • 只序列化成员变量,不序列化静态变量。反序列化时,静态变量值返回原先值

    • transuent修改的变量不序列化

    结论:Java的序列化机制是通过在运行时判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,JVM会把传来的字节流中的serialVersionUID与本地相应实体(类)的serialVersionUID进行比较,如果相同就认为是一致的,可以进行反序列化,否则就会出现序列化版本不一致的异常。当实现java.io.Serializable接口的实体(类)没有显式地定义一个名为serialVersionUID,类型为long的变量时,Java序列化机制会根据编译的class自动生成一个serialVersionUID作序列化版本比较用,这种情况下,只有同一次编译生成的class才会生成相同的serialVersionUID 。如果我们不希望通过编译来强制划分软件版本,即实现序列化接口的实体能够兼容先前版本,未作更改的类,就需要显式地定义一个名为serialVersionUID,类型为long的变量,不修改这个变量值的序列化实体都可以相互进行串行化和反串行化。

    自定义函数

    UDF

    • 通过sparkSession注册UDF,实现UDF函数(UDF0-UDF22)

    • 注册后的UDF在当前的session中有效

    //获取session
    SparkSession sparkSession = SparkSession
            .builder()
            .appName("udf")
            .master("local")
            .enableHiveSupport()
            .getOrCreate();
    //通过session创建UDF,指定函数名,函数逻辑,返回值类型
    sparkSession.udf().register(
            "add",
            new UDF2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            }, DataTypes.IntegerType);
    sparkSession.sql("select add(a,b) from table")
    

    UDAF

    可以理解为自定义组函数

    通过sparkSession注册函数,实现UserDefinedAggregateFunction类重写以下方法

    • initialize:初始化中间结果缓存,每个分区的每个组初始化一次,总分区的每个组初始化一次

    • upadte:处理数据,合并一个分区中各组的数据,入参为每个组的缓存和当前记录

    • merge:将各分区的中间结果进行合并

    • evaluate:指定最终结果,从最后一个中间结果中取出,数据类型自己指定

    • dataType:指定结果字段的数据类型

    • inputSchema:指定每个输入字段的的列名,数据类型,是否为空。可以指定多个字段

    • bufferSchema:定义buffer中每个字段的属性(列名,数据类型,是否为空),可以指定多个字段

    • deterministic:确保一致性,一般使用true

    //通过session创建UDF
    sparkSession.udf().register(
            "add",
            new UDF2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            }, DataTypes.IntegerType);
    
    
    sparkSession.udf().register(
            "udaf", new UserDefinedAggregateFunction() {
    
                //初始化中间结果缓存,每个分区的每个组初始化一次,总分区的每个组初始化一次
                @Override
                public void initialize(MutableAggregationBuffer buffer) {
                    //内部可以存储多个数据
                    buffer.update(0,0);
                }
    
                //处理数据,入参为每个组的缓存和当前记录
                @Override
                public void update(MutableAggregationBuffer buffer, Row input) {
    
                    //数据封装在row中,从row中取值字段的值进行逻辑处理
                    Object fieldName = input.getAs("fieldName");
    
                    //对缓存中的数据更新
                    buffer.update(0,buffer.getInt(0)+1);
                }
    
                //将各分区的中间结果进行合并
                //参数1:已经合并分区的中间结果或初始的中间结果,其初始化也使用initialize
                //参数2:当前分区的中间结果
                @Override
                public void merge(MutableAggregationBuffer buffer,  Row newBuffer) {
                    //获取数据值
                    Integer newData = newBuffer.getAs(0);
                    Integer beforeData = buffer.getAs(0);
                    //更新合并的中间结果
                    buffer.update(0,beforeData+newData);
                }
    
                //指定最终结果,从最后一个中间结果中取出,数据类型自己指定
                @Override
                public Integer evaluate(Row buffer) {
                    return buffer.getAs(0);
                }
    
                //指定每个输入字段的的列名,数据类型,是否为空。可以指定多个字段。
                @Override
                public StructType inputSchema() {
                    return DataTypes.createStructType(
                            Arrays.asList(
                                    DataTypes.createStructField(
                                            "name",
                                            DataTypes.StringType,
                                            true)
                            )
                    );
                }
    
                //定义buffer中每个字段的属性(列名,数据类型,是否为空),可以指定多个字段
                @Override
                public StructType bufferSchema() {
                    return DataTypes.createStructType(
                            Arrays.asList(
                                    DataTypes.createStructField(
                                            "count",
                                            DataTypes.IntegerType,
                                            false),
                                    DataTypes.createStructField(
                                            "name",
                                            DataTypes.StringType,
                                            true)
                            )
                    );
                }
    
                //指定结果字段的数据类型
                @Override
                public DataType dataType(){return DataTypes.StringType;}
    
                //确保一致性,一般使用true
                @Override
                public boolean deterministic() { return true; }
            }
    );
    
    //上述UDAF模拟了count的功能
    sparkSession.sql("select add(name) from table group by name ");
    

     

    开窗函数

    用于分组排序取最高值,适用于hive,mysql,oracle

    row_number() over (partitin by xxx order by yyy) as sss

    数据根据xxx列分组,再根据yyy排序,排序结果作为sss列的数据,通过对sss列行过滤,取出最大或最小的若干个值

    SELECT
    	t.id,
    	t.uname,
    	t.money
    FROM
    	(
    		SELECT
    			id,
    			uname,
    			money,
    			row_number () over (
    				PARTITION BY uname
    				ORDER BY
    					money DESC
    			) rank
    		FROM
    			table
    	) t
    WHERE
    	t.rank <= 3
    

     

  • 相关阅读:
    Leetcode Reverse Words in a String
    topcoder SRM 619 DIV2 GoodCompanyDivTwo
    topcoder SRM 618 DIV2 MovingRooksDiv2
    topcoder SRM 618 DIV2 WritingWords
    topcoder SRM 618 DIV2 LongWordsDiv2
    Zepto Code Rush 2014 A. Feed with Candy
    Zepto Code Rush 2014 B
    Codeforces Round #245 (Div. 2) B
    Codeforces Round #245 (Div. 2) A
    Codeforces Round #247 (Div. 2) B
  • 原文地址:https://www.cnblogs.com/javaxiaobu/p/11775071.html
Copyright © 2011-2022 走看看