zoukankan      html  css  js  c++  java
  • 学习Mahout(三)

    开发+运行第一个Mahout的程序

    代码:

    /**
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package chen.test.kmeans;
    
    import java.util.List;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.util.ToolRunner;
    import org.apache.mahout.clustering.Cluster;
    import org.apache.mahout.clustering.canopy.CanopyDriver;
    import org.apache.mahout.clustering.conversion.InputDriver;
    import org.apache.mahout.clustering.kmeans.KMeansDriver;
    import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
    import org.apache.mahout.common.AbstractJob;
    import org.apache.mahout.common.ClassUtils;
    import org.apache.mahout.common.HadoopUtil;
    import org.apache.mahout.common.commandline.DefaultOptionCreator;
    import org.apache.mahout.common.distance.DistanceMeasure;
    import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
    import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
    import org.apache.mahout.utils.clustering.ClusterDumper;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public final class TwoJob extends AbstractJob {
      
      private static final Logger log = LoggerFactory.getLogger(TwoJob.class);
      
      private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
      
      private TwoJob() {
      }
      
      public static void main(String[] args) throws Exception {
        if (args.length > 0) {
          log.info("Running with only user-supplied arguments");
          ToolRunner.run(new Configuration(), new TwoJob(), args);
        } else {
          log.info("Running with default arguments");
          Path output = new Path("output");
          Configuration conf = new Configuration();
          HadoopUtil.delete(conf, output);
          run(conf, new Path("testdata"), output, new EuclideanDistanceMeasure(), 2, 0.5, 10);
        }
      }
      
      @Override
      public int run(String[] args) throws Exception {
        addInputOption();
        addOutputOption();
        addOption(DefaultOptionCreator.distanceMeasureOption().create());
        addOption(DefaultOptionCreator.numClustersOption().create());
        addOption(DefaultOptionCreator.t1Option().create());
        addOption(DefaultOptionCreator.t2Option().create());
        addOption(DefaultOptionCreator.convergenceOption().create());
        addOption(DefaultOptionCreator.maxIterationsOption().create());
        addOption(DefaultOptionCreator.overwriteOption().create());
        
        Map<String,List<String>> argMap = parseArguments(args);
        if (argMap == null) {
          return -1;
        }
        
        Path input = getInputPath();
        Path output = getOutputPath();
        String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
        if (measureClass == null) {
          measureClass = SquaredEuclideanDistanceMeasure.class.getName();
        }
        double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
        int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
        if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
          HadoopUtil.delete(getConf(), output);
        }
        DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
        if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
          int k = Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));
          run(getConf(), input, output, measure, k, convergenceDelta, maxIterations);
        } else {
          double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
          double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
          run(getConf(), input, output, measure, t1, t2, convergenceDelta, maxIterations);
        }
        return 0;
      }
      
      /**
       * Run the kmeans clustering job on an input dataset using the given the number of clusters k and iteration
       * parameters. All output data will be written to the output directory, which will be initially deleted if it exists.
       * The clustered points will reside in the path <output>/clustered-points. By default, the job expects a file
       * containing equal length space delimited data that resides in a directory named "testdata", and writes output to a
       * directory named "output".
       * 
       * @param conf
       *          the Configuration to use
       * @param input
       *          the String denoting the input directory path
       * @param output
       *          the String denoting the output directory path
       * @param measure
       *          the DistanceMeasure to use
       * @param k
       *          the number of clusters in Kmeans
       * @param convergenceDelta
       *          the double convergence criteria for iterations
       * @param maxIterations
       *          the int maximum number of iterations
       */
      public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, int k,
          double convergenceDelta, int maxIterations) throws Exception {
        Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
        log.info("Preparing Input");
        InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
        log.info("Running random seed to get initial clusters");
        Path clusters = new Path(output, "random-seeds");
        clusters = RandomSeedGenerator.buildRandom(conf, directoryContainingConvertedInput, clusters, k, measure);
        log.info("Running KMeans with k = {}", k);
        KMeansDriver.run(conf, directoryContainingConvertedInput, clusters, output, convergenceDelta,
            maxIterations, true, 0.0, false);
        // run ClusterDumper
        Path outGlob = new Path(output, "clusters-*-final");
        Path clusteredPoints = new Path(output,"clusteredPoints");
        log.info("Dumping out clusters from clusters: {} and clusteredPoints: {}", outGlob, clusteredPoints);
        ClusterDumper clusterDumper = new ClusterDumper(outGlob, clusteredPoints);
        
        //print the result
        clusterDumper.printClusters(null);
        
        
      }
      
      /**
       * Run the kmeans clustering job on an input dataset using the given distance measure, t1, t2 and iteration
       * parameters. All output data will be written to the output directory, which will be initially deleted if it exists.
       * The clustered points will reside in the path <output>/clustered-points. By default, the job expects the a file
       * containing synthetic_control.data as obtained from
       * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a directory named "testdata",
       * and writes output to a directory named "output".
       * 
       * @param conf
       *          the Configuration to use
       * @param input
       *          the String denoting the input directory path
       * @param output
       *          the String denoting the output directory path
       * @param measure
       *          the DistanceMeasure to use
       * @param t1
       *          the canopy T1 threshold
       * @param t2
       *          the canopy T2 threshold
       * @param convergenceDelta
       *          the double convergence criteria for iterations
       * @param maxIterations
       *          the int maximum number of iterations
       */
      public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2,
          double convergenceDelta, int maxIterations) throws Exception {
        Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
        log.info("Preparing Input");
        InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
        log.info("Running Canopy to get initial clusters");
        Path canopyOutput = new Path(output, "canopies");
        CanopyDriver.run(new Configuration(), directoryContainingConvertedInput, canopyOutput, measure, t1, t2, false, 0.0,
            false);
        log.info("Running KMeans");
        KMeansDriver.run(conf, directoryContainingConvertedInput, new Path(canopyOutput, Cluster.INITIAL_CLUSTERS_DIR
            + "-final"), output, convergenceDelta, maxIterations, true, 0.0, false);
        // run ClusterDumper
        ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output,
            "clusteredPoints"));
        clusterDumper.printClusters(null);
      }
    }

    上面的代码就是上一篇的example 例子,使用kmeans 实现聚集。

    build.xml代码

    <project name="mahout_test" default="jar">
    
       <property name="root.dir" value="." />
       <property name="src.dir" value="${root.dir}/src" />
       <property name="lib.dir" value="${root.dir}/lib" />
       <property name="build.dir" value="${root.dir}/build" />
       
       <target name="clean" depends="">
          <echo>root = ${root.dir}</echo>
          <delete dir="${build.dir}" />
          
          <mkdir dir="${build.dir}" />
    
       </target>
       
       <target name="build" depends="clean">
          <javac fork="true" debug="true" srcdir="${src.dir}" destdir="${build.dir}">
             <classpath>
                <fileset dir="${lib.dir}" includes="*.jar" />
             </classpath>
          </javac>
          
       </target>
       
       <target name="jar" depends="build">
             <mkdir dir="${build.dir}/lib" />
           <!--
             <copy file="${lib.dir}/mahout-core-0.9.jar" todir="${build.dir}/lib" />
             <copy file="${lib.dir}/mahout-integration-0.9.jar" todir="${build.dir}/lib" />
          <copy file="${lib.dir}/hadoop-core-1.2.1.jar" todir="${build.dir}/lib" />
           -->
           
           <copy file="${lib.dir}/mahout-examples-0.9-job.jar" todir="${build.dir}/lib" />
           <!--
           <copy file="${lib.dir}/mahout-integration-0.9.jar" todir="${build.dir}/lib" />
           -->
             <jar destfile="${root.dir}/mahout_test.jar" basedir="${build.dir}" >
             <manifest>
                <!--
                 <attribute name="Main-Class" value="chen/test/Job" />
                 -->
             </manifest>
             </jar>
       </target>
       
    </project>

    编译命令:

    ant -f build.xml

    编译后,它会在${root.dir}下生成一个 mahout_test.jar 的文件。

    编译程序依赖的jar包:mahout-core-0.9-job.jar、mahout-examples-0.9-job.jar、hadoop-core-1.2.1.jar

    其中mahout-core-0.9.jar 包只是使用了org.slf4j.Logger、org.slf4j.LoggerFactory 类

    你也可以依赖 hadoop lib 的 slf4j-api-1.4.3.jar 包来替换 mahout-core-0.9-job.jar 包。

    制作Mahout 程序的关键在与在生成 jar 包时,要包含mahout-examples-0.9-job.jar 包。否则hadoop jar **.jar 运行是会出错。

    <copy file="${lib.dir}/mahout-examples-0.9-job.jar" todir="${build.dir}/lib" />

    mahout-examples-0.9-job.jar 包里面的类和 mahout-core-0.9-job.jar 包的类有很多是重叠的,这个实在太坑了。如果同时加载两个jar 包,它就报错,说找不到相应的类。

    我被这个问题困扰了很久。

    而且编译时,不要指定Main Class ,否则也会出错,原因我也没有细究,知道的同学可以留言。

    运行命令:

    bin/hadoop jar /mnt/hgfs/mnt/chenfool/mahout_test.jar  chen.test.kmeans.TwoJob

    运行的环境和上一篇的要求相似,也需要再 HDFS 的 /user/${user}/testdata 目录下存在向量文件。

  • 相关阅读:
    简单明了的带你理解springboot原理和三大核心注解
    Spring Boot(一):入门篇
    【Mysql优化】聚簇索引与非聚簇索引概念
    Mysql索引原理与优化
    Mysql全文索引的使用
    索引的优缺点,如何创建索引
    184 01 Android 零基础入门 03 Java常用工具类03 Java字符串 02 String类 04 例:字符串与byte(即:字节)数组间的相互转换
    183 01 Android 零基础入门 03 Java常用工具类03 Java字符串 02 String类 03 String常用方法(下)
    182 01 Android 零基础入门 03 Java常用工具类03 Java字符串 02 String类 02 String常用方法(上)
    181 01 Android 零基础入门 03 Java常用工具类03 Java字符串 02 String类 01 String常用方法简介
  • 原文地址:https://www.cnblogs.com/chenfool/p/3806601.html
Copyright © 2011-2022 走看看