zoukankan      html  css  js  c++  java
  • spark-sql将Rdd转换为DataFrame进行操作的两种方法

       SparkConf sparkConf = new SparkConf()
                    .setMaster("local").setAppName("ClzMap");
    
            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    
            JavaRDD<String> line_str = javaSparkContext.textFile("C:\Users\Administrator\Desktop\stud.txt");
    
            JavaRDD<KK> line_kk = line_str.map(new Function<String, KK>() {
                @Override
                public KK call(String s) throws Exception {
                    String attr[] = s.split(",");
                    KK k = new KK();
                    k.setName(attr[0]);
                    k.setAge(Integer.parseInt(attr[1]));
                    k.setYear(attr[2]);
                    return k;
                }
            });
    
            SQLContext sqlContext = new SQLContext(javaSparkContext);
    
            DataFrame df = sqlContext.createDataFrame(line_kk, KK.class);//反射的方式
    
            //在这理由两种方法进行数据过滤(1:使用DataFrame的javaApi,2:使用临时表的sql查询方式)
    
            //-------------------------第1种-----------------------
            DataFrame df_filter = df.filter(df.col("age").geq(19));
            //-------------------------end-----------------------
    
            //-------------------------第2种-----------------------
            DataFrame df_filter1 = df.filter(df.col("age").geq(19));
            df_filter1.registerTempTable("KK");//创建一个临时表,参数为表名
            sqlContext.sql("select  * from KK where age>=19");
            //-------------------------end-----------------------
    
            JavaRDD<Row> df_row = df_filter1.javaRDD();//将DataFrame转化成RDD
    
            JavaRDD<KK> df_kk = df_row.map(new Function<Row, KK>() {
                @Override
                public KK call(Row row) throws Exception {//row的顺序和原来的文件输入可能有不同
                    KK k = new KK();
                    k.setAge(row.getInt(0));
                    k.setName(row.getString(1));
                    k.setYear(row.getString(2));
                    return k;
                }
            });
    
            df_kk.foreach(new VoidFunction<KK>() {
                @Override
                public void call(KK kk) throws Exception {
                    System.out.println("getAge->" + kk.getAge());
                    System.out.println("getYear->" + kk.getYear());
                    System.out.println("getName->" + kk.getName());
                    System.out.println("=============");
                }
            });

    文本文件的内容:

       由上述代码可以看出,KK是一个实体类型并且可序列化(Serializable)!

    zzq,19,2016
    yyu,18,2016
    uui,90,2015



    ps:如果在运行期才能确定类型,则需要使用StructType动态构建类型,代码如下:
       //构建一个动态类型
            List<StructField> structFieldList = new ArrayList<StructField>();
            structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));//第三个参数决定这个属性是否可以为null
            structFieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
            structFieldList.add(DataTypes.createStructField("year", DataTypes.StringType, true));
            StructType structType = DataTypes.createStructType(structFieldList);
    
            SQLContext sqlContext = new SQLContext(javaSparkContext);
    
            DataFrame df = sqlContext.createDataFrame(line_row, structType);
  • 相关阅读:
    Linux 多路复用 select / poll
    Linux 驱动层实现阻塞和非阻塞
    Linux 中断下半部
    Nginx基本配置文件
    lnmp “.user.ini”无法删除解决方法
    阿里云服务器配置nginx和PHP
    PHP使用某个键值对二维数组排序
    Laravel 生成二维码的方法
    Redis五种数据类型-设置key的过期时间
    laravel中redis队列的使用
  • 原文地址:https://www.cnblogs.com/zzq-include/p/8717544.html
Copyright © 2011-2022 走看看