from pyspark import SparkContext, SparkConf import os from pyspark.sql.session import SparkSession from pyspark.sql import Row def CreateSparkContex(): sparkconf = SparkConf().setAppName("MYPRO").set("spark.ui.showConsoleProgress", "false") sc = SparkContext(conf=sparkconf) print("master:" + sc.master) sc.setLogLevel("WARN") Setpath(sc) spark = SparkSession.builder.config(conf=sparkconf).getOrCreate() return sc, spark def Setpath(sc): global Path if sc.master[:5] == "local": Path = "file:/C:/spark/sparkworkspace" else: Path = "hdfs://test" if __name__ == "__main__": print("Here we go! ") sc, spark = CreateSparkContex() readcsvpath = os.path.join(Path, 'iris.csv') dfcsv = spark.read.csv(readcsvpath, header=True, schema=("`Sepal.Length` DOUBLE,`Sepal.Width` DOUBLE,`Petal.Length` DOUBLE,`Petal.Width` DOUBLE,`Species` string")) #指定数据类型读取 dfcsv.show(3) dfcsv.registerTempTable('Iris')#创建并登陆临时表 spark.sql("select * from Iris limit 3").show()#使用sql语句查询 spark.sql("select Species,count(1) from Iris group by Species").show() df = dfcsv.alias('Iris1')#创建一个别名 df.select('Species', '`Sepal.Width`').show(4)#因表头有特殊字符需用反引号``转义 df.select(df.Species,df['`Sepal.Width`']).show(4) dfcsv.select(df.Species).show(4)#原始名、别名的组合 df[df.Species, df['`Sepal.Width`']].show(4) df[['Species']]#与pandas相同 df['Species']#注意这是一个字段名 #########增加字段 df[df['`Sepal.Length`'], df['`Sepal.Width`'], df['`Sepal.Length`'] - df['`Sepal.Width`']].show(4) df[df['`Sepal.Length`'], df['`Sepal.Width`'], (df['`Sepal.Length`'] - df['`Sepal.Width`']).alias('rua')].show(4)#重命名 #########筛选数据 df[df.Species == 'virginica'].show(4)#与pandas筛选一样 df[(df.Species == 'virginica') & (df['`Sepal.Width`']>1)].show(4)#多条件筛选 df.filter(df.Species == 'virginica').show(4)#也可以用fileter方法筛选 spark.sql("select * from Iris where Species='virginica'").show(4)#sql筛选 ##########多字段排序 spark.sql("select * from Iris order by `Sepal.Length` asc ").show(4)#升序 spark.sql("select * from Iris order by `Sepal.Length` desc ").show(4)#降序 spark.sql("select * from Iris order by `Sepal.Length` asc,`Sepal.Width` desc ").show(4)#升降序 df.select('`Sepal.Length`', '`Sepal.Width`').orderBy('`Sepal.Width`',ascending=0).show(4)#按降序 df.select('`Sepal.Length`', '`Sepal.Width`').orderBy('`Sepal.Width`').show(4) # 升序 df.select('`Sepal.Length`', '`Sepal.Width`').orderBy('`Sepal.Width`', ascending=1).show(4) # 按升序,默认的 df.select('`Sepal.Length`', '`Sepal.Width`').orderBy(df['`Sepal.Width`'].desc()).show(4) # 按降序 df.select('`Sepal.Length`', '`Sepal.Width`').orderBy( ['`Sepal.Length`','`Sepal.Width`'], ascending=[0,1]).show(4)#两个字段按先降序再升序 df.orderBy(df['`Sepal.Length`'].desc(),df['`Sepal.Width`']).show(4) ##########去重 spark.sql("select distinct Species from Iris").show() spark.sql("select distinct Species,`Sepal.Width` from Iris").show() df.select('Species').distinct().show() df.select('Species','`Sepal.Width`').distinct().show() df.select('Species').drop_duplicates().show()#同上,与pandas用法相同 df.select('Species').dropDuplicates().show()#同上 ##########分组统计 spark.sql("select Species,count(1) from Iris group by Species").show() df[['Species']].groupby('Species').count().show() df.groupby(['Species']).agg({'`Sepal.Width`': 'sum'}).show() df.groupby(['Species']).agg({'`Sepal.Width`': 'sum', '`Sepal.Length`': 'mean'}).show() #########联结数据 dic=[['virginica','A1'],['versicolor','A2'],['setosa','A3']] rrd=sc.parallelize(dic) df2=rrd.map(lambda p: Row(lei=p[0],al=p[1])) df2frame=spark.createDataFrame(df2) df2frame.show() df2frame.registerTempTable('dictable') spark.sql("select * from Iris u left join dictable z on u.Species=z.lei").show() df.join(df2frame, df.Species == df2frame.lei, 'left_outer').show() sc.stop() spark.stop()