zoukankan      html  css  js  c++  java
  • PySpark SQL 基本操作

    记录备忘:

    转自: https://www.jianshu.com/p/177cbcb1cb6f 

     

     

    数据拉取

    加载包:

    from __future__ import print_function
    
    import pandas as pd
    
    from pyspark.sql import HiveContext
    
    from pyspark import SparkContext,SparkConf
    
    from sqlalchemy import create_engine
    
    import datetime
    
    import pyspark.sql.functions as F
    
     
    
    conf = SparkConf().setAppName("abc")
    
    sc = SparkContext(conf=conf)
    
    hiveCtx = HiveContext(sc)
    
     
    
    # 创建dataframe
    
    d = [{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 5}]
    
    df = sqlContext.createDataFrame(d)
    
    df.show() 
    
     
    
    sql = "" # 拉数SQL
    
    df  = hiveCtx.sql(sql)
    

      

     

    数据探索

    df.show() # 不加参数默认展示前20行
    
    df.count() 
    
    df.printSchema() 
    
    df.columns

     

    数据处理

    df.select('age','name') # 带show才能看到结果
    
    df.select(df.age.alias('age_value'),'name')
    
    df.filter(df.name=='Alice')
    

      

     

    函数和UDF

    pyspark.sql.functions里有许多常用的函数,可以满足日常绝大多数的数据处理需求;当然也支持自己写的UDF,直接拿来用。

    自带函数

    根据官方文档,以下是部分函数说明:

    'lit': 'Creates a :class:`Column` of literal value.',
    
    'col': 'Returns a :class:`Column` based on the given column name.',
    
    'column': 'Returns a :class:`Column` based on the given column name.',
    
    'asc': 'Returns a sort expression based on the ascending order of the given column name.',
    
    'desc': 'Returns a sort expression based on the descending order of the given column name.',
    
     
    
    'upper': 'Converts a string expression to upper case.',
    
    'lower': 'Converts a string expression to upper case.',
    
    'sqrt': 'Computes the square root of the specified float value.',
    
    'abs': 'Computes the absolutle value.',
    
     
    
    'max': 'Aggregate function: returns the maximum value of the expression in a group.',
    
    'min': 'Aggregate function: returns the minimum value of the expression in a group.',
    
    'first': 'Aggregate function: returns the first value in a group.',
    
    'last': 'Aggregate function: returns the last value in a group.',
    
    'count': 'Aggregate function: returns the number of items in a group.',
    
    'sum': 'Aggregate function: returns the sum of all values in the expression.',
    
    'avg': 'Aggregate function: returns the average of the values in a group.',
    
    'mean': 'Aggregate function: returns the average of the values in a group.',
    
    'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.',
    
     
    
    ---------------------------
    
     
    
    df.select(F.max(df.age))
    
    df.select(F.min(df.age))
    
    df.select(F.avg(df.age)) # 也可以用mean,一样的效果
    
    df.select(F.countDistinct(df.age)) # 去重后统计
    
    df.select(F.count(df.age)) # 直接统计,经试验,这个函数会去掉缺失值会再统计
    
     
    
    from pyspark.sql import Window
    
    df.withColumn("row_number", F.row_number().over(Window.partitionBy("a","b","c","d").orderBy("time"))).show() # row_number()函数
    

      

    数据写出

    写入集群分区表

    all_bike.rdd.map(lambda line: u','.join(map(lambda x:unicode(x),line))).saveAsTextFile('/user/hive/warehouse/bi.db/bikeid_without_3codes_a_d/dt={}'.format(t0_uf)) #转化为RDD写入HDFS路径
    

      

    还有一种方法,是先把dataframe创建成一个临时表,再用hive sql的语句写入表的分区

    bike_change_2days.registerTempTable('bike_change_2days')
    sqlContext.sql("insert into bi.bike_changes_2days_a_d partition(dt='%s') select citycode,biketype,detain_bike_flag,bike_tag_onday,bike_tag_yesterday,bike_num from bike_change_2days"%(date))
    

    写入集群非分区表

    df_spark.write.mode("append").insertInto('bi.pesudo_bike_white_list') # 直接使用write.mode方法insert到指定的集群表
    

     

    可以先将PySpark DataFrame转化成Pandas DataFrame,然后用pandasto_sql方法插入数据库

    写出本地

    df.write.csv()

     

     

    Pandas DataFrame互相转换

    如果你熟悉Pandas包,并且PySpark处理的中间数据量不是太大,那么可以直接转换成pandas DataFrame,然后转化成常规操作。
    df.toPandas() # PySpark DataFrame转化成Pandas DataFrame
    
    
    
    import pandas as pd
    df_p = pd.DataFrame(dict(num=range(3),char=['a','b','c']))
    df_s = sqlContext.createDataFrame(df_p) # pandas dataframe转化成PySpark DataFrame
    type(df_s)
    

      

  • 相关阅读:
    NanoProfiler
    NanoProfiler
    Open Source Cassandra Gitbook for Developer
    Android Fragment使用(四) Toolbar使用及Fragment中的Toolbar处理
    Android Fragment使用(三) Activity, Fragment, WebView的状态保存和恢复
    Android Fragment使用(二) 嵌套Fragments (Nested Fragments) 的使用及常见错误
    Android Fragment使用(一) 基础篇 温故知新
    Set up Github Pages with Hexo, migrating from Jekyll
    EventBus源码解析 源码阅读记录
    Android M Permission 运行时权限 学习笔记
  • 原文地址:https://www.cnblogs.com/Allen-rg/p/12693653.html
Copyright © 2011-2022 走看看