zoukankan      html  css  js  c++  java
  • spark-sql将Rdd转换为DataFrame进行操作的两种方法

       SparkConf sparkConf = new SparkConf()
                    .setMaster("local").setAppName("ClzMap");
    
            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    
            JavaRDD<String> line_str = javaSparkContext.textFile("C:\Users\Administrator\Desktop\stud.txt");
    
            JavaRDD<KK> line_kk = line_str.map(new Function<String, KK>() {
                @Override
                public KK call(String s) throws Exception {
                    String attr[] = s.split(",");
                    KK k = new KK();
                    k.setName(attr[0]);
                    k.setAge(Integer.parseInt(attr[1]));
                    k.setYear(attr[2]);
                    return k;
                }
            });
    
            SQLContext sqlContext = new SQLContext(javaSparkContext);
    
            DataFrame df = sqlContext.createDataFrame(line_kk, KK.class);//反射的方式
    
            //在这理由两种方法进行数据过滤(1:使用DataFrame的javaApi,2:使用临时表的sql查询方式)
    
            //-------------------------第1种-----------------------
            DataFrame df_filter = df.filter(df.col("age").geq(19));
            //-------------------------end-----------------------
    
            //-------------------------第2种-----------------------
            DataFrame df_filter1 = df.filter(df.col("age").geq(19));
            df_filter1.registerTempTable("KK");//创建一个临时表,参数为表名
            sqlContext.sql("select  * from KK where age>=19");
            //-------------------------end-----------------------
    
            JavaRDD<Row> df_row = df_filter1.javaRDD();//将DataFrame转化成RDD
    
            JavaRDD<KK> df_kk = df_row.map(new Function<Row, KK>() {
                @Override
                public KK call(Row row) throws Exception {//row的顺序和原来的文件输入可能有不同
                    KK k = new KK();
                    k.setAge(row.getInt(0));
                    k.setName(row.getString(1));
                    k.setYear(row.getString(2));
                    return k;
                }
            });
    
            df_kk.foreach(new VoidFunction<KK>() {
                @Override
                public void call(KK kk) throws Exception {
                    System.out.println("getAge->" + kk.getAge());
                    System.out.println("getYear->" + kk.getYear());
                    System.out.println("getName->" + kk.getName());
                    System.out.println("=============");
                }
            });

    文本文件的内容:

       由上述代码可以看出,KK是一个实体类型并且可序列化(Serializable)!

    zzq,19,2016
    yyu,18,2016
    uui,90,2015



    ps:如果在运行期才能确定类型,则需要使用StructType动态构建类型,代码如下:
       //构建一个动态类型
            List<StructField> structFieldList = new ArrayList<StructField>();
            structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));//第三个参数决定这个属性是否可以为null
            structFieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
            structFieldList.add(DataTypes.createStructField("year", DataTypes.StringType, true));
            StructType structType = DataTypes.createStructType(structFieldList);
    
            SQLContext sqlContext = new SQLContext(javaSparkContext);
    
            DataFrame df = sqlContext.createDataFrame(line_row, structType);
  • 相关阅读:
    SQL性能优化:如何定位网络性能问题
    ORACLE 10升级到10.2.0.5 Patch Set遇到的内核参数检测失败问题
    Linux 僵尸进程查杀
    Linux 虚拟机网络适配器从E1000改为VMXNET3
    v$session中server为none与shared值解析
    SQL SERVER导出特殊格式的平面文件
    XtraBackup出现 Can't connect to local MySQL server through socket '/tmp/mysql.sock'
    CentOS 6.6安装Xtrabackup RPM提示缺少libev.so.4()
    SQL Server Replication 中关于视图的点滴
    ORA-00988: missing or invalid password(s)
  • 原文地址:https://www.cnblogs.com/zzq-include/p/8717544.html
Copyright © 2011-2022 走看看