zoukankan      html  css  js  c++  java
  • spark操作实例

    启动命令

    ./bin/spark-shell
    ./bin/spark-shell --master yarn-client    //在yarn上启动

    操作实例1

    val df=sql("select * from default.orders")
    
    df.select("user_id").distinct.count()
    
    //selectExpr里面可以用hive sql语句
    df.selectExpr("max(cast(user_id as int))").show()
    
    df.groupBy("order_dow").count().show()

    cache和persist 将数据放入内存

    val priors = spark.sql("select * from default.order_products_prior")
    val df2 = df.join(priors,"order_id").cache
    val df1 = df.groupBy("order_dow").count().cache()
    df2.uppersist  //从内存中释放

    操作实例2

    import org.apache.spark.sql.SparkSession
    
    object TestFunc {
      def main(args: Array[String]): Unit = {
    //    实例化sparksession 在client端自动实例化sparksession
    //    Spark session available as 'spark'.
        val spark = SparkSession
          .builder()
          .appName("test")
          .master("local[2]")
          .enableHiveSupport()
          .getOrCreate()
    
        val df = spark.sql("select * from badou.orders")
        val priors = spark.sql("select * from badou.order_products_prior")
    
        """
          |4.每个用户根据order_hour_of_day这列的值对order_dow进行排序
          |1  2 08
          |1  3 07
          |
          |1 [(2,08),(3,07)]
          |
          |=> 1 [(3,07),(2,08)] 一个用户最喜爱购买商品的top3
          |rdd: (user_id,(order_number,order_hour_of_day))   
        """.stripMargin
    
        import spark.implicits._
        val orderNumberSort = df.select("user_id","order_number","order_hour_of_day")
          .rdd.map(x=>(x(0).toString,(x(1).toString,x(2).toString)))    //DataFrame转RDD
          .groupByKey()
          .mapValues(_.toArray.sortWith(_._2<_._2).slice(0,2))
          .toDF("user_id","order_sort_by_hour")
    
    //    udf
        import org.apache.spark.sql.functions._
        val plusUDF = udf((col1:String,col2:String)=>col1.toInt+col2.toInt)
        df.withColumn("plus",plusUDF(col("order_number"),col("order_dow"))).show()
    
    
      }
    }

     word count

    val file = sc.textFile("/data/The_Man_of_Property.txt")
    //file.take(3)
    //file.flatMap(line=>line.split(" ")).take(10)
    //file.flatMap(line=>line.split(" ")).map((_,1)).take(10)
    //file.flatMap(line=>line.split(" ")).map((_,1)).reduceByKey(_+_).take(10)
    file.flatMap(line=>line.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/data/output/wc")

    结巴分词

    import com.huaban.analysis.jieba.{JiebaSegmenter, SegToken}
    import com.huaban.analysis.jieba.JiebaSegmenter.SegMode
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.functions._
    
    object JiebaKry {
      def main(args: Array[String]): Unit = {
        //    定义结巴分词类的序列化
        val conf = new SparkConf()
          .registerKryoClasses(Array(classOf[JiebaSegmenter]))
          .set("spark.rpc.message.maxSize","800")
        //    建立sparkSession,并传入定义好的Conf
        val spark = SparkSession
          .builder()
          .appName("Jieba UDF")
          .enableHiveSupport()
          .config(conf)
          .getOrCreate()
    
        // 定义结巴分词的方法,传入的是DataFrame,输出也是DataFrame多一列seg(分好词的一列)
        def jieba_seg(df:DataFrame,colname:String): DataFrame ={
    
          val segmenter = new JiebaSegmenter()
          val seg = spark.sparkContext.broadcast(segmenter)
          val jieba_udf = udf{(sentence:String)=>
            val segV = seg.value
            segV.process(sentence.toString,SegMode.INDEX)
              .toArray().map(_.asInstanceOf[SegToken].word)
              .filter(_.length>1).mkString("/")
          }
          df.withColumn("seg",jieba_udf(col(colname)))
        }
    
        val df =spark.sql("select sentence,label from badou.news_noseg limit 300")
        val df_seg = jieba_seg(df,"sentence")
        df_seg.show()
    //    df_seg.write.mode("overwrite").saveAsTable("badou.news_jieba")
      }
    }
    View Code

    简单数据处理(特征提取)

    package offline
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.functions._
    object SimpleFeature {
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession
          .builder()
          .appName("test")
          .master("local[2]")
          .enableHiveSupport()
          .getOrCreate()
        val priors = spark.sql("select * from badou.order_products_prior")
        val orders = spark.sql("select * from badou.orders")
        /** product feature
          * 1. 销售量prod_cnt
          * 2. 商品被再次购买(reordered)量prod_sum_rod
          * 3. 统计reordered比率prod_rod_rate   avg=sum/count  [0,1]
         */
    //    销售量prod_cnt
        val prodCnt = priors.groupBy("product_id").count().withColumnRenamed("count","prod_cnt")
    //   prod_sum_rod
        val prodRodCnt = priors.selectExpr("product_id","cast(reordered as int)")
          .groupBy("product_id")
          .agg(sum("reordered").as("prod_sum_rod"),
            avg("reordered").as("prod_rod_rate"),
            count("product_id").as("prod_cnt")
          )
    
        /** user Features:
          * 1. 每个用户购买订单的平均间隔 days orders
          * 2. 每个用户的总订单数
          * 3. 每个用户购买的product商品去重后的集合数据  user_id , set{prod1,prod2....}
          * 4. 用户总商品数量以及去重后的商品数量
          * 5. 每个用户平均每个订单有多少商品
          */
    //  异常值处理:将days_since_prior_order中的空值进行处理
        val ordersNew = orders.selectExpr("*",
      "if(days_since_prior_order='',0.0,days_since_prior_order) as dspo")
      .drop("days_since_prior_order")
    
    //    1.每个用户购买订单的平均间隔 days orders
        val userGap = ordersNew.selectExpr("user_id","cast(dspo as int) as dspo")
          .groupBy("user_id").avg("dspo").withColumnRenamed("avg(dspo)","u_avg_day_gap")
    //    2. 每个用户的总订单数
        val userOrdCnt = orders.groupBy("user_id").count()
    //    3. 每个用户购买的product商品去重后的集合数据   用户 product
        val opDF = orders.join(priors,"order_id")
        val up = opDF.select("user_id","product_id")
    
        import spark.implicits._
    //    up.rdd.map()从DataFrame转变成rdd的数据,
    // rdd.toDF()从rdd变成DataFrame,这里返回时tuple2,所以在DF中是两列
        val userUniOrdRecs = up.rdd.map{x=>(x(0).toString,x(1).toString)}
          .groupByKey()
          .mapValues(_.toSet.mkString(","))
          .toDF("user_id","prod_records")
    //    4. 用户总商品数量以及去重后的商品数量
        val userAllProd = up.groupBy("user_id").count()
    
        val userUniOrdCnt = up.rdd.map{x=>(x(0).toString,x(1).toString)}
          .groupByKey()
          .mapValues(_.toSet.size)
          .toDF("user_id","prod_dist_cnt")
    
    //    当有groupByKey的处理逻辑两个类似的方法时,看能不能合并
    //    合并“去重后的集合数据”和“去重后的商品数量”统计逻辑
    //    第一种合并提取公因子
        val userRddGroup = up.rdd.map(x=>(x(0).toString,x(1).toString)).groupByKey().cache()
        userRddGroup.unpersist() // python del userRddGroup
    //    val userUniOrdRecs = userRddGroup.mapValues(_.toSet.mkString(",")).toDF("user_id","prod_records")
    //    val userUniOrdCnt = userRddGroup.mapValues(_.toSet.size).toDF("user_id","prod_dist_cnt")
    
        // 第二种同时计算两个
        val userProRcdSize = up.rdd.map{x=>(x(0).toString,x(1).toString)}.groupByKey()
          .mapValues{records=>
            val rs = records.toSet;
            (rs.size,rs.mkString(","))
          }.toDF("user_id","tuple")
          .selectExpr("user_id","tuple._1 as prod_dist_size","tuple._2 as prod_records")
    
        val usergroup = up.groupBy("user_id")
          .agg(size(collect_set("product_id")).as("prod_dist_size"),
            collect_set("product_id").as("prod_records"))
    //    5. 每个用户平均每个订单有多少商品
    //    1)先求每个订单多少商品
        val ordProdCnt = priors.groupBy("order_id").count()
    //    2)求每个用户订单商品数量的平均值
        val userPerOrdProdCnt = orders.join(ordProdCnt,"order_id")
          .groupBy("user_id")
          .agg(avg("count").as("u_avg_ord_prods"))
      }
    
      def feat(priors:DataFrame,orders:DataFrame):DataFrame={
        priors
      }
    
    }
    View Code

     

     参考资料

    八斗大数据

  • 相关阅读:
    CentOS7下Tomcat启动慢的原因及解决方案
    在SpringBoot中使用RabbitMQ
    SpringBoot 中使用Redis分布式锁
    微信小程序个人入门开发
    CentOS 通过 expect 批量远程执行脚本和命令
    (七)Spring Cloud 配置中心config
    (六)Spring Cloud 网关Zuul
    (五)Spring Cloud 熔断器 hystrix
    用Sql Server自动生产html格式的数据字典
    TCP介绍
  • 原文地址:https://www.cnblogs.com/xumaomao/p/12681326.html
Copyright © 2011-2022 走看看