zoukankan      html  css  js  c++  java
  • 性能:Transform层面

    数据处理的并行度

    1、BlockRDD的分区数
    (1)通过Receiver接受数据的特点决定
    (2)也可以自己通过repartition设置
    2、ShuffleRDD的分区数
    (1)默认的分区数为spark.default.parallelism(core的大小)
    (2)通过我们自己设置决定
     val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)
    
        val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER)
    
        val lines = lines1.union(lines2)
    
        lines.repartition(100)  //通过repartition设置  
    
        //处理的逻辑,就是简单的进行word count
        val words = lines.repartition(100).flatMap(_.split(" "))
         //自己设置决定ShuffleRDD的分区数 以及分区算法,默认是core的数量 
        val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))   //并发度是10个分区,根据集群资源情况调节
    

    数据的序列化

    两种需要序列化的数据:
    1、输入数据
    默认是以StorageLevel.MEMORY_AND_DISK_SER_2的形式存储在executor上的内存中(以序列化的方式存储在内存中,内存不够放在DISK中)
    2、Streaming操作中产生的缓存RDD
    默认是以StorageLevel.MEMORY_ONLY_SER的形式存储的内存中
    使用Kryo序列化机制,比Java序列化机制性能好
     
    import com.esotericsoftware.kryo.Kryo
    import org.apache.spark.serializer.KryoRegistrator
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * WordCount程序,Spark Streaming消费TCP Server发过来的实时数据的例子:
      *
      * 1、在master服务器上启动一个Netcat server
      * `$ nc -lk 9998` (如果nc命令无效的话,我们可以用yum install -y nc来安装nc)
      *
      * 2、用下面的命令在在集群中将Spark Streaming应用跑起来
      * spark-submit --class com.twq.wordcount.JavaNetworkWordCount 
      * --master spark://master:7077 
      * --deploy-mode client 
      * --driver-memory 512m 
      * --executor-memory 512m 
      * --total-executor-cores 4 
      * --executor-cores 2 
      * /home/hadoop-twq/spark-course/streaming/spark-streaming-basic-1.0-SNAPSHOT.jar
      */
    object KryoNetworkWordCount {
      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("KryoNetworkWordCount")
    
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")    //指定spark.serializer.KryoSerializer
        sparkConf.set("spark.kryo.registrator", "com.twq.spark.rdd.example.ClickTrackerKryoRegistrator")  //  自定义的数据类型通过Kryo序列化
    
        val sc = new SparkContext(sparkConf)
    
        // Create the context with a 1 second batch size
        val ssc = new StreamingContext(sc, Seconds(1))
    
        //如果一个batchInterval中的数据量不大,并且没有window等操作,则可以使用MEMORY_ONLY
        val lines = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_ONLY_SER)
    
        //处理的逻辑,就是简单的进行word count
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    
        //将结果输出到控制台
        wordCounts.print()
    
        //启动Streaming处理流
        ssc.start()
    
        //等待Streaming程序终止
        ssc.awaitTermination()
      }
    }
    
    class ClickTrackerKryoRegistrator extends KryoRegistrator {
      override def registerClasses(kryo: Kryo): Unit = {
        kryo.register(classOf[TrackerLog])
      }
    }
    
    case class TrackerLog(id: String, name: String)
    

      

    内存调优

    1、需要内存大小
    和transform类型有关系
    数据存储的级别
     
    2、GC
    driver端和executor端都使用CMS垃圾收集器
    CMS(Concurrent Mark Sweep 标记清除算法)收集器是一种以获取最短回收停顿时间为目标的收集器
    (通过--driver-java-options和spark.executor.extraJavaOptions)
  • 相关阅读:
    浅谈树状数组与线段树
    BZOJ1367:[Baltic2004]sequence
    浅谈左偏树
    BZOJ4003:[JLOI2015]城池攻占
    BZOJ2809:[APIO2012]dispatching
    BZOJ1455:罗马游戏
    模拟ssh远程执行命令
    基于TCP协议的socket套接字编程
    计算机网络基础知识
    元类( 控制对象产生和控制类产生)模板
  • 原文地址:https://www.cnblogs.com/tesla-turing/p/11488346.html
Copyright © 2011-2022 走看看