zoukankan      html  css  js  c++  java
  • Spark SQL 读取json 里面的数据 ,jason 是 结构的数据

    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.SparkContext;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    import scala.Tuple2;
    
    /**
     * JSON数据源
     * @author Administrator
     *
     */
    public class JSONDataSource {
    
    	public static void main(String[] args) {
    		SparkConf conf = new SparkConf()
    				.setAppName("JSONDataSource")
    //				.set("spark.default.parallelism", "100")
    				.setMaster("local");  
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		SQLContext sqlContext = new SQLContext(sc);
    		
    		DataFrame studentScoresDF = sqlContext.read().json("student.json");  
    		
    		studentScoresDF.registerTempTable("student_scores");
    		DataFrame goodStudentScoresDF = sqlContext.sql(
    				"select name,count(score) from student_scores where score>=80 group by name");
    		
    		List<String> goodStudentNames = goodStudentScoresDF.javaRDD().map(
    				
    				new Function<Row, String>() {
    					
    					private static final long serialVersionUID = 1L;
    		
    					@Override
    					public String call(Row row) throws Exception {
    						return row.getString(0);
    					}
    					
    				}).collect();
    		
    		List<String> studentInfoJSONs = new ArrayList<String>();
    		studentInfoJSONs.add("{"name":"Michael", "age":18}");  
    		studentInfoJSONs.add("{"name":"Andy", "age":17}");  
    		studentInfoJSONs.add("{"name":"Justin", "age":19}");
    		JavaRDD<String> studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs);
    		DataFrame studentInfosDF = sqlContext.read().json(studentInfoJSONsRDD);
    		
    		studentInfosDF.registerTempTable("student_infos");  
    		
    		String sql = "select name,age from student_infos where name in (";
    		for(int i = 0; i < goodStudentNames.size(); i++) {
    			sql += "'" + goodStudentNames.get(i) + "'";
    			if(i < goodStudentNames.size() - 1) {
    				sql += ",";
    			}
    		}
    		sql += ")";
    		
    		DataFrame goodStudentInfosDF = sqlContext.sql(sql);
    		
    		JavaPairRDD<String, Tuple2<Integer, Integer>> goodStudentsRDD = 
    				
    				goodStudentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
    
    					private static final long serialVersionUID = 1L;
    		
    					@Override
    					public Tuple2<String, Integer> call(Row row) throws Exception {
    						return new Tuple2<String, Integer>(row.getString(0), 
    								Integer.valueOf(String.valueOf(row.getLong(1))));  
    					}
    					
    				}).join(goodStudentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
    		
    					private static final long serialVersionUID = 1L;
    		
    					@Override
    					public Tuple2<String, Integer> call(Row row) throws Exception {
    						return new Tuple2<String, Integer>(row.getString(0),
    								Integer.valueOf(String.valueOf(row.getLong(1))));   
    					}
    					
    				}));
    		
    		JavaRDD<Row> goodStudentRowsRDD = goodStudentsRDD.map(
    				
    				new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {
    
    					private static final long serialVersionUID = 1L;
    
    					@Override
    					public Row call(
    							Tuple2<String, Tuple2<Integer, Integer>> tuple)
    							throws Exception {
    						return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
    					}
    					
    				});
    		
    		List<StructField> structFields = new ArrayList<StructField>();
    		structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); 
    		structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));  
    		structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));  
    		StructType structType = DataTypes.createStructType(structFields);
    		
    		DataFrame goodStudentsDF = sqlContext.createDataFrame(goodStudentRowsRDD, structType);
    		
    		goodStudentsDF.show();
    //		goodStudentsDF.write().format("json").save("hdfs://hadoop1:9000/output/good-students");  
    	}
    	
    }
    

  • 相关阅读:
    Python实现杨辉三角
    第8.30节 重写Python __setattr__方法实现属性修改捕获
    转:Cookie详解
    第8.29节 使用MethodType将Python __setattr__定义的实例方法与实例绑定
    第8.28节 Python中使用__setattr__定义实例变量和实例方法
    第8.27节 Python中__getattribute__与property的fget、@property装饰器getter关系深入解析
    第8.26节 重写Python类中的__getattribute__方法实现实例属性访问捕获
    关于open函数文件打开模式的有意思的一个现象
    第8.25节 Python风格的__getattribute__属性访问方法语法释义及使用
    转:关于Python中的lambda,这篇阅读量10万+的文章可能是你见过的最完整的讲解
  • 原文地址:https://www.cnblogs.com/TendToBigData/p/10501295.html
Copyright © 2011-2022 走看看