zoukankan      html  css  js  c++  java
  • Spark- 计算每个学科最受欢迎的老师

    日志类型

    测试数据
    http://bigdata.myit.com/zhangsan
    http://bigdata.myit.com/zhangsan
    http://bigdata.myit.com/zhangsan
    http://bigdata.myit.com/zhangsan
    http://bigdata.myit.com/zhangsan
    http://java.myit.com/lisi
    http://java.myit.com/lisi
    http://java.myit.com/lisi

    计算每个学科最受欢迎的老师

    package mypro
    
    import java.net.URL
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkContext, SparkConf}
    
    /**
     * Created by 166 on 2017/9/5.
     */
    object FavTeacher {
      def main(args: Array[String]) {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[2]")//local[*]代表用多个线程跑,2代表用两个线程跑
        val sc = new SparkContext(conf)
    
        //读取数据
        val lines: RDD[String] = sc.textFile(args(0))
        //整理数据
        val subjectAndTeacher:RDD[(String,String)]=lines.map(line=> {
          val url = new URL(line)
          val host = url.getHost
          val subject = host.substring(0, host.indexOf("."))
          val teacher = url.getPath.substring(1)   //去掉路径前面的"/"
          (subject, teacher)
        })
    
        //聚合
        val reduce = subjectAndTeacher.map((_,1)).reduceByKey(_+_)
        //println(reduce.collect().toBuffer)
    
        //按学科分组
        val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduce.groupBy(_._1._1)//迭代器不能排序,需要将它变成List。
    
        //二次排序
        val result: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(2))//用scala的语法,会把数据全部加载到内存后再做排序,数据量大的时候会有性能问题,内存溢出的问题,不建议这样使用,
        val arr: Array[(String, List[((String, String), Int)])] = result.collect()
        println(arr.toBuffer)
        
      }
    }

     另种角度来实现,过滤多次提交

    package com.rz.spark.base
    
    import java.net.URL
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    // 过滤多次提交
    object GroupFavTeacher2 {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    
        val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
        val sc = new SparkContext(conf)
        val topN = args(1).toInt
    
        val subject = Array("bigdata","javaee","php")
        // 读取数据
        val lines: RDD[String] = sc.textFile(args(0))
        // 整理数据  http://bigdata.myit.cn/laozhang
        val subjectAndTeacher= lines.map(line => {
          val url = new URL(line)
          val host = url.getHost
          val subject = host.substring(0, host.indexOf("."))
          val teacher = url.getPath.substring(1) // 去掉前面的/
          ((subject, teacher),1)
        })
    
        // 聚合
        val reduced = subjectAndTeacher.reduceByKey(_+_)
      
      // 缓存到内存,因为多次过滤都是使用同一个rdd,缓存到内存可以提高反复使用的性能
      val cache = reduced.cache()
    for (sb <- subject){ val sorted = cache.filter(_._1._1 == sb).sortBy(_._2,false).take(topN) println(sorted.toBuffer) } sc.stop() } }

      使用自定义分区器将每个学科的数据shuffle到独自的分区,在分区内进行排序取topN

    package com.rz.spark.base
    
    import java.net.URL
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    
    // 自定义分区器
    import scala.collection.mutable
    // 过滤多次提交
    object GroupFavTeacher3 {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    
        val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
        val sc = new SparkContext(conf)
        val topN = args(1).toInt
    
        val subject = Array("bigdata","javaee","php")
        // 读取数据
        val lines: RDD[String] = sc.textFile(args(0))
        // 整理数据  http://bigdata.myit.cn/laozhang
        val subjectAndTeacher= lines.map(line => {
          val url = new URL(line)
          val host = url.getHost
          val subject = host.substring(0, host.indexOf("."))
          val teacher = url.getPath.substring(1) // 去掉前面的/
          ((subject, teacher),1)
        })
    
        // 聚合
        val reduced = subjectAndTeacher.reduceByKey(_+_)
    
        // 计算我们有多少学科
        val sujects: Array[String] = reduced.map(_._1._1).distinct().collect()
    
        // 自定义一个分区器,并且按照指定的分区器进行分区
        val subjectPartitoner = new SubjectPartitoner(sujects)
    
        // partitionBy按照指定的分区规则进行分区
        val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(subjectPartitoner)
    
        // 如果一次拿出一个分区(可以操作一个分区的数据)
        val sorted = partitioned.mapPartitions(it => {
          // 将迭代器转成List,然后排序,再转成迭代器返回
          it.toList.sortBy(_._2).reverse.take(topN).toIterator // 按数值排序
        })
        val result = sorted.collect()
    
        println(result.toBuffer)
        sc.stop()
      }
    
      // 自定义分区器
      class SubjectPartitoner(sbs: Array[String]) extends Partitioner{
        // 相当于主构造器(new 的时候会执行一次)
        // 用于存放规则的一个map
        val rules = new mutable.HashMap[String, Int]()
        var i = 0
        for (sb <- sbs){
          rules.put(sb,i)
          i += 1
        }
    
        // 返回分区的数量(下一个RDD有多少分区)
        override def numPartitions: Int = sbs.length
    
        // 根据传入的key计算分区标号
        // Key是一个无组(String, String)
        override def getPartition(key: Any): Int ={
          // 获取学科名称
          val subject = key.asInstanceOf[(String, String)]._1
          // 根据规则计算分区编号
          rules(subject)
        }
      }
    
    }

    上面的方式会有多次shuffle,reduceByKey聚合数据的时候shuffle一次,使用自定义分区器重新对数据进行分析又shuffle了一次。我们可以尽可能的减少shuffle的过程,我们可以在reduceByKey的时候手动使用自定分区器进行分区,reduceByKey默认使用的是。HashPartitioner。

    package com.rz.spark.base
    
    import java.net.URL
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    
    // 自定义分区器且减少shuffle
    import scala.collection.mutable
    
    // 过滤多次提交
    object GroupFavTeacher4 {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    
        val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
        val sc = new SparkContext(conf)
        val topN = args(1).toInt
    
        val subject = Array("bigdata","javaee","php")
        // 读取数据
        val lines: RDD[String] = sc.textFile(args(0))
        // 整理数据  http://bigdata.myit.cn/laozhang
        val subjectAndTeacher= lines.map(line => {
          val url = new URL(line)
          val host = url.getHost
          val subject = host.substring(0, host.indexOf("."))
          val teacher = url.getPath.substring(1) // 去掉前面的/
          ((subject, teacher),1)
        })
    
        // 计算我们有多少学科
        val sujects: Array[String] = subjectAndTeacher.map(_._1._1).distinct().collect()
    
        // 自定义一个分区器,并且按照指定的分区器进行分区
        val subjectPartitoner = new SubjectPartitoner2(sujects)
    
        // 聚合,聚合是按照指定的分区器进行分区
        // 该RDD一个分区内仅有一个学科的数据
        val reduced: RDD[((String, String), Int)] = subjectAndTeacher.reduceByKey(subjectPartitoner,_+_)
    
    
        // 如果一次拿出一个分区(可以操作一个分区的数据)
        val sorted = reduced.mapPartitions(it => {
          // 将迭代器转成List,然后排序,再转成迭代器返回
          it.toList.sortBy(_._2).reverse.take(topN).toIterator // 按数值排序
        })
    
        // 收集数据
        val result = sorted.collect()
    
        println(result.toBuffer)
        sc.stop()
      }
    
      // 自定义分区器
      class SubjectPartitoner2(sbs: Array[String]) extends Partitioner{
        // 相当于主构造器(new 的时候会执行一次)
        // 用于存放规则的一个map
        val rules = new mutable.HashMap[String, Int]()
        var i = 0
        for (sb <- sbs){
          rules.put(sb,i)
          i += 1
        }
    
        // 返回分区的数量(下一个RDD有多少分区)
        override def numPartitions: Int = sbs.length
    
        // 根据传入的key计算分区标号
        // Key是一个无组(String, String)
        override def getPartition(key: Any): Int ={
          // 获取学科名称
          val subject = key.asInstanceOf[(String, String)]._1
          // 根据规则计算分区编号
          rules(subject)
        }
      }
    
    }
  • 相关阅读:
    DevExpress VCL for Delphi 各版本收集下载
    Delphi XE 5,Rad Studio XE 5 官方下载(附破解),更新 Update 1,Help Update 1
    PostMessage 向Windows窗口发送Alt组合键
    Windows XP UDF 2.5 补丁,播放蓝光ISO光盘必备
    60个开发者不容错过的免费资源库
    [转]游戏多开的原理
    Delphi加载驱动
    窗口截图
    Drectx 3D窗口后台截图
    利用进程ID获取主线程ID
  • 原文地址:https://www.cnblogs.com/RzCong/p/7822998.html
Copyright © 2011-2022 走看看