zoukankan      html  css  js  c++  java
  • spark解决hash碰撞

    问题背景:pairRDD

    调用spark存入文件的api时,最后结果文件的个数(就是分区的个数)取决于PariRDD中的key的hash值,

    解决后可是相同key的数据到同一个partitioner中

    解决方法:

    1. 自己重新定义一个partitioner
    
    
    //一般内部通过map来区分最好
    class HostPartitioner(hosts:Array[String]) extends Partitioner{
    //从host数组中进行partitioner编号的确定

    val hostMap = new mutable.HashMap[String,Int]()
    var count = 0
    for (host <- hosts){
    hostMap += (host -> count)
    count += 1
    }

    override def numPartitions: Int = hosts.length

    //key 是前面的host 有三个spark会自动将key传进来判断
    override def getPartition(key: Any): Int = {
    hostMap.getOrElse(key.toString,0)
    }
    }
     

    整个代码如下:

    package flowanalysis

    import java.net.URL

    import org.apache.spark.{Partitioner, SparkConf, SparkContext}

    import scala.collection.mutable

    /**
    * Created by willian on 2017/3/18.
    * 解决存放进hadoop中的产生的hash碰撞,导致文件的内容没有按key进分区
    */
    object FlowAnalysisPartitioner {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("flow_analysis").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("/Users/willian/Desktop/project/spark/wordcount/src/main/resources/itcast.log").map(line =>{
    val f = line.split(" ")
    (f(i = 1),1)
    })
    val rdd1 = rdd.reduceByKey(_+_)
    val rdd3 = rdd1.map(tuple =>{
    val url = tuple._1
    val host = new URL(url).getHost
    (host,(url,tuple._2))
    })
    val hostrdd = rdd3.map(_._1).distinct().collect()
    // rdd3.repartition(3).saveAsTextFile("/Users/willian/Desktop/project/spark/wordcount/src/main/output")
    // println(hostrdd.collect.toBuffer)
    val partitioner = new HostPartitioner(hostrdd)

    rdd3.partitionBy(partitioner).mapPartitions(it=>{
    it.toList.sortBy(_._2._2).reverse.take(3).iterator
    }).saveAsTextFile("/Users/willian/Desktop/project/spark/wordcount/src/main/output")

    }
    }

    class HostPartitioner(hosts:Array[String]) extends Partitioner{
    //从host数组中进行partitioner编号的确定

    val hostMap = new mutable.HashMap[String,Int]()
    var count = 0
    for (host <- hosts){
    hostMap += (host -> count)
    count += 1
    }

    override def numPartitions: Int = hosts.length

    //key 是前面的host 有三个spark会自动将key传进来判断
    override def getPartition(key: Any): Int = {
    hostMap.getOrElse(key.toString,0)
    }
    }
  • 相关阅读:
    Cookie数据的编码及解码
    删除单链表节点,时间复杂度为O(1)
    匹配URL
    C#文本框允许使用ctrl+A
    实现统计一个字符串所含的不同字符的总数
    调用win32 api 函数SendMessage() 实现消息直接调用
    关于C++的const对象
    从一个文本文件中找出使用频率最高的五个字符
    C++基础中的基础(深拷贝与浅拷贝)
    python+Django CRM客户关系管理系统开发(十)--左右移动选择框功能开发
  • 原文地址:https://www.cnblogs.com/zhangweilun/p/6576693.html
Copyright © 2011-2022 走看看