zoukankan      html  css  js  c++  java
  • Spark分区实例(teacher)

    package URL1
    
    import org.apache.spark.Partitioner
    
    import scala.collection.mutable
    
    class MyPartitioner(val num:Array[String]) extends Partitioner{
    val parMap=new mutable.HashMap[String,Int]()
      var count=0
      for(i<-num){
        parMap.put(i,count)
        count += 1
      }
    
      //分区数目
      override def numPartitions: Int = num.length
    
      //分区的规则
      //def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1
      override def getPartition(key: Any): Int = {
        // 将对象转换为指定类型;
        val tople=key.asInstanceOf[(String,String)]
        val subject=tople._1
        this.parMap(subject)
    
    }
    }
    package URL1
    
    class Orders extends Ordering[((String,String),Int)]{
      override def compare(x: ((String, String), Int), y: ((String, String), Int)): Int = {
        x._2-y._2
      }
    }
    package URL1
    
    import java.net.URL
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable
    
    object teacher {
      def main(args: Array[String]): Unit = {
        val cof=new SparkConf()
          .setAppName(this.getClass.getSimpleName)
          .setMaster("local[*]")
        val sc=new SparkContext(cof)
    
        val lines=sc.textFile("E:\teacher.log")
    
        val result1:RDD[((String,String),Int)]=lines.map( tp=>{
          val teacher=tp.split("/").last
          val host=new URL(tp).getHost
          val subject=host.substring(0,host.indexOf("."))
          ((subject,teacher),1)
        })
    
        //科目
        val subject=result1.map(tp=>tp._1._1).distinct().collect()
    
        //分区
        val partitions=new MyPartitioner(subject)
    
        //业务逻辑
        //1.全局TOPN
       // val result2=result1.reduceByKey(partitions,_+_).sortBy(-_._2).take(2).foreach(println)
    
        //1.全局TOPN
        val result3=result1.foreachPartition(tp=>{
          val treeSet=new mutable.TreeSet[((String,String),Int)]()(new Orders)
    
          tp.foreach(tp=>{
            treeSet.add(tp)
            if(treeSet.size>2){
              treeSet.dropRight(1)
            }
          })
    
          treeSet.foreach(println)
        })
    
    
    sc.stop()
      }
    }

    teacher.log

    复制代码
    http://bigdata.baidu.cn/zhangsan
    http://bigdata.baidu.cn/zhangsan
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/wangwu
    http://bigdata.baidu.cn/wangwu
    http://javaee.baidu.cn/xiaoxu
    http://javaee.baidu.cn/xiaoxu
    http://javaee.baidu.cn/laoyang
    http://javaee.baidu.cn/laoyang
    http://javaee.baidu.cn/laoyang
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/wangwu
    http://bigdata.baidu.cn/wangwu
    http://javaee.baidu.cn/xiaoxu
    http://javaee.baidu.cn/xiaoxu
    http://javaee.baidu.cn/laoyang
    http://javaee.baidu.cn/laoyang
    http://javaee.baidu.cn/laoyang
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/wangwu
    http://bigdata.baidu.cn/wangwu
    http://javaee.baidu.cn/xiaoxu
    http://javaee.baidu.cn/xiaoxu
    http://javaee.baidu.cn/laoyang
    http://javaee.baidu.cn/laoyang
    http://javaee.baidu.cn/laoyang
    http://php.baidu.cn/laoli
    http://php.baidu.cn/laoliu
    http://php.baidu.cn/laoli
    http://php.baidu.cn/laoli
    复制代码
  • 相关阅读:
    八、drop和alter
    undefined reference to ****
    cgdb的认识
    ping: unknown host www.baidu.com
    ubuntu mysql汉字写入只写入了一个字符
    gdb map.insert方法运行异常:program received signal segmentation fault
    ubuntu环境下c++ 模板特化的编写
    putty fatal error software caused connection
    ubuntu共享文件夹不能被访问,其他主机ping不通该服务器
    ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: YES) 问题
  • 原文地址:https://www.cnblogs.com/wangshuang123/p/11084360.html
Copyright © 2011-2022 走看看