zoukankan      html  css  js  c++  java
  • RDD & java 类 (反射)构建 DataFrame ---java code

    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SQLContext;
    
    /**
     * 使用反射的方式将RDD转换成为DataFrame
     * 1、自定义的类必须是public
     * 2、自定义的类必须是可序列化的
     * 3、RDD转成DataFrame的时候,他会根据自定义类中的字段名进行排序。
     * @author zfg
     *
     */
    
    public class RDD2DataFrameByReflection {
    
    	public static void main(String[] args) {
    		SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		SQLContext sqlcontext = new SQLContext(sc);
    		
    		JavaRDD<String> lines = sc.textFile("Peoples.txt");
    		
    		JavaRDD<Person> personsRdd = lines.map(new Function<String, Person>() {
    			
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Person call(String line) throws Exception {
    				String[] split = line.split(",");
    				Person p = new Person();
    				p.setId(Integer.valueOf(split[0].trim()));
    				p.setNam(split[1]);
    				p.setAge(Integer.valueOf(split[2].trim()));
    				return p;
    			}
    			
    		});
    		
    		//传入进去Person.class的时候,sqlContext是通过反射的方式创建DataFrame
    		//在底层通过反射的方式或得Person的所有field,结合RDD本身,就生成了DataFrame
    		DataFrame df = sqlcontext.createDataFrame(personsRdd, Person.class);
    	 
    		//命名table的名字为person
    		df.registerTempTable("personTable");
    		
    		
    		DataFrame resultDataFrame = sqlcontext.sql("select * from personTable where age > 7");
    		
    		resultDataFrame.show();
    		
    		//将df转成rdd
    		JavaRDD<Row> resultRDD = resultDataFrame.javaRDD();
    
    		JavaRDD<Person> result = resultRDD.map(new Function<Row, Person>() {
    
    			@Override
    			public Person call(Row row) throws Exception {
    				 Person p = new Person();
    				 p.setAge(row.getInt(0));
    				 p.setId(row.getInt(1));
    				 p.setNam(row.getString(2));
    				return p;
    			}
    		});
    		
    		 List<Person> personList = result.collect();
    		 
    		 for (Person person : personList) {
    			System.out.println(person.toString());
    		} 
    	}
    }
     

  • 相关阅读:
    “12306”的架构到底有多牛逼
    数字治理
    浅谈web网站架构演变过程
    MapReduce基本原理
    Flink+Hologres亿级用户实时UV精确去重最佳实践
    全链路压测体系建设方案的思考与实践
    如何做好一场技术演讲?
    “控本焦虑”的工程企业 用钉钉宜搭找到了低成本数字化的“捷径”
    友盟+《小程序用户增长白皮书》:从五个角度入手分析小程序数据
    数字化让618有了洞悉消费者内心的“大脑”
  • 原文地址:https://www.cnblogs.com/TendToBigData/p/10501291.html
Copyright © 2011-2022 走看看