zoukankan      html  css  js  c++  java
  • spark-sql集合的“条件过滤”,“合并”,“动态类型映射DataFrame”,“存储”

      List<String> basicList = new ArrayList<String>();
            basicList.add("{"name": "zzq","age": 15}");
            basicList.add("{"name": "zzq1","age": 25}");
            basicList.add("{"name": "zzq2","age": 35}");
    
            List<String> scoreList = new ArrayList<String>();
            scoreList.add("{"name": "zzq","sex": "男","score": 110}");
            scoreList.add("{"name": "zzq1","sex": "女","score": 90}");
            scoreList.add("{"name": "zzq2","sex": "男","score": 70}");
    
            SparkConf sparkConf = new SparkConf()
                    .setAppName("StudentsScore")
                    .setMaster("local");
    
            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
            SQLContext sqlContext = new SQLContext(javaSparkContext);
    
    
            JavaRDD<String> rdd_basicList = javaSparkContext.parallelize(basicList);
            JavaRDD<String> rdd_scoreList = javaSparkContext.parallelize(scoreList);
    
    
            DataFrame df_scoreList = sqlContext.read().json(rdd_scoreList);
            JavaRDD<Row> rdd_filter_score = df_scoreList.filter(df_scoreList.col("score").geq(90)).javaRDD();
    
            //Pair默认返回一个Tuple2,如果更多属性值的话可以在第二个参数下使用TupleX,例子如下
            JavaPairRDD<String, Tuple2<String, Long>> rdd_pair_score = rdd_filter_score.mapToPair(new PairFunction<Row, String, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Tuple2<String, Long>> call(Row row) throws Exception {
                    return new Tuple2<String, Tuple2<String, Long>>(row.getString(0), new Tuple2<String, Long>(row.getString(2), row.getLong(1)));
                }
            });
    
            DataFrame df_basicList = sqlContext.read().json(rdd_basicList);
            df_basicList.registerTempTable("df_basicList_table");
            StringBuilder sqlStrB = new StringBuilder();
            sqlStrB.append("select name,age from df_basicList_table where name in ( ");
            List<Tuple2<String, Tuple2<String, Long>>> local_rdd_pair_score = rdd_pair_score.collect();
            Iterator<Tuple2<String, Tuple2<String, Long>>> itr = local_rdd_pair_score.iterator();
            for (; itr.hasNext(); ) {
                Tuple2<String, Tuple2<String, Long>> currItem = itr.next();
                sqlStrB.append(""");
                sqlStrB.append(currItem._1());
                sqlStrB.append(""");
                if (itr.hasNext())
                    sqlStrB.append(",");
            }
            sqlStrB.append(" ) ");
    
            DataFrame df_filter_basicList = sqlContext.sql(sqlStrB.toString());
            JavaRDD<Row> rdd_filter_basic = df_filter_basicList.javaRDD();
            JavaPairRDD<String, Long> rdd_pair_basic = rdd_filter_basic.mapToPair(new PairFunction<Row, String, Long>() {
                @Override
                public Tuple2<String, Long> call(Row row) throws Exception {
                    return new Tuple2<String, Long>(row.getString(0), row.getLong(1));
                }
            });
    
            JavaPairRDD<String, Tuple2<Tuple2<String, Long>, Long>> all_studentsInfo = rdd_pair_score.join(rdd_pair_basic);
    
    
            //存储-------------------------------start----------------------------------
            JavaRDD<Row> row_all_studentsInfo = all_studentsInfo.map(new Function<Tuple2<String, Tuple2<Tuple2<String, Long>, Long>>, Row>() {
                @Override
                public Row call(Tuple2<String, Tuple2<Tuple2<String, Long>, Long>> v1) throws Exception {
                    return RowFactory.create(v1._1(), v1._2()._1()._1(), v1._2()._1()._2(), v1._2()._2());
                }
            });
    
            List<StructField> fieldList = new ArrayList<StructField>();
            fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
            fieldList.add(DataTypes.createStructField("sex", DataTypes.StringType, true));
            fieldList.add(DataTypes.createStructField("score", DataTypes.LongType, true));
            fieldList.add(DataTypes.createStructField("age", DataTypes.LongType, true));
            StructType temp = DataTypes.createStructType(fieldList);
    
            DataFrame df_save = sqlContext.createDataFrame(row_all_studentsInfo, temp);
    
            df_save.write().save("hdfs://xxxx..........parquet");//将文件存储
            //存储-------------------------------end----------------------------------
    
            all_studentsInfo.foreach(new VoidFunction<Tuple2<String, Tuple2<Tuple2<String, Long>, Long>>>() {
                @Override
                public void call(Tuple2<String, Tuple2<Tuple2<String, Long>, Long>> stringTuple2Tuple2) throws Exception {
                    System.out.println(">>>>>>>>>>>>" + stringTuple2Tuple2._1() + "  -- " + stringTuple2Tuple2._2()._1()._1() + "  --  " + stringTuple2Tuple2._2()._1()._2() + "  --  " + stringTuple2Tuple2._2()._2());
                }
            });
  • 相关阅读:
    angular 组件间数据共享
    Linux 常用命令
    angular 子路由跳转出现Navigation triggered outside Angular zone, did you forget to call ‘ngZone.run() 的问题修复
    angular :ngIf 的else用法
    利用 filter 来去重
    webpack打包时删除console.log,和debugger
    git忽略而不提交文件的3种情形
    jenkins 构建日程表配置
    vue之多页面的开发
    vue-cli3使用jq
  • 原文地址:https://www.cnblogs.com/zzq-include/p/8727431.html
Copyright © 2011-2022 走看看