转自:http://www.cnblogs.com/vivounicorn/archive/2011/09/23/2186483.html 为便于理解 有修改
一、基本思想
1、基于Canopy Method的聚类算法将聚类过程分为两个阶段
Stage1、聚类最耗费计算的地方是计算对象相似性的时候,Canopy Method在第一阶段选择简单、计算代价较低的方法计算对象相似性,将相似的对象放在一个子集中,这个子集被叫做Canopy ,通过一系列计算得到若干Canopy,Canopy之间可以是重叠的,但不会存在某个对象不属于任何Canopy的情况,可以把这一阶段看做数据预处理;
Stage2、在各个Canopy 内使用传统的聚类方法(如K-means),不属于同一Canopy 的对象之间不进行相似性计算。
从这个方法起码可以看出两点好处:首先,Canopy 不要太大且Canopy 之间重叠的不要太多的话会大大减少后续需要计算相似性的对象的个数;其次,类似于K-means这样的聚类方法是需要人为指出K的值的,通过Stage1得到的Canopy 个数完全可以作为这个K值,一定程度上减少了选择K的盲目性。
2、聚类精度
对传统聚类来说,例如K-means、Expectation-Maximization、Greedy Agglomerative Clustering,某个对象与Cluster的相似性是该点到Cluster中心的距离,那么聚类精度能够被很好保证的条件是:
对于每个Cluster都存在一个Canopy,它包含所有属于这个Cluster的元素。
如果这种相似性的度量为当前点与某个Cluster中离的最近的点的距离,那么聚类精度能够被很好保证的条件是:
对于每个Cluster都存在若干个Canopy,这些Canopy之间由Cluster中的元素连接(重叠的部分包含Cluster中的元素)。
数据集的Canopy划分完成后,类似于下图:
二、一般Canopy的算法
(1)、将数据集向量化得到一个list后放入内存,选择两个距离阈值:T1和T2,其中T1 > T2,对应上图,实线圈为T1,虚线圈为T2,T1和T2的值可以用交叉校验来确定;
(2)、从list中任取一点P,用低计算成本方法快速计算点P与所有Canopy之间的距离(如果当前不存在Canopy,则把点P作为一个Canopy),如果点P与某个Canopy距离在T1以内,则将点P加入到这个Canopy;
(3)、如果点P曾经与某个Canopy的距离在T2以内,则需要把点P从list中删除(不过在在新的mahout采用的 不加入新的Collection 这样后面处理的时候就不包含点P),这一步是认为点P此时与这个Canopy已经够近了,因此它不可以再做其它Canopy的中心了;
(4)、重复步骤2、3,直到list为空结束
三、并行策略
并行点是比较明显的,就是生成Canopy的过程可以并行,第一阶段,各个slave可以依据存储在本地的数据,各自在本地用上述算法生成若干Canopy(这个地方感觉使各个不同slave的Canopy中心距离尽可能大于T2,效果会更好些),最后在master机器将这些Canopy用相同算法汇总后得到最终的Canopy集合,第二阶段聚类操作就利用最终的Canopy集合进行。
用map-reduce描述就是:datanode在map阶段,利用上述算法在本地生成若干Canopy,之后通过reduce操作得到最终的Canopy集合。
四、Mahout的Canopy Clustering(这里对原文进行了简化)
mahout实现了一个Canopy Clustering,大致思路与前两节用的方法一样,用了两个map操作和一个reduce操作,首先用一个map和一个reduce生成全局Canopy集合,最后用一个map操作进行聚类。
1.数据结构模型
Mahout聚类算法将对象以Vector的方式表示,它同时支持dense vector和sparse vector,一共有三种表示方式(它们拥有共同的基类AbstractVector,里面实现了有关Vector的很多操作):
(1)、DenseVector
它实现的时候用一个double数组表示Vector(private double[] values), 对于dense data可以使用它;
(2)、RandomAccessSparseVector
它用来表示一个可以随机访问的sparse vector,只存储非零元素,数据的存储采用hash映射:OpenIntDoubleHashMap;
关于OpenIntDoubleHashMap,其key为int类型,value为double类型,解决冲突的方法是double hashing,
(3)、SequentialAccessSparseVector
它用来表示一个顺序访问的sparse vector,同样只存储非零元素,数据的存储采用顺序映射:OrderedIntDoubleMapping;
关于OrderedIntDoubleMapping,其key为int类型,value为double类型,存储的方式让我想起了Libsvm数据表示的形式:非零元素索引:非零元素的值,这里用一个int数组存储indices,用double数组存储非零元素,要想读写某个元素,需要在indices中查找offset,由于indices应该是有序的,所以查找操作用的是二分法。
2.Canopy变量含义
可以从Canopy.java文件及其父类中找到答案,Mahout在实现时候还是很巧妙的,一个Canopy包含的字段信息主要有:
1)、private int id; #Canopy的id
2)、private long numPoints; #Canopy中包含点的个数,这里的点都是Vector
3)、private Vector center; #Canopy的重心
4)、private Vector Radius; #Canopy的半径,这个半径是各个点的标准差,反映组内个体间的离散程度,它的计算依赖下面要说的s0、s1和s2。
它并不会真的去用一个list去存储其包含的点,因为将来的计算并不关心这些点是什么,而是与由这些点得到的三个值有关,这里用三个变量来表示:
5)、private double s0; #表示Canopy包含点的权重之和,
6)、private Vector s1; #表示各点的加权和,
7)、private Vector s2; #表示各点平方的加权和,
以下是它的核心操作:
8)、public void computeParameters(); #根据s0、s1、s2计算numPoints、center和Radius,其中numPoints=(int)s0,center=s1/s0,Radius=sqrt(s2*s0-s1*s1)/s0,简单点来,假设所有点权重都是1,那么:
,其中
,其中
9)、public void observe(VectorWritable x, double weight); #每当有一个新的点加入当前Canopy时都需要更新s0、s1、s2的值,这个比较简单。
3、Canopy Clustering的Map-Reduce实现
Canopy Clustering的实现包含单机版和MR两个版本,单机版就不多说了,MR版用了两个map操作和一个reduce操作,当然是通过两个不同的job实现的,map和reduce阶段执行顺序是:CanopyMapper –> CanopyReducer –> ClusterMapper,我想对照下面这幅图来理解:
-------------------------------------------横线中间内容,详见《Hadoop权威指南》-----------------------------------------
(1)、首先是InputFormat,这是从HDFS读取文件后第一个要考虑的问题,mahout中提供了三种方式,都继承于FileInputFormat<K,V>:
Format |
Description |
Key |
Value |
TextInputFormat |
Default format; reads lines of text files (默认格式,按行读取文件且不进行解析操作,基于行的文件比较有效) |
The byte offset of the line(行的字节偏移量) | The line contents (整个行的内容) |
KeyValueInputFormat |
Parses lines into key, val pairs (同样是按照行读取,但会搜寻第一个tab字符,把行拆分为(Key,Value) pair) |
Everything up to the first tab character(第一个tab字符前的所有字符) |
The remainder of the line (该行剩下的内容) |
SequenceFileInputFormat |
A Hadoop-specific high-performance binary format (Hadoop定义的高性能二进制格式) |
user-defined (用户自定义) |
user-defined (用户自定义) |
在这里,由于使用了很多自定义的类型,如:表示vector的VectorWritable类型,表示canopy的canopy类型,且需要进行高效的数据处理,所以输入输出文件选择SequenceFileInputFormat格式。由job对象的setInputFormatClass方法来设置,如:job.setInputFormatClass(SequenceFileInputFormat.class),一般在执行聚类算法前需要调用一个job专门处理原始文件为合适的格式,比如用InputDriver,这点后面再说。
(2)、Split
一个Split块为一个map任务提供输入数据,它是InputSplit类型的,默认情况下hadoop会把文件以64MB为基数拆分为若干Block,这些Block分散在各个节点上,于是一个文件就可以被多个map并行的处理,也就是说InputSplit定义了文件是被如何切分的。
(3)RR
RecordReader类把由Split传来的数据加载后转换为适合mapper读取的(Key,Value) pair,RecordReader实例是由InputFormat决定,RR被反复调用直到Split数据处理完,RR被调用后接着就会调用Mapper的map()方法。
“RecordReader实例是由InputFormat决定”这句话怎么理解呢?比如,在Canopy Clustering中,使用的是SequenceFileInputFormat,它会提供一个 SequenceFileRecordReader类型,利用SequenceFile.Reader将Key和Value读取出来,这里Key和Value的类型对应Mapper的map函数的Key和Value的类型,Sequence File的存储根据不同压缩策略分为:NONE:不压缩、RECORD:仅压缩每一个record中的value值、BLOCK:将一个block中的所有records压缩在一起
-----------------------------------------------------------------------------------------------------------------------------------------------------
(4)代码全分析
CanopyMapper部分
class CanopyMapper extends Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable> { private final Collection<Canopy> canopies = new ArrayList<Canopy>(); private CanopyClusterer canopyClusterer; @Override protected void map(WritableComparable<?> key, VectorWritable point, Context context) throws IOException, InterruptedException { canopyClusterer.addPointToCanopies(point.get(), canopies); } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); canopyClusterer = new CanopyClusterer(context.getConfiguration()); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (Canopy canopy : canopies) { context.write(new Text("centroid"), new VectorWritable(canopy.computeCentroid())); } super.cleanup(context); } }
CanopyMapper类里面定义了一个Canopy集合,用来存储通过map操作得到的本地Canopy。
setup方法在map操作执行前进行必要的初始化工作;
它的map操作很直白,就是将传来的(Key,Value) pair(以后就叫“点”吧,少写几个字)按照某种策略加入到某个Canopy中,这个策略在CanopyClusterer类里说明;
在map操作执行完后,调用cleanup操作,将中间结果写入上下文,注意这里的Key是一个固定的字符串“centroid”,将来reduce操作接收到的数据就只有这个Key,写入的value是所有Canopy的中心点(是个Vector)。
------这部分操作后,在各个slave初步形成局部的Canopy,其中很多与本地中心的距离L在T2<L<T1之间的,并没有划到本地Canopy,这些点将在Reduce阶段被全局的条件下划分,当然这里没有更新Canopy参数。
Combiner部分(这里面,我所见的mahout代码没有设置,但是一般来说,可以Combiner有助于减少Hadoop系统中的传输负载)
可以看做是一个local的reduce操作,接受前面map的结果,处理完后发出结果,可以使用reduce类或者自己定义新类,这里的汇总操作有时候是很有意义的,因为它们都是在本地执行,最后发送出得数据量比直接发出map结果的要小,减少网络带宽的占用,对将来shuffle操作也有益。在Canopy Clustering中不需要这个操作。
Partitioner & Shuffle 部分(详见Hadoop权威指南或Hadoop源代码)
当有多个reducer的时候,partitioner决定由mapper或combiner传来的(Key,Value) Pair会被发送给哪个reducer,接着Shuffle操作会把所有从相同或不同mapper或combiner传来的(Key,Value) Pair按照Key进行分组,相同Key值的点会被放在同一个reducer中,我觉得如何提高Shuffle的效率是hadoop可以改进的地方。在Canopy Clustering中,因为map后的数据只有一个Key值,也就没必要有多个reducer了,也就不用partition了。
CanopyReducer部分
public class CanopyReducer extends Reducer<Text, VectorWritable, Text, Canopy> { private final Collection<Canopy> canopies = new ArrayList<Canopy>(); private CanopyClusterer canopyClusterer; CanopyClusterer getCanopyClusterer() { return canopyClusterer; } @Override protected void reduce(Text arg0, Iterable<VectorWritable> values, Context context) throws IOException, InterruptedException { for (VectorWritable value : values) { Vector point = value.get(); canopyClusterer.addPointToCanopies(point, canopies); } for (Canopy canopy : canopies) { canopy.computeParameters(); context.write(new Text(canopy.getIdentifier()), canopy); } } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); canopyClusterer = new CanopyClusterer(context.getConfiguration()); canopyClusterer.useT3T4(); } }
CanopyReducer 类里面同样定义了一个Canopy集合,用来存储全局Canopy。
setup方法在reduce操作执行前进行必要的初始化工作,这里与mapper不同的地方是可以对阈值T1、T2(T1>T2)重新设置(这里用T3、T4表示),也就是说map阶段的阈值可以与reduce阶段的不同;
reduce操作用于map操作一样的策略将局部Canopy的中心点做重新划分,最后更新各个全局Canopy的numPoints、center、radius的信息,将(Canopy标示符,Canopy对象) Pair写入上下文中。
------Reduce阶段将前面Map阶段中距离:T2<L<T1的点进行全局条件的划分,是这些点能够跨slave划分(例如slavei的点划分在slavej上Canopy上更合理,Map阶段的本地性没有划分,这是就需要Reduce阶段去完成这个任务),同时更新Canopy参数。
OutputFormat部分
它与InputFormat类似,Hadoop会利用OutputFormat的实例把文件写在本地磁盘或HDFS上,它们都是继承自FileOutputFormat类。各个reducer会把结果写在HDFS某个目录下的单独的文件内,命名规则是part-r-xxxxx,这个是依据hadoop自动命名的,此外还会在同一目录下生成一个_SUCCESS文件,输出文件夹用FileOutputFormat.setOutputPath() 设置。
到此为止构建Canopy的job结束。即CanopyMapper –> CanopyReducer 阶段结束。
ClusterMapper部分
public class ClusterMapper extends Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable> { private CanopyClusterer canopyClusterer; @Override protected void map(WritableComparable<?> key, VectorWritable point, Context context) throws IOException, InterruptedException { canopyClusterer.emitPointToClosestCanopy(point.get(), canopies, context); } private final Collection<Canopy> canopies = new ArrayList<Canopy>(); /** * Configure the mapper by providing its canopies. Used by unit tests. */ public void config(Collection<Canopy> canopies) { this.canopies.clear(); this.canopies.addAll(canopies); } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); canopyClusterer = new CanopyClusterer(context.getConfiguration()); Configuration conf = context.getConfiguration(); String clustersIn = conf.get(CanopyConfigKeys.CANOPY_PATH_KEY); // filter out the files if (clustersIn != null && clustersIn.length() > 0) { Path clusterPath = new Path(clustersIn, "*"); FileSystem fs = clusterPath.getFileSystem(conf); Path[] paths = FileUtil.stat2Paths(fs.globStatus(clusterPath, PathFilters.partFilter())); for (FileStatus file : fs.listStatus(paths, PathFilters.partFilter())) { for (Canopy value : new SequenceFileValueIterable<Canopy>(file .getPath(), conf)) { canopies.add(value); } } if (canopies.isEmpty()) { throw new IllegalStateException("Canopies are empty!"); } } } public boolean canopyCovers(Canopy canopy, Vector point) { return canopyClusterer.canopyCovers(canopy, point); } }
最后聚类阶段比较简单,只有一个map操作,以上一阶段输出的Sequence File为输入,setup方法做一些初始化工作并从上一阶段输出目录读取文件(见 CanopyDriver中的代码),重建Canopy集合信息并存储在一个Canopy集合(Collection)中,map操作就调用CanopyClusterer的emitPointToClosestCanopy方法实现聚类,将最终结果输出到一个Sequence File中。
这部分将所有点进行了划分,包括前面没有划分的点;
CanopyClusterer部分(这部分主要实现了一些前面做需要用到的函数)
这个类是实现Canopy算法的核心,其中:
1)、addPointToCanopies代码实现
public void addPointToCanopies(Vector point, Collection<Canopy> canopies) { boolean pointStronglyBound = false; for (Canopy canopy : canopies) { double dist = measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point); if (dist < t1) { log.debug("Added point: {} to canopy: {}", AbstractCluster.formatVector(point, null), canopy.getIdentifier()); canopy.observe(point); } pointStronglyBound = pointStronglyBound || dist < t2; } if (!pointStronglyBound) { log.debug("Created new Canopy:{} at center:{}", nextCanopyId, AbstractCluster.formatVector(point, null)); canopies.add(new Canopy(point, nextCanopyId++, measure)); } }
addPointToCanopies方法用来决定当前点应该加入到哪个Canopy中,在CanopyMapper和CanopyReducer 中用到,流程如下:
2)、emitPointToClosestCanopy代码实现
public void emitPointToClosestCanopy(Vector point, Iterable<Canopy> canopies, Mapper<?,?,IntWritable,WeightedVectorWritable>.Context context) throws IOException, InterruptedException { Canopy closest = findClosestCanopy(point, canopies); context.write(new IntWritable(closest.getId()), new WeightedVectorWritable(1, point)); context.setStatus("Emit Closest Canopy ID:" + closest.getIdentifier()); } protected Canopy findClosestCanopy(Vector point, Iterable<Canopy> canopies) { double minDist = Double.MAX_VALUE; Canopy closest = null; // find closest canopy for (Canopy canopy : canopies) { double dist = measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point); if (dist < minDist) { minDist = dist; closest = canopy; } } return closest; }
emitPointToClosestCanopy方法查找与当前点距离最近的Canopy,并将(Canopy的标示符,当前点Vector表示)输出,这个方法在聚类阶段ClusterMapper中用到。
CanopyDriver部分(buildClustersMR、clusterDataMR代码略)
public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2, double t3, double t4, boolean runClustering, boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException, InstantiationException, IllegalAccessException { Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, t3, t4, runSequential); if (runClustering) { clusterData(conf, input, clustersOut, output, measure, t1, t2, runSequential); } }
其中buildClusters调用buildClustersMR,clusterData调用clusterDataMR,buildClustersMR和clusterDataMR负责定义必要的参数,传入必要conf文件配置选项等等
一般都会定义这么一个driver,用来定义和配置job,组织job执行,同时提供单机版和MR版。job执行顺序是:buildClusters –> clusterData
概括总结:第一阶段:buildCluster阶段 ---map首先根据T1、T2标识出部分各点应所属的Canopy,并在map结束贴上的CanopyID标签,reduce搜集map输出的各点,并更新各个Canopy的参数。第二阶段:ClusterData阶段-利用一个map将所有点进行(包括第一阶段没有处理的的点)进行聚类运算,找出每个点距离最近的Canopy,并存在一个Canopy集合(Collection)中,同时输出结果。
-----------------------------以上为Mahout下的聚类算法Canopy的MapReduce实现 关于如何让进行输入格式的处理 见原博客-