zoukankan      html  css  js  c++  java
  • pyspark dataframe 常用操作

    spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能。当然主要对类SQL的支持。

     
    在实际工作中会遇到这样的情况,主要是会进行两个数据集的筛选、合并,重新入库。
     
    首先加载数据集,然后在提取数据集的前几行过程中,才找到limit的函数。
     
    而合并就用到union函数,重新入库,就是registerTemple注册成表,再进行写入到HIVE中。
     

    1、union、unionAll、unionByName,row 合并(上下拼接)

    data_all = data_neg.unionByName(data_pos)

    2、dataframe 样本抽样

    data_all.sample(False, 0.5, 1000).count()

    3、条件过滤

    data_all.filter("label >= 1").count()

    4、注册为临时表,再使用spark.sql 对dataframe进行操作

    res = predictions.select("user_log_acct", split_udf('probability').alias('probability'))

    res.registerTempTable("tmp")
    spark.sql("insert overwrite table dev.dev_result_temp select user_log_acct,probability from tmp")

    spark.stop()

     

    创建和保存spark dataframe:

    spark.createDataFrame(data, schema=None, samplingRatio=None),直接创建
    其中data是行或元组或列表或字典的RDD、list、pandas.DataFrame。

    df = spark.createDataFrame([
            (1, 144.5, 5.9, 33, 'M'),
            (2, 167.2, 5.4, 45, 'M'),
            (3, 124.1, 5.2, 23, 'F'),
            (4, 144.5, 5.9, 33, 'M'),
            (5, 133.2, 5.7, 54, 'F'),
            (3, 124.1, 5.2, 23, 'F'),
            (5, 129.2, 5.3, 42, 'M'),
        ], ['id', 'weight', 'height', 'age', 'gender']) #直接创建Dataframe
    
    df = spark.createDataFrame([{'name':'Alice','age':1},
    	{'name':'Polo','age':1}]) #从字典创建
    
    schema = StructType([
        StructField("id", LongType(), True),    
        StructField("name", StringType(), True),
        StructField("age", LongType(), True),
        StructField("eyeColor", StringType(), True)
    ])
    df = spark.createDataFrame(csvRDD, schema) #指定schema。
    
    

     spark.read 从文件中读数据

    >>> airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='	')
    >>> rdd = sc.textFile('python/test_support/sql/ages.csv') #可以用这种方法将用逗号分隔的rdd转为dataframe
    >>> df2 = spark.read.csv(rdd)
    >>> df = spark.read.format('json').load('python/test_support/sql/people.json') 
    >>> df1 = spark.read.json('python/test_support/sql/people.json')
    >>> df1.dtypes
    [('age', 'bigint'), ('name', 'string')]
    >>> rdd = sc.textFile('python/test_support/sql/people.json')
    >>> df2 = spark.read.json(rdd) 
    >>> df = spark.read.text('python/test_support/sql/text-test.txt')
    >>> df.collect()
    [Row(value='hello'), Row(value='this')]
    >>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True)
    >>> df.collect()
    [Row(value='hello
    this')]
    

      

    Spark function

    1)foreach(f),应用f函数,将df的每一行作为f函数的输入

    例如:

    def f(person):

        print(person.name)

    df.foreach(f)

    2) apply(udf)
    3) map(f),应用f函数,作用对象为rdd的每一行

     

    参考:https://blog.csdn.net/kittyzc/article/details/82862089 

  • 相关阅读:
    堆栈学习
    需要阅读的书籍
    Rust Book Lang Ch.19 Fully Qualified Syntax, Supertraits, Newtype Pattern, type aliases, never type, dynamic sized type
    Rust Lang Book Ch.19 Placeholder type, Default generic type parameter, operator overloading
    Rust Lang Book Ch.19 Unsafe
    Rust Lang Book Ch.18 Patterns and Matching
    Rust Lang Book Ch.17 OOP
    Rust Lang Book Ch.16 Concurrency
    Rust Lang Book Ch.15 Smart Pointers
    HDU3966-Aragorn's Story-树链剖分-点权
  • 原文地址:https://www.cnblogs.com/Allen-rg/p/10216928.html
Copyright © 2011-2022 走看看