zoukankan      html  css  js  c++  java
  • PySpark SQL 基本操作

    记录备忘:

    转自: https://www.jianshu.com/p/177cbcb1cb6f 

     

     

    数据拉取

    加载包:

    from __future__ import print_function
    
    import pandas as pd
    
    from pyspark.sql import HiveContext
    
    from pyspark import SparkContext,SparkConf
    
    from sqlalchemy import create_engine
    
    import datetime
    
    import pyspark.sql.functions as F
    
     
    
    conf = SparkConf().setAppName("abc")
    
    sc = SparkContext(conf=conf)
    
    hiveCtx = HiveContext(sc)
    
     
    
    # 创建dataframe
    
    d = [{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 5}]
    
    df = sqlContext.createDataFrame(d)
    
    df.show() 
    
     
    
    sql = "" # 拉数SQL
    
    df  = hiveCtx.sql(sql)
    

      

     

    数据探索

    df.show() # 不加参数默认展示前20行
    
    df.count() 
    
    df.printSchema() 
    
    df.columns

     

    数据处理

    df.select('age','name') # 带show才能看到结果
    
    df.select(df.age.alias('age_value'),'name')
    
    df.filter(df.name=='Alice')
    

      

     

    函数和UDF

    pyspark.sql.functions里有许多常用的函数,可以满足日常绝大多数的数据处理需求;当然也支持自己写的UDF,直接拿来用。

    自带函数

    根据官方文档,以下是部分函数说明:

    'lit': 'Creates a :class:`Column` of literal value.',
    
    'col': 'Returns a :class:`Column` based on the given column name.',
    
    'column': 'Returns a :class:`Column` based on the given column name.',
    
    'asc': 'Returns a sort expression based on the ascending order of the given column name.',
    
    'desc': 'Returns a sort expression based on the descending order of the given column name.',
    
     
    
    'upper': 'Converts a string expression to upper case.',
    
    'lower': 'Converts a string expression to upper case.',
    
    'sqrt': 'Computes the square root of the specified float value.',
    
    'abs': 'Computes the absolutle value.',
    
     
    
    'max': 'Aggregate function: returns the maximum value of the expression in a group.',
    
    'min': 'Aggregate function: returns the minimum value of the expression in a group.',
    
    'first': 'Aggregate function: returns the first value in a group.',
    
    'last': 'Aggregate function: returns the last value in a group.',
    
    'count': 'Aggregate function: returns the number of items in a group.',
    
    'sum': 'Aggregate function: returns the sum of all values in the expression.',
    
    'avg': 'Aggregate function: returns the average of the values in a group.',
    
    'mean': 'Aggregate function: returns the average of the values in a group.',
    
    'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.',
    
     
    
    ---------------------------
    
     
    
    df.select(F.max(df.age))
    
    df.select(F.min(df.age))
    
    df.select(F.avg(df.age)) # 也可以用mean,一样的效果
    
    df.select(F.countDistinct(df.age)) # 去重后统计
    
    df.select(F.count(df.age)) # 直接统计,经试验,这个函数会去掉缺失值会再统计
    
     
    
    from pyspark.sql import Window
    
    df.withColumn("row_number", F.row_number().over(Window.partitionBy("a","b","c","d").orderBy("time"))).show() # row_number()函数
    

      

    数据写出

    写入集群分区表

    all_bike.rdd.map(lambda line: u','.join(map(lambda x:unicode(x),line))).saveAsTextFile('/user/hive/warehouse/bi.db/bikeid_without_3codes_a_d/dt={}'.format(t0_uf)) #转化为RDD写入HDFS路径
    

      

    还有一种方法,是先把dataframe创建成一个临时表,再用hive sql的语句写入表的分区

    bike_change_2days.registerTempTable('bike_change_2days')
    sqlContext.sql("insert into bi.bike_changes_2days_a_d partition(dt='%s') select citycode,biketype,detain_bike_flag,bike_tag_onday,bike_tag_yesterday,bike_num from bike_change_2days"%(date))
    

    写入集群非分区表

    df_spark.write.mode("append").insertInto('bi.pesudo_bike_white_list') # 直接使用write.mode方法insert到指定的集群表
    

     

    可以先将PySpark DataFrame转化成Pandas DataFrame,然后用pandasto_sql方法插入数据库

    写出本地

    df.write.csv()

     

     

    Pandas DataFrame互相转换

    如果你熟悉Pandas包,并且PySpark处理的中间数据量不是太大,那么可以直接转换成pandas DataFrame,然后转化成常规操作。
    df.toPandas() # PySpark DataFrame转化成Pandas DataFrame
    
    
    
    import pandas as pd
    df_p = pd.DataFrame(dict(num=range(3),char=['a','b','c']))
    df_s = sqlContext.createDataFrame(df_p) # pandas dataframe转化成PySpark DataFrame
    type(df_s)
    

      

  • 相关阅读:
    37.Spring-事务控制.md
    35.Spring-jdbc支持.md
    29.Hibernate-二级缓存和session管理.md
    28.Hibernate-HQL查询.md
    27.Hibernate-缓存和懒加载.md
    26.Hibernate-主键和映射.md
    25.Hibernate-配置文件.md
    24.Hibernate-各种类型的查询.md
    MarkDown学习记录
    gitbook使用
  • 原文地址:https://www.cnblogs.com/Allen-rg/p/12693653.html
Copyright © 2011-2022 走看看