zoukankan      html  css  js  c++  java
  • spark sql 基本用法

     一、通过结构化数据创建DataFrame:

    publicstaticvoid main(String[] args) {
       SparkConf conf = new SparkConf()

    .setAppName("DataFrameCreate").setMaster("local");  

            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
            
            DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json");  //结构化数据直接加载为DataFrame
            
            df.show();  
        }

    二、通过RDD创建DataFrame的两种创建方式

     (数据源students.txt的数据截图)

    2.1通过已知类型的schema创建DataFrame,代码如下:

    public static void main(String[] args) {
            SparkConf conf = new SparkConf()
                .setMaster("local")
                .setAppName("RDD2DataFrameReflection");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);

            JavaRDD<String> lines = sc.textFile("D://students.txt");
            
            //将lines转换成 JavaRDD<Student>
            JavaRDD<Student> students = lines.map(new Function<String, Student>() {

                private static final long serialVersionUID = 1L;

                @Override
                public Student call(String line) throws Exception {
                    // TODO Auto-generated method stub
                    String[] strPlits = line.split(",");
                    Student stu = new Student();
                    
                    stu.setId(Integer.valueOf(strPlits[0]));
                    stu.setName(strPlits[1]);
                    stu.setAge(Integer.valueOf(strPlits[2]));
                    
                    return stu;
                }
                
            });
                    
            // 使用反射方式,将RDD转换为DataFrame
            
    // 这里要求,JavaBean必须实现Serializable接口,是可序列化的

            
    //根据student的schema 和 RDD创建DataFrame
            DataFrame studentsDF = sqlContext.createDataFrame(students, Student.class);
            studentsDF.show();
        }

    2.2手动创建schema的方式创建DataFrame

     public static void main(String[] args) {

            //...  省略创建sqlContext的过程

          

    JavaRDD<String> lines = sc.textFile("D://students.txt");
            
            //将普通RDD装换成JavaRDD<Row>
            JavaRDD<Row> rowRDD = lines.map(new Function<String, Row>() {

                private static final long serialVersionUID = 1L;

                @Override
                public Row call(String line) throws Exception {
                    String[] strArray = line.split(",");
                    Row row= RowFactory.create(
                            Integer.valueOf(strArray[0]),    //id
                            strArray[1],    //name
                            Integer.valueOf(strArray[2]));    //age
                
                    return row;
                }
            });
            
            //第二步 创建元类型, 即创建schema
            List<StructField> structFields = new ArrayList<StructField>();
            structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));  
            structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));  
            structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));  
            StructType structType = DataTypes.createStructType(structFields);

            //根据元数据类型将JavaRDD<Row>转化成DataFrame
            DataFrame studentDF = sqlCotnext.createDataFrame(rowRDD, structType);

            studentDF.show();
        }

    -》DataFrame、RDD、List互转

    JavaRDD<Row> rows = studentDF.javaRDD();

    List<Row> studentList = rows.collect(); 

    三、DataFrame基本用法

            // 打印DataFrame中所有的数据(select * from ...)

    df.show();
            // 打印DataFrame的元数据(Schema)
            df.printSchema();
            // 查询某列所有的数据
            df.select("name").show();  
            // 查询某几列所有的数据,并对列进行计算
            df.select(df.col("name"), df.col("age").plus(1)).show();
            // 根据某一列的值进行过滤
            df.filter(df.col("age").gt(18)).show();
            // 根据某一列进行分组,然后进行聚合
            df.groupBy(df.col("age")).count().show();

      DataFrame studentDF = sqlCotnext.createDataFrame(rowRDD, structType);
            studentDF.show();

            studentDF.registerTempTable("students"); //将DataFrame注册为零时表,取名students
            
            //对students零时表做sql查询
            DataFrame oldStudentDF = sqlCotnext.sql("select * from students where age>18");

            
            oldStudentDF.show();
  • 相关阅读:
    软件工程之开发过程
    软件工程设计之四则运算
    Android笔记-5-EditText密码和Checkbox二选一
    Android笔记-4-实现登陆页面并跳转和简单的注册页面
    Android笔记-3-EditText的属性介绍
    Android笔记-2-TextView的属性详解
    Android笔记-1
    Microsoft Build 2015
    网络受限是个什么东东?
    几乎所有编程语言的hello, world程序(3)
  • 原文地址:https://www.cnblogs.com/key1309/p/5352310.html
Copyright © 2011-2022 走看看