zoukankan      html  css  js  c++  java
  • pyspark SparkSession及dataframe基本操作

    from pyspark import SparkContext, SparkConf
    import os
    from pyspark.sql.session import SparkSession
    from pyspark.sql import Row
    
    def CreateSparkContex():
    	sparkconf = SparkConf().setAppName("MYPRO").set("spark.ui.showConsoleProgress", "false")
    	sc = SparkContext(conf=sparkconf)
    	print("master:" + sc.master)
    	sc.setLogLevel("WARN")
    	Setpath(sc)
    	spark = SparkSession.builder.config(conf=sparkconf).getOrCreate()
    	return sc, spark
    
    def Setpath(sc):
    	global Path
    	if sc.master[:5] == "local":
    		Path = "file:/C:/spark/sparkworkspace"
    	else:
    		Path = "hdfs://test"
    
    
    
    if __name__ == "__main__":
    	print("Here we go!
    ")
    	sc, spark = CreateSparkContex()
    	readcsvpath = os.path.join(Path, 'iris.csv')
    	dfcsv = spark.read.csv(readcsvpath, header=True,
    	schema=("`Sepal.Length` DOUBLE,`Sepal.Width` DOUBLE,`Petal.Length` DOUBLE,`Petal.Width` DOUBLE,`Species` string"))
    	#指定数据类型读取
    	
    	dfcsv.show(3)
    	
    	dfcsv.registerTempTable('Iris')#创建并登陆临时表
    	spark.sql("select * from Iris limit 3").show()#使用sql语句查询
    	spark.sql("select Species,count(1) from Iris group by Species").show()
    	
    	df = dfcsv.alias('Iris1')#创建一个别名
    	df.select('Species', '`Sepal.Width`').show(4)#因表头有特殊字符需用反引号``转义
    	df.select(df.Species,df['`Sepal.Width`']).show(4)
    	dfcsv.select(df.Species).show(4)#原始名、别名的组合
    	df[df.Species, df['`Sepal.Width`']].show(4)
    	df[['Species']]#与pandas相同
    	df['Species']#注意这是一个字段名
    	
    	#########增加字段
    	df[df['`Sepal.Length`'], df['`Sepal.Width`'], df['`Sepal.Length`'] - df['`Sepal.Width`']].show(4)
    	df[df['`Sepal.Length`'], df['`Sepal.Width`'],
    	   (df['`Sepal.Length`'] - df['`Sepal.Width`']).alias('rua')].show(4)#重命名
    	
    	#########筛选数据
    	df[df.Species == 'virginica'].show(4)#与pandas筛选一样
    	df[(df.Species == 'virginica') & (df['`Sepal.Width`']>1)].show(4)#多条件筛选
    	df.filter(df.Species == 'virginica').show(4)#也可以用fileter方法筛选
    	spark.sql("select * from Iris where Species='virginica'").show(4)#sql筛选
    	
    	##########多字段排序
    	spark.sql("select * from Iris order by `Sepal.Length` asc ").show(4)#升序
    	spark.sql("select * from Iris order by `Sepal.Length` desc ").show(4)#降序
    	spark.sql("select * from Iris order by `Sepal.Length` asc,`Sepal.Width` desc ").show(4)#升降序
    	
    	df.select('`Sepal.Length`', '`Sepal.Width`').orderBy('`Sepal.Width`',ascending=0).show(4)#按降序
    	df.select('`Sepal.Length`', '`Sepal.Width`').orderBy('`Sepal.Width`').show(4)  # 升序
    	df.select('`Sepal.Length`', '`Sepal.Width`').orderBy('`Sepal.Width`', ascending=1).show(4)  # 按升序,默认的
    	df.select('`Sepal.Length`', '`Sepal.Width`').orderBy(df['`Sepal.Width`'].desc()).show(4)  # 按降序
    	
    	df.select('`Sepal.Length`', '`Sepal.Width`').orderBy(
    		['`Sepal.Length`','`Sepal.Width`'], ascending=[0,1]).show(4)#两个字段按先降序再升序
    	df.orderBy(df['`Sepal.Length`'].desc(),df['`Sepal.Width`']).show(4)
    	
    	##########去重
    	spark.sql("select distinct Species from Iris").show()
    	spark.sql("select distinct Species,`Sepal.Width` from Iris").show()
    	
    	df.select('Species').distinct().show()
    	df.select('Species','`Sepal.Width`').distinct().show()
    	df.select('Species').drop_duplicates().show()#同上,与pandas用法相同
    	df.select('Species').dropDuplicates().show()#同上
    	
    	##########分组统计
    	spark.sql("select Species,count(1) from Iris group by Species").show()
    	df[['Species']].groupby('Species').count().show()
    	df.groupby(['Species']).agg({'`Sepal.Width`': 'sum'}).show()
    	df.groupby(['Species']).agg({'`Sepal.Width`': 'sum', '`Sepal.Length`': 'mean'}).show()
    	
    	#########联结数据
    	dic=[['virginica','A1'],['versicolor','A2'],['setosa','A3']]
    	rrd=sc.parallelize(dic)
    	df2=rrd.map(lambda p: Row(lei=p[0],al=p[1]))
    	df2frame=spark.createDataFrame(df2)
    	df2frame.show()
    	df2frame.registerTempTable('dictable')
    	spark.sql("select * from Iris u left join dictable z on u.Species=z.lei").show()
    	df.join(df2frame, df.Species == df2frame.lei, 'left_outer').show()
    	
    	sc.stop()
    	spark.stop()
    

     

  • 相关阅读:
    使用Ansible安装部署nginx+php+mysql之安装php(2)
    使用Ansible安装部署nginx+php+mysql之安装nginx(1)
    Ansible常见问题处理
    4.2、Ansible常用模块
    3.2、Ansible单命令测试
    2、Ansible配置文件详解
    4.1、Ansible模块
    3.3、Ansible命令参数详解
    3.1、Ansible命令简要说明及初步使用
    1、Ansible初识简要介绍及安装
  • 原文地址:https://www.cnblogs.com/mahailuo/p/9603401.html
Copyright © 2011-2022 走看看