zoukankan      html  css  js  c++  java
  • hadoop在实现kmeans算法——一个mapreduce实施

    写mapreduce程序实现kmeans算法。我们的想法可能是

    1. 次迭代后的质心

    2. map里。计算每一个质心与样本之间的距离,得到与样本距离最短的质心,以这个质心作为key,样本作为value,输出

    3. reduce里,输入的key是质心,value是其它的样本,这时又一次计算聚类中心,将聚类中心put到一个所有变量t中。

    4. 在main里比較前一次的质心和本次的质心是否发生变化,假设变化,则继续迭代,否则退出。

    本文的思路基本上是依照上面的步骤来做的,仅仅只是有几个问题须要解决

    1. hadoop是不存在自己定义的全局变量的。所以上面定义一个全局变量存放质心的想法是实现不了的。所以一个替代的思路是将质心存放在文件里

    2. 存放质心的文件在什么地方读取,假设在map中读取。那么能够肯定我们是不能用一个mapreduce实现一次迭代。所以我们选择在main函数里读取质心,然后将质心set到configuration中。configuration在map和reduce都是可读

    3. 怎样比較质心是否发生变化,是在main里比較么,读取本次质心和上一次质心的文件然后进行比較。这样的方法是能够实现的,可是显得不够高富帅,这个时候我们用到了自己定义的counter,counter是全局变量,在map和reduce中可读可写,在上面的思路中,我们看到reduce是有上次迭代的质心和刚刚计算出来的质心的。所以直接在reduce中进行比較就全然能够。假设没发生变化,counter加1。仅仅要在main里比較获取counter的值即可了。

    梳理一下,详细的过程例如以下

    1. main函数读取质心文件

    2. 将质心的字符串放到configuration

    3. 在mapper类重写setup方法,获取到configuration的质心内容。解析成二维数组的形式。代表质心

    4. mapper类中的map方法读取样本文件,跟全部的质心比較。得出每一个样本跟哪个质心近期,然后输出<质心,样本>

    5. reducer类中又一次计算质心,假设又一次计算出来的质心跟进来时的质心一致,那么自己定义的counter加1

    6. main中获取counter的值,看是否等于质心,假设不相等,那么继续迭代,否在退出

    详细的实现例如以下

    1. pom依赖

    这个要跟集群的一致。由于假设不一致在计算其它问题的时候没有问题。可是在使用counter的时候会出现故障

    java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.Counter, but class was expected

    原因是:事实上从2.0開始。org.apache.hadoop.mapreduce.Counter从1.0版本号的class改为interface,能够看一下你导入的这个类是class还是interface,假设是class那么就是导包导入的不正确,须要改动

    2. 样本

    实例样本例如以下

    1,1
    2,2
    3,3
    -3,-3
    -4,-4
    -5,-5
    

    3. 质心

    这个质心是从样本中随机找的

    1,1
    2,2
    

    4. 代码实现

    首先定义一个Center类,这个类主要存放了质心的个数k,还有两个从hdfs上读取质心文件的方法,一个用来读取初始的质心。这个实在文件里,另一个是用来读取每次迭代后的质心目录,这个是在目录中的,代码例如以下

    Center类

    public class Center {
    
    	protected static int k = 2;		//质心的个数
    	
    	/**
    	 * 从初始的质心文件里载入质心,并返回字符串。质心之间用tab切割
    	 * @param path
    	 * @return
    	 * @throws IOException
    	 */
    	public String loadInitCenter(Path path) throws IOException {
    		
    		StringBuffer sb = new StringBuffer();
    		
    		Configuration conf = new Configuration();
    		FileSystem hdfs = FileSystem.get(conf);
    		FSDataInputStream dis = hdfs.open(path);
    		LineReader in = new LineReader(dis, conf);
    		Text line = new Text();
    		while(in.readLine(line) > 0) {
    			sb.append(line.toString().trim());
    			sb.append("	");
    		}
    		
    		return sb.toString().trim();
    	}
    	
    	/**
    	 * 从每次迭代的质心文件里读取质心,并返回字符串
    	 * @param path
    	 * @return
    	 * @throws IOException
    	 */
    	public String loadCenter(Path path) throws IOException {
    		
    		StringBuffer sb = new StringBuffer();
    		
    		Configuration conf = new Configuration();
    		FileSystem hdfs = FileSystem.get(conf);
    		FileStatus[] files = hdfs.listStatus(path);
    		
    		for(int i = 0; i < files.length; i++) {
    			
    			Path filePath = files[i].getPath();
    			if(!filePath.getName().contains("part")) continue;
    			FSDataInputStream dis = hdfs.open(filePath);
    			LineReader in = new LineReader(dis, conf);
    			Text line = new Text();
    			while(in.readLine(line) > 0) {
    				sb.append(line.toString().trim());
    				sb.append("	");
    			}
    		}
    		
    		return sb.toString().trim();
    	}
    }

    KmeansMR类

    public class KmeansMR {
    
    	private static String FLAG = "KCLUSTER";
    		
    	public static class TokenizerMapper 
        extends Mapper<Object, Text, Text, Text>{
    		
    		double[][] centers = new double[Center.k][];
    		String[] centerstrArray = null;
    		
    		@Override
    		public void setup(Context context) {
    			
    			//将放在context中的聚类中心转换为数组的形式。方便使用
    			String kmeansS = context.getConfiguration().get(FLAG);
    			centerstrArray = kmeansS.split("	");
    			for(int i = 0; i < centerstrArray.length; i++) {
    				String[] segs = centerstrArray[i].split(",");
    				centers[i] = new double[segs.length];
    				for(int j = 0; j < segs.length; j++) {
    					centers[i][j] = Double.parseDouble(segs[j]);
    				}
    			}
    		}
    		
    		public void map(Object key, Text value, Context context
                     ) throws IOException, InterruptedException {
    			
    			String line = value.toString();
    			String[] segs = line.split(",");
    			double[] sample = new double[segs.length];
    			for(int i = 0; i < segs.length; i++) {
    				sample[i] = Float.parseFloat(segs[i]);
    			}
    			//求得距离近期的质心
    			double min = Double.MAX_VALUE;
    			int index = 0;
    			for(int i = 0; i < centers.length; i++) {
    				double dis = distance(centers[i], sample);
    				if(dis < min) {
    					min = dis;
    					index = i;
    				}
    			}
    			
    			context.write(new Text(centerstrArray[index]), new Text(line));
    		}
    	}
    
    	public static class IntSumReducer 
        extends Reducer<Text,Text,NullWritable,Text> {
    
    		Counter counter = null;
    		
    		public void reduce(Text key, Iterable<Text> values, 
                        Context context
                        ) throws IOException, InterruptedException {
    			
    			double[] sum = new double[Center.k];
    			int size = 0;
    			//计算相应维度上值的加和。存放在sum数组中
    			for(Text text : values) {
    				String[] segs = text.toString().split(",");
    				for(int i = 0; i < segs.length; i++) {
    					sum[i] += Double.parseDouble(segs[i]);
    				}
    				size ++;
    			}
    			
    			//求sum数组中每一个维度的平均值。也就是新的质心
    			StringBuffer sb = new StringBuffer();
    			for(int i = 0; i < sum.length; i++) {
    				sum[i] /= size;
    				sb.append(sum[i]);
    				sb.append(",");
    			}
    			
    			/**推断新的质心跟老的质心是否是一样的*/
    			boolean flag = true;
    			String[] centerStrArray = key.toString().split(",");
    			for(int i = 0; i < centerStrArray.length; i++) {
    				if(Math.abs(Double.parseDouble(centerStrArray[i]) - sum[i]) > 0.00000000001) {
    					flag = false;
    					break;
    				}
    			}
    			//假设新的质心跟老的质心是一样的,那么相应的计数器加1
    			if(flag) {
    				counter = context.getCounter("myCounter", "kmenasCounter");
    				counter.increment(1l);
    			}
    			context.write(null, new Text(sb.toString()));
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    
    		Path kMeansPath = new Path("/dsap/middata/kmeans/kMeans");	//初始的质心文件
    		Path samplePath = new Path("/dsap/middata/kmeans/sample");	//样本文件
    		//载入聚类中心文件
    		Center center = new Center();
    		String centerString = center.loadInitCenter(kMeansPath);
    		
    		int index = 0;	//迭代的次数
    		while(index < 5) {
    			
    			Configuration conf = new Configuration();
    			conf.set(FLAG, centerString);	//将聚类中心的字符串放到configuration中
    			
    			kMeansPath = new Path("/dsap/middata/kmeans/kMeans" + index);	//本次迭代的输出路径。也是下一次质心的读取路径
    			
    			/**推断输出路径是否存在。假设存在,则删除*/
    			FileSystem hdfs = FileSystem.get(conf);
    			if(hdfs.exists(kMeansPath)) hdfs.delete(kMeansPath);
    
    			Job job = new Job(conf, "kmeans" + index); 
    			job.setJarByClass(KmeansMR.class);
    			job.setMapperClass(TokenizerMapper.class);
    			job.setReducerClass(IntSumReducer.class);
    			job.setOutputKeyClass(NullWritable.class);
    			job.setOutputValueClass(Text.class);
    			job.setMapOutputKeyClass(Text.class);
    		    job.setMapOutputValueClass(Text.class);
    		    FileInputFormat.addInputPath(job, samplePath);
    		    FileOutputFormat.setOutputPath(job, kMeansPath);
    			job.waitForCompletion(true);
    			
    			/**获取自己定义counter的大小,假设等于质心的大小。说明质心已经不会发生变化了,则程序停止迭代*/
    			long counter = job.getCounters().getGroup("myCounter").findCounter("kmenasCounter").getValue();
    			if(counter == Center.k)	System.exit(0);
    			/**又一次载入质心*/
    			center = new Center();
    			centerString = center.loadCenter(kMeansPath);
    			
    			index ++;
    		}
    		System.exit(0);
    	}
    	
    	public static double distance(double[] a, double[] b) {
    		
    		if(a == null || b == null || a.length != b.length) return Double.MAX_VALUE;
    		double dis = 0;
    		for(int i = 0; i < a.length; i++) {
    			dis += Math.pow(a[i] - b[i], 2);
    		}
    		return Math.sqrt(dis);
    	}
    }	

    5. 结果

    产生了两个目录。各自是第一次、第二次迭代后的聚类中心


    最后的聚类中心的内容例如以下


    版权声明:本文博客原创文章。博客,未经同意,不得转载。

  • 相关阅读:
    MySql不同版本安装
    逆向知识第十四讲,(C语言完结)结构体在汇编中的表现形式
    逆向知识十三讲,汇编中数组的表现形式,以及还原数组
    逆向知识第十二讲,识别全局变量,静态全局变量,局部静态变量,以及变量.
    逆向知识十一讲,识别函数的调用约定,函数参数,函数返回值.
    常见注入手法第三讲,远程线程注入
    病毒分析第二讲,分析病毒的主要功能
    病毒分析第一讲,分析病毒注意事项,以及简单分析主要功能
    逆向知识第十讲,循环在汇编中的表现形式,以及代码还原
    逆向实战第一讲,寻找OllyDbg调试工具的Bug并修复
  • 原文地址:https://www.cnblogs.com/hrhguanli/p/4638363.html
Copyright © 2011-2022 走看看