package URL1 import org.apache.spark.Partitioner import scala.collection.mutable class MyPartitioner(val num:Array[String]) extends Partitioner{ val parMap=new mutable.HashMap[String,Int]() var count=0 for(i<-num){ parMap.put(i,count) count += 1 } //分区数目 override def numPartitions: Int = num.length //分区的规则 //def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1 override def getPartition(key: Any): Int = { // 将对象转换为指定类型; val tople=key.asInstanceOf[(String,String)] val subject=tople._1 this.parMap(subject) } }
package URL1 class Orders extends Ordering[((String,String),Int)]{ override def compare(x: ((String, String), Int), y: ((String, String), Int)): Int = { x._2-y._2 } }
package URL1 import java.net.URL import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable object teacher { def main(args: Array[String]): Unit = { val cof=new SparkConf() .setAppName(this.getClass.getSimpleName) .setMaster("local[*]") val sc=new SparkContext(cof) val lines=sc.textFile("E:\teacher.log") val result1:RDD[((String,String),Int)]=lines.map( tp=>{ val teacher=tp.split("/").last val host=new URL(tp).getHost val subject=host.substring(0,host.indexOf(".")) ((subject,teacher),1) }) //科目 val subject=result1.map(tp=>tp._1._1).distinct().collect() //分区 val partitions=new MyPartitioner(subject) //业务逻辑 //1.全局TOPN // val result2=result1.reduceByKey(partitions,_+_).sortBy(-_._2).take(2).foreach(println) //1.全局TOPN val result3=result1.foreachPartition(tp=>{ val treeSet=new mutable.TreeSet[((String,String),Int)]()(new Orders) tp.foreach(tp=>{ treeSet.add(tp) if(treeSet.size>2){ treeSet.dropRight(1) } }) treeSet.foreach(println) }) sc.stop() } }
teacher.log
http://bigdata.baidu.cn/zhangsan
http://bigdata.baidu.cn/zhangsan
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/wangwu
http://bigdata.baidu.cn/wangwu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/wangwu
http://bigdata.baidu.cn/wangwu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/wangwu
http://bigdata.baidu.cn/wangwu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://php.baidu.cn/laoli
http://php.baidu.cn/laoliu
http://php.baidu.cn/laoli
http://php.baidu.cn/laoli