List<String> basicList = new ArrayList<String>(); basicList.add("{"name": "zzq","age": 15}"); basicList.add("{"name": "zzq1","age": 25}"); basicList.add("{"name": "zzq2","age": 35}"); List<String> scoreList = new ArrayList<String>(); scoreList.add("{"name": "zzq","sex": "男","score": 110}"); scoreList.add("{"name": "zzq1","sex": "女","score": 90}"); scoreList.add("{"name": "zzq2","sex": "男","score": 70}"); SparkConf sparkConf = new SparkConf() .setAppName("StudentsScore") .setMaster("local"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(javaSparkContext); JavaRDD<String> rdd_basicList = javaSparkContext.parallelize(basicList); JavaRDD<String> rdd_scoreList = javaSparkContext.parallelize(scoreList); DataFrame df_scoreList = sqlContext.read().json(rdd_scoreList); JavaRDD<Row> rdd_filter_score = df_scoreList.filter(df_scoreList.col("score").geq(90)).javaRDD(); //Pair默认返回一个Tuple2,如果更多属性值的话可以在第二个参数下使用TupleX,例子如下 JavaPairRDD<String, Tuple2<String, Long>> rdd_pair_score = rdd_filter_score.mapToPair(new PairFunction<Row, String, Tuple2<String, Long>>() { @Override public Tuple2<String, Tuple2<String, Long>> call(Row row) throws Exception { return new Tuple2<String, Tuple2<String, Long>>(row.getString(0), new Tuple2<String, Long>(row.getString(2), row.getLong(1))); } }); DataFrame df_basicList = sqlContext.read().json(rdd_basicList); df_basicList.registerTempTable("df_basicList_table"); StringBuilder sqlStrB = new StringBuilder(); sqlStrB.append("select name,age from df_basicList_table where name in ( "); List<Tuple2<String, Tuple2<String, Long>>> local_rdd_pair_score = rdd_pair_score.collect(); Iterator<Tuple2<String, Tuple2<String, Long>>> itr = local_rdd_pair_score.iterator(); for (; itr.hasNext(); ) { Tuple2<String, Tuple2<String, Long>> currItem = itr.next(); sqlStrB.append("""); sqlStrB.append(currItem._1()); sqlStrB.append("""); if (itr.hasNext()) sqlStrB.append(","); } sqlStrB.append(" ) "); DataFrame df_filter_basicList = sqlContext.sql(sqlStrB.toString()); JavaRDD<Row> rdd_filter_basic = df_filter_basicList.javaRDD(); JavaPairRDD<String, Long> rdd_pair_basic = rdd_filter_basic.mapToPair(new PairFunction<Row, String, Long>() { @Override public Tuple2<String, Long> call(Row row) throws Exception { return new Tuple2<String, Long>(row.getString(0), row.getLong(1)); } }); JavaPairRDD<String, Tuple2<Tuple2<String, Long>, Long>> all_studentsInfo = rdd_pair_score.join(rdd_pair_basic); //存储-------------------------------start---------------------------------- JavaRDD<Row> row_all_studentsInfo = all_studentsInfo.map(new Function<Tuple2<String, Tuple2<Tuple2<String, Long>, Long>>, Row>() { @Override public Row call(Tuple2<String, Tuple2<Tuple2<String, Long>, Long>> v1) throws Exception { return RowFactory.create(v1._1(), v1._2()._1()._1(), v1._2()._1()._2(), v1._2()._2()); } }); List<StructField> fieldList = new ArrayList<StructField>(); fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true)); fieldList.add(DataTypes.createStructField("sex", DataTypes.StringType, true)); fieldList.add(DataTypes.createStructField("score", DataTypes.LongType, true)); fieldList.add(DataTypes.createStructField("age", DataTypes.LongType, true)); StructType temp = DataTypes.createStructType(fieldList); DataFrame df_save = sqlContext.createDataFrame(row_all_studentsInfo, temp); df_save.write().save("hdfs://xxxx..........parquet");//将文件存储 //存储-------------------------------end---------------------------------- all_studentsInfo.foreach(new VoidFunction<Tuple2<String, Tuple2<Tuple2<String, Long>, Long>>>() { @Override public void call(Tuple2<String, Tuple2<Tuple2<String, Long>, Long>> stringTuple2Tuple2) throws Exception { System.out.println(">>>>>>>>>>>>" + stringTuple2Tuple2._1() + " -- " + stringTuple2Tuple2._2()._1()._1() + " -- " + stringTuple2Tuple2._2()._1()._2() + " -- " + stringTuple2Tuple2._2()._2()); } });