zoukankan      html  css  js  c++  java
  • JAVASparkSQL

    1.SparkSQL基础

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Arrays;
    import java.util.Collections;
    import java.io.Serializable;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.MapFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.Encoder;
    import org.apache.spark.sql.Encoders;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import static org.apache.spark.sql.functions.col;
    
    public class SparkSQL {
    
    	public static void main(String[] args) {
    		SparkSession spark = SparkSession
    				  .builder()				  
    				  .appName("Java Spark SQL basic example")
    				  .config("spark.some.config.option", "some-value")
    				  .master("local")				  
    				  .getOrCreate();	
    
    //		runBaseFrameShow(spark);		
    //		runDatasetCreationExample(spark);
    //		runInferSchemaExample(spark);
    		runProgrammaticSchemaExample(spark);
    				
    		spark.stop();		
    	}
    	
    	  private static void runBaseFrameShow(SparkSession spark){
    		
    		Dataset<Row> df = spark.read().json("jsonsong.json");
    
    		df.show();
    //		+--------+----+-------------------+------+
    //		|   album|name|               path|singer|
    //		+--------+----+-------------------+------+
    //		|香港电视剧主题歌| 上海滩|mp3/shanghaitan.mp3|   叶丽仪|
    //		|香港电视剧主题歌|一生何求|mp3/shanghaitan.mp3|   陈百强|
    //		|    怀旧专辑|  红日|mp3/shanghaitan.mp3|   李克勤|
    //		|    怀旧专辑|爱如潮水|mp3/airucaoshun.mp3|   张信哲|
    //		|    怀旧专辑| 红茶馆|  mp3/redteabar.mp3|   陈惠嫻|
    //		+--------+----+-------------------+------+
    		
    		df.printSchema();   //打印表结构
    //		root
    //		 |-- album: string (nullable = true)
    //		 |-- name: string (nullable = true)
    //		 |-- path: string (nullable = true)
    //		 |-- singer: string (nullable = true)
    
    		// Select only the "name" column
    		df.select("name").show();
    //		+----+
    //		|name|
    //		+----+
    //		| 上海滩|
    //		|一生何求|
    //		|  红日|
    //		|爱如潮水|
    //		| 红茶馆|
    //		+----+
    	
    		// Select everybody, but increment the age by 1
    		//.plus(1)   加1
    		df.select(col("album"), col("path")).show();  //选择两行
    //		+--------+-------------------+
    //		|   album|               path|
    //		+--------+-------------------+
    //		|香港电视剧主题歌|mp3/shanghaitan.mp3|
    //		|香港电视剧主题歌|mp3/shanghaitan.mp3|
    //		|    怀旧专辑|mp3/shanghaitan.mp3|
    //		|    怀旧专辑|mp3/airucaoshun.mp3|
    //		|    怀旧专辑|  mp3/redteabar.mp3|
    //		+--------+-------------------+
    	
    		
    		df.filter(col("album").equalTo("怀旧专辑")).show();     //过滤出只有怀旧专辑的row
    //		+-----+----+-------------------+------+
    //		|album|name|               path|singer|
    //		+-----+----+-------------------+------+
    //		| 怀旧专辑|  红日|mp3/shanghaitan.mp3|   李克勤|
    //		| 怀旧专辑|爱如潮水|mp3/airucaoshun.mp3|   张信哲|
    //		| 怀旧专辑| 红茶馆|  mp3/redteabar.mp3|   陈惠嫻|
    //		+-----+----+-------------------+------+
    		
    		df.groupBy("album").count().show();    //分组计数
    //		+--------+-----+
    //		|   album|count|
    //		+--------+-----+
    //		|香港电视剧主题歌|    2|
    //		|    怀旧专辑|    3|
    //		+--------+-----+
    		
    		df.createOrReplaceTempView("song");                                          //建立一张临时表,将rdd放入表中
    		Dataset<Row> sqlDF = spark.sql("SELECT * FROM song where album='怀旧专辑'");    //生成底层的rdd的代码,使用SQL语句查找
    		sqlDF.show();
    //		+-----+----+-------------------+------+
    //		|album|name|               path|singer|
    //		+-----+----+-------------------+------+
    //		| 怀旧专辑|  红日|mp3/shanghaitan.mp3|   李克勤|
    //		| 怀旧专辑|爱如潮水|mp3/airucaoshun.mp3|   张信哲|
    //		| 怀旧专辑| 红茶馆|  mp3/redteabar.mp3|   陈惠嫻|
    //		+-----+----+-------------------+------+
    	}
    
    
    	  private static void runDatasetCreationExample(SparkSession spark) {
    	    Person person = new Person();
    	    person.setName("Andy");
    	    person.setAge(32);
    
    	    //编码器将对象序列化为字节,person类型的编码器
    	    Encoder<Person> personEncoder = Encoders.bean(Person.class);	    
    	    Dataset<Person> javaBeanDS = spark.createDataset(
    	      Collections.singletonList(person),						//第一个参数为接收的是list类型,将list打包,
    	      personEncoder												//创建DataSet类型,最后使用的是person对象		
    	    );
    	    javaBeanDS.show();    	     //这里输出的value值是person对象来的,name和age字段都是从person对象的getter和setter对象中获得的字段进行转换
    	    // +---+----+
    	    // |age|name|
    	    // +---+----+
    	    // | 32|Andy|
    	    // +---+----+
    
    	    Encoder<Integer> integerEncoder = Encoders.INT();		//integer类型的编码器
    	    Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
    	    Dataset<Integer> transformedDS = primitiveDS.map(new MapFunction<Integer, Integer>() {
    	      @Override
    	      public Integer call(Integer value) throws Exception {
    	        return value + 1;
    	      }
    	    }, integerEncoder);
    	    transformedDS.collect(); // Returns [2, 3, 4]
    
    	    String path = "people.json";
    	    Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
    	    peopleDS.show();
    //	     +----+-------+
    //	     | age|   name|
    //	     +----+-------+
    //	     |null|Michael|
    //	     |  30|   Andy|
    //	     |  19| Justin|
    //	     +----+-------+
    	     
    	  }
    
    	  private static void runInferSchemaExample(SparkSession spark) {
    	    
    		  JavaRDD<Person> peopleRDD = spark.read()
    	      .textFile("people.txt")
    	      .javaRDD()									//将DataSet转换为javaRdd
    	      .map(new Function<String, Person>() {
    	        @Override
    	        public Person call(String line) throws Exception {
    	          String[] parts = line.split(",");	          
    	          Person person = new Person();
    	          person.setName(parts[0]);
    	          person.setAge(Integer.parseInt(parts[1].trim()));     //去掉空行。空格
    	          return person;
    	        }
    	      });
    		 
    		//将rdd转换为DataSet,类型为person 
    	    Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
    
    	    peopleDF.createOrReplaceTempView("people");					//建一张临时表
    	    Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
    
    	    Encoder<String> stringEncoder = Encoders.STRING();		
    	    Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(new MapFunction<Row, String>() {
    	      @Override
    	      public String call(Row row) throws Exception {
    	        return "Name: " + row.getString(0);					//标号为0的属性,下标
    	      }
    	    }, stringEncoder);
    	    teenagerNamesByIndexDF.show();
    	    // +------------+
    	    // |       value|
    	    // +------------+
    	    // |Name: Justin|
    	    // +------------+
    
    	    // or by field name
    	    Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(new MapFunction<Row, String>() {
    	      @Override
    	      public String call(Row row) throws Exception {
    	        return "Name: " + row.<String>getAs("name");			//获得属性名的值
    	      }
    	    }, stringEncoder);
    	    teenagerNamesByFieldDF.show();
    	    // +------------+
    	    // |       value|
    	    // +------------+
    	    // |Name: Justin|
    	    // +------------+
    	    // $example off:schema_inferring$
    	  }
    
    	  private static void runProgrammaticSchemaExample(SparkSession spark) {
    		  //调用sparkContext方法。
    	    JavaRDD<String> peopleRDD = spark.sparkContext()
    	      .textFile("people.txt", 1)
    	      .toJavaRDD();
    	    //使用代码中的字段设计
    	    String schemaString = "name age";
    
    	    List<StructField> fields = new ArrayList<>();
    	    for (String fieldName : schemaString.split(" ")) {				//创建结构化的字段,传的数组,,数据类型
    	      StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
    	      fields.add(field);
    	    }
    	    StructType schema = DataTypes.createStructType(fields);
    
    	    // Convert records of the RDD (people) to Rows
    	    JavaRDD<Row> rowRDD = peopleRDD.map(new Function<String, Row>() {
    	      public Row call(String record) throws Exception {
    	        String[] attributes = record.split(",");
    	        return RowFactory.create(attributes[0], attributes[1].trim());
    	      }
    	    });
    
    	    // 类型信息也可以来自StructType
    	    Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
    
    	    peopleDataFrame.createOrReplaceTempView("people");
    
    	    Dataset<Row> results = spark.sql("SELECT name FROM people");
    
    	    Dataset<String> namesDS = results.map(new MapFunction<Row, String>() {
    	      public String call(Row row) throws Exception {
    	        return "Name: " + row.getString(0);
    	      }
    	    }, Encoders.STRING());
    	    namesDS.show();
    	    // +-------------+
    	    // |        value|
    	    // +-------------+
    	    // |Name: Michael|
    	    // |   Name: Andy|
    	    // | Name: Justin|
    	    // +-------------+
    	    // $example off:programmatic_schema$
    	  }
    	  
    	  public static class Person implements Serializable {
    	    private String name;
    	    private int age;
    
    	    public String getName() {
    	      return name;
    	    }
    
    	    public void setName(String name) {
    	      this.name = name;
    	    }
    
    	    public int getAge() {
    	      return age;
    	    }
    
    	    public void setAge(int age) {
    	      this.age = age;
    	    }
    	  }
    	
    }
    

    2.sparkSQL查询

    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.List;
    // $example off:schema_merging$
    
    // $example on:basic_parquet_example$
    import org.apache.spark.api.java.function.MapFunction;
    import org.apache.spark.sql.Encoders;
    // $example on:schema_merging$
    // $example on:json_dataset$
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    // $example off:json_dataset$
    // $example off:schema_merging$
    // $example off:basic_parquet_example$
    import org.apache.spark.sql.SparkSession;
    
    public class JavaSQLDataSourceExample {
    
      public static void main(String[] args) {
        SparkSession spark = SparkSession
          .builder()
          .appName("Java Spark SQL data sources example")
          .config("spark.some.config.option", "some-value")
          .master("local")
          .getOrCreate();
    
    //    runBasicDataSourceExample(spark);
    //    runBasicParquetExample(spark);
    //    runParquetSchemaMergingExample(spark);
        runJsonDatasetExample(spark);
    //    runJdbcDatasetExample(spark);
    
        spark.stop();
      }
    
      private static void runBasicDataSourceExample(SparkSession spark) {
        // parquet列式存储的文件
        Dataset<Row> usersDF = spark.read().load("users.parquet");
        usersDF.schema(); 				//表结构
        usersDF.foreach(x->System.out.println(x));
    //    [Alyssa,null,WrappedArray(3, 9, 15, 20)]
    //    		[Ben,red,WrappedArray()]
        
        //提取两个字段的 信息,单独存取在另一个parquet目录,目录中有个parquet文件
        usersDF.select("name", "favorite_color").write().save("parquet/namesAndFavColors.parquet");
    
        Dataset<Row> peopleDF =	spark.read().format("json").load("people.json");
        //将json格式的数据以parquet格式的文件保存
        peopleDF.select("name", "age").write().format("parquet").save("parquet/namesAndAges.parquet");
    
        //此处调用白鸥只需要用parquet当做表就行了,不用放在内存中使用
        Dataset<Row> sqlDF = spark.sql("SELECT * FROM parquet.`users.parquet`");
        sqlDF.foreach(x->System.out.println(x));
    //  [Alyssa,null,WrappedArray(3, 9, 15, 20)]
    //	[Ben,red,WrappedArray()]
        
        Dataset<Row> sqlDF1 = spark.sql("SELECT * FROM json.`people.json`");
        sqlDF1.foreach(x->System.out.println(x));
    //    [null,Michael]
    //    		[30,Andy]
    //    		[19,Justin]
        
      }
    
      private static void runBasicParquetExample(SparkSession spark) {
    	  
        Dataset<Row> peopleDF = spark.read().json("people.json");
    //	将json文件保存为parquet目录文件
        peopleDF.write().parquet("parquet/people.parquet");
    
        Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
    
        parquetFileDF.createOrReplaceTempView("parquetFile");
        Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
        Dataset<String> namesDS = namesDF.map(new MapFunction<Row, String>() {
          public String call(Row row) {
            return "Name: " + row.getString(0);
          }
        }, Encoders.STRING());
        namesDS.show();
        // +------------+
        // |       value|
        // +------------+
        // |Name: Justin|
        // +------------+
    
      }
    
      private static void runParquetSchemaMergingExample(SparkSession spark) {
    
        List<Square> squares = new ArrayList<>();
        for (int value = 1; value <= 5; value++) {
          Square square = new Square();
          square.setValue(value);
          square.setSquare(value * value);
          squares.add(square);
        }
    
        Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
        squaresDF.write().parquet("parquet/data/test_table/key=1");				//以parquet格式保存squre  list
    
        List<Cube> cubes = new ArrayList<>();
        for (int value = 6; value <= 10; value++) {
          Cube cube = new Cube();
          cube.setValue(value);
          cube.setCube(value * value * value);
          cubes.add(cube);
        }
    
        Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
        cubesDF.write().parquet("parquet/data/test_table/key=2");
    //	将同一目录下的多个parquet目录文件合并后为1个    
        Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("parquet/data/test_table");
        mergedDF.printSchema();
        // The final schema consists of all 3 columns in the Parquet files together
        // with the partitioning column appeared in the partition directory paths
        // root
        //  |-- value: int (nullable = true)
        //  |-- square: int (nullable = true)
        //  |-- cube: int (nullable = true)
        //  |-- key: int (nullable = true)
            
        mergedDF.foreach(x->System.out.println(x));
    //    		[1,1,null,1]
    //    		[4,2,null,1]
    //    		[9,3,null,1]
    //    		[16,4,null,1]
    //    		[25,5,null,1]
    //    		[null,6,216,2]
    //    		[null,7,343,2]
    //    		[null,8,512,2]
    //    		[null,9,729,2]
    //    		[null,10,1000,2]
        
    
      }
    
      private static void runJsonDatasetExample(SparkSession spark) {
    
        Dataset<Row> people = spark.read().json("people.json");
    
        people.printSchema();
        // root
        //  |-- age: long (nullable = true)
        //  |-- name: string (nullable = true)
    
        people.createOrReplaceTempView("people");		//在内存中创建一张临时表
    
        Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
        namesDF.show();
        // +------+
        // |  name|
        // +------+
        // |Justin|
        // +------+
    
      }
    
      private static void runJdbcDatasetExample(SparkSession spark) {
        Dataset<Row> jdbcDF = spark.read()
          .format("jdbc")
          .option("url", "jdbc:postgresql:dbserver")
          .option("dbtable", "schema.tablename")
          .option("user", "username")
          .option("password", "password")
          .load();
      }
      
    
      public static class Square implements Serializable {
        private int value;
        private int square;
    
        // Getters and setters...
        // $example off:schema_merging$
        public int getValue() {
          return value;
        }
    
        public void setValue(int value) {
          this.value = value;
        }
    
        public int getSquare() {
          return square;
        }
    
        public void setSquare(int square) {
          this.square = square;
        }
        // $example on:schema_merging$
      }
      
      public static class Cube implements Serializable {
        private int value;
        private int cube;
    
        // Getters and setters...
        // $example off:schema_merging$
        public int getValue() {
          return value;
        }
    
        public void setValue(int value) {
          this.value = value;
        }
    
        public int getCube() {
          return cube;
        }
    
        public void setCube(int cube) {
          this.cube = cube;
        }
        // $example on:schema_merging$
      }
     
      
    }
  • 相关阅读:
    JVM精进之路
    Java8-java.time-常用API
    Java代码精进
    Java8——jdk——java.time包
    Java8——Optional
    Java8——Stream
    Java8——Lambda表达式
    Java高级-反射
    重新学习SpringMVC——补充
    LeetCode677. 键值映射(相关话题:Trie前缀树)
  • 原文地址:https://www.cnblogs.com/apppointint/p/8885295.html
Copyright © 2011-2022 走看看