zoukankan      html  css  js  c++  java
  • spark常用操作(二)

    //spark读取数据
    Dataset<Row> df = spark.read().textFile(currentSrcPath, 1);
    Dataset<Row> df = spark.read().json(path);
    Dataset<Row> df = spark.read().orc(path);
    Dataset<Row> parquet = spark.read().parquet(path);
    
    //spark写入数据
    df.write().mode("overwrite").text(outputPath);
    df.write().mode("overwrite").parquet(outputPath);
    df.write().mode("overwrite").orc(outputPath);
    
    //rdd转Dataset<Row>
    Dataset<Row> df = spark.createDataFrame(rowRDD, AdjustSchema.row);
    
    //list转Dataset
    Dataset<String> dataset = spark.createDataset(Collections.singletonList(Long.toString(startTime)), Encoders.STRING());
    //从spark获取hadoop FileSystem
    FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
    //构建schema
    public static StructType row = DataTypes.createStructType(
                Arrays.asList(
                        DataTypes.createStructField("phone_name", StringType, true),
                        DataTypes.createStructField("app_id", StringType, true)
    ...
    ));
    //rdd/javaRDD转DataFrame(Dataset<Row>)
    Dataset<Row> personDF = spark.createDataFrame(personRDD, Person.class);
    spark.createDataFrame(personRDD, PersonSchema);
    personDF = spark.createDataFrame(personJavaRDD, Person.class);
    
    //rdd转Dataset
    Encoder<Person> personEncoder = Encoders.bean(Person.class);
    personDS = spark.createDataset(personJavaRDD.rdd(), personEncoder);
    
    //list直接构建Dataset
    Dataset<Row> personDF = spark.createDataFrame(personList, Person.class);
    
    //JavaRDD<Row>转Dataset<Row>
    JavaRDD<Row> personRowRdd = personJavaRDD.map(person -> RowFactory.create(person.age, person.name));
    personDF = spark.createDataFrame(personRowRdd, rowAgeNameSchema);
    
    //Dataset<Person> -> JavaRDD<Person>
    personJavaRDD = personDS.toJavaRDD();
    
    //Dataset<Row> -> JavaRDD<Person>
    personJavaRDD = personDF.toJavaRDD().map(row -> {
              String name = row.getAs("name");
              int age = row.getAs("age");
              return new Person(name, age);
          });
    
    //Dataset<Person> -> Dataset<Row>
    ExpressionEncoder<Row> rowEncoder = RowEncoder.apply(rowSchema);
          Dataset<Row> personDF_fromDS = personDS.map(
                  (MapFunction<Person, Row>) person -> {
                      List<Object> objectList = new ArrayList<>();
                      objectList.add(person.name);
                      objectList.add(person.age);
                      return RowFactory.create(objectList.toArray());
                  },
                  rowEncoder
          );
    
    //Dataset<Row> -> Dataset<Person>
    personDS = personDF.map(new MapFunction<Row, Person>() {
              @Override
              public Person call(Row value) throws Exception {
                  return new Person(value.getAs("name"), value.getAs("age"));
              }
          }, personEncoder);
  • 相关阅读:
    收藏:iBLC编码器
    蛙蛙推荐:怎样调试asp.net黄页错
    蛙蛙推荐:IE下3px bug研究
    蛙蛙推荐:winform入门
    蛙蛙推荐:蛙蛙牌云存储服务
    整理:个人知识管理相关链接
    蛙蛙推荐:c#编写网络电话
    蛙蛙推荐:蛙蛙教你解析网络包
    有用的SQL 语句(转) dodo
    .net 点击刷新验证码问题 dodo
  • 原文地址:https://www.cnblogs.com/wangbin2188/p/12851952.html
Copyright © 2011-2022 走看看