zoukankan      html  css  js  c++  java
  • Spark SQL 读取json 里面的数据 ,jason 是 结构的数据

    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.SparkContext;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    import scala.Tuple2;
    
    /**
     * JSON数据源
     * @author Administrator
     *
     */
    public class JSONDataSource {
    
    	public static void main(String[] args) {
    		SparkConf conf = new SparkConf()
    				.setAppName("JSONDataSource")
    //				.set("spark.default.parallelism", "100")
    				.setMaster("local");  
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		SQLContext sqlContext = new SQLContext(sc);
    		
    		DataFrame studentScoresDF = sqlContext.read().json("student.json");  
    		
    		studentScoresDF.registerTempTable("student_scores");
    		DataFrame goodStudentScoresDF = sqlContext.sql(
    				"select name,count(score) from student_scores where score>=80 group by name");
    		
    		List<String> goodStudentNames = goodStudentScoresDF.javaRDD().map(
    				
    				new Function<Row, String>() {
    					
    					private static final long serialVersionUID = 1L;
    		
    					@Override
    					public String call(Row row) throws Exception {
    						return row.getString(0);
    					}
    					
    				}).collect();
    		
    		List<String> studentInfoJSONs = new ArrayList<String>();
    		studentInfoJSONs.add("{"name":"Michael", "age":18}");  
    		studentInfoJSONs.add("{"name":"Andy", "age":17}");  
    		studentInfoJSONs.add("{"name":"Justin", "age":19}");
    		JavaRDD<String> studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs);
    		DataFrame studentInfosDF = sqlContext.read().json(studentInfoJSONsRDD);
    		
    		studentInfosDF.registerTempTable("student_infos");  
    		
    		String sql = "select name,age from student_infos where name in (";
    		for(int i = 0; i < goodStudentNames.size(); i++) {
    			sql += "'" + goodStudentNames.get(i) + "'";
    			if(i < goodStudentNames.size() - 1) {
    				sql += ",";
    			}
    		}
    		sql += ")";
    		
    		DataFrame goodStudentInfosDF = sqlContext.sql(sql);
    		
    		JavaPairRDD<String, Tuple2<Integer, Integer>> goodStudentsRDD = 
    				
    				goodStudentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
    
    					private static final long serialVersionUID = 1L;
    		
    					@Override
    					public Tuple2<String, Integer> call(Row row) throws Exception {
    						return new Tuple2<String, Integer>(row.getString(0), 
    								Integer.valueOf(String.valueOf(row.getLong(1))));  
    					}
    					
    				}).join(goodStudentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
    		
    					private static final long serialVersionUID = 1L;
    		
    					@Override
    					public Tuple2<String, Integer> call(Row row) throws Exception {
    						return new Tuple2<String, Integer>(row.getString(0),
    								Integer.valueOf(String.valueOf(row.getLong(1))));   
    					}
    					
    				}));
    		
    		JavaRDD<Row> goodStudentRowsRDD = goodStudentsRDD.map(
    				
    				new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {
    
    					private static final long serialVersionUID = 1L;
    
    					@Override
    					public Row call(
    							Tuple2<String, Tuple2<Integer, Integer>> tuple)
    							throws Exception {
    						return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
    					}
    					
    				});
    		
    		List<StructField> structFields = new ArrayList<StructField>();
    		structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); 
    		structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));  
    		structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));  
    		StructType structType = DataTypes.createStructType(structFields);
    		
    		DataFrame goodStudentsDF = sqlContext.createDataFrame(goodStudentRowsRDD, structType);
    		
    		goodStudentsDF.show();
    //		goodStudentsDF.write().format("json").save("hdfs://hadoop1:9000/output/good-students");  
    	}
    	
    }
    

  • 相关阅读:
    BZOJ4039 : 集会
    BZOJ3655 : 神经错乱数
    World Finals 2017爆OJ记
    Petrozavodsk Summer-2016. Ural FU Dandelion Contest
    XVII Open Cup named after E.V. Pankratiev. Grand Prix of America (NAIPC-2017)
    递归的逻辑(3)——递归与分治
    递归的逻辑(2)——特征方程和递归算法
    递归的逻辑(1)——递归关系模型
    整数的故事(4)——Karastuba算法
    整数的故事(3)——最小公倍数与哥德巴赫猜想
  • 原文地址:https://www.cnblogs.com/TendToBigData/p/10501295.html
Copyright © 2011-2022 走看看