zoukankan      html  css  js  c++  java
  • Spark学习笔记

    Spark Core

    1.1 RDD

    概念:The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.

    RDD创建:

    1. parallelizing an existing collection in your driver program
    2. referencing a dataset in an external storage system, such as a shared filesystem

      

    //第一种创建方法
    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
    JavaRDD<Integer> distData = sc.parallelize(data);
    
    //第二种创建方法
    JavaRDD<String> distFile = sc.textFile("data.txt");
    

    RDD操作:

    1. transformations, which create a new dataset from an existing one
    2. actions, which return a value to the driver program after running a computation on the dataset
    JavaRDD<String> lines = sc.textFile("data.txt");
    JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
    int totalLength = lineLengths.reduce((a, b) -> a + b);
    

     map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program.

    RDD操作性能:

      All transformations in Spark are lazy.

      By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.

      Shuffle operations. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. (reduceByKey).

    1.2 Shared Variables

    Broadcast Variables:Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

    Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
    
    broadcastVar.value();
    // returns [1, 2, 3]
    

    Accumulators:Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel.A numeric accumulator can be created by calling SparkContext.longAccumulator() or SparkContext.doubleAccumulator() to accumulate values of type Long or Double, respectively. Tasks running on a cluster can then add to it using the add method. However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.sed to implement counters (as in MapReduce) or sums.

    LongAccumulator accum = jsc.sc().longAccumulator();
    
    sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
    
    accum.value();
    // returns 10
    

    Spark Sql

      Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API.

      A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.

      A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. While, in JAVA API, users need to use Dataset<Row> to represent a DataFrame.

    //DataFrames实例
    //people.json
    //{"name":"Michael"}
    //{"name":"Andy", "age":30}
    //{"name":"Justin", "age":19}
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
    
    SparkSession spark = SparkSession
      .builder()
      .appName("Java Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate();
    // Displays the content of the DataFrame to stdout
    df.show();
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+
    // Select everybody, but increment the age by 1
    df.select(col("name"), col("age").plus(1)).show();
    // +-------+---------+
    // |   name|(age + 1)|
    // +-------+---------+
    // |Michael|     null|
    // |   Andy|       31|
    // | Justin|       20|
    // +-------+---------+
    
    // Select people older than 21
    df.filter(col("age").gt(21)).show();
    // +---+----+
    // |age|name|
    // +---+----+
    // | 30|Andy|
    // +---+----+
    
    // Count people by age
    df.groupBy("age").count().show();
    // +----+-----+
    // | age|count|
    // +----+-----+
    // |  19|    1|
    // |null|    1|
    // |  30|    1|
    // +----+-----+
    

      

    //SQL实例
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    
    // Register the DataFrame as a SQL temporary view
    df.createOrReplaceTempView("people");
    
    Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
    sqlDF.show();
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+
    
    //Dataset实例
    import java.util.Arrays;
    import java.util.Collections;
    import java.io.Serializable;
    
    import org.apache.spark.api.java.function.MapFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.Encoder;
    import org.apache.spark.sql.Encoders;
    
    public static class Person implements Serializable {
      private String name;
      private int age;
    
      public String getName() {
        return name;
      }
    
      public void setName(String name) {
        this.name = name;
      }
    
      public int getAge() {
        return age;
      }
    
      public void setAge(int age) {
        this.age = age;
      }
    }
    // Create an instance of a Bean class
    Person person = new Person();
    person.setName("Andy");
    person.setAge(32);
    
    // Encoders are created for Java beans
    Encoder<Person> personEncoder = Encoders.bean(Person.class);
    Dataset<Person> javaBeanDS = spark.createDataset(
      Collections.singletonList(person),
      personEncoder
    );
    javaBeanDS.show();
    // +---+----+
    // |age|name|
    // +---+----+
    // | 32|Andy|
    // +---+----+
    
    
    // DataFrames can be converted to a Dataset by providing a class. Mapping based on name
    String path = "examples/src/main/resources/people.json";
    Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
    peopleDS.show();
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+
    

      Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use a specialized Encoder to serialize the objects for processing or transmitting over the network. While both encoders and standard serialization are responsible for turning an object into bytes, encoders are code generated dynamically and use a format that allows Spark to perform many operations like filtering, sorting and hashing without deserializing the bytes back into an object.

  • 相关阅读:
    获取DIV与浏览器顶部相聚一定位置之后移动DIV
    CSS定位小技巧
    jquery动态样式操作
    Python学习笔记1
    KNN算法的感受 2
    KNN算法的感受 1
    Matplotlib安装感想
    安装numpy只需一步简单的方法
    Ubuntu学习笔记3-图书知识点总结
    Hadoop源码如何查看
  • 原文地址:https://www.cnblogs.com/killianxu/p/11026447.html
Copyright © 2011-2022 走看看