zoukankan      html  css  js  c++  java
  • Spark 编程基础

    1. 初始化Spark

    import org.apache.spark.{SparkContext, SparkConf}
    
    val conf=new SparkConf().setAppName("RDD1").setMaster("local")
    val sc=new SparkContext(conf)

    2. 创建RDD的方法

    内存:Parallelize 或者 makeRDD

    外部文件:textFile

    //1.  both Parallelize and makeRDD could create RDD from In-Memory
     val distData=sc.parallelize(data)                   // parallelize
     val distData1=sc.makeRDD(data)                 // makeRDD 
    
    //2 textFile could create RDD from files
    val distFile=sc.textFile("E:/Java_WS/ScalaDemo/data/wc.txt") 
    

    3. 保存Spark结果

    RDD可以使用 saveAsTextFile()保存下来;

    非RDD,可以借助 Parallelize/makeRDD转化为RDD,再保存下来

    myRDD.saveTextFile("Path/test.txt")
    
    val precision=new Array[String](100)
    sc.parallelize(precision).saveAsTextFile("E:/Spark/models/precision.txt")

    4. 键值对

    下面两者等价:

    myRDD. map (s=> (s,1))
    myRDD. map (_,1)
    

    reduceByKey 和sortByKey、groupByKey

    distFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)   
    distFile.flatMap(_.split(" ")).map(s=>(s,1)).sortByKey().collect().foreach(println)
    distFile.flatMap(_.split(" ")).map(s=>(s,1)).groupByKey().foreach(println)

    1)返回key 以及 每个key的个数 (key, cnt)

    2)返回 (key,value) 排序后的

    3)返回(key, (value1,value2...))

    5. RDD 持久化  

     persist() 或 cache()

     unpersist() 可以删除缓存RDD

    6. 广播变量和累加器

    • 通过sc.broadcast(v) 和 sc.accumulator(初始值,comments)定义
    • 通过value访问其值。
    • 广播变量不能修改了
    • 累加器只能通过add 或者 +=修改
    //SparkContext.broadcast(v)  is a broadcast variable, could replace v in any place of the cluster
    val broadcastVar=sc.broadcast(Array(1,2,3))
    println(broadcastVar.value(0),broadcastVar.value(1),broadcastVar.value(2))
        
    val accum=sc.accumulator(0,"My Accumulator")
    sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x)
    println(accum.value)
    

    7. UDF 和UDAF

    8. 提交spark任务

    例子:spark-submit (详细参考

    ./bin/spark-submit 
      --master yarn-cluster 
      --num-executors 100 
      --executor-memory 6G 
      --executor-cores 4 
      --driver-memory 1G 
      --conf spark.default.parallelism=1000 
      --conf spark.storage.memoryFraction=0.5 
      --conf spark.shuffle.memoryFraction=0.3 
    --class YourClass
    YourJar
    JarParameter1
    JarParameter2
    • num-executors

    参数说明:用于设置Spark作业总共要用多少个Executor进程来执行。如果不设置默认会使用很少,影响作业的效率。

    • executor-memory

    参数说明:用于设置每个Executor进程的内存。一般来说设置4G~8G较为合适。

    • executor-cores

    参数说明:该参数用于设置每个Executor进程的CPU core数量,决定task的执行效率。一般2~4个。

    • driver-momory

    参数说明:该参数用于设置Driver进程的内存。通常不需要设置,或者设置为1G即可。

    • spark.default.parallelism

    参数说明:该参数用于设置每个stage的默认task数量。需要设置,默认的话会根据HDFS的blocks数设置,偏少。

    建议设置为num-executors * executor-cores的2~3倍较为合适

    • spark.storage.memoryFraction

    参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6

    • spark.shuffle.memoryFraction

    参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2

  • 相关阅读:
    iOS 自动化测试踩坑(二):Appium 架构原理、环境命令、定位方式
    干货 | 掌握 Selenium 元素定位,解决 Web 自动化测试痛点
    代理技术哪家强?接口 Mock 测试首选 Charles
    浅谈MVC缓存
    PetaPoco 快速上手
    解释器模式(26)
    享元模式(25)
    中介者模式(24)
    职责链模式(23)
    命令模式(22)
  • 原文地址:https://www.cnblogs.com/skyEva/p/5867472.html
Copyright © 2011-2022 走看看