zoukankan      html  css  js  c++  java
  • spark解决hash碰撞

    问题背景:pairRDD

    调用spark存入文件的api时,最后结果文件的个数(就是分区的个数)取决于PariRDD中的key的hash值,

    解决后可是相同key的数据到同一个partitioner中

    解决方法:

    1. 自己重新定义一个partitioner
    
    
    //一般内部通过map来区分最好
    class HostPartitioner(hosts:Array[String]) extends Partitioner{
    //从host数组中进行partitioner编号的确定

    val hostMap = new mutable.HashMap[String,Int]()
    var count = 0
    for (host <- hosts){
    hostMap += (host -> count)
    count += 1
    }

    override def numPartitions: Int = hosts.length

    //key 是前面的host 有三个spark会自动将key传进来判断
    override def getPartition(key: Any): Int = {
    hostMap.getOrElse(key.toString,0)
    }
    }
     

    整个代码如下:

    package flowanalysis

    import java.net.URL

    import org.apache.spark.{Partitioner, SparkConf, SparkContext}

    import scala.collection.mutable

    /**
    * Created by willian on 2017/3/18.
    * 解决存放进hadoop中的产生的hash碰撞,导致文件的内容没有按key进分区
    */
    object FlowAnalysisPartitioner {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("flow_analysis").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("/Users/willian/Desktop/project/spark/wordcount/src/main/resources/itcast.log").map(line =>{
    val f = line.split(" ")
    (f(i = 1),1)
    })
    val rdd1 = rdd.reduceByKey(_+_)
    val rdd3 = rdd1.map(tuple =>{
    val url = tuple._1
    val host = new URL(url).getHost
    (host,(url,tuple._2))
    })
    val hostrdd = rdd3.map(_._1).distinct().collect()
    // rdd3.repartition(3).saveAsTextFile("/Users/willian/Desktop/project/spark/wordcount/src/main/output")
    // println(hostrdd.collect.toBuffer)
    val partitioner = new HostPartitioner(hostrdd)

    rdd3.partitionBy(partitioner).mapPartitions(it=>{
    it.toList.sortBy(_._2._2).reverse.take(3).iterator
    }).saveAsTextFile("/Users/willian/Desktop/project/spark/wordcount/src/main/output")

    }
    }

    class HostPartitioner(hosts:Array[String]) extends Partitioner{
    //从host数组中进行partitioner编号的确定

    val hostMap = new mutable.HashMap[String,Int]()
    var count = 0
    for (host <- hosts){
    hostMap += (host -> count)
    count += 1
    }

    override def numPartitions: Int = hosts.length

    //key 是前面的host 有三个spark会自动将key传进来判断
    override def getPartition(key: Any): Int = {
    hostMap.getOrElse(key.toString,0)
    }
    }
  • 相关阅读:
    CSS学习(五)
    1. Git-2.12.0-64-bit .exe下载
    90.bower解决js的依赖管理
    89.[NodeJS] Express 模板传值对象app.locals、res.locals
    88.NODE.JS加密模块CRYPTO常用方法介绍
    87.node.js操作mongoDB数据库示例分享
    50.AngularJs directive详解及示例代码
    49.AngularJs 指令directive之controller,link,compile
    48.AngularJS ng-src 指令
    86.express里面的app.configure作用
  • 原文地址:https://www.cnblogs.com/zhangweilun/p/6576693.html
Copyright © 2011-2022 走看看