zoukankan      html  css  js  c++  java
  • spark-------------案例实操-------------Top10 热门品类

    引言

    品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。如

    鞋 点击数 下单数 支付数

    衣服 点击数 下单数 支付数

    电脑 点击数 下单数 支付数

    本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数

    正文

    实现方案一

    分别统计每个品类点击的次数,下单的次数和支付的次数:(品类,点击总数)(品类,下单总数)(品类,支付总数)

    package com.xiao.spark.core.rdd.req
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    
    object Spark01_Req01_TopTen {
    
      def main(args: Array[String]): Unit = {
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("part")
          val sc = new SparkContext(sparkConf)
          val rdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
    
          // 统计点击数量
          val clickRDD: RDD[String] = rdd.filter {
            line => {
              val datas: Array[String] = line.split("_")
              // 点击的品类 ID为-1,表示数据不是点击数据
              datas(6) != "-1"
            }
          }
          val clickCountRDD: RDD[(String, Int)] = clickRDD.map {
            line => {
              val datas: Array[String] = line.split("_")
              // (id,1)
              (datas(6), 1)
            }
          }.reduceByKey(_ + _)
          // 统计下单数量
          val orderRDD: RDD[String] = rdd.filter(
            line => {
              val datas: Array[String] = line.split("_")
              // 本次不是下单行为,则数据采用 null 表示
              datas(8) != "null"
            }
          )
          // 扁平化映射: 因为一次下单可能有多个商品
          // (id1,id2,id3) => (id1,1),(id2,1),(id3,1)
          val orderCountRDD: RDD[(String, Int)] = orderRDD.flatMap(
            line => {
              val datas: Array[String] = line.split("_")
              val cids: Array[String] = datas(8).split(",")
              cids.map((_, 1))
            }
          ).reduceByKey(_ + _)
    
          // 统计支付数量
          val payRDD: RDD[String] = rdd.filter(
            line => {
              val datas: Array[String] = line.split("_")
              // 本次不是付行为,则数据采用 null 表示
              datas(10) != "null"
            }
          )
          // 扁平化映射: 因为一次下单可能有多个商品
          // (id1,id2,id3) => (id1,1),(id2,1),(id3,1)
          val payCountRDD: RDD[(String, Int)] = payRDD.flatMap(
            line => {
              val datas: Array[String] = line.split("_")
              val cids: Array[String] = datas(10).split(",")
              cids.map((_, 1))
            }
          ).reduceByKey(_+_)
          // 5. 将品类进行排序,并且取前10名
          //    点击数量排序,下单数量排序,支付数量排序
          //    元组排序:先比较第一个,再比较第二个,再比较第三个,依此类推
          //    ( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )
          //
          val value: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = clickCountRDD.cogroup(orderCountRDD, payCountRDD)
          val analysisRDD: RDD[(String, (Int, Int, Int))] = value.mapValues {
            case (clickIter, orderIter, payIter) => {
              var clickCnt = 0
              val cIter = clickIter.iterator
              if (cIter.hasNext) {
                clickCnt = cIter.next()
              }
              var orderCnt = 0
              val oIter = orderIter.iterator
              if (oIter.hasNext) {
                orderCnt = oIter.next()
              }
              var payCnt = 0
              val pIter = payIter.iterator
              if (pIter.hasNext) {
                payCnt = pIter.next()
              }
              (clickCnt, orderCnt, payCnt)
            }
          }
          // 排序
          val resultRDD: Array[(String, (Int, Int, Int))] = analysisRDD.sortBy(_._2, false).take(10)
          resultRDD.foreach(println)
        sc.stop()
      }
    
    
    }
    

    运行结果:

    实现方案二

    一次性统计每个品类点击的次数,下单的次数和支付的次数:(品类,(点击总数,下单总数,支付总数))

    package com.xiao.spark.core.rdd.req
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark02_Req01_TopTen {
    
      def main(args: Array[String]): Unit = {
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("part")
          val sc = new SparkContext(sparkConf)
          val rdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
          rdd.cache()
          // 统计点击数量
          val clickRDD: RDD[String] = rdd.filter {
            line => {
              val datas: Array[String] = line.split("_")
              // 点击的品类 ID为-1,表示数据不是点击数据
              datas(6) != "-1"
            }
          }
          val clickCountRDD: RDD[(String, (Int, Int, Int))] = clickRDD.map {
            line => {
              val datas: Array[String] = line.split("_")
              // (id,1)
              (datas(6), 1)
            }
          }.reduceByKey(_ + _).map {
            case (id, count) => (id, (count, 0, 0))
          }
          // 统计下单数量
          val orderRDD: RDD[String] = rdd.filter(
            line => {
              val datas: Array[String] = line.split("_")
              // 本次不是下单行为,则数据采用 null 表示
              datas(8) != "null"
            }
          )
          // 扁平化映射: 因为一次下单可能有多个商品
          // (id1,id2,id3) => (id1,1),(id2,1),(id3,1)
          val orderCountRDD: RDD[(String, (Int,Int,Int))] = orderRDD.flatMap(
            line => {
              val datas: Array[String] = line.split("_")
              val cids: Array[String] = datas(8).split(",")
              cids.map((_, 1))
            }
          ).reduceByKey(_ + _).map {
            case (id, count) => (id, (0, count, 0))
          }
    
          // 统计支付数量
          val payRDD: RDD[String] = rdd.filter(
            line => {
              val datas: Array[String] = line.split("_")
              // 本次不是付行为,则数据采用 null 表示
              datas(10) != "null"
            }
          )
          // 扁平化映射: 因为一次下单可能有多个商品
          // (id1,id2,id3) => (id1,1),(id2,1),(id3,1)
          val payCountRDD: RDD[(String, (Int,Int,Int))] = payRDD.flatMap(
            line => {
              val datas: Array[String] = line.split("_")
              val cids: Array[String] = datas(10).split(",")
              cids.map((_, 1))
            }
          ).reduceByKey(_+_).map {
            case (id, count) => (id, (0, 0,count))
          }
          // 5. 聚合
          //    (品类ID, 点击数量) ==>( 品类ID, ( 点击数量, 0, 0 ) )
          //    (品类ID, 下单数量) ==>( 品类ID, ( 0, 下单数量, 0 ) )
          //    (品类ID, 支付数量) ==>( 品类ID, ( 0, 0, 支付数量 ) )
          val value: RDD[(String, (Int, Int, Int))] = clickCountRDD.union(orderCountRDD).union(payCountRDD)
          val resultRDD: RDD[(String, (Int, Int, Int))] = value.reduceByKey(
            (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
          )
          resultRDD.sortBy(_._2,false).take(10).foreach(println)
          sc.stop()
      }
    
    
    }
    

    运行结果:

    实现方案三:

    使用累加器的方式聚合数据

    package com.xiao.spark.core.rdd.req
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.util.AccumulatorV2
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable
    
    object Spark04_Req01_TopTen {
    
      def main(args: Array[String]): Unit = {
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("part")
          val sc = new SparkContext(sparkConf)
          val rdd: RDD[String] = sc.textFile("datas/user_visit_action.txt")
          val accumulator = new myAccumulator()
          sc.register(accumulator)
          rdd.foreach(
            line => {
              val datas: Array[String] = line.split("_")
              if (datas(6) != "-1"){
                accumulator.add((datas(6),"click"))
              }else if(datas(8) != "null"){
                val ids: Array[String] = datas(8).split(",")
                ids.foreach(
                  id => {
                    accumulator.add((id,"order"))
                  }
                )
              }else if(datas(10) != "null"){
                val ids: Array[String] = datas(10).split(",")
                ids.foreach(
                  id => {
                    accumulator.add((id,"pay"))
                  }
                )
              }
            }
          )
          // 只取hotCategor
          val categories: mutable.Iterable[HotCategory] = accumulator.value.map(_._2)
        // 自定义排序规则
        categories.toList.sortWith(
            (left,right) => {
              if (left.clickCnt > right.clickCnt){
                true
              } else if(left.clickCnt == right.clickCnt){
                if (left.orderCnt > right.orderCnt){
                  true
                }else if(left.orderCnt == right.orderCnt){
                  left.payCnt > right.payCnt
                }else{
                  false
                }
              }else{
                false
              }
            }
          ).take(10).foreach(println)
          sc.stop()
      }
    
      case class HotCategory(cid : String, var clickCnt : Int, var orderCnt : Int ,var payCnt : Int)
    
      // 自定义累加器
      class myAccumulator extends AccumulatorV2[(String,String),mutable.Map[String,HotCategory]] {
        var hcMap =  mutable.Map[String, HotCategory]()
        override def isZero: Boolean = {
            hcMap.isEmpty
        }
    
        override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = {
            new myAccumulator()
        }
    
        override def reset(): Unit = {
            hcMap.clear()
        }
    
        override def add(v: (String, String)): Unit = {
          val category: HotCategory = hcMap.getOrElse(v._1, new HotCategory(v._1, 0, 0, 0))
          if (v._2 == "click"){
            category.clickCnt += 1
          }else if (v._2 == "order"){
            category.orderCnt += 1
          }else if (v._2 == "pay"){
            category.payCnt += 1
          }
          hcMap.update(v._1,category)
        }
    
    
    
        override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {
          var map1 = this.hcMap
          var map2 = other.value
          map2.foreach{
            case (cid,hc) =>{
              val category: HotCategory = map1.getOrElse(cid, new HotCategory(cid, 0, 0, 0))
              category.clickCnt += hc.clickCnt
              category.orderCnt += hc.orderCnt
              category.payCnt += hc.payCnt
              map1.update(cid,category)
            }
          }
    
    
        }
    
        override def value: mutable.Map[String, HotCategory] = {
            hcMap
        }
      }
    
    
    }
    

    运行结果:

  • 相关阅读:
    Java之修改文件内容:字符串逐行替换
    Java之"instanceof"和"isInstance"代码举例
    spring之跨模块引用配置文件
    Java之相对路径找不到文件问题解决方法
    html 页面模块的常用命名
    移动端资源集锦
    手机web不同屏幕字体大小高度自适应
    meta标签整理
    css文字超出自动显示省略号
    display:inline、block、inline-block的区别
  • 原文地址:https://www.cnblogs.com/yangxiao-/p/14358684.html
Copyright © 2011-2022 走看看