zoukankan      html  css  js  c++  java
  • 大数据入门第二十二天——spark(三)自定义分区、排序与查找

    一、自定义分区

      1.概述

        默认的是Hash的分区策略,这点和Hadoop是类似的,具体的分区介绍,参见:https://blog.csdn.net/high2011/article/details/68491115

      2.实现

    package cn.itcast.spark.day3
    
    import java.net.URL
    import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
    import scala.collection.mutable
    
    /**
      * Created by root on 2016/5/18.
      */
    object UrlCountPartition {
    
      def main(args: Array[String]) {
    
        val conf = new SparkConf().setAppName("UrlCountPartition").setMaster("local[2]")
        val sc = new SparkContext(conf)
    
        //rdd1将数据切分,元组中放的是(URL, 1)
        val rdd1 = sc.textFile("c://itcast.log").map(line => {
          val f = line.split("	")
          (f(1), 1)
        })
        val rdd2 = rdd1.reduceByKey(_ + _)
    
        val rdd3 = rdd2.map(t => {
          val url = t._1
          val host = new URL(url).getHost
          (host, (url, t._2))
        })
        val ints = rdd3.map(_._1).distinct().collect()
        val hostParitioner = new HostParitioner(ints)
    //    val rdd4 = rdd3.partitionBy(new HashPartitioner(ints.length))
    
        val rdd4 = rdd3.partitionBy(hostParitioner).mapPartitions(it => {
          it.toList.sortBy(_._2._2).reverse.take(2).iterator
        })
        rdd4.saveAsTextFile("c://out4")
        //println(rdd4.collect().toBuffer)
        sc.stop()
      }
    }
    
    /**
      * 决定了数据到哪个分区里面
      * @param ins
      */
    class HostParitioner(ins: Array[String]) extends Partitioner {
    
      val parMap = new mutable.HashMap[String, Int]()
      var count = 0
      for(i <- ins){
        parMap += (i -> count)
        count += 1
      }
    
      override def numPartitions: Int = ins.length
    
      override def getPartition(key: Any): Int = {
        parMap.getOrElse(key.toString, 0)
      }
    }

       // 与Hadoop相通,不再赘述

    二、自定义排序

      基本上就是结合之前的隐式转换了:(这里使用样例类可以不用new就能得到实例,另外也可以用于模式匹配)

    package cn.itcast.spark.day3
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    object OrderContext {
      implicit val girlOrdering  = new Ordering[Girl] {
        override def compare(x: Girl, y: Girl): Int = {
          if(x.faceValue > y.faceValue) 1
          else if (x.faceValue == y.faceValue) {
            if(x.age > y.age) -1 else 1
          } else -1
        }
      }
    }
    
    
    /**
      * Created by root on 2016/5/18.
      */
    //sort =>规则 先按faveValue,比较年龄
    //name,faveValue,age
    
    
    object CustomSort {
    
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("CustomSort").setMaster("local[2]")
        val sc = new SparkContext(conf)
        val rdd1 = sc.parallelize(List(("yuihatano", 90, 28, 1), ("angelababy", 90, 27, 2),("JuJingYi", 95, 22, 3)))
        import OrderContext._
        val rdd2 = rdd1.sortBy(x => Girl(x._2, x._3), false)
        println(rdd2.collect().toBuffer)
        sc.stop()
      }
    
    }
    
    /**
      * 第一种方式
      * @param faceValue
      * @param age
    
    case class Girl(val faceValue: Int, val age: Int) extends Ordered[Girl] with Serializable {
      override def compare(that: Girl): Int = {
        if(this.faceValue == that.faceValue) {
          that.age - this.age
        } else {
          this.faceValue -that.faceValue
        }
      }
    }
      */
    
    /**
      * 第二种,通过隐式转换完成排序
      * @param faceValue
      * @param age
      */
    case class Girl(faceValue: Int, age: Int) extends Serializable

       // 复习隐式转换,基本也无新内容

    三、IP查找小练习

      参考:https://www.cnblogs.com/wnbahmbb/p/6250099.html

  • 相关阅读:
    [NOI2004] 郁闷的出纳员
    [洛谷P4556] 雨天的尾巴
    【转】进程、线程、 GIL全局解释器锁知识点整理
    Python3中的SocketServer
    socket 上传文件代码
    python socket 连续send,出现粘包问题
    【转】动态导入模块的两种方法
    【转】面向对象高级语法部分
    python的垃圾回收机制和析构函数__del__
    django 项目使用setting文件里定义的变量方法
  • 原文地址:https://www.cnblogs.com/jiangbei/p/8708806.html
Copyright © 2011-2022 走看看