zoukankan      html  css  js  c++  java
  • Mahout源码MeanShiftCanopyDriver分析之二MeanShiftCanopyMapper仿造

    首先更正一点,昨天处理数据的时候是有问题的,直接从网页中拷贝的文件的空格是有问题的,直接拷贝然后新建的文件中的空格可能有一个两个、三个的,所以要把两个或者三个的都换为一个,在InputMapper中下面的代码:

    private static final Pattern SPACE = Pattern.compile(" ");
    String[] numbers = SPACE.split(values.toString());

    可以看到这个代码是以一个空格来区分的,可以在linux的terminal中输入下面的命令来进行替换:

    Sed -I "s/   / /g" `grep    -l synthetic_control.data`   -- 替换三个空格为一个空格
    Sed -I "s/  / /g" `grep    -l synthetic_control.data`    -- 替换两个空格为一个空格

    通过上面的命令,然后在上传,使用昨天的命令进行meanshiftCanopyDriver的调用。
    不过补充一点,因为在InputMapper中对这个数据的处理还有下面的代码:

     for (String value : numbers) {
          if (!value.isEmpty()) {
            doubles.add(Double.valueOf(value));
          }
        }

    这个代码就表示如果是空字符串的话,就不进行添加,所以说输入数据和前面保持一致也是可以的,即昨天的数据和今天修改的数据其结果一样。
    MeansShiftCanopyDriver的run方法跳转如下:

    run(159行)-->buildClusters(282行)-->buildClustersMR(353行)-->runIterationMR(412行),这里说明几点:

    在159行开始run方法进入后,进行第一个判断inputIsCanopies,如下:

    if (inputIsCanopies) {
          clustersIn = input;
        } else {
          createCanopyFromVectors(conf, input, clustersIn, measure, runSequential);
        }

    因为在前面的测试中我们已经使用了InputDriver把输入数据转换为了canopy,所以这里直接进入了clustersIn=input,然后往下面走;

    在282行的buildClusters方法进入后因为是默认在Hadoop中跑的程序,所以是使用MR算法的,进入到else中,如下:

    if (runSequential) {
          return buildClustersSeq(clustersIn, output, measure, kernelProfile, t1,
              t2, convergenceDelta, maxIterations, runClustering);
        } else {
          return buildClustersMR(conf, clustersIn, output, measure, kernelProfile,
              t1, t2, convergenceDelta, maxIterations, runClustering);
        }

    在353行中的方法buildClustersMR进入后,即开始进行循环,混合的主体是412行的runIterationMR方法。本篇主要分析此Job的Mapper和Reducer类,

    这两个类分别是MeanShiftCanopyMapper、MeanShiftCanopyReducer。下面的代码是MeanShiftCanopyMapper的仿造代码,可以直接使用此代码进行调试,这样就可以看到MeanShiftCanopyMapper的数据逻辑流了,今晚又太晚了,明天还要早起。就下次再分析了,代码如下:

    package mahout.fansy.meanshift;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.mahout.clustering.iterator.ClusterWritable;
    import org.apache.mahout.clustering.meanshift.MeanShiftCanopy;
    import org.apache.mahout.clustering.meanshift.MeanShiftCanopyClusterer;
    import org.apache.mahout.clustering.meanshift.MeanShiftCanopyConfigKeys;
    import org.apache.mahout.common.iterator.sequencefile.PathFilters;
    import org.apache.mahout.common.iterator.sequencefile.PathType;
    import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
    
    import com.google.common.collect.Lists;
    
    public class MeanShiftCanopyMapperFollow {
    
    	/**
    	 * MeanShiftCanopyMapper仿造代码
    	 * @author fansy
    	 * @param args
    	 */
    	public static void main(String[] args) {
    		cleanup();// 调试cleanup函数
    	}
    	
    	/**
    	 * 仿造map操作
    	 */
    	public static Collection<MeanShiftCanopy> map(){
    		Collection<MeanShiftCanopy> canopies = Lists.newArrayList();
    		List<ClusterWritable> data=getMapData(); // 获取map的输入值
    		MeanShiftCanopyClusterer clusterer=setup();  // 获取setup函数中经过设置的值
    		for (ClusterWritable clusterWritable : data){  // 这里设置断点,查看程序初始数据
    			MeanShiftCanopy canopy = (MeanShiftCanopy)clusterWritable.getValue();
    		      clusterer.mergeCanopy(canopy.shallowCopy(), canopies);
    		}
    		return canopies;
    	}
    	/**
    	 * 仿造setup函数
    	 * @return 返回经过设置值的MeanShiftCanopyClusterer
    	 */
    	public static MeanShiftCanopyClusterer setup(){
    		String measureClassName="org.apache.mahout.common.distance.EuclideanDistanceMeasure";
    		String kernelProfileClassName="org.apache.mahout.common.kernel.TriangularKernelProfile";
    		double convergenceDelta=0.5;
    		double t1=47.6;
    		double t2=1;
    		boolean runClustering=true;
    		Configuration conf =new Configuration();
    		
    		conf.set(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY, measureClassName);
    	    conf.set(MeanShiftCanopyConfigKeys.KERNEL_PROFILE_KEY,
    	        kernelProfileClassName);
    	    
    		conf.set(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY, String
    	        .valueOf(convergenceDelta));
    	   
    		conf.set(MeanShiftCanopyConfigKeys.T1_KEY, String.valueOf(t1));
    	    conf.set(MeanShiftCanopyConfigKeys.T2_KEY, String.valueOf(t2));
    	
    	    conf.set(MeanShiftCanopyConfigKeys.CLUSTER_POINTS_KEY, String
    	        .valueOf(runClustering));
    		MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(conf);
    		
    		return clusterer;
    	}
    	/**
    	 * 仿造cleanup函数
    	 */
    	public static Map<Text,ClusterWritable> cleanup(){
    		int numReducers=1; // 自己设定,这里为了方便直接设置为1
    		Map<Text,ClusterWritable> map=new HashMap<Text,ClusterWritable>();
    		Collection<MeanShiftCanopy> canopies=map(); // 获得map的输出
    		MeanShiftCanopyClusterer clusterer =setup();// 获得setup的输出
    		int reducer = 0;
    	    for (MeanShiftCanopy canopy : canopies) {
    	      clusterer.shiftToMean(canopy);
    	      ClusterWritable clusterWritable = new ClusterWritable();
    	      clusterWritable.setValue(canopy);
    	      map.put(new Text(String.valueOf(reducer)), clusterWritable);
    	      reducer++;
    	      if (reducer >= numReducers) {
    	    	  reducer=0;
    	      }
    	    }
    	    return map;
    	}
    	/**
    	 * 获得map的输入数据,输入数据的value是ClusterWritable类型的
    	 * @return
    	 */
    	public static List<ClusterWritable> getMapData(){
    		Path input=new Path("hdfs://ubuntu:9000/user/test/input/real_input/part-m-00000"); //路径是经过InputDriver后的输出路径
    		Configuration conf=new Configuration();
    		conf.set("mapred.job.tracker", "ubuntu:9001");
    		List<ClusterWritable> clusters = new ArrayList<ClusterWritable>();
        	for (Writable value : new SequenceFileDirValueIterable<Writable>(input, PathType.LIST,
        	        PathFilters.partFilter(), conf)) {
        	      Class<? extends Writable> valueClass = value.getClass();
        	      if (valueClass.equals(ClusterWritable.class)) {
        	        ClusterWritable clusterWritable = (ClusterWritable) value;
        	        clusters.add( clusterWritable);
        	      } else {
        	        throw new IllegalStateException("can't read " + input);
        	      }
        	    }
        	return clusters;
    	}
    
    }
    


    今天培训还听讲师说不要抱怨,额,好吧,现在感觉天天都是1点半之后或者左右的时间睡觉了,严重感觉睡眠不足,哎,难道这就是程序员的名?今天讲师还说确定目标后有四个阶段:初始兴奋期、寂寞期、煎熬期、成功期,我现在还在哪个阶段熬着呀。额,好吧,慢慢来,坚持。。。


    分享,快乐,成长


    转载请注明出处:http://blog.csdn.net/fansy1990 




  • 相关阅读:
    团队绩效打分
    软件对标分析
    目前校园百晓生APP与CSDN软件的对比
    Alpha版
    团队项目第一阶段成果展示
    意见汇总
    团队第一阶段冲刺评价
    冲刺(十)
    【WPF学习】第五十八章 理解逻辑树和可视化树
    【WPF学习】第五十七章 使用代码创建故事板
  • 原文地址:https://www.cnblogs.com/pangblog/p/3278078.html
Copyright © 2011-2022 走看看