启动命令
./bin/spark-shell ./bin/spark-shell --master yarn-client //在yarn上启动
操作实例1
val df=sql("select * from default.orders") df.select("user_id").distinct.count() //selectExpr里面可以用hive sql语句 df.selectExpr("max(cast(user_id as int))").show() df.groupBy("order_dow").count().show()
cache和persist 将数据放入内存
val priors = spark.sql("select * from default.order_products_prior") val df2 = df.join(priors,"order_id").cache val df1 = df.groupBy("order_dow").count().cache()
df2.uppersist //从内存中释放
操作实例2
import org.apache.spark.sql.SparkSession object TestFunc { def main(args: Array[String]): Unit = { // 实例化sparksession 在client端自动实例化sparksession // Spark session available as 'spark'. val spark = SparkSession .builder() .appName("test") .master("local[2]") .enableHiveSupport() .getOrCreate() val df = spark.sql("select * from badou.orders") val priors = spark.sql("select * from badou.order_products_prior") """ |4.每个用户根据order_hour_of_day这列的值对order_dow进行排序 |1 2 08 |1 3 07 | |1 [(2,08),(3,07)] | |=> 1 [(3,07),(2,08)] 一个用户最喜爱购买商品的top3 |rdd: (user_id,(order_number,order_hour_of_day)) """.stripMargin import spark.implicits._ val orderNumberSort = df.select("user_id","order_number","order_hour_of_day") .rdd.map(x=>(x(0).toString,(x(1).toString,x(2).toString))) //DataFrame转RDD .groupByKey() .mapValues(_.toArray.sortWith(_._2<_._2).slice(0,2)) .toDF("user_id","order_sort_by_hour") // udf import org.apache.spark.sql.functions._ val plusUDF = udf((col1:String,col2:String)=>col1.toInt+col2.toInt) df.withColumn("plus",plusUDF(col("order_number"),col("order_dow"))).show() } }
word count
val file = sc.textFile("/data/The_Man_of_Property.txt") //file.take(3) //file.flatMap(line=>line.split(" ")).take(10) //file.flatMap(line=>line.split(" ")).map((_,1)).take(10) //file.flatMap(line=>line.split(" ")).map((_,1)).reduceByKey(_+_).take(10) file.flatMap(line=>line.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/data/output/wc")
结巴分词
import com.huaban.analysis.jieba.{JiebaSegmenter, SegToken} import com.huaban.analysis.jieba.JiebaSegmenter.SegMode import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ object JiebaKry { def main(args: Array[String]): Unit = { // 定义结巴分词类的序列化 val conf = new SparkConf() .registerKryoClasses(Array(classOf[JiebaSegmenter])) .set("spark.rpc.message.maxSize","800") // 建立sparkSession,并传入定义好的Conf val spark = SparkSession .builder() .appName("Jieba UDF") .enableHiveSupport() .config(conf) .getOrCreate() // 定义结巴分词的方法,传入的是DataFrame,输出也是DataFrame多一列seg(分好词的一列) def jieba_seg(df:DataFrame,colname:String): DataFrame ={ val segmenter = new JiebaSegmenter() val seg = spark.sparkContext.broadcast(segmenter) val jieba_udf = udf{(sentence:String)=> val segV = seg.value segV.process(sentence.toString,SegMode.INDEX) .toArray().map(_.asInstanceOf[SegToken].word) .filter(_.length>1).mkString("/") } df.withColumn("seg",jieba_udf(col(colname))) } val df =spark.sql("select sentence,label from badou.news_noseg limit 300") val df_seg = jieba_seg(df,"sentence") df_seg.show() // df_seg.write.mode("overwrite").saveAsTable("badou.news_jieba") } }
简单数据处理(特征提取)
package offline import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ object SimpleFeature { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("test") .master("local[2]") .enableHiveSupport() .getOrCreate() val priors = spark.sql("select * from badou.order_products_prior") val orders = spark.sql("select * from badou.orders") /** product feature * 1. 销售量prod_cnt * 2. 商品被再次购买(reordered)量prod_sum_rod * 3. 统计reordered比率prod_rod_rate avg=sum/count [0,1] */ // 销售量prod_cnt val prodCnt = priors.groupBy("product_id").count().withColumnRenamed("count","prod_cnt") // prod_sum_rod val prodRodCnt = priors.selectExpr("product_id","cast(reordered as int)") .groupBy("product_id") .agg(sum("reordered").as("prod_sum_rod"), avg("reordered").as("prod_rod_rate"), count("product_id").as("prod_cnt") ) /** user Features: * 1. 每个用户购买订单的平均间隔 days orders * 2. 每个用户的总订单数 * 3. 每个用户购买的product商品去重后的集合数据 user_id , set{prod1,prod2....} * 4. 用户总商品数量以及去重后的商品数量 * 5. 每个用户平均每个订单有多少商品 */ // 异常值处理:将days_since_prior_order中的空值进行处理 val ordersNew = orders.selectExpr("*", "if(days_since_prior_order='',0.0,days_since_prior_order) as dspo") .drop("days_since_prior_order") // 1.每个用户购买订单的平均间隔 days orders val userGap = ordersNew.selectExpr("user_id","cast(dspo as int) as dspo") .groupBy("user_id").avg("dspo").withColumnRenamed("avg(dspo)","u_avg_day_gap") // 2. 每个用户的总订单数 val userOrdCnt = orders.groupBy("user_id").count() // 3. 每个用户购买的product商品去重后的集合数据 用户 product val opDF = orders.join(priors,"order_id") val up = opDF.select("user_id","product_id") import spark.implicits._ // up.rdd.map()从DataFrame转变成rdd的数据, // rdd.toDF()从rdd变成DataFrame,这里返回时tuple2,所以在DF中是两列 val userUniOrdRecs = up.rdd.map{x=>(x(0).toString,x(1).toString)} .groupByKey() .mapValues(_.toSet.mkString(",")) .toDF("user_id","prod_records") // 4. 用户总商品数量以及去重后的商品数量 val userAllProd = up.groupBy("user_id").count() val userUniOrdCnt = up.rdd.map{x=>(x(0).toString,x(1).toString)} .groupByKey() .mapValues(_.toSet.size) .toDF("user_id","prod_dist_cnt") // 当有groupByKey的处理逻辑两个类似的方法时,看能不能合并 // 合并“去重后的集合数据”和“去重后的商品数量”统计逻辑 // 第一种合并提取公因子 val userRddGroup = up.rdd.map(x=>(x(0).toString,x(1).toString)).groupByKey().cache() userRddGroup.unpersist() // python del userRddGroup // val userUniOrdRecs = userRddGroup.mapValues(_.toSet.mkString(",")).toDF("user_id","prod_records") // val userUniOrdCnt = userRddGroup.mapValues(_.toSet.size).toDF("user_id","prod_dist_cnt") // 第二种同时计算两个 val userProRcdSize = up.rdd.map{x=>(x(0).toString,x(1).toString)}.groupByKey() .mapValues{records=> val rs = records.toSet; (rs.size,rs.mkString(",")) }.toDF("user_id","tuple") .selectExpr("user_id","tuple._1 as prod_dist_size","tuple._2 as prod_records") val usergroup = up.groupBy("user_id") .agg(size(collect_set("product_id")).as("prod_dist_size"), collect_set("product_id").as("prod_records")) // 5. 每个用户平均每个订单有多少商品 // 1)先求每个订单多少商品 val ordProdCnt = priors.groupBy("order_id").count() // 2)求每个用户订单商品数量的平均值 val userPerOrdProdCnt = orders.join(ordProdCnt,"order_id") .groupBy("user_id") .agg(avg("count").as("u_avg_ord_prods")) } def feat(priors:DataFrame,orders:DataFrame):DataFrame={ priors } }
参考资料
八斗大数据