zoukankan      html  css  js  c++  java
  • pyspark dataframe 常用操作

    spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能。当然主要对类SQL的支持。

     
    在实际工作中会遇到这样的情况,主要是会进行两个数据集的筛选、合并,重新入库。
     
    首先加载数据集,然后在提取数据集的前几行过程中,才找到limit的函数。
     
    而合并就用到union函数,重新入库,就是registerTemple注册成表,再进行写入到HIVE中。
     

    1、union、unionAll、unionByName,row 合并(上下拼接)

    data_all = data_neg.unionByName(data_pos)

    2、dataframe 样本抽样

    data_all.sample(False, 0.5, 1000).count()

    3、条件过滤

    data_all.filter("label >= 1").count()

    4、注册为临时表,再使用spark.sql 对dataframe进行操作

    res = predictions.select("user_log_acct", split_udf('probability').alias('probability'))

    res.registerTempTable("tmp")
    spark.sql("insert overwrite table dev.dev_result_temp select user_log_acct,probability from tmp")

    spark.stop()

     

    创建和保存spark dataframe:

    spark.createDataFrame(data, schema=None, samplingRatio=None),直接创建
    其中data是行或元组或列表或字典的RDD、list、pandas.DataFrame。

    df = spark.createDataFrame([
            (1, 144.5, 5.9, 33, 'M'),
            (2, 167.2, 5.4, 45, 'M'),
            (3, 124.1, 5.2, 23, 'F'),
            (4, 144.5, 5.9, 33, 'M'),
            (5, 133.2, 5.7, 54, 'F'),
            (3, 124.1, 5.2, 23, 'F'),
            (5, 129.2, 5.3, 42, 'M'),
        ], ['id', 'weight', 'height', 'age', 'gender']) #直接创建Dataframe
    
    df = spark.createDataFrame([{'name':'Alice','age':1},
    	{'name':'Polo','age':1}]) #从字典创建
    
    schema = StructType([
        StructField("id", LongType(), True),    
        StructField("name", StringType(), True),
        StructField("age", LongType(), True),
        StructField("eyeColor", StringType(), True)
    ])
    df = spark.createDataFrame(csvRDD, schema) #指定schema。
    
    

     spark.read 从文件中读数据

    >>> airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='	')
    >>> rdd = sc.textFile('python/test_support/sql/ages.csv') #可以用这种方法将用逗号分隔的rdd转为dataframe
    >>> df2 = spark.read.csv(rdd)
    >>> df = spark.read.format('json').load('python/test_support/sql/people.json') 
    >>> df1 = spark.read.json('python/test_support/sql/people.json')
    >>> df1.dtypes
    [('age', 'bigint'), ('name', 'string')]
    >>> rdd = sc.textFile('python/test_support/sql/people.json')
    >>> df2 = spark.read.json(rdd) 
    >>> df = spark.read.text('python/test_support/sql/text-test.txt')
    >>> df.collect()
    [Row(value='hello'), Row(value='this')]
    >>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True)
    >>> df.collect()
    [Row(value='hello
    this')]
    

      

    Spark function

    1)foreach(f),应用f函数,将df的每一行作为f函数的输入

    例如:

    def f(person):

        print(person.name)

    df.foreach(f)

    2) apply(udf)
    3) map(f),应用f函数,作用对象为rdd的每一行

     

    参考:https://blog.csdn.net/kittyzc/article/details/82862089 

  • 相关阅读:
    九宫格小游戏源码分享
    DeviceOne 竟然做出来如此复杂的App
    DeviceOne 让你一见钟情的App快速开发平台
    MySQL初始化
    MySQL的操作
    MySQL
    Library
    Python模块
    Anaconda的使用
    面向对象之成员修饰 特殊成员 methclass
  • 原文地址:https://www.cnblogs.com/Allen-rg/p/10216928.html
Copyright © 2011-2022 走看看