zoukankan      html  css  js  c++  java
  • spark2.0系列《一》—— RDD VS. DataFrame VS. DataSet

      虽说,spark我也不陌生,之前一直用python跑的spark,基本的core和SQL操作用的也是比较熟练。但是这一切的基础都是在RDD上进行操作,即使是进行SQL操作也是将利用SpaekContext类中的textFile方法读取txt文件返回RDD对象,然后使用SQLContext实例化载利用函数createDataFrame将格式化后的数据转化为dataFrame或者利用createDataset将数据转换为dataset。真不是一般的麻烦。。。话不多说,比如以下python代码示例:

     1 # -*-coding:utf-8-*-
     2 # Created by wuying on 2017/3/28
     3 
     4 from pyspark.sql import Row
     5 from pyspark import SparkContext
     6 from pyspark.sql import SQLContext
     7 from pyspark.sql.functions import *
     8 
     9 
    10 def create_df(sqlContext, raw_data):
    11     """
    12     :param row_data: original data
    13     :return: data frame
    14     """
    15     lineLists = raw_data.map(lambda x: x.split(','))
    16    //筛选部分有用的数据字段作为表头
    17     row_data = lineLists.map(lambda x: Row(
    18     recordCode = x[0],
    19     logicCode  = x[1],
    20     deviceCode = x[2],
    21     compId     = x[2][:3],
    22     siteId     = x[2][:6],
    23     transType  = x[4],
    24     cardTime   = x[8],
    25     compName   = x[12],
    26     siteName   = x[13],
    27     carCode    = x[14]
    28     )
    29     )
    30     SZT_df = sqlContext.createDataFrame(row_data)
    31     SZT_df.registerTempTable("SZT_df")
    32 
    33     return SZT_df
    34 
    35 
    36 if __name__ == '__main__':
    37     # Create DataFrame
    38     # Load data from hdfs
    39     inputFile = "P_GJGD_SZT_20170101"  //数据来源于地铁打卡
    40     sc = SparkContext(master="local[*]", appName="AppTest", pyFiles=["prepared.py"])
    41     raw_data = sc.textFile(inputFile)
    42     sqlContext = SQLContext(sc)
    43     SZT_df = create_df(sqlContext, raw_data)
    44     print SZT_df.dtypes

      1、RDD,英文全称是“Resilient Distributed Dataset”,即弹性分布式数据集,听起来高大上的名字,简而言之就是大数据案例下的一种数据对象,RDD这个API在spark1.0中就已经存在,因此比较老的版本的tutorial中用的都是RDD作为原始数据处理对象,而在spark-shell中已经实例化好的sc对象一般通过加载数据产生的RDD这个对象的基础上进行数据分析。当然,打草稿情况(未接触企业级系统)下RDD API还是足够我们对一般的数据进行转换,清洗以及计数,里面有较为丰富的函数可以调用,比如常用的map, filter, groupBy等等,具体实现见pyspark。所以,这个RDD的简单安全且易于理解使得很多人都是用RDD打开spark这个高大上之神器的大门(包括我~~)。

      首先,它不好操作,以我目前的知识水平而言,我宁愿选dataFrame。因为dataFrame方便且高速,比如SQL语句,自从用了SQL,再也不想一步步map,一步步filter了。其次,据说,RDD无论是在集群上执行任务还是存储到硬盘上。它都会默认使用java对象序列化(提高数据操作的性能),而序列化单个java和scala对象的开销过大,并且需要将数据及其结构在各节点之间传输,而生成和销毁个别对象需要进行垃圾收集这期间的开销也非常大。

      2、DataFrame。说到dataFrame,我就想到R和pandas(python)中常用的数据框架就是dataFrame,估计后来spark的设计者从R和pandas这个两个数据科学语言中的数据dataFrame中吸取灵感,不同的是dataFrame是从底层出发为大数据应用设计出的RDD的拓展,因此它具有RDD所不具有的几个特性(Spark 1.3以后):

    • 处理数据能力从千字节到PB量级不等
    • 支持各种数据格式和存储系统
    • 通过SPARK SQL Catalyst优化器进行高效率优化和代码生成
    • 通过SPARK对所有大数据工具基础架构进行无缝集成
    • 提供Python,Scala,Java 和R的api

      简而言之,我们可以将dataFrame当作是关系数据库中表或者是R或者Python中的dataFrame数据结构。实际上,有了dataFrame我们相当于spark可以管理数据视图,以后传输数据只要在各个节点穿数据数据而不需要传数据结构,这种方式比java序列化有效的多。

      直接上个scala代码瞅瞅:

     1 package cn.sibat.metro
     2 import org.apache.spark.sql.SparkSession
     3 
     4 /**
     5   * Created by wing1995 on 2017/4/20
     6   */
     7 
     8 object Test {
     9   def main(args: Array[String]) = {
    10     val spark = SparkSession
    11         .builder()
    12         .config("spark.sql.warehouse.dir", "file:/file:E:/bus")
    13         .appName("Spark SQL Test")
    14         .master("local[*]")
    15         .getOrCreate()
    16 
    17     import spark.implicits._
    18 
    19     val df = spark.sparkContext
    20       .textFile("E:\trafficDataAnalysis\SZTDataCheck\testData.txt")
    21       .map(_.split(","))
    22       .map(line => SZT(line(0), line(1), line(2), line(2).substring(0, 3), line(2).substring(0, 6), line(4), line(8), line(12), line(13), line(14)))
    23       .toDF()
    24     df.show()
    25     df.printSchema()
    26   }
    27 }
    28 
    29 case class SZT(recordCode: String, logicCode: String, terminalCode: String, compId: String, siteId: String,
    30                transType: String, cardTime: String, compName: String, siteName: String, vehicleCode: String
    31               )

      代码真是清新可人啊,直接SparkSession实例化然后再怎么转其他格式,怎么读其他数据都可以。。。

      3、Dataset(Spark 1.6)

      跟DataFrame很像,不是很熟悉,貌似是为了兼容SCALA中的RDD和JAVA的面向对象而设计,事实证明Scala在Spark中的优势是java取代不了的,即使java8已经做出不少改进。然而,Scala作为原生态语言,仍然是Spark使用者的主流。所以,接下来的博客陆续以Scala为主。

      个人是比较喜欢简洁而有趣的Scala,为数据科学而设计!

  • 相关阅读:
    web页面与多页应用(布局示例普通文档流)
    web页面与多页应用(一)
    Flutter,webview里面实现上传和下载的功能
    Flutter项目删除了相关的dart文件之后运行flutter run或者 F5编译运行时会报这个错误.... were declared as an inputs, but did not exist. Check the definition of target:kernel_snapshot for errors
    vue项目中,点击输入框的时候,弹出的键盘挡住了输入框,需要把输入框展示在可见区域中,不被遮挡
    在IE浏览器上,min-hheight:unset/line-hight:unset不生效问题解决,把unset换成auto,问题只要时IE浏览器设置unset不生效
    本人修改了,需要把新的AndroidManifest.xml 覆盖原来的,AndroidManifest.xml 覆盖问题
    使用Filter来过滤掉需要排除的数组对象
    深拷贝和浅拷贝
    正则表达式
  • 原文地址:https://www.cnblogs.com/wing1995/p/6803630.html
Copyright © 2011-2022 走看看