zoukankan      html  css  js  c++  java
  • pyspark SQL

     1 from pyspark.sql import HiveContext
     2 from pyspark import SparkContext,SparkConf
     3 import pyspark.sql.functions as F
     4 from  pyspark.sql import SparkSession
     5 
     6 conf = SparkConf().setAppName("abc")
     7 sc = SparkContext(conf=conf)
     8 hiveCtx = HiveContext(sc)
     9 df  = hiveCtx.sql(sql) #用Hive拉数
    10 
    11 df.cache()  # 数据载入缓存
    12 df.show()   # 不加参数默认展示前20行
    13 df.count()  # 统计行数
    14 df.printSchema() # 查看schema
    15 df.columns # 查看字段
    16 df.dtypes # 查看字段类型
    17 df.select('age','name') # 带show才能看到结果
    18 df.select(df.age.alias('age_value'),'name').show() #别名
    19 df.filter(df.name=='Alice').show() # 筛选
    20 df.drop_duplicates() #删除重复记录
    21 df.distinct() #去重
    22 df.drop('id') #删除列
    23 df.na.drop(thresh=2).show() #如果一行至少2个缺失值才删除该行
    24 df.na.fill('unknown').show() #对所有列用同一个值填充缺失值
    25 df.na.fill({'name':'--', 'age':0}).show() # 不同的列用不同的值填充
    26 df.groupby('name').agg(F.max(df['age'])) # 分组计算
    27 df.groupby('name').agg(F.max(df['age'])) # join
    28 df.describe("age").show() # 描述性统计分析
    29 spark.catalog.listTables()  #查看temptable
    30 
    31 df.select(df.age+1,'age','name') # 增加列
    32 df.select(F.lit(0).alias('id'),'age','name') # 增加列
    33 df.unionAll(df2) # 增加行
     1 #spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option", "some-value").getOrCreate()
     2 spark=SparkSession.builder.appName("boye").getOrCreate()
     3 #d = [{"name": "Alice", "age": 12},{"name": "Bob", "age": 53}]
     4 #df = spark.createDataFrame(d)
     5 df =spark.read.json("file:///usr/local/test/01.json")
     6 #df = spark.read.csv(path=path,schema=["id","name"],sep="	",header=False)
     7 df.show()
     8 df.createTempView("student")
     9 df.createOrReplaceTempView("student") #全局临时表,spark.sql("select avg(age) from global_temp.student").show()
    10 spark.newSession().sql("SELECT * FROM global_temp.student").show()
    11 df.createGlobalTempView("student")
    12 spark.sql("select * from student where age<20").show()

    【Example1】

    spark=SparkSession.builder.appName("boye").getOrCreate()
    sc = spark.sparkContext
    textFile = sc.textFile("file:///usr/local/test/urls")
    rdd = textFile.map(lambda x:x.split("	")).filter(lambda x:len(x)==2)
    df = spark.createDataFrame(rdd,schema=["rowkey","url"])
    df.write.format("json").mode("overwrite").save("file:///usr/local/test/outPut") #保存数据
    df.write.save(path='/usr/local/test/csv', format='csv', mode='overwrite', sep='	') #保存为csv文件
    df.write.mode("overwrite").saveAsTable("ss") #永久保存

    【Example2】

    fields = [
                ('uid',StringType()),
                ('url',StringType()),
                #('age',IntegerType())
            ]
        schema = StructType([StructField(e[0],e[1],True) for e in fields])
        df = spark.read.csv(path="/usr/local/test/urls",schema=schema,sep="	",header=False)
  • 相关阅读:
    Qt -- 鼠标移入移出事件 enterEvent、leaveEvent
    QT -- QPainter介绍
    Qt -- 浅析QFontMetrics 获取字体宽度,高度
    函数声明后面的const用法
    QT -- 读取file数据/写数据到file
    QT -- QLineEdit按下回车键获取信息
    C++ -- fgets,fputs,fputc,fgetc总结
    QT -- QString / std::string转换为const char*
    C++ -- fopen函数用法
    HTML DOM树
  • 原文地址:https://www.cnblogs.com/boye169/p/14540912.html
Copyright © 2011-2022 走看看