zoukankan      html  css  js  c++  java
  • 【Spark】使用spark进行K-means分析

    由于思路比较简单,而且代码我写的很详细了,直接贴代码吧。Mark.

    /**
     * @autor phh
     * 相似度通常以对象到类质心的距离作为相似性的评价指标
     * 算法流程如下:
     * 1、从n个数据对象中选取k个不同的点作为初始质心,每个质心看成是一个类别的标识点
     * 2、然后将数据集中的每一个点划分到距离最近的一个知心所对应的类别
     * 3、完成一次聚类后根据此次聚类的结果重新计算各个类别的新质心
     * 4、如果新的质心和之前的质心距离大于某个阈值,那么说明现在的聚类结果还没有达到最佳结果,继续直到不再变化
     */
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.calcite.rel.core.Collect
    import akka.dispatch.Foreach
    
    object Kmeans {
      def main(args:Array[String]){
        if(args.length < 0){
          System.err.println("usag error input and output");
        }
        
        val input = args(0) ;
        
        val conf = new SparkConf().setAppName("Kmeans-test").setMaster("local")
        val sc = new SparkContext(conf)
        
        
        //k指代聚类结果生成的类别个数
        val k=2
        //制定结束条件:要么距离阈值为0.1,要么最大迭代次数为5
        val e=0.1
        val maxIterations = 5
        //起始迭代是0
        var iteration =0
    
        //将数据从input中读取使用map生成K/V格式:
        //???
        val data = sc.textFile(input).map(x=>x.split(" ").map(_.toDouble)).cache
        println(data);
        //定义一个数组来保存质心
        var centers:Array[Array[Double]]=null
        
        //随机选取两个输入数据作为质心
        do{
          //???
          centers = data.takeSample(true, 2, System.nanoTime().toInt )
        }while(centers.map(_.deep).toSet.size != k)
          
        def euclideanDistance(xs:Array[Double],ys:Array[Double])={
          Math.sqrt((xs zip ys).map{
            case (x,y)=>Math.pow(y-x,2)
          }.sum)
        }
        
        var changed =true
        val dims =centers(0).length
        while(changed && iteration<maxIterations){
          iteration= iteration + 1 
          changed =false
        }
        
        val pointWithClass =data.map({point=>
          val closestCenterIndex =centers.zipWithIndex.map({case (center,
              index)=>{
                val distance =euclideanDistance(point, center)
                (distance,index)
                }}).reduce((d1,d2)=>if(d1._1>d2._2) d2 else d1)._2
                (closestCenterIndex,(point, 1))
              })
              val totalContribs =pointWithClass.reduceByKey({case ((xs,c1),(ys,c2))=>
                ((xs zip ys).map{case (x,y)=>x+y},c1+c2)}).collect
                
                val newCenters =totalContribs.map{
          case (centerIndex, (sum, counts))=>
            (centerIndex,sum.map(_/counts))}.sortBy(_._1).map(_._2)
            for(i<-0 until k){
              if(euclideanDistance(centers(i), newCenters(i))>e){
                changed=true;
                centers(i)=newCenters(i)
              }
            }
            
            centers.foreach(x=>println(x.mkString(",")))
      }
    }
    

      

  • 相关阅读:
    求两条链表有无交点和第一个交点
    重载自增运算符(前置自增++p和后置自增p++)
    二叉排序树和平衡二叉树
    红黑树
    java学习攻略
    Intellij IDEA / IntelliJ
    ngrinder test
    eclipsejeekeplerSR2win32x86_64 jsonedit plugin
    向叶子文文的.net之路学习(大量的转载)
    微软发布机制(转)从浅入深
  • 原文地址:https://www.cnblogs.com/panghaohan/p/6957226.html
Copyright © 2011-2022 走看看