zoukankan      html  css  js  c++  java
  • Spark RDDs vs DataFrames vs SparkSQL

    简介

    Spark的 RDD、DataFrame 和 SparkSQL的性能比较。

    2方面的比较

    1. 单条记录的随机查找

    2. aggregation聚合并且sorting后输出

    使用以下Spark的三种方式来解决上面的2个问题,对比性能。

    1. Using RDD’s

    2. Using DataFrames

    3. Using SparkSQL

    数据源

    • 在HDFS中3个文件中存储的9百万不同记录

    • 每条记录11个字段
    • 总大小 1.4 GB

    实验环境

    • HDP 2.4

    • Hadoop version 2.7

    • Spark 1.6

    • HDP Sandbox

    测试结果

    • 原始的RDD 比 DataFrames 和 SparkSQL性能要好 

    • DataFrames 和 SparkSQL 性能差不多

    • 使用DataFrames 和 SparkSQL 比 RDD 操作更直观

    • Jobs都是独立运行,没有其他job的干扰

    2个操作

    1. Random lookup against 1 order ID from 9 Million unique order ID's

    2. GROUP all the different products with their total COUNTS and SORT DESCENDING by product name

    代码

    RDD Random Lookup

    #!/usr/bin/env python
     
    from time import time
    from pyspark import SparkConf, SparkContext
     
    conf = (SparkConf()
      .setAppName("rdd_random_lookup")
      .set("spark.executor.instances", "10")
      .set("spark.executor.cores", 2)
      .set("spark.dynamicAllocation.enabled", "false")
      .set("spark.shuffle.service.enabled", "false")
      .set("spark.executor.memory", "500MB"))
    sc = SparkContext(conf = conf)
     
    t0 = time()
     
    path = "/data/customer_orders*"
    lines = sc.textFile(path)
     
    ## filter where the order_id, the second field, is equal to 96922894
    print lines.map(lambda line: line.split('|')).filter(lambda line: int(line[1]) == 96922894).collect()
     
    tt = str(time() - t0)
    print "RDD lookup performed in " + tt + " seconds"

    DataFrame Random Lookup

    #!/usr/bin/env python
     
    from time import time
    from pyspark.sql import *
    from pyspark import SparkConf, SparkContext
     
    conf = (SparkConf()
      .setAppName("data_frame_random_lookup")
      .set("spark.executor.instances", "10")
      .set("spark.executor.cores", 2)
      .set("spark.dynamicAllocation.enabled", "false")
      .set("spark.shuffle.service.enabled", "false")
      .set("spark.executor.memory", "500MB"))
    sc = SparkContext(conf = conf)
     
    sqlContext = SQLContext(sc)
     
    t0 = time()
     
    path = "/data/customer_orders*"
    lines = sc.textFile(path)
     
    ## create data frame
    orders_df = sqlContext.createDataFrame( 
    lines.map(lambda l: l.split("|")) 
    .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], 
    country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10]  ) ) )
     
    ## filter where the order_id, the second field, is equal to 96922894
    orders_df.where(orders_df['order_id'] == 96922894).show()
     
    tt = str(time() - t0)
    print "DataFrame performed in " + tt + " seconds"

    SparkSQL Random Lookup

    #!/usr/bin/env python
     
    from time import time
    from pyspark.sql import *
    from pyspark import SparkConf, SparkContext
     
    conf = (SparkConf()
      .setAppName("spark_sql_random_lookup")
      .set("spark.executor.instances", "10")
      .set("spark.executor.cores", 2)
      .set("spark.dynamicAllocation.enabled", "false")
      .set("spark.shuffle.service.enabled", "false")
      .set("spark.executor.memory", "500MB"))
    sc = SparkContext(conf = conf)
     
    sqlContext = SQLContext(sc)
     
    t0 = time()
     
    path = "/data/customer_orders*"
    lines = sc.textFile(path)
     
    ## create data frame
    orders_df = sqlContext.createDataFrame( 
    lines.map(lambda l: l.split("|")) 
    .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], 
    country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10]  ) ) )
     
    ## register data frame as a temporary table
    orders_df.registerTempTable("orders")
     
    ## filter where the customer_id, the first field, is equal to 96922894
    print sqlContext.sql("SELECT * FROM orders where order_id = 96922894").collect()
     
    tt = str(time() - t0)
    print "SparkSQL performed in " + tt + " seconds"

    RDD with GroupBy, Count, and Sort Descending

    #!/usr/bin/env python
     
    from time import time
    from pyspark import SparkConf, SparkContext
     
    conf = (SparkConf()
      .setAppName("rdd_aggregation_and_sort")
      .set("spark.executor.instances", "10")
      .set("spark.executor.cores", 2)
      .set("spark.dynamicAllocation.enabled", "false")
      .set("spark.shuffle.service.enabled", "false")
      .set("spark.executor.memory", "500MB"))
    sc = SparkContext(conf = conf)
     
    t0 = time()
     
    path = "/data/customer_orders*"
    lines = sc.textFile(path)
     
    counts = lines.map(lambda line: line.split('|')) 
    .map(lambda x: (x[5], 1)) 
    .reduceByKey(lambda a, b: a + b) 
    .map(lambda x:(x[1],x[0])) 
    .sortByKey(ascending=False)
     
    for x in counts.collect():
      print x[1] + '	' + str(x[0])
     
    tt = str(time() - t0)
    print "RDD GroupBy performed in " + tt + " seconds"

    DataFrame with GroupBy, Count, and Sort Descending

    #!/usr/bin/env python
     
    from time import time
    from pyspark.sql import *
    from pyspark import SparkConf, SparkContext
     
    conf = (SparkConf()
      .setAppName("data_frame_aggregation_and_sort")
      .set("spark.executor.instances", "10")
      .set("spark.executor.cores", 2)
      .set("spark.dynamicAllocation.enabled", "false")
      .set("spark.shuffle.service.enabled", "false")
      .set("spark.executor.memory", "500MB"))
    sc = SparkContext(conf = conf)
     
    sqlContext = SQLContext(sc)
     
    t0 = time()
     
    path = "/data/customer_orders*"
    lines = sc.textFile(path)
     
    ## create data frame
    orders_df = sqlContext.createDataFrame( 
    lines.map(lambda l: l.split("|")) 
    .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], 
    country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10]  ) ) )
     
    results = orders_df.groupBy(orders_df['product_desc']).count().sort("count",ascending=False)
     
    for x in results.collect():
      print x
     
    tt = str(time() - t0)
    print "DataFrame performed in " + tt + " seconds"

    SparkSQL with GroupBy, Count, and Sort Descending

    #!/usr/bin/env python
     
    from time import time
    from pyspark.sql import *
    from pyspark import SparkConf, SparkContext
     
    conf = (SparkConf()
      .setAppName("spark_sql_aggregation_and_sort")
      .set("spark.executor.instances", "10")
      .set("spark.executor.cores", 2)
      .set("spark.dynamicAllocation.enabled", "false")
      .set("spark.shuffle.service.enabled", "false")
      .set("spark.executor.memory", "500MB"))
    sc = SparkContext(conf = conf)
     
    sqlContext = SQLContext(sc)
     
    t0 = time()
     
    path = "/data/customer_orders*"
    lines = sc.textFile(path)
     
    ## create data frame
    orders_df = sqlContext.createDataFrame(lines.map(lambda l: l.split("|")) 
    .map(lambda r: Row(product=r[5])))
     
    ## register data frame as a temporary table
    orders_df.registerTempTable("orders")
     
    results = sqlContext.sql("SELECT product, count(*) AS total_count FROM orders GROUP BY product ORDER BY total_count DESC")
     
    for x in results.collect():
      print x
     
    tt = str(time() - t0)
    print "SparkSQL performed in " + tt + " seconds"

    原文:https://community.hortonworks.com/articles/42027/rdd-vs-dataframe-vs-sparksql.html

  • 相关阅读:
    UVa 1451 Average (斜率优化)
    POJ 1160 Post Office (四边形不等式优化DP)
    HDU 3507 Print Article (斜率DP)
    LightOJ 1427 Substring Frequency (II) (AC自动机)
    UVa 10245 The Closest Pair Problem (分治)
    POJ 1741 Tree (树分治)
    HDU 3487 Play with Chain (Splay)
    POJ 2828 Buy Tickets (线段树)
    HDU 3723 Delta Wave (高精度+calelan数)
    UVa 1625 Color Length (DP)
  • 原文地址:https://www.cnblogs.com/luxiaoxun/p/6397996.html
Copyright © 2011-2022 走看看