zoukankan      html  css  js  c++  java
  • spark复习总结01

    1.MapReduce和spark的对比

    MapReduceSpark
    数据存储结构:磁盘hdfs文件系统的split 使用内存构建弹性分布式数据集RDD,对数据进行运算和cache
    编程范式:Map+Reduce DAG(有向无环图):Transformation+action
    计算中间数据放在磁盘,io及序列化,反序列化代价大 计算中间数据在内存中维护,存储速度是磁盘的几个数量级
    Task以进程的方式维护,任务启动就要数秒 Task以线程的方式维护,对小数量集的读取能达到亚秒级的延迟

    2.初始化spark

    SparkConf conf = new SparkConf().setAppName("LocalWordCount").setMaster("local");// 指定运行在本地
    JavaSparkContext sparkContext = new JavaSparkContext(conf);

      master为Spark、Mesos、YARN URL或local。使用local值时,表示在本地模式下运行程序。

      也可以在省略,在spark-submit时进行指定。

    3.使用spark-shell,会自动创建sparkContext sc

      --marter 设置运行模式

    $ ./bin/spark-shell --master local[4]   //4为cup核数

      --jars 将jar包添加到classpath,多个jar中间用,隔开

    $ ./bin/spark-shell --master local[4] --jars code.jar

      --packages 通过maven坐标添加jar,多个用逗号隔开

    $ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

      --repositories 添加外部的repository

    4.RDD的创建

      4.1 通过并行化集合创建

     //通过并行化集合来创建RDD
    context.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));

      将集合并行化时可以指定分区数,spark会在每个分区上启动一个task。

      通常集群的每个CPU设置2~4个分区

      通常,spark基于集群自动设置分区数,也可以手动设置,parallelize(array,10)

      4.2 通过读取外部文件创建

    // 通过读取外部文件创建RDD
    sparkContext.textFile("src/main/resources/wordcount.txt", 5);

      如果使用的是本地文件,在其他worker节点上该文件必须在相同的目录下

      textFile()中可以是文件和目录,同时可以使用通配符sparkContext.textFile("src/main/resources/*.txt", 5);

      分区数默认情况spark为每个块创建一个分区,也可以手动设置,但不能小于块数。

      同时读取不同目录下的文件,使用wholeTextFiles(),仅适用于小文档

      

    /**
     * 读取不同目录下的文件,返回<fileName,content>的RDD,仅适用于小文档
     * 
     * [(file:/D:/workhome/workhome2/beifeng-spark/src/main/resources/wordcount.txt,
          hello scala java
          hello spark
          hello yarn
          hadoop spark yarn), 
      (file:/D:/workhome/workhome2/beifeng-spark/src/main/resources/other/aa.txt,
          hadoop yarn
          scale java)]
    */
    JavaPairRDD<String, String> files=sparkContext.wholeTextFiles("src/main/resources/wordcount.txt,src/main/resources/other/aa.txt");
           
    //返回以每个文档为元素的RDD
    JavaRDD<String> textRdd=files.map(new Function<Tuple2<String,String>, String>() {
    
                @Override
                public String call(Tuple2<String, String> v1) throws Exception {
                    //将文档中的换行符替换为空格,将回车符去掉
                    return v1._2.replaceAll("
    "," ").replaceAll("
    ", "");
                }
    });
            
    //返回以每个单词为元素的RDD
     JavaRDD<String> words=textRdd.flatMap(new FlatMapFunction<String, String>() {
    
                @Override
                public Iterable<String> call(String text) throws Exception {
                    return Arrays.asList(text.split(" "));
                }
     });

     5.cache(),persist()和checkpoint()之间的区别

      5.1 cache()和checkpoint()的区别

        适用场景:

          cache:会被重复使用但不太大的RDD,只会使用内存

          checkpoint:运算时间过长或者运算量太大才能得到的RDD,或者依赖其他RDD过多的RDD

        缓存机制:

          cache:每计算出一个要cache的RDD就直接将其cache到内存。

          checkpoint:等到job结束后另外开启专门的job去完成checkpoint,即checkpoint的RDD会计算两次,

              因此在使用rdd.checkpoint的时候,建议加上rdd.cache(),第二次的计算的时候就会直接使用cache中的内容

      5.2 persisit()和checkpoint()的区别

        persist()可以将RDD的partition持久化到磁盘,但该partition由blockManager管理,一旦driver program执行结束,也就是executor所在的进程停止,blockManager也会停止,别cache到磁盘上的RDD也会被清空。

        checkpoint()可以将RDD持久化到HDFS或本地文件夹,如果不手动清除永久存在。

      5.3 cache()和persisit()的区别

        cache()底层调用persist(StorageLevel.MEMORY_ONLY)

  • 相关阅读:
    leetcode
    Vim i和a差别
    HDU-4643-GSM(DFS)
    使用可编程电源自动化输出电源
    如何使用全局变量
    异步和同步
    启动界面
    鼠标右键弹出菜单
    Qt+excel
    Qt+数据库
  • 原文地址:https://www.cnblogs.com/lifeone/p/7115519.html
Copyright © 2011-2022 走看看