pom.xml中
<!-- https://mvnrepository.com/artifact/com.google.guava/guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>14.0.1</version> </dependency>
一、toDF
1.直接写全部列名
val spark = SparkSession.builder().master("local[2]").appName("app").getOrCreate() val df = spark.read.format("csv").load("file:///D:/idea/ideaProjects/spark_projects/myspark8/src/main/scala/com/njbdqn/DSDF/orders.csv") .toDF("id", "orddate", "itemid", "status") // 去表头的写法 spark.read.format("csv").option("header","true").load("hdfs://192.168.56.111:9000/party/data/users.csv").cache() dfUsers.show(false)
2.选择需要的列
val orders = spark.read.format("csv").load("file:///D:/idea/ideaProjects/spark_projects/myspark8/src/main/scala/com/njbdqn/DSDF/orders.csv") .select("_c0","_c2").toDF("orderid","userid")
二、使用DF-SQL语句方式
1.全局TempView需要 global_temp.orders
//df.createOrReplaceGlobalTempView("orders") df.createOrReplaceTempView("orders")
2.使用spark.sql()
spark.sql("select if(dayofweek(orddate)==1,'Sun',dayofweek(orddate)-1) as s ,count(id) from orders group by dayofweek(orddate)").show(false)
spark.sql()中写多行:使用stripMargin,按照“ | ”识别开头,避免格式不一致的问题
val sql="""select |userid,eventid, |max(case when statu='invited' then 1 else 0 end) invited, |max(case when statu='yes' then 1 else 0 end) yes, |max(case when statu='maybe' then 1 else 0 end) maybe, |max(case when statu='no' then 1 else 0 end) no |from |dwd_events.dwd_eventAttendees group by userid,eventid""".stripMargin spark.sql(sql)
三、使用DF-SQL API
【如何实现CASE WHEN1】
1.自定义UDF函数:
- 如何调用?参数是Column对象 wdchange($"weekdate")
- 入参/出参 是什么类型?
udf{(sc:Int)=>{返回String}} udf{(sc:String)=>{返回Int}} // 都没有问题
import org.apache.spark.sql.functions._ val wdchange = udf{(num:Int)=> num match { case 1 => "七" case 2 => "一" case 3 => "二" case 4 => "三" case 5 => "四" case 6 => "五" case _ => "六" } } // 先groupby再agg // 操作几个列,如果最后不select,都会显示、 // agg里面两种写法 // withColumn表示再添一列,$表示列对象,需要implicts._ df // 几周?date_format(,"u") .groupBy(date_format($"orddate","u").alias("weekdate")) // .agg("id"->"sum","id"->"count") .agg(count("id").alias("countId"),sum("id").alias("sumId")) .withColumn("wd",wdchange($"weekdate")) .show(false)
【如何实现CASE WHEN2】
2.使用when.otherwise
import spark.implicits._ df .groupBy(when(dayofweek($"orddate")===1,"七") .when(dayofweek($"orddate")===2,"一") .when(dayofweek($"orddate")===3,"二") .when(dayofweek($"orddate")===4,"三") .when(dayofweek($"orddate")===5,"四") .when(dayofweek($"orddate")===6,"五") .otherwise("六").alias("day")) .agg(count("id").alias("countId"),sum("id").alias("sumId")) .show(false)
结果:
+---+-------+------------+
|day|countId|sumId |
+---+-------+------------+
|六 |9984 |3.35875381E8|
|三 |9758 |3.46913986E8|
|四 |9862 |3.32166696E8|
|二 |9964 |3.45180682E8|
|一 |9292 |3.23174788E8|
|五 |10288 |3.53314523E8|
|七 |9735 |3.3584223E8 |
+---+-------+------------+
3.另一个例子
分析:根据city分组sum(countprice)
orderid - countprice : 1:n => orderid - sum(countprice) : 1: 1
- userid : n:1 => sum(countprice) - userid : n:1
userid - city: 1 :1 => sum(countprice) - city : n:1 => sum(countprice) - groupby(city): 1 : 1
// groupBy(列)必须是当表有的列
// groupBy可以加列对象也可以加列名,返回的是 RelationalGroupedDataset,只有DataFrame格式的可以show()
def groupBy(col1 : scala.Predef.String, cols : scala.Predef.String*)
def groupBy(cols : org.apache.spark.sql.Column*)
orderitem //.select($"orderid",$"countprice".cast(DataTypes.DoubleType)) .groupBy("orderid").agg(sum("countprice").alias("sumPrice")) .join(orders,Seq("orderid"),"inner") .join(customers,Seq("userid"),"inner").groupBy($"city").agg(sum("sumPrice").alias("cityPrice")) .orderBy(desc("cityPrice")) .show()
4. 使用窗口函数
row_number().over(Window.orderBy("good_id")).alias("gid"))
import org.apache.spark.sql.expressions.Window val wnd = Window.partitionBy(month($"date")) .orderBy("date") df.select($"name",$"date",$"count",sum("count") over(wnd)).show(false)
为什么使用窗口函数?因为使用了sum/min聚合函数,必须有group by;如果不想group by,就需要over()成为单独的一列
.withColumn("min_login_time",min($"last_login_time")over()) // 聚合函数,不是groupBy,就是over() .withColumn("lasttime",datediff($"last_login_time",$"min_login_time")) // 普通函数,不需要窗口
四、使用JAVA的Calendar + RDD 形式
// date转星期 JAVA def daychange(dateStr:String): String ={ val arr = Array("日","一","二","三","四","五","六") val cal = Calendar.getInstance() val sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss") cal.setTime(sdf.parse(dateStr)) val week = arr(cal.get(Calendar.DAY_OF_WEEK)-1) // println(week) week }
// Java + RDD
// dataFrame.map => dataSet 转变成DataSet
val str = df.select("id", "orddate") .map(x => { (daychange(x(1).toString), x(0).toString) }) .rdd.groupByKey().foreach(x=>println(x._1,x._2.size))