(借鉴于网络资料,有修改)
一、概念介绍
K-means算法是硬聚类算法,是典型的局域原型的目标函数聚类方法的代表,它是数据点到原型的某种距离作为优化的目标函数,利用函数求极值的方法得到迭代运算的调整规则。K-means算法以欧式距离作为相似度测度,它是求对应某一初始聚类中心向量V最有分类,使得评价指标J最小。算法采用误差平方和准则函数作为聚类准则函数。
K-means算法是很典型的基于距离的聚类算法,采用距离作为相似性的评价指标,即认为两个对象的距离越近,其相似度就越大。该算法认为簇是由距离靠近的对象组成的,因此把得到紧凑且独立的簇作为最终目标。
k个初始类聚类中心点的选取对聚类结果具有较大的影响,因为在该算法第一步中是随机的选取任意k个对象作为初始聚类的中心,初始地代表一个簇。该算法在每次迭代中对数据集中剩余的每个对象,根据其与各个簇中心的距离将每个对象重新赋给最近的簇。当考察完所有数据对象后,一次迭代运算完成,新的聚类中心被计算出来。如果在一次迭代前后,评价指标J的值没有发生变化,说明算法已经收敛。
二、基本思想
1.数学描述
给定d维实数向量(),后面就将这个实数向量称作点吧,短!K-Means算法会根据事先制定的参数k,将这些点划分出k个Cluster(k ≤ n),而划分的标准是最小化点与Cluster重心(均值)的距离平方和,假设这些Cluster为:,则数学描述如下:
,其中为第i个Cluster的“重心”(Cluster中所有点的平均值)。
聚类的效果类似下图:
具体可见:http://en.wikipedia.org/wiki/K-means_clustering
2.K-means算法
它是一种迭代的算法:
(1)、根据事先给定的k值建立初始划分,得到k个Cluster,比如,可以随机选择k个点作为k个Cluster的重心,又或者用Canopy Clustering得到的Cluster作为初始重心(当然这个时候k的值由Canopy Clustering得结果决定);
(2)、计算每个点到各个Cluster重心的距离,将它加入到最近的那个Cluster;
(3)、重新计算每个Cluster的重心;
(4)、重复过程2~3,直到各个Cluster重心在某个精度范围内不变化或者达到最大迭代次数。
别看算法简单,很多复杂算法的实际效果或许都不如它,而且它的局部性较好,容易并行化,对大规模数据集很有意义;算法时间复杂度是:O(nkt),其中:n 是聚类点个数,k 是Cluster个数,t 是迭代次数。
三、并行化K-means
K-Means较好地局部性使它能很好的被并行化。第一阶段,生成Cluster的过程可以并行化,各个Slaves读取存在本地的数据集,用上述算法生成Cluster集合,最后用若干Cluster集合生成第一次迭代的全局Cluster集合,然后重复这个过程直到满足结束条件,第二阶段,用之前得到的Cluster进行聚类操作。
用map-reduce描述是:datanode在map阶段读出位于本地的数据集,输出每个点及其对应的Cluster;combiner操作对位于本地包含在相同Cluster中的点进行reduce操作并输出,reduce操作得到全局Cluster集合并写入HDFS。
四、Mahout的K-means
mahout实现了标准K-Means Clustering,思想与前面相同,一共使用了2个map操作、1个combine操作和1个reduce操作,每次迭代都用1个map、1个combine和一个reduce操作得到并保存全局Cluster集合,迭代结束后,用一个map进行聚类操作。
1.数据结构模型
Mahout聚类算法将对象以Vector的方式表示,它同时支持dense vector和sparse vector,一共有三种表示方式(它们拥有共同的基类AbstractVector,里面实现了有关Vector的很多操作):
(1)、DenseVector
它实现的时候用一个double数组表示Vector(private double[] values), 对于dense data可以使用它;
(2)、RandomAccessSparseVector
它用来表示一个可以随机访问的sparse vector,只存储非零元素,数据的存储采用hash映射:OpenIntDoubleHashMap;
关于OpenIntDoubleHashMap,其key为int类型,value为double类型,解决冲突的方法是double hashing,
(3)、SequentialAccessSparseVector
它用来表示一个顺序访问的sparse vector,同样只存储非零元素,数据的存储采用顺序映射:OrderedIntDoubleMapping;
关于OrderedIntDoubleMapping,其key为int类型,value为double类型,存储的方式让我想起了Libsvm数据表示的形式:非零元素索引:非零元素的值,这里用一个int数组存储indices,用double数组存储非零元素,要想读写某个元素,需要在indices中查找offset,由于indices应该是有序的,所以查找操作用的是二分法。
2.K-means变量含义
可以从Cluster.java及其父类,对于Cluster,mahout实现了一个抽象类AbstractCluster封装Cluster,具体说明可以参考上一篇文章,这里做个简单说明:
(1)、private int id; #每个K-Means算法产生的Cluster的id
(2)、private long numPoints; #Cluster中包含点的个数,这里的点都是Vector
(3)、private Vector center; #Cluster的重心,这里就是平均值,由s0和s1计算而来。
(4)、private Vector Radius; #Cluster的半径,这个半径是各个点的标准差,反映组内个体间的离散程度,由s0、s1和s2计算而来。
(5)、private double s0; #表示Cluster包含点的权重之和,
(6)、private Vector s1; #表示Cluster包含点的加权和,
(7)、private Vector s2; #表示Cluster包含点平方的加权和,
(8)、public void computeParameters(); #根据s0、s1、s2计算numPoints、center和Radius:
这几个操作很重要,最后三步很必要,在后面会做说明。
(9)、public void observe(VectorWritable x, double weight); #每当有一个新的点加入当前Cluster时都需要更新s0、s1、s2的值
(10)、public ClusterObservation getObservations(); #这个操作在combine操作时会经常被用到,它会返回由s0、s1、s2初始化的ClusterObservation对象,表示当前Cluster中包含的所有被观察过的点
3.K-means的Map-Reduce实现
K-Means Clustering的实现同样包含单机版和MR两个版本,单机版就不说了,MR版用了两个map操作、一个combine操作和一个reduce操作,是通过两个不同的job触发,用Dirver来组织的,map和reduce阶段执行顺序是:
(1)对于K初始化的形成
K-Means算法需要一个对数据点的初始划分,mahout里用了两种方法(以Iris dataset前3个feature为例):
A、使用RandomSeedGenerator类
在指定clusters目录生成k个初始划分并以Sequence File形式存储,其选择方法希望能尽可能不让孤立点作为Cluster重心,大概流程如下:
图2
B、使用Canopy Clustering
Canopy Clustering常常用来对初始数据做一个粗略的划分,它的结果可以为之后代价较高聚类提供帮助,Canopy Clustering可能用在数据预处理上要比单纯拿来聚类更有用,比如对K-Means来说提供k值,另外还能很好的处理孤立点,当然,需要人工指定的参数由k变成了T1、T2,T1和T2所起的作用是缺一不可的,T1决定了每个Cluster包含点的数目,这直接影响了Cluster的“重心”和“半径”,而T2则决定了Cluster的数目,T2太大会导致只有一个Cluster,而太小则会出现过多的Cluster。通过实验,T1和T2取值会严重影响到算法的效果,如何确定T1和T2,似乎可以用AIC、BIC或者交叉验证去做。。。
(2).配置Cluster信息
K-Means算法的MR实现,第一次迭代需要将随机方法或者Canopy Clustering方法结果目录作为kmeans第一次迭代的输入目录,接下来的每次迭代都需要将上次迭代的输出目录作为本次迭代的输入目录,这就需要能在每次kmeans map和kmeans reduce操作前从该目录得到Cluster的信息,这个功能由KMeansUtil.configureWithClusterInfo实现,它依据指定的HDFS目录将Canopy Cluster或者上次迭代Cluster的信息存储到一个Collection中,这个方法在之后的每个map和reduce操作中都需要。
(3).KMeansMapper
public class KMeansMapper extends Mapper<WritableComparable<?>, VectorWritable, Text, ClusterObservations> { private KMeansClusterer clusterer; private final Collection<Cluster> clusters = new ArrayList<Cluster>(); @Override protected void map(WritableComparable<?> key, VectorWritable point, Context context) throws IOException, InterruptedException { this.clusterer.emitPointToNearestCluster(point.get(), this.clusters, context); } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); try { ClassLoader ccl = Thread.currentThread().getContextClassLoader(); DistanceMeasure measure = ccl.loadClass(conf.get(KMeansConfigKeys.DISTANCE_MEASURE_KEY)) .asSubclass(DistanceMeasure.class).newInstance(); measure.configure(conf); this.clusterer = new KMeansClusterer(measure); String clusterPath = conf.get(KMeansConfigKeys.CLUSTER_PATH_KEY); if (clusterPath != null && clusterPath.length() > 0) { KMeansUtil.configureWithClusterInfo(conf, new Path(clusterPath), clusters); if (clusters.isEmpty()) { throw new IllegalStateException("No clusters found. Check your -c path."); } } } catch (ClassNotFoundException e) { throw new IllegalStateException(e); } catch (IllegalAccessException e) { throw new IllegalStateException(e); } catch (InstantiationException e) { throw new IllegalStateException(e); } } void setup(Collection<Cluster> clusters, DistanceMeasure measure) { this.clusters.clear(); this.clusters.addAll(clusters); this.clusterer = new KMeansClusterer(measure); } }
A、KMeansMapper接收的是(WritableComparable<?>, VectorWritable) Pair,setup方法利用KMeansUtil.configureWithClusterInfo得到上一次迭代的Clustering结果,map操作需要依据这个结果聚类。
B、每个slave机器会分布式的处理存在硬盘上的数据,依据之前得到得Cluster信息,用emitPointToNearestCluster方法将每个点加入到与其距离最近的Cluster,输出结果为(与当前点距离最近Cluster的ID, 由当前点包装而成的ClusterObservations) Pair,值得注意的是Mapper只是将点加入最近的Cluster,并以(key,value)形式注明此点所离最近的cluster,等待combiner,reducer搜集,没有更新Cluster重心等参数。
(4).KMeansCombiner
public class KMeansCombiner extends Reducer<Text, ClusterObservations, Text, ClusterObservations> { @Override protected void reduce(Text key, Iterable<ClusterObservations> values, Context context) throws IOException, InterruptedException { Cluster cluster = new Cluster(); for (ClusterObservations value : values) { cluster.observe(value); } context.write(key, cluster.getObservations()); } }
combiner操作是一个本地的reduce操作,发生在map之后,reduce之前:
(5).KMeansReducer
public class KMeansReducer extends Reducer<Text, ClusterObservations, Text, Cluster> { private Map<String, Cluster> clusterMap; private double convergenceDelta; private KMeansClusterer clusterer; @Override protected void reduce(Text key, Iterable<ClusterObservations> values, Context context) throws IOException, InterruptedException { Cluster cluster = clusterMap.get(key.toString()); for (ClusterObservations delta : values) { cluster.observe(delta); } // force convergence calculation boolean converged = clusterer.computeConvergence(cluster, convergenceDelta); if (converged) { context.getCounter("Clustering", "Converged Clusters").increment(1); } cluster.computeParameters(); context.write(new Text(cluster.getIdentifier()), cluster); } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); try { ClassLoader ccl = Thread.currentThread().getContextClassLoader(); DistanceMeasure measure = ccl.loadClass(conf.get(KMeansConfigKeys.DISTANCE_MEASURE_KEY)) .asSubclass(DistanceMeasure.class).newInstance(); measure.configure(conf); this.convergenceDelta = Double.parseDouble(conf.get(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY)); this.clusterer = new KMeansClusterer(measure); this.clusterMap = new HashMap<String, Cluster>(); String path = conf.get(KMeansConfigKeys.CLUSTER_PATH_KEY); if (path.length() > 0) { Collection<Cluster> clusters = new ArrayList<Cluster>(); KMeansUtil.configureWithClusterInfo(conf, new Path(path), clusters); setClusterMap(clusters); if (clusterMap.isEmpty()) { throw new IllegalStateException("Cluster is empty!"); } } } catch (ClassNotFoundException e) { throw new IllegalStateException(e); } catch (IllegalAccessException e) { throw new IllegalStateException(e); } catch (InstantiationException e) { throw new IllegalStateException(e); } } private void setClusterMap(Collection<Cluster> clusters) { clusterMap = new HashMap<String, Cluster>(); for (Cluster cluster : clusters) { clusterMap.put(cluster.getIdentifier(), cluster); } clusters.clear(); } public void setup(Collection<Cluster> clusters, DistanceMeasure measure) { setClusterMap(clusters); this.clusterer = new KMeansClusterer(measure); } }
很直白的的操作,只是在setup的时候稍复杂。
A、setup操作的目的是读取初始划分或者上次迭代的结果,构建Cluster信息,同时做了Map<Cluster的ID,Cluster>映射,方便从ID找Cluster。
B、reduce操作非常直白,将从combiner传来的<Cluster ID,ClusterObservations>进行汇总;
computeConvergence用来判断当前Cluster是否收敛,即新的“重心”与老的“重心”距离是否满足之前传入的精度要求;
注意到有个cluster.computeParameters()操作,这个操作非常重要,它保证了本次迭代的结果不会影响到下次迭代,也就是保证了能够“重新计算每个Cluster的重心”这一步骤。
前三个操作得到新的Cluster信息;
后三个步骤清空S0、S1、S2信息,保证下次迭代所需的Cluster信息是“干净”的。
之后,reduce将(Cluster ID, Cluster) Pair写入到HDFS中以”clusters-迭代次数“命名的文件夹中,供后面迭代时候使用。
Reduce操作搜集前面Combiner输出的信息,并再一次对Canopy重心等信息进行了更新
(6).KMeansClusterMapper
之前的MR操作用于构建Cluster信息,KMeansClusterMapper则用构造好的Cluster信息来聚类。
public class KMeansClusterMapper extends Mapper<WritableComparable<?>,VectorWritable,IntWritable,WeightedVectorWritable> { private final Collection<Cluster> clusters = new ArrayList<Cluster>(); private KMeansClusterer clusterer; @Override protected void map(WritableComparable<?> key, VectorWritable point, Context context) throws IOException, InterruptedException { clusterer.outputPointWithClusterInfo(point.get(), clusters, context); } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); try { ClassLoader ccl = Thread.currentThread().getContextClassLoader(); DistanceMeasure measure = ccl.loadClass(conf.get(KMeansConfigKeys.DISTANCE_MEASURE_KEY)) .asSubclass(DistanceMeasure.class).newInstance(); measure.configure(conf); String clusterPath = conf.get(KMeansConfigKeys.CLUSTER_PATH_KEY); if (clusterPath != null && clusterPath.length() > 0) { KMeansUtil.configureWithClusterInfo(conf, new Path(clusterPath), clusters); if (clusters.isEmpty()) { throw new IllegalStateException("No clusters found. Check your -c path."); } } this.clusterer = new KMeansClusterer(measure); } catch (ClassNotFoundException e) { throw new IllegalStateException(e); } catch (IllegalAccessException e) { throw new IllegalStateException(e); } catch (InstantiationException e) { throw new IllegalStateException(e); } } }
A、setup依然是从指定目录读取并构建Cluster信息;
B、map操作通过计算每个点到各Cluster“重心”的距离完成聚类操作,可以看到map操作结束,所有点就都被放在唯一一个与之距离最近的Cluster中了,因此之后并不需要reduce操作。
(7).KMeansDriver
这里值得注意的是buildCluster中的迭代过程,runIteration中设置前面KMeanMapper,KMeansCombiner,KMeanReducer所在job的参数。
其中buildCluster代码:
private static Path buildClustersMR(Configuration conf, Path input, Path clustersIn, Path output, DistanceMeasure measure, int maxIterations, String delta) throws IOException, InterruptedException, ClassNotFoundException { boolean converged = false; int iteration = 1; while (!converged && iteration <= maxIterations) { log.info("K-Means Iteration {}", iteration); // point the output to a new directory per iteration Path clustersOut = new Path(output, AbstractCluster.CLUSTERS_DIR + iteration); converged = runIteration(conf, input, clustersIn, clustersOut, measure.getClass().getName(), delta); // now point the input to the old output directory clustersIn = clustersOut; iteration++; } return clustersIn; }
如果把前面的KMeansMapper、KMeansCombiner、KMeansReducer、KMeansClusterMapper看做是砖的话,KMeansDriver就是盖房子的人,它用来组织整个kmeans算法流程(包括单机版和MR版)。示意图如下:
图4