zoukankan      html  css  js  c++  java
  • Spark RDDs vs DataFrames vs SparkSQL

    简介

    Spark的 RDD、DataFrame 和 SparkSQL的性能比较。

    2方面的比较

    1. 单条记录的随机查找

    2. aggregation聚合并且sorting后输出

    使用以下Spark的三种方式来解决上面的2个问题,对比性能。

    1. Using RDD’s

    2. Using DataFrames

    3. Using SparkSQL

    数据源

    • 在HDFS中3个文件中存储的9百万不同记录

    • 每条记录11个字段
    • 总大小 1.4 GB

    实验环境

    • HDP 2.4

    • Hadoop version 2.7

    • Spark 1.6

    • HDP Sandbox

    测试结果

    • 原始的RDD 比 DataFrames 和 SparkSQL性能要好 

    • DataFrames 和 SparkSQL 性能差不多

    • 使用DataFrames 和 SparkSQL 比 RDD 操作更直观

    • Jobs都是独立运行,没有其他job的干扰

    2个操作

    1. Random lookup against 1 order ID from 9 Million unique order ID's

    2. GROUP all the different products with their total COUNTS and SORT DESCENDING by product name

    代码

    RDD Random Lookup

    #!/usr/bin/env python
     
    from time import time
    from pyspark import SparkConf, SparkContext
     
    conf = (SparkConf()
      .setAppName("rdd_random_lookup")
      .set("spark.executor.instances", "10")
      .set("spark.executor.cores", 2)
      .set("spark.dynamicAllocation.enabled", "false")
      .set("spark.shuffle.service.enabled", "false")
      .set("spark.executor.memory", "500MB"))
    sc = SparkContext(conf = conf)
     
    t0 = time()
     
    path = "/data/customer_orders*"
    lines = sc.textFile(path)
     
    ## filter where the order_id, the second field, is equal to 96922894
    print lines.map(lambda line: line.split('|')).filter(lambda line: int(line[1]) == 96922894).collect()
     
    tt = str(time() - t0)
    print "RDD lookup performed in " + tt + " seconds"

    DataFrame Random Lookup

    #!/usr/bin/env python
     
    from time import time
    from pyspark.sql import *
    from pyspark import SparkConf, SparkContext
     
    conf = (SparkConf()
      .setAppName("data_frame_random_lookup")
      .set("spark.executor.instances", "10")
      .set("spark.executor.cores", 2)
      .set("spark.dynamicAllocation.enabled", "false")
      .set("spark.shuffle.service.enabled", "false")
      .set("spark.executor.memory", "500MB"))
    sc = SparkContext(conf = conf)
     
    sqlContext = SQLContext(sc)
     
    t0 = time()
     
    path = "/data/customer_orders*"
    lines = sc.textFile(path)
     
    ## create data frame
    orders_df = sqlContext.createDataFrame( 
    lines.map(lambda l: l.split("|")) 
    .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], 
    country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10]  ) ) )
     
    ## filter where the order_id, the second field, is equal to 96922894
    orders_df.where(orders_df['order_id'] == 96922894).show()
     
    tt = str(time() - t0)
    print "DataFrame performed in " + tt + " seconds"

    SparkSQL Random Lookup

    #!/usr/bin/env python
     
    from time import time
    from pyspark.sql import *
    from pyspark import SparkConf, SparkContext
     
    conf = (SparkConf()
      .setAppName("spark_sql_random_lookup")
      .set("spark.executor.instances", "10")
      .set("spark.executor.cores", 2)
      .set("spark.dynamicAllocation.enabled", "false")
      .set("spark.shuffle.service.enabled", "false")
      .set("spark.executor.memory", "500MB"))
    sc = SparkContext(conf = conf)
     
    sqlContext = SQLContext(sc)
     
    t0 = time()
     
    path = "/data/customer_orders*"
    lines = sc.textFile(path)
     
    ## create data frame
    orders_df = sqlContext.createDataFrame( 
    lines.map(lambda l: l.split("|")) 
    .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], 
    country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10]  ) ) )
     
    ## register data frame as a temporary table
    orders_df.registerTempTable("orders")
     
    ## filter where the customer_id, the first field, is equal to 96922894
    print sqlContext.sql("SELECT * FROM orders where order_id = 96922894").collect()
     
    tt = str(time() - t0)
    print "SparkSQL performed in " + tt + " seconds"

    RDD with GroupBy, Count, and Sort Descending

    #!/usr/bin/env python
     
    from time import time
    from pyspark import SparkConf, SparkContext
     
    conf = (SparkConf()
      .setAppName("rdd_aggregation_and_sort")
      .set("spark.executor.instances", "10")
      .set("spark.executor.cores", 2)
      .set("spark.dynamicAllocation.enabled", "false")
      .set("spark.shuffle.service.enabled", "false")
      .set("spark.executor.memory", "500MB"))
    sc = SparkContext(conf = conf)
     
    t0 = time()
     
    path = "/data/customer_orders*"
    lines = sc.textFile(path)
     
    counts = lines.map(lambda line: line.split('|')) 
    .map(lambda x: (x[5], 1)) 
    .reduceByKey(lambda a, b: a + b) 
    .map(lambda x:(x[1],x[0])) 
    .sortByKey(ascending=False)
     
    for x in counts.collect():
      print x[1] + '	' + str(x[0])
     
    tt = str(time() - t0)
    print "RDD GroupBy performed in " + tt + " seconds"

    DataFrame with GroupBy, Count, and Sort Descending

    #!/usr/bin/env python
     
    from time import time
    from pyspark.sql import *
    from pyspark import SparkConf, SparkContext
     
    conf = (SparkConf()
      .setAppName("data_frame_aggregation_and_sort")
      .set("spark.executor.instances", "10")
      .set("spark.executor.cores", 2)
      .set("spark.dynamicAllocation.enabled", "false")
      .set("spark.shuffle.service.enabled", "false")
      .set("spark.executor.memory", "500MB"))
    sc = SparkContext(conf = conf)
     
    sqlContext = SQLContext(sc)
     
    t0 = time()
     
    path = "/data/customer_orders*"
    lines = sc.textFile(path)
     
    ## create data frame
    orders_df = sqlContext.createDataFrame( 
    lines.map(lambda l: l.split("|")) 
    .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], 
    country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10]  ) ) )
     
    results = orders_df.groupBy(orders_df['product_desc']).count().sort("count",ascending=False)
     
    for x in results.collect():
      print x
     
    tt = str(time() - t0)
    print "DataFrame performed in " + tt + " seconds"

    SparkSQL with GroupBy, Count, and Sort Descending

    #!/usr/bin/env python
     
    from time import time
    from pyspark.sql import *
    from pyspark import SparkConf, SparkContext
     
    conf = (SparkConf()
      .setAppName("spark_sql_aggregation_and_sort")
      .set("spark.executor.instances", "10")
      .set("spark.executor.cores", 2)
      .set("spark.dynamicAllocation.enabled", "false")
      .set("spark.shuffle.service.enabled", "false")
      .set("spark.executor.memory", "500MB"))
    sc = SparkContext(conf = conf)
     
    sqlContext = SQLContext(sc)
     
    t0 = time()
     
    path = "/data/customer_orders*"
    lines = sc.textFile(path)
     
    ## create data frame
    orders_df = sqlContext.createDataFrame(lines.map(lambda l: l.split("|")) 
    .map(lambda r: Row(product=r[5])))
     
    ## register data frame as a temporary table
    orders_df.registerTempTable("orders")
     
    results = sqlContext.sql("SELECT product, count(*) AS total_count FROM orders GROUP BY product ORDER BY total_count DESC")
     
    for x in results.collect():
      print x
     
    tt = str(time() - t0)
    print "SparkSQL performed in " + tt + " seconds"

    原文:https://community.hortonworks.com/articles/42027/rdd-vs-dataframe-vs-sparksql.html

  • 相关阅读:
    HDU 2899 Strange fuction
    HDU 2899 Strange fuction
    HDU 2199 Can you solve this equation?
    HDU 2199 Can you solve this equation?
    Java实现 LeetCode 700 二叉搜索树中的搜索(遍历树)
    Java实现 LeetCode 700 二叉搜索树中的搜索(遍历树)
    Java实现 LeetCode 700 二叉搜索树中的搜索(遍历树)
    Java实现 LeetCode 699 掉落的方块(线段树?)
    Java实现 LeetCode 699 掉落的方块(线段树?)
    Java实现 LeetCode 699 掉落的方块(线段树?)
  • 原文地址:https://www.cnblogs.com/luxiaoxun/p/6397996.html
Copyright © 2011-2022 走看看