zoukankan      html  css  js  c++  java
  • spark-DataFrame之RDD和DataFrame之间的转换

    package cn.spark.study.core.mycode_dataFrame;

    import java.io.Serializable;
    import java.util.List;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SQLContext;

    public class RDD2DataFrameReflection implements Serializable{
    /**
    *
    */
    private static final long serialVersionUID = 1L;

    public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("RDD2DataFrameReflection").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);
    JavaRDD<String> lines = sc.textFile("D:/students.txt");
    JavaRDD<student> students = lines.map(new Function<String, student>() {
    @Override
    public student call(String line) throws Exception {
    String[] lineSplit = line.split(",");
    student stu = new student();
    stu.setId(Integer.valueOf(lineSplit[0].trim()));
    stu.setName(String.valueOf(lineSplit[1].trim()));
    stu.setAge(Integer.valueOf(lineSplit[2].trim()));
    return stu;
    }
    });
    // 使用反射方式,将RDD转换为DataFrame
    DataFrame studentDF = sqlContext.createDataFrame(students, student.class);
    // 拿到了一个DataFrame之后,就可以将其注册为一个临时表,然后针对其中的数据执行SQL语句
    studentDF.registerTempTable("students");
    DataFrame teenagerDF = sqlContext.sql("select * from students where age <= 18");
    // 将查询出来的DataFrame,再次转换为RDD(中间查询 结果转换RDD)
    JavaRDD<Row> teenagerRDD = teenagerDF.javaRDD();
    JavaRDD<student> teenagerStudentRDD = teenagerRDD.map(new Function<Row, student>() {

    @Override
    public student call(Row row) throws Exception {
    // row中的数据的顺序,可能是跟我们期望的是不一样的!
    student stu = new student();
    stu.setAge(row.getInt(0));
    stu.setId(row.getInt(1));
    stu.setName(row.getString(2));
    return stu;
    }
    });
    List<student> studentList = teenagerStudentRDD.collect();
    for(student stu : studentList)
    {
    System.out.println(stu);
    }
    }
    }

  • 相关阅读:
    ibatis 中isNull, isNotNull与isEmpty, isNotEmpty区别
    关于异常Microsoft.CSharp.RuntimeBinder.RuntimeBinderException
    php 利用fsockopen GET/POST 提交表单及上传文件
    php发送get、post请求获取内容的几种方法
    修改WampServer的默认端口
    SQL Server2008附加数据库之后显示为只读时解决方法
    Linux一键安装web环境全攻略(阿里云服务器)
    如何从Windows远程上传文件到Linux(例如CentOS 7)
    在CentOS上搭建PHP服务器环境
    linux(系统centos6.5)常用命令总结
  • 原文地址:https://www.cnblogs.com/yaohaitao/p/5681496.html
Copyright © 2011-2022 走看看