zoukankan      html  css  js  c++  java
  • Spark向HDFS中存储数据

    程序如下:

    import org.apache.spark.sql.Row;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;
    
    
    public class QueryAndStoreHDFSData {
        static SparkConf sparkConf = new SparkConf().setAppName("HDFSQuery").setMaster("local[2]");
        static JavaSparkContext sc = new JavaSparkContext(sparkConf);
        static SQLContext sqlContext = new SQLContext(sc);
        public static void main(String[] args){
    //        JavaRDD<String> poi = sc.textFile("hdfs://node2:9000/user/flume/events/2015-11-27-21/events-.1448629506841");
            DataFrame df = sqlContext.read().json("hdfs://node2:9000/user/flume/events/2015-11-26-21/events-.1448543965316");
            // 打印模式
            df.printSchema();
            // 将数据框架注册成一个表
            df.registerTempTable("poi");
            // 使用sql语句从表中读取数据
            DataFrame poi = sqlContext.sql("SELECT * FROM poi WHERE cid=57425749418");
            JavaRDD<Row> row = poi.javaRDD();
            
            //将RDD中的数据存入HDFS(也可以指定其他目录和格式)
            row.saveAsTextFile("hdfs://node2:9000/user/poi.txt");
            
            row.foreach(new VoidFunction<Row>(){
                @Override
                public void call(Row r) throws Exception {
                    System.out.println(r.mkString());        
                }
                
            });
        }
    }
  • 相关阅读:
    C语言ll作业01
    C语言寒假大作战04
    C语言寒假大作战03
    C语言寒假大作战02
    C语言寒假大作战01
    C语言I作业12—学期总结
    C语言I博客作业11
    C语言I博客作业10
    第三章预习笔记-运算方法和运算部件
    非数值数据的编码表示
  • 原文地址:https://www.cnblogs.com/gaopeng527/p/5003290.html
Copyright © 2011-2022 走看看