zoukankan      html  css  js  c++  java
  • Spark 中的join方式(pySpark)

     spark基础知识请参考spark官网:http://spark.apache.org/docs/1.2.1/quick-start.html

        无论是mapreduce还是spark ,分布式框架的性能优化方向大致分为:负载均衡、网络传输和磁盘I/O 这三块。而spark是基于内存的计算框架,因此在编写应用时需要充分利用其内存计算特征。本篇主要针对

    spark应用中的join问题进行讨论,关于集群参数的优化会在另一篇文章中提及。

        在传统的数据库平台和分布式计算平台,join的性能消耗都是很可观的,对spark来说如果join的表比较大,那么在shuffle时网络及磁盘压力会明显提升,严重时可能会造成excutor失败导致任务无法进行下去,

    对这种join的优化方法主要是采用map和filter来改变join的实现方式,减少shuffle阶段的网络和磁盘I/O。下面以表的数据量大小分两部分来讨论。

       大表:数据量较大的表

       小表:数据量较小的表

    一、大表与小表之间的join

       这种join是大部分业务场景的主要join方式,将小表以broadcast的形式分发到每个executor后对大表进行filter操作,以下对每种join进行示例说明(兼容表中ID不唯一的情况)。

      1、leftOuterJoin 

      >>>d1=sc.parallelize([(1,2),(2,3),(2,4),(3,4)])

      >>>d2=sc.parallelize([(1,'a'),(2,'b'),(1,'d'),(5,'2')])

      原生实现方式:

      >>>d1.leftOuterJoin(d2).collect()

      >>>[(1, (2, 'a')), (1, (2, 'd')), (2, (4, 'b')), (2, (3, 'b')), (3, (4, None))]

       map实现方式(小表在右的实现方式,小表在左的情况会稍微复杂些,需要多一些操作操作,实际场景中不多见):

    复制代码
    def  doJoin(row):
        result=[]
        if row[1][1] is not None:
            for i in row[1][1]:
                result+=[(row[0],(row[1][0],i))]
      else:
                result+=[row]
      return result
    
    d2_map={}
    for i in d2.groupByKey().collect():
        d2_map[i[0]]=i[1]
    d2_broadcast=sc.broadcast(d2_map)
    d2_dict=d2_broadcast.value
    d1.map(lambda row:(row[0],(row[1],d2_dict.get(row[0])))).flatMap(doJoin).collect()               
    复制代码

    >>>[(1, (2, 'd')), (1, (2, 'a')), (2, (3, 'b')), (2, (4, 'b')), (3, (4, None))]

    2、join

    这里的join指的是innerjoin即只取出匹配到的数据项,只需要在上面的实现方式中加个filter即可

    d1.map(lambda row:(row[0],(row[1],d2_dict.get(row[0])))).filter(lambda row:row[1][1] is not None).flatMap(doJoin).collect()

    >>>[(1, (2, 'd')), (1, (2, 'a')), (2, (3, 'b')), (2, (4, 'b'))]

    二、大表与大表之间的join(Reduce-join)

    大表之间的join无法通过缓存数据来达到优化目的,因此需要把优化的重点放在分区效率及key的设计上

    1、join的key值尽量使用数值类型,减少分区及shuffle的操作时间,在join时数值类型的key值在匹配时更快

    2、将过滤条件放在join之前,使得join的数据量尽量最少

    3、在join之前将两个表按相同分区数进行重新分区

    reduce-join:指将两个表按key值进行分区,相同key的数据会被分在同一个分区,最后使用mapPartition进行join操作。

     4、如果需要减少分区和并行度,请使用coalesce 而非repartition 方法。

    * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
    * which can avoid performing a shuffle.

    三、其它优化方式

    1、同一份数据被多次用到,在读入时进行缓存,后面直接使用,例如配置表,如果数据量不大则进行broadcast,否则使用cache

    2、尽量减少重复计算,同样的计算逻辑只计算一次

    3、几个优化参数

    spark.akka.frameSize 1000                       集群间通信 一帧数据的大小,设置太小可能会导致通信延迟

    spark.akka.timeout 100                             通信等待最长时间(秒为单位)
    spark.akka.heartbeat.pauses 600                 心跳失败最大间隔(秒为单位)
    spark.serializer org.apache.spark.serializer.KryoSerializer    序列化方式(sprak自己的实现方式)
    spark.sql.autoBroadcastJoinThreshold -1           禁止自动broadcast表
    spark.shuffle.consolidateFiles true             shuffle 自动合并小文件

    四、后续优化方向

    1、内存优化:对象所占用的内存,访问对象的消耗以及垃圾回收(garbage collection)所占用的开销

    2、优化数据结构

    3、优化RDD存储

    4、并行度

  • 相关阅读:
    最舒适的路线(并查集)
    POJ 2411 状态压缩DP
    NYOJ 708 ones
    HUD 1024 Max Sum Plus Plus
    最长上升子序列
    HDU 4717 The Moving Points
    重新开始写随笔
    读书的意义
    读《如何阅读一本书》笔记
    读《GRAY HAT PYTHON》笔记
  • 原文地址:https://www.cnblogs.com/ExMan/p/14355829.html
Copyright © 2011-2022 走看看