zoukankan      html  css  js  c++  java
  • Spark SQL 读取json 里面的数据 ,jason 是 结构的数据

    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.SparkContext;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    import scala.Tuple2;
    
    /**
     * JSON数据源
     * @author Administrator
     *
     */
    public class JSONDataSource {
    
    	public static void main(String[] args) {
    		SparkConf conf = new SparkConf()
    				.setAppName("JSONDataSource")
    //				.set("spark.default.parallelism", "100")
    				.setMaster("local");  
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		SQLContext sqlContext = new SQLContext(sc);
    		
    		DataFrame studentScoresDF = sqlContext.read().json("student.json");  
    		
    		studentScoresDF.registerTempTable("student_scores");
    		DataFrame goodStudentScoresDF = sqlContext.sql(
    				"select name,count(score) from student_scores where score>=80 group by name");
    		
    		List<String> goodStudentNames = goodStudentScoresDF.javaRDD().map(
    				
    				new Function<Row, String>() {
    					
    					private static final long serialVersionUID = 1L;
    		
    					@Override
    					public String call(Row row) throws Exception {
    						return row.getString(0);
    					}
    					
    				}).collect();
    		
    		List<String> studentInfoJSONs = new ArrayList<String>();
    		studentInfoJSONs.add("{"name":"Michael", "age":18}");  
    		studentInfoJSONs.add("{"name":"Andy", "age":17}");  
    		studentInfoJSONs.add("{"name":"Justin", "age":19}");
    		JavaRDD<String> studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs);
    		DataFrame studentInfosDF = sqlContext.read().json(studentInfoJSONsRDD);
    		
    		studentInfosDF.registerTempTable("student_infos");  
    		
    		String sql = "select name,age from student_infos where name in (";
    		for(int i = 0; i < goodStudentNames.size(); i++) {
    			sql += "'" + goodStudentNames.get(i) + "'";
    			if(i < goodStudentNames.size() - 1) {
    				sql += ",";
    			}
    		}
    		sql += ")";
    		
    		DataFrame goodStudentInfosDF = sqlContext.sql(sql);
    		
    		JavaPairRDD<String, Tuple2<Integer, Integer>> goodStudentsRDD = 
    				
    				goodStudentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
    
    					private static final long serialVersionUID = 1L;
    		
    					@Override
    					public Tuple2<String, Integer> call(Row row) throws Exception {
    						return new Tuple2<String, Integer>(row.getString(0), 
    								Integer.valueOf(String.valueOf(row.getLong(1))));  
    					}
    					
    				}).join(goodStudentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
    		
    					private static final long serialVersionUID = 1L;
    		
    					@Override
    					public Tuple2<String, Integer> call(Row row) throws Exception {
    						return new Tuple2<String, Integer>(row.getString(0),
    								Integer.valueOf(String.valueOf(row.getLong(1))));   
    					}
    					
    				}));
    		
    		JavaRDD<Row> goodStudentRowsRDD = goodStudentsRDD.map(
    				
    				new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {
    
    					private static final long serialVersionUID = 1L;
    
    					@Override
    					public Row call(
    							Tuple2<String, Tuple2<Integer, Integer>> tuple)
    							throws Exception {
    						return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
    					}
    					
    				});
    		
    		List<StructField> structFields = new ArrayList<StructField>();
    		structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); 
    		structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));  
    		structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));  
    		StructType structType = DataTypes.createStructType(structFields);
    		
    		DataFrame goodStudentsDF = sqlContext.createDataFrame(goodStudentRowsRDD, structType);
    		
    		goodStudentsDF.show();
    //		goodStudentsDF.write().format("json").save("hdfs://hadoop1:9000/output/good-students");  
    	}
    	
    }
    

  • 相关阅读:
    RN-Android构建失败:Caused by: org.gradle.api.ProjectConfigurationException: A problem occurred configuring root project 'AwesomeProject'.
    Android更新包下载成功后不出现安装界面
    真机调试: The application could not be installed: INSTALL_FAILED_TEST_ONLY
    react native 屏幕尺寸转换
    Android Studio生成签名文件,自动签名,以及获取SHA1和MD5值
    React Native安卓真机调试
    git提交代码报错Permission denied, please try again
    The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.
    命令行设置快捷命令
    Linux 常用指令
  • 原文地址:https://www.cnblogs.com/TendToBigData/p/10501295.html
Copyright © 2011-2022 走看看