zoukankan      html  css  js  c++  java
  • Spark分区实例(teacher)

    package URL1
    
    import org.apache.spark.Partitioner
    
    import scala.collection.mutable
    
    class MyPartitioner(val num:Array[String]) extends Partitioner{
    val parMap=new mutable.HashMap[String,Int]()
      var count=0
      for(i<-num){
        parMap.put(i,count)
        count += 1
      }
    
      //分区数目
      override def numPartitions: Int = num.length
    
      //分区的规则
      //def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1
      override def getPartition(key: Any): Int = {
        // 将对象转换为指定类型;
        val tople=key.asInstanceOf[(String,String)]
        val subject=tople._1
        this.parMap(subject)
    
    }
    }
    package URL1
    
    class Orders extends Ordering[((String,String),Int)]{
      override def compare(x: ((String, String), Int), y: ((String, String), Int)): Int = {
        x._2-y._2
      }
    }
    package URL1
    
    import java.net.URL
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable
    
    object teacher {
      def main(args: Array[String]): Unit = {
        val cof=new SparkConf()
          .setAppName(this.getClass.getSimpleName)
          .setMaster("local[*]")
        val sc=new SparkContext(cof)
    
        val lines=sc.textFile("E:\teacher.log")
    
        val result1:RDD[((String,String),Int)]=lines.map( tp=>{
          val teacher=tp.split("/").last
          val host=new URL(tp).getHost
          val subject=host.substring(0,host.indexOf("."))
          ((subject,teacher),1)
        })
    
        //科目
        val subject=result1.map(tp=>tp._1._1).distinct().collect()
    
        //分区
        val partitions=new MyPartitioner(subject)
    
        //业务逻辑
        //1.全局TOPN
       // val result2=result1.reduceByKey(partitions,_+_).sortBy(-_._2).take(2).foreach(println)
    
        //1.全局TOPN
        val result3=result1.foreachPartition(tp=>{
          val treeSet=new mutable.TreeSet[((String,String),Int)]()(new Orders)
    
          tp.foreach(tp=>{
            treeSet.add(tp)
            if(treeSet.size>2){
              treeSet.dropRight(1)
            }
          })
    
          treeSet.foreach(println)
        })
    
    
    sc.stop()
      }
    }

    teacher.log

    复制代码
    http://bigdata.baidu.cn/zhangsan
    http://bigdata.baidu.cn/zhangsan
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/wangwu
    http://bigdata.baidu.cn/wangwu
    http://javaee.baidu.cn/xiaoxu
    http://javaee.baidu.cn/xiaoxu
    http://javaee.baidu.cn/laoyang
    http://javaee.baidu.cn/laoyang
    http://javaee.baidu.cn/laoyang
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/wangwu
    http://bigdata.baidu.cn/wangwu
    http://javaee.baidu.cn/xiaoxu
    http://javaee.baidu.cn/xiaoxu
    http://javaee.baidu.cn/laoyang
    http://javaee.baidu.cn/laoyang
    http://javaee.baidu.cn/laoyang
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/lisi
    http://bigdata.baidu.cn/wangwu
    http://bigdata.baidu.cn/wangwu
    http://javaee.baidu.cn/xiaoxu
    http://javaee.baidu.cn/xiaoxu
    http://javaee.baidu.cn/laoyang
    http://javaee.baidu.cn/laoyang
    http://javaee.baidu.cn/laoyang
    http://php.baidu.cn/laoli
    http://php.baidu.cn/laoliu
    http://php.baidu.cn/laoli
    http://php.baidu.cn/laoli
    复制代码
  • 相关阅读:
    Gin 使用 websocket
    7天用Go从零实现Web框架Gee教程
    docker-compose 搭建 Redis Sentinel 测试环境
    关闭禁用 Redis 危险命令
    Redis Cluster 设置密码
    使用 twine 上传自己的 python 包到 pypi
    Redis 5.0.7 讲解,单机、集群模式搭建
    Redis 单机模式,主从模式,哨兵模式(sentinel),集群模式(cluster),第三方模式优缺点分析
    django 重写 mysql 连接库实现连接池
    编译安装httpd-2.4
  • 原文地址:https://www.cnblogs.com/wangshuang123/p/11084360.html
Copyright © 2011-2022 走看看