zoukankan      html  css  js  c++  java
  • spark DataFrame 读写和保存数据

    一。读写Parquet(DataFrame) 

      Spark SQL可以支持Parquet、JSON、Hive等数据源,并且可以通过JDBC连接外部数据源。前面的介绍中,我们已经涉及到了JSON、文本格式的加载,这里不再赘述。这里介绍Parquet,下一节会介绍JDBC数据库连接。

      Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet是语言无关的,而且不与任何一种数据处理框架绑定在一起,适配多种语言和组件,能够与Parquet配合的组件有:
        * 查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
        * 计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
        * 数据模型: Avro, Thrift, Protocol Buffers, POJOs
      Spark已经为我们提供了parquet样例数据,就保存在“/usr/local/spark/examples/src/main/resources/”这个目录下,有个users.parquet文件,这个文件格式比较特殊,如果你用vim编辑器打开,或者用cat命令查看文件内容,肉眼是一堆乱七八糟的东西,是无法理解的。只有被加载到程序中以后,Spark会对这种格式进行解析,然后我们才能理解其中的数据。
      下面代码演示了如何从parquet文件中加载数据生成DataFrame。

    >>> parquetFileDF = spark.read.parquet("file:///usr/local/spark/examples/src/main/resources/users.parquet"
    >>> parquetFileDF.createOrReplaceTempView("parquetFile")
     
    >>> namesDF = spark.sql("SELECT * FROM parquetFile")
     
    >>> namesDF.rdd.foreach(lambda person: print(person.name))
     
    Alyssa
    Ben
     
    >>> peopleDF = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
     
    >>> peopleDF.write.parquet("file:///usr/local/spark/mycode/newpeople.parquet")
     

    二。jdbc

      

    >>> jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "hadoop").load()
    >>> jdbcDF.show()
    >>> from pyspark.sql.types import Row
    >>> from pyspark.sql.types import StructType
    >>> from pyspark.sql.types import StructField
    >>> from pyspark.sql.types import StringType
    >>> from pyspark.sql.types import IntegerType
    >>> studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" "))
    //下面要设置模式信息
    >>> schema = StructType([StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
    >>> rowRDD = studentRDD.map(lambda p : Row(p[1].strip(), p[2].strip(),int(p[3])))
    //建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
    >>> studentDF = spark.createDataFrame(rowRDD, schema)
    >>> prop = {}
    >>> prop['user'] = 'root'
    >>> prop['password'] = 'hadoop'
    >>> prop['driver'] = "com.mysql.jdbc.Driver"
    >>> studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)
  • 相关阅读:
    Java实现 LeetCode 740 删除与获得点数(递推 || 动态规划?打家劫舍Ⅳ)
    Python oct() 函数
    Python hex() 函数
    Python ord() 函数
    Python unichr() 函数
    Python chr() 函数
    arm,asic,dsp,fpga,mcu,soc各自的特点
    摄像头标定技术
    自主泊车技术分析
    畸变的单目摄像机标定
  • 原文地址:https://www.cnblogs.com/dhName/p/10699824.html
Copyright © 2011-2022 走看看