zoukankan      html  css  js  c++  java
  • <Spark><Programming><Key/Value Pairs><RDD>

    Working with key/value Pairs

    Motivation

    • Pair RDDs are a useful building block in many programs, as they expose operations that allow u to act on each key in parallel or regroup data across network.
    • Eg: pair RDDs have a reduceByKey() method that can aggeragate data separately for each key; join() method that can merge two RDDs together by grouping elements with the same key.

    Creating Pair RDDs

    • Many formats we loading from will directly return pair RDDs for their k/v values.(许多格式的数据加载会直接返回pair RDDs)
    • By turning a regular RDD into a pair RDD  --> Using map() function  (通过转换得到)
    val pairs = lines.map(x => (x.split(" ")(0), x))

    Transformation on Pair RDDs

    • 我们同样可以给Spark传送函数,不过由于pair RDDs包含的是元组tuple,所以我们要传送的函数式操作在tuples之上的。实际上Pair RDDs就是RDDs of Tuple2 object。

    Aggregations

    • reduceByKey()和reduce()很相似:它们都接收一个函数并使用该函数来combine values。它们的不同在于:
      1. reduceByKey()并行地为数据集中每个key运行reduce操作。
      2. reduceByKey()属于transformation,它返回一个新的RDD。这样做是考虑到数据集有大量的keys。
    • foldByKey()与fold()相似:都使用与RDD中数据同类型的zero value,以及一个combination函数。
      1. foldByKey()所提供的zero会应用于每个value
    • 与MR的combiner概念类似:调用reduceByKey()和foldByKey()时会自动在每个机器本地进行combine。用户不需指定combiner。更通用的combineByKey()接口可以允许你自定义combiner
    val input = sc.textFile("input/")
    val words = input.flatMap(x => x.split(" "))
    val result = words.map(x = > (x, 1)).reduceByKey((x, y) => x + y)
    • combineByKey()是最general的per-key aggregation functions. combinerByKey():对一个partition中的每个元素,如果该元素的key原来没出现过,combineByKey()使用我们提供的函数createCombiner()来为该key创建一个accumulator初始值(要注意这里如果是一个已经出现过的key,那么不会执行此操作)。如果该key在处理该partition时已经出现过,那么将会使用给的的mergeValue()函数,操作在当前value和该key的accumulator之上。这样,每个partition被独立处理之后,对同一个key我们得到多个accumulators。当我们merging每个partition的结果时,会对相同key使用用户提供的mergeCombiners()函数。没看懂的话仔细看下面这段求平均值的例子!~
    val result = input.combineByKey(
        (v) => (v, 1),   // createCombiner(), value => (value, num)
        (acc:(Int, Int), v) => (acc._1 +v, acc._2 + 1),   // mergeValue(), for each key: ((valueSum, nums), value) => (valueSum +value, nums + 1)
        (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // mergeCombiners(), for each key: (valueSumOnOnePartition, nums), (valueSumOnAnotherPartition, nums) => (_+_, _+_)
        ).map{case (key, value) => (key, value._1 / value._2.toFloat)}   // (key,(valueSum, nums)) => (key, valueSum / float(nums)) 
    // we can use mapValues(v => (v._1 / v._2.toFloat)) result.collectAsMap().map(println(_))
    • 很多options可以根据key来combine数据,他们基本上都是基于combineByKey()来实现的,只是在上层提供了更简单的接口。
    • 如果我们知道数据并不会从中获益时, 我们可以disable map-side aggregation in combineByKey()。比如groupByKey()的map端聚合被disable,因为appending to list不会save any space.

    Tuning the level of parallelism

    • spark如何决定如何划分作业?
    •  每个RDD都有一个固定的partitions数目,它决定了执行RDD操作时的并行度。Spark会试图根据你的cluster size推断一个实用的缺省值。
    • 当执行聚合aggregation或分组grouping操作时,我们可以要求Spark使用指定的partitions数量。
    data = [("a", 3), ("b", 4), ("a", 1)]
    sc.parallelize(data).reduceByKey((x, y) => x + y)  // default parallelism
    sc.parallelize(data).reduceByKey((x, y) => x + y, 10) // custom parallelism

    Grouping Data

    • 带有key的数据最常用的用例就是根据key划分数据
    • groupByKey(): [k, V] -> [K, Iterable[V]]
    • groupBy(func): works on unpaired data,会根据传入的func计算key,并进行分组。
    val a = sc.parallelize(1 to 9, 3)
    a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect//分成两组
    /*结果 
    Array(
    (even,ArrayBuffer(2, 4, 6, 8)),
    (odd,ArrayBuffer(1, 3, 5, 7, 9))
    )
    */
    View Code
    • 如果你发现你在使用groupByKey()之后在values上使用reduce()或fold()。那么你可以更高效地用per-key聚合函数来替换它。比如说rdd.reduceByKey(func) 和rdd.groupBy Key().mapValues(value => value.reduce(func)) 产生的是同样的RDD,但是前者更高效,因为前者避免了对每个key创建a list of values的过程。
    • 除了从一个单一的RDD聚合数据,我们还可以从多个sharing 相同key的RDDs中聚合数据 --> 使用cogroup()函数。
      • cogroup(): RDD[(K, V)], RDD[(K, W)] --> RDD[(K, (Iterable[V], Iterable[W]))]
      • cogroup()是join、intersect等的底层实现

    Joins

    • 包含了左/右外连接,交叉连接和内连接。

    Sorting Data

    • 只要key上定义了排序准则,我们就可以对键值对进行排序
    • sortByKey(): 可以提供自定义的comparison函数
    val input: RDD[(Int, Venue)] =
    implicit val sortIntergersByString = new Ordering[Int]{
        override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
    }
    rdd.sortByKey()
    View Code

    Actions Available on Pair RDDs

    • 一些附加的pair RDDs可用的actions如下,它们可以利用键值对的天然优势
      • countByKey(): 为每个键值的元素统计数量
      • collectAsMap(): 将结果收集成map结构的,以便提供方便的查找功能
      • lookup(key): 返回key关联的value。eg: rdd.lookup(3)返回[4, 6]
      • ...

    Data Partitioning(Advanced)

    • 这一部分我们会了解到如何控制数据集在节点之间的partitioning。Spark的分区在所有的键值对RDD上可用。
    • 在分布式程序中,通信是十分昂贵的。因此合理的安放数据来减少网络流量可以大幅度提高性能。
    • 并不是在所有的应用中,partitioning都是很有帮助的。比如说一个给定的RDD只扫描一次,那么就没有必要预先为他划分分区。
    • 当一个数据多次在面向key的操作中被重用时,预先partitioning是十分有用的。
    • eg:在这个例子中我们有一个userData(UserID, LinkInfo)表(不更新),和一个events(UserID, LinkInfo)表(每n分钟实时更新)。在这个例子中我们可以先用partitionBy()对userData进行分区,这样在之后join的时候Spark知道该数据已经是hash-partitioned的,就不会在shuffle该数据了。而events表是实时更新,并且只使用一次,所以没有必要预先partition。
    // Initialization code; we load the user info from a Hadoop SequenceFile on HDFS.
    // This distributes elements of userData by the HDFS block where they are found,
    // and doesn't provide Spark with any way of knowing in which partition a
    // particular UserID is located.
    val sc = new SparkContext(...)
    val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
    .partitionBy(new HashPartitioner(100)) // Create 100 partitions .persist()
    
    // we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.
    def processNewLogs(logFileName: String) {
    val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
    val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs val offTopicVisits = joined.filter {
    case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components !userInfo.topics.contains(linkInfo.topic)
    }.count()
      println("Number of visits to non-subscribed topics: " + offTopicVisits)
    }
    View Code
    • 关于partitionBy()
      • 要注意的是partitionBy()属于transformation,它返回的是一个新的RDD,所以我们需要persist()并保存为userData。注意一定要persist().
      • partitionBy()中的100表示分区数量,它会控制further opertions on the RDD的并行任务数。通常它被设置成你的cluster的cores数目
    • 如果预先知道partitioning的信息,很多操作都会从中获益。比如sortByKey()和groupByKey()分别会受益于range-partitioned和hash-partitioned RDD.
    • 但是,一些操作比如map()会导致新的RDD忘记parent的分区信息,因为那些操作理论上会改变每个记录的key。(但是像mapValues()这样的就会保存父RDD的partition信息)。

     Determining an RDD's Partitioner

    • 在Scala和Java中,我们可以通过partitioner property来确定它是如何partitioned的。
    • 对于Scala,看下面这个例子:
    • scala> data1.partitioner
      res2: Option[org.apache.spark.Partitioner] = None
      
      scala> data1.partitionBy(new HashPartitioner(2))
      res3: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:30
      
      scala> data1.partitioner
      res4: Option[org.apache.spark.Partitioner] = None
      
      scala> val data2 = data1.partitionBy(new HashPartitioner(2))
      data2: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at partitionBy at <console>:29
      
      scala> data2.partitioner
      res5: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
      View Code
      • RDD.partitioner:返回的是Option类。关于Option:Option是Scala提供的一个容器,其中可能包含也可能不包含item。我们可以在Option上调用isDefined() 来判断其是否含有值,并且使用get()来获取值。
      • @transient val partitioner: Option[Partitioner] = None
      • 其次就是要记得RDD是不可变的- -,要新建一个变量data2来保存partition后的数据。如果我们后续还想使用data2的话,要调用persist()。

    Operations that Benefit from Partitioning

    • 许多Spark操作包含了在网络间shuffle数据的过程,这些操作都会从partitioning中受益。
    • cogroup(), groupWith(), join(), groupByKey(), reduceByKey(), combineByKey(), lookup()...
    • 比如reduceByKey()如果操作在已经预分区的RDD上,那么每个key的values都可以本地计算了。
    • 对cogroup()和join()操作,预分区会使至少一个RDDs不需要被shuffle。如果两个RDDs都使用同样的partitioner,并且都cache在相同的机器上(比如其中一个是通过另一个使用mapValues()得到的),那么将不会有shuffle发生。

    Operations that Affect Partitioning

    • Spark知道操作是如何影响partitioning的,因此它会自动的为由某RDD转化而得的RDD设置partitioner
      • 比如:你join()两个RDDs,那么相同key的元素会被hash到同一个machine,因此Spark知道结果是hash-partitioned。那么对join所得的RDD进行reduceByKey()会很快
    • 但是一些transformation不能保证得到已知的partitioning,那么输出的RDD将不会有partitioner set。
      • 比如在hash-partitioned RDD上调用map()所得RDD将不会有partitioner。因为理论上map()会改变每个元素的key,而Spark不会分析你的函数来检查你是否保持key。因此,相应的,你可以使用mapValues()和flatMapValues()来保证每个元组的key保持不变。所以如果你没有改变元素的key,最好调用mapValues()和flatMapValues()
    • 保证result RDD 已经partitioned的操作有:cogroup(), groupWith(), join(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort(), mapValues()[如果其父RDD已经partitioned], flatMapValues()[如果其父RDD已经partitioned], filter()[如果其父RDD已经partitioned]。 而其他的操作会得到一个没有partitioner的RDD。

    Example: PageRank

    • PageRank是一个能从RDD partitioning受益的例子。
    • PageRank是一个多次join的迭代算法。算法有两个数据集:(pageID, linkList) elements, (pageID, rank) elements
    • // Assume that our neighbor list was saved as a Spark objectFile
      val links = sc.objectFile[(String, Seq[String])]("links") .partitionBy(new HashPartitioner(100))
      .persist()
      // Initialize each page's rank to 1.0; since we use mapValues, the resulting RDD // will have the same partitioner as links
      var ranks = links.mapValues(v => 1.0)
      // Run 10 iterations of PageRank
      for(i<-0until10){
      val contributions = links.join(ranks).flatMap {
      case (pageId, (links, rank)) => links.map(dest => (dest, rank / links.size))
      }
      ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v) }
      // Write out the final ranks
      ranks.saveAsTextFile("ranks")
      View Code
    • 该程序有几个trick:
      • 对links进行pre-partition并persist,这样links再也不用shuffle。
      • 初始化ranks时使用mapValues()保留links的分区,这样第一次join的时候完全不用shuffle。
      • 对ranks进行reduceByKey()所得结果是hash-partitioned的,所以我们调研mapValues(),那么所得的ranks仍然是hash-partitioned的。

    Custom Partitioners

    • Spark提供了HashPartitioner和RangePartitioner来满足很多情况,但是你仍然可以自定义Partitioner。你可以使用领域知识来减少通信。
    • 比如,我们之前的PageRank算法中每个pageID使用URL表示的,那么相似的URLs(e.g., http://www.cnn.com/WORLD and http://www.cnn.com/US)可能被hash到完全不同的节点。但是有相同域名的URL可能更容易指向彼此。同时PageRank算法每次迭代都会从每个page发送信息到该page的neighbors。因此我们可以自定义一个partitioner,该partitioner只look at域名而不是整个URL。
    • 为实现自定义partitioner,你需要继承org.apache.spark.Partitioner,并且实现以下三个方法:
      • numPartitions:Int, 返回你将会创建的partitions的数目
      • getPartition(key: Any): Int, 返回给定key的partitionID
      • equals(), 标准Java equality方法。Spark通过该方法来比较两个Partitioner objects。
    • class DomainNamePartitioner(numParts: Int) extends Partitioner { 
          override def numPartitions: Int = numParts
          override def getPartition(key: Any): Int = {
          val domain = new Java.net.URL(key.toString).getHost() 
          val code = (domain.hashCode % numPartitions)
          if(code<0){
          code + numPartitions // Make it non-negative }
          else{
          code
          } 
      }
      // Java equals method to let Spark compare our Partitioner objects
      override def equals(other: Any): Boolean = other match { 
          case dnp: DomainNamePartitioner =>
          dnp.numPartitions == numPartitions 
          case _ =>
              false
      } }
      View Code
    • 使用很简单,直接将custom Partitioner类传到partitionBy()方法即可。
    满地都是六便士,她却抬头看见了月亮。
  • 相关阅读:
    C# Tips Written By Andrew Troelsen
    ASP.NET:性能与缓存
    New Feature In C# 2.0
    .NET Remoting中的通道注册
    通过应用程序域AppDomain加载和卸载程序集
    Some BrainTeaser in WinDev, Can you Solve them?
    ExtJs学习笔记(24)Drag/Drop拖动功能
    wap开发体会
    关于”System.ServiceModel.Activation.WebServiceHostFactory“与"<webHttp/>"以及RestFul/启用了Ajax的WCF服务
    验证码无刷新更换
  • 原文地址:https://www.cnblogs.com/wttttt/p/6827870.html
Copyright © 2011-2022 走看看