zoukankan      html  css  js  c++  java
  • Spark- 计算每个学科最受欢迎的老师

    日志类型

    测试数据
    http://bigdata.myit.com/zhangsan
    http://bigdata.myit.com/zhangsan
    http://bigdata.myit.com/zhangsan
    http://bigdata.myit.com/zhangsan
    http://bigdata.myit.com/zhangsan
    http://java.myit.com/lisi
    http://java.myit.com/lisi
    http://java.myit.com/lisi

    计算每个学科最受欢迎的老师

    package mypro
    
    import java.net.URL
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkContext, SparkConf}
    
    /**
     * Created by 166 on 2017/9/5.
     */
    object FavTeacher {
      def main(args: Array[String]) {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[2]")//local[*]代表用多个线程跑,2代表用两个线程跑
        val sc = new SparkContext(conf)
    
        //读取数据
        val lines: RDD[String] = sc.textFile(args(0))
        //整理数据
        val subjectAndTeacher:RDD[(String,String)]=lines.map(line=> {
          val url = new URL(line)
          val host = url.getHost
          val subject = host.substring(0, host.indexOf("."))
          val teacher = url.getPath.substring(1)   //去掉路径前面的"/"
          (subject, teacher)
        })
    
        //聚合
        val reduce = subjectAndTeacher.map((_,1)).reduceByKey(_+_)
        //println(reduce.collect().toBuffer)
    
        //按学科分组
        val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduce.groupBy(_._1._1)//迭代器不能排序,需要将它变成List。
    
        //二次排序
        val result: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(2))//用scala的语法,会把数据全部加载到内存后再做排序,数据量大的时候会有性能问题,内存溢出的问题,不建议这样使用,
        val arr: Array[(String, List[((String, String), Int)])] = result.collect()
        println(arr.toBuffer)
        
      }
    }

     另种角度来实现,过滤多次提交

    package com.rz.spark.base
    
    import java.net.URL
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    // 过滤多次提交
    object GroupFavTeacher2 {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    
        val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
        val sc = new SparkContext(conf)
        val topN = args(1).toInt
    
        val subject = Array("bigdata","javaee","php")
        // 读取数据
        val lines: RDD[String] = sc.textFile(args(0))
        // 整理数据  http://bigdata.myit.cn/laozhang
        val subjectAndTeacher= lines.map(line => {
          val url = new URL(line)
          val host = url.getHost
          val subject = host.substring(0, host.indexOf("."))
          val teacher = url.getPath.substring(1) // 去掉前面的/
          ((subject, teacher),1)
        })
    
        // 聚合
        val reduced = subjectAndTeacher.reduceByKey(_+_)
      
      // 缓存到内存,因为多次过滤都是使用同一个rdd,缓存到内存可以提高反复使用的性能
      val cache = reduced.cache()
    for (sb <- subject){ val sorted = cache.filter(_._1._1 == sb).sortBy(_._2,false).take(topN) println(sorted.toBuffer) } sc.stop() } }

      使用自定义分区器将每个学科的数据shuffle到独自的分区,在分区内进行排序取topN

    package com.rz.spark.base
    
    import java.net.URL
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    
    // 自定义分区器
    import scala.collection.mutable
    // 过滤多次提交
    object GroupFavTeacher3 {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    
        val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
        val sc = new SparkContext(conf)
        val topN = args(1).toInt
    
        val subject = Array("bigdata","javaee","php")
        // 读取数据
        val lines: RDD[String] = sc.textFile(args(0))
        // 整理数据  http://bigdata.myit.cn/laozhang
        val subjectAndTeacher= lines.map(line => {
          val url = new URL(line)
          val host = url.getHost
          val subject = host.substring(0, host.indexOf("."))
          val teacher = url.getPath.substring(1) // 去掉前面的/
          ((subject, teacher),1)
        })
    
        // 聚合
        val reduced = subjectAndTeacher.reduceByKey(_+_)
    
        // 计算我们有多少学科
        val sujects: Array[String] = reduced.map(_._1._1).distinct().collect()
    
        // 自定义一个分区器,并且按照指定的分区器进行分区
        val subjectPartitoner = new SubjectPartitoner(sujects)
    
        // partitionBy按照指定的分区规则进行分区
        val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(subjectPartitoner)
    
        // 如果一次拿出一个分区(可以操作一个分区的数据)
        val sorted = partitioned.mapPartitions(it => {
          // 将迭代器转成List,然后排序,再转成迭代器返回
          it.toList.sortBy(_._2).reverse.take(topN).toIterator // 按数值排序
        })
        val result = sorted.collect()
    
        println(result.toBuffer)
        sc.stop()
      }
    
      // 自定义分区器
      class SubjectPartitoner(sbs: Array[String]) extends Partitioner{
        // 相当于主构造器(new 的时候会执行一次)
        // 用于存放规则的一个map
        val rules = new mutable.HashMap[String, Int]()
        var i = 0
        for (sb <- sbs){
          rules.put(sb,i)
          i += 1
        }
    
        // 返回分区的数量(下一个RDD有多少分区)
        override def numPartitions: Int = sbs.length
    
        // 根据传入的key计算分区标号
        // Key是一个无组(String, String)
        override def getPartition(key: Any): Int ={
          // 获取学科名称
          val subject = key.asInstanceOf[(String, String)]._1
          // 根据规则计算分区编号
          rules(subject)
        }
      }
    
    }

    上面的方式会有多次shuffle,reduceByKey聚合数据的时候shuffle一次,使用自定义分区器重新对数据进行分析又shuffle了一次。我们可以尽可能的减少shuffle的过程,我们可以在reduceByKey的时候手动使用自定分区器进行分区,reduceByKey默认使用的是。HashPartitioner。

    package com.rz.spark.base
    
    import java.net.URL
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    
    // 自定义分区器且减少shuffle
    import scala.collection.mutable
    
    // 过滤多次提交
    object GroupFavTeacher4 {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    
        val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
        val sc = new SparkContext(conf)
        val topN = args(1).toInt
    
        val subject = Array("bigdata","javaee","php")
        // 读取数据
        val lines: RDD[String] = sc.textFile(args(0))
        // 整理数据  http://bigdata.myit.cn/laozhang
        val subjectAndTeacher= lines.map(line => {
          val url = new URL(line)
          val host = url.getHost
          val subject = host.substring(0, host.indexOf("."))
          val teacher = url.getPath.substring(1) // 去掉前面的/
          ((subject, teacher),1)
        })
    
        // 计算我们有多少学科
        val sujects: Array[String] = subjectAndTeacher.map(_._1._1).distinct().collect()
    
        // 自定义一个分区器,并且按照指定的分区器进行分区
        val subjectPartitoner = new SubjectPartitoner2(sujects)
    
        // 聚合,聚合是按照指定的分区器进行分区
        // 该RDD一个分区内仅有一个学科的数据
        val reduced: RDD[((String, String), Int)] = subjectAndTeacher.reduceByKey(subjectPartitoner,_+_)
    
    
        // 如果一次拿出一个分区(可以操作一个分区的数据)
        val sorted = reduced.mapPartitions(it => {
          // 将迭代器转成List,然后排序,再转成迭代器返回
          it.toList.sortBy(_._2).reverse.take(topN).toIterator // 按数值排序
        })
    
        // 收集数据
        val result = sorted.collect()
    
        println(result.toBuffer)
        sc.stop()
      }
    
      // 自定义分区器
      class SubjectPartitoner2(sbs: Array[String]) extends Partitioner{
        // 相当于主构造器(new 的时候会执行一次)
        // 用于存放规则的一个map
        val rules = new mutable.HashMap[String, Int]()
        var i = 0
        for (sb <- sbs){
          rules.put(sb,i)
          i += 1
        }
    
        // 返回分区的数量(下一个RDD有多少分区)
        override def numPartitions: Int = sbs.length
    
        // 根据传入的key计算分区标号
        // Key是一个无组(String, String)
        override def getPartition(key: Any): Int ={
          // 获取学科名称
          val subject = key.asInstanceOf[(String, String)]._1
          // 根据规则计算分区编号
          rules(subject)
        }
      }
    
    }
  • 相关阅读:
    [转]asp.net页面缓存技术
    UL和LI在div中的高度的IE6下兼容性
    jquery制作的横向图片滚动带横向滚动条TackerScroll
    电脑可以上网,但是qq登陆不上去?
    Introduction to discrete event system学习笔记4.6
    Introduction to Discrete event system学习笔记4.9
    Introduction to discrete event systemsstudy 4.5
    Symbolic synthesis of obserability requirements for diagnosability B.Bittner,M.Bozzano,A.Cimatti,and X.Olive笔记4.16
    Introduction to discrete event system学习笔记4.8pm
    Introduction to discrete event system学习笔记 4.8
  • 原文地址:https://www.cnblogs.com/RzCong/p/7822998.html
Copyright © 2011-2022 走看看