zoukankan      html  css  js  c++  java
  • mahout算法源码分析之Itembased Collaborative Filtering(二)RowSimilarityJob

    Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit。

    本篇开始之前先来验证前篇blog的分析结果,编写下面的测试文件来进行对上篇三个job的输出进行读取:

    package mahout.fansy.item;
    
    import java.io.IOException;
    import java.util.Map;
    
    import org.apache.hadoop.io.Writable;
    
    import mahout.fansy.utils.read.ReadArbiKV;
    import junit.framework.TestCase;
    
    public class ReadPreparePreferenceMatrixJob extends TestCase {
    	
    	// 测试 ITEMID_INDEX 输出:
    	public void testITEMID_INDEX() throws IOException{
    		String path="hdfs://ubuntu:9000/user/mahout/item/temp/preparePreferenceMatrix/itemIDIndex/part-r-00000";
    		Map<Writable,Writable> map= ReadArbiKV.readFromFile(path);
    		System.out.println("ITEMID_INDEX=================");
    		System.out.println(map);
    	}
    	
    	// 测试 userVectors 输出:
    		public void testUSER_VECTORS() throws IOException{
    			String path="hdfs://ubuntu:9000/user/mahout/item/temp/preparePreferenceMatrix/userVectors/part-r-00000";
    			Map<Writable,Writable> map= ReadArbiKV.readFromFile(path);
    			System.out.println("USER_VECTORS================");
    			System.out.println(map);
    		}
    		
    	// 测试 ratingMatrix 输出:
    			public void testRATING_MATRIX() throws IOException{
    				String path="hdfs://ubuntu:9000/user/mahout/item/temp/preparePreferenceMatrix/ratingMatrix/part-r-00000";
    				Map<Writable,Writable> map= ReadArbiKV.readFromFile(path);
    				System.out.println("USER_VECTORS================");
    				System.out.println(map);
    			}
    }
    

    运行的结果如下:

    ITEMID_INDEX=================
    {102=102, 103=103, 101=101, 106=106, 107=107, 104=104, 105=105}
    USER_VECTORS================
    {1={103:2.5,102:3.0,101:5.0}, 2={101:2.0,104:2.0,103:5.0,102:2.5}, 3={101:2.5,107:5.0,105:4.5,104:4.0}, 4={101:5.0,106:4.0,104:4.5,103:3.0}, 5={106:4.0,105:3.5,104:4.0,103:2.0,102:3.0,101:4.0}}
    RATING_MATRIX================
    {102={5:3.0,2:2.5,1:3.0}, 103={5:2.0,4:3.0,2:5.0,1:2.5}, 101={5:4.0,4:5.0,3:2.5,2:2.0,1:5.0}, 106={5:4.0,4:4.0}, 107={3:5.0}, 104={5:4.0,4:4.5,3:4.0,2:2.0}, 105={5:3.5,3:4.5}}
    

    由上面的结果可以看出确实和分析的一样。同时注意到hdfs://ubuntu:9000/user/mahout/item1/temp/preparePreferenceMatrix/numUsers.bin这个文件的大小是0k,同时里面看不到内容,所以对这个文件进行读取,编写下面的测试代码:

    package mahout.fansy.item;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.mahout.common.HadoopUtil;
    
    import junit.framework.TestCase;
    
    public class TestHadoopUtils extends TestCase {
    	
    	// 测试HadoopUtil的writeInt方法
    	public void testWriteInt() throws IOException{
    		String path="hdfs://ubuntu:9000/user/mahout/item1/temp/preparePreferenceMatrix/numUsers.bin";
    		int numberOfUsers=5;
    		Configuration conf=new Configuration();
    		conf.set("mapred.job.tracker", "ubuntu:9001");
    	//	HadoopUtil.writeInt(numberOfUsers, new Path(path), conf);
    		int number=HadoopUtil.readInt(new Path(path), conf);
    		System.out.println(number);
    	}
    }
    

    测试的结果是:5,这个是因为数据太小,然后HDFS就不显示了?

    接着源码进行分析:

    numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());

    源码首先把所有的用户数全部读取出来,然后进行下一个phase。进入下一个phase后,这里首先判断下user是否是-1,那么就是用UserVector的输出记录数作为user的值:

    if (numberOfUsers == -1) {
            numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
                    PathType.LIST, null, getConf());
          }

    然后是下一个RowSimilarityJob类了,其作用应该是:calculate the co-occurrence matrix,计算共生矩阵(啥叫共生矩阵?请google。。。我也不知道),其调用代码如下:

    ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
                  "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
                  "--output", similarityMatrixPath.toString(),
                  "--numberOfColumns", String.valueOf(numberOfUsers),
                  "--similarityClassname", similarityClassname,
                  "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
                  "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
                  "--threshold", String.valueOf(threshold),
                  "--tempDir", getTempPath().toString()});
        }

    打开RowSimilarityJob,可以看到这个类同样继承AbstractJob类,那么直接进入run方法吧,可以看到这个方法里面含有三个shouldRunPhase,每个含有一个prepareJob函数,即这个run里面也有三个Job,下面来一个个看。

    (1)// weightsPath

    Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,
              VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);

    输入就是前面的RATING_MATRIX的输出,格式为:<key,value>   :   itemID-->vector[userID:prefValue,userID:prefVlaue,...];

    (1.1)看mapper:

    (1.1.1)setup函数,这个函数就是做一些初始化工作,包括vector:norms; nonZeroEntries; maxValues;,以及变量similarity和threshold。

    (1.1.2)map函数,这个函数就是对vector和变量做各种操作:

    protected void map(IntWritable row, VectorWritable vectorWritable, Context ctx)
            throws IOException, InterruptedException {
    
          Vector rowVector = similarity.normalize(vectorWritable.get());
    
          int numNonZeroEntries = 0;
          double maxValue = Double.MIN_VALUE;
    
          Iterator<Vector.Element> nonZeroElements = rowVector.iterateNonZero();
          while (nonZeroElements.hasNext()) {
            Vector.Element element = nonZeroElements.next();
            RandomAccessSparseVector partialColumnVector = new RandomAccessSparseVector(Integer.MAX_VALUE);
            partialColumnVector.setQuick(row.get(), element.get());
            ctx.write(new IntWritable(element.index()), new VectorWritable(partialColumnVector));
    
            numNonZeroEntries++;
            if (maxValue < element.get()) {
              maxValue = element.get();
            }
          }
    
          if (threshold != NO_THRESHOLD) {
            nonZeroEntries.setQuick(row.get(), numNonZeroEntries);
            maxValues.setQuick(row.get(), maxValue);
          }
          norms.setQuick(row.get(), similarity.norm(rowVector));
    
          ctx.getCounter(Counters.ROWS).increment(1);
        }

    首先,similarity.normalize()函数其实返回的还是vector本身,并没有做其他操作,因为使用的EuclideanDistanceSimilarity,这个类的normalize函数直接返回;while循环里面含有写入文件的操作,这个写入的格式其实就是<key,value>  -->  <userID,[itemid:prefValue],同时比较同一个项目的各个用户的评分prefValue,求出最大值放入maxValue中。接下来就是threshold的判断了,由于在实战中是按照默认的值,所以这两个值是相同的,所以if里面的代码就不执行了。设置这个应该是和reducer中的操作相关的;然后就是norms设置了,其中similarity.norm(rowVector)是返回rowVector中项的平方和。最后ROWS计数器自增1。

    (1.1.3)cleanup函数,这个主要输出三行即可:

     protected void cleanup(Context ctx) throws IOException, InterruptedException {
          super.cleanup(ctx);
          // dirty trick
          ctx.write(new IntWritable(NORM_VECTOR_MARKER), new VectorWritable(norms));
          ctx.write(new IntWritable(NUM_NON_ZERO_ENTRIES_VECTOR_MARKER), new VectorWritable(nonZeroEntries));
          ctx.write(new IntWritable(MAXVALUE_VECTOR_MARKER), new VectorWritable(maxValues));
        }

    分别是:

    -2147483647 [0x80000001]  -->vector( maxValues)
    -2147483646 [0x80000002]  -->vector( nonzeroEntries)
    -2147483648 [0x80000000]  -->vector( norms)

    由于threshold和NO_THRESHOLD相等,所以if里面的代码没有执行,所以maxValues和nonzeroEntries中的应该是空。

    (1.2)看commbiner://MergeVectorsCombiner

    (1.2.1)就一个reduce函数:

    protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)
            throws IOException, InterruptedException {
          ctx.write(row, new VectorWritable(Vectors.merge(partialVectors)));
        }

    这个是把相同的key整合,输出为<key,value>  -->    <userid,vector[itemid:prefValue,itemid:prefValue,...]

    (1.3)reducer:

    (1.3.1)setup:初始化三个变量的路径:norms; nonZeroEntries; maxValues;

    (1.3.2)reduce:

    protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)
            throws IOException, InterruptedException {
          Vector partialVector = Vectors.merge(partialVectors);
    
          if (row.get() == NORM_VECTOR_MARKER) {
            Vectors.write(partialVector, normsPath, ctx.getConfiguration());
          } else if (row.get() == MAXVALUE_VECTOR_MARKER) {
            Vectors.write(partialVector, maxValuesPath, ctx.getConfiguration());
          } else if (row.get() == NUM_NON_ZERO_ENTRIES_VECTOR_MARKER) {
            Vectors.write(partialVector, numNonZeroEntriesPath, ctx.getConfiguration(), true);
          } else {
            ctx.write(row, new VectorWritable(partialVector));
          }
        }

    在reduce中先把还没有整合的vector[itemid:prefValue,itemid:prefValue,...]再整合一下,然后判断key是否是setup中的三个变量,是的话就把value值写入相应的文件,否则则直接输出即可,所以这个job一共产生了四个文件,一个是job的输出,其他三个就是三个变量文件的输出了。

    job的输出路径是:weightsPath,格式:<key,value>   -->       <userid,vector[itemid:prefValue,itemid:prefValue,...]。

    (2)//pairwiseSimilarityPath

    调用代码:

    Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,
              IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);

    可以看到它的输入就是上一个weightsPath的输出了,格式:<key,value>   -->       <userid,vector[itemid:prefValue,itemid:prefValue,...];
    (2.1)mapper: //CooccurrencesMapper

    (2.1.1)setup:初始化变量similarity;numNonZeroEntries;maxValues;threshold;,其中中间两个应该是空,最后一个应该是默认值,第一个是欧氏距离;

    (2.1.2)map:

    protected void map(IntWritable column, VectorWritable occurrenceVector, Context ctx)
            throws IOException, InterruptedException {
          Vector.Element[] occurrences = Vectors.toArray(occurrenceVector);
          Arrays.sort(occurrences, BY_INDEX);
    
          int cooccurrences = 0;
          int prunedCooccurrences = 0;
          for (int n = 0; n < occurrences.length; n++) {
            Vector.Element occurrenceA = occurrences[n];
            Vector dots = new RandomAccessSparseVector(Integer.MAX_VALUE);
            for (int m = n; m < occurrences.length; m++) {
              Vector.Element occurrenceB = occurrences[m];
              if (threshold == NO_THRESHOLD || consider(occurrenceA, occurrenceB)) {
                dots.setQuick(occurrenceB.index(), similarity.aggregate(occurrenceA.get(), occurrenceB.get()));
                cooccurrences++;
              } else {
                prunedCooccurrences++;
              }
            }
            ctx.write(new IntWritable(occurrenceA.index()), new VectorWritable(dots));
          }
          ctx.getCounter(Counters.COOCCURRENCES).increment(cooccurrences);
          ctx.getCounter(Counters.PRUNED_COOCCURRENCES).increment(prunedCooccurrences);
        }

    map首先把当前user的所有item以及评分对转换到一个数组中,然后根据item大小对数组进行排序;然后就是两层的for循环了,其中的if里面肯定是要执行dots. 。。。的,因为threshold和NO_THRESHOLD肯定是相等的。然后来说两层for循环是干什么的:针对用户2的所有评分项目排序后为[101:2.0,102:2.5,103:5.0,104:2.0],那么输出就是101-->[102:prefValue101*prefValue102,103:prefValue101*prefValue103,104:prefValue101*prefValue104],然后是102 --> [103:prefValue102*prefValue103,104:prefValue102*prefValue104],。。。以此类推。其中similarity.aggregate返回就是两个参数的乘积。

    输出格式为   <key,value>   -->  <itemid,vector[itemid:pref*,itemid,pref*,...]

    (2.2)combiner: //VectorSumReducer

     (2.2.1) reduce函数:

    protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context ctx)
        throws IOException, InterruptedException {
        Vector vector = null;
        for (VectorWritable v : values) {
          if (vector == null) {
            vector = v.get();
          } else {
            vector.assign(v.get(), Functions.PLUS);
          }
        }
        ctx.write(key, new VectorWritable(vector));
      }

    这个就是把map的输出整理一下,把相同key的vector中对应的项相加,比如 102 --> [103:1prefValue102*1prefValue103,104:1prefValue102*1prefValue104] ,102 --> [103:prefValue102*prefValue103,104:prefValue102*prefValue104,105:prefValue102*prefValue105],那么整合就是102 --> [103:prefValue102*prefValue103+1prefValue102*1prefValue103,104:prefValue102*prefValue104+1prefValue102*1prefValue104,105:prefValue102*prefValue105]。

    (2.3)reducer://SimilarityReducer

    (2.3.1)setup:初始化 变量similarity;numberOfColumns;excludeSelfSimilarity;norms;treshold;,其中的numberOfColumns就是numberUsers(实战中是5),而excludeSelfSimilarity在调用的时候被设置为True了。

    (2.3.2)reduce:

    protected void reduce(IntWritable row, Iterable<VectorWritable> partialDots, Context ctx)
            throws IOException, InterruptedException {
          Iterator<VectorWritable> partialDotsIterator = partialDots.iterator();
          Vector dots = partialDotsIterator.next().get();
          while (partialDotsIterator.hasNext()) {
            Vector toAdd = partialDotsIterator.next().get();
            Iterator<Vector.Element> nonZeroElements = toAdd.iterateNonZero();
            while (nonZeroElements.hasNext()) {
              Vector.Element nonZeroElement = nonZeroElements.next();
              dots.setQuick(nonZeroElement.index(), dots.getQuick(nonZeroElement.index()) + nonZeroElement.get());
            }
          }
    
          Vector similarities = dots.like();
          double normA = norms.getQuick(row.get());
          Iterator<Vector.Element> dotsWith = dots.iterateNonZero();
          while (dotsWith.hasNext()) {
            Vector.Element b = dotsWith.next();
            double similarityValue = similarity.similarity(b.get(), normA, norms.getQuick(b.index()), numberOfColumns);
            if (similarityValue >= treshold) {
              similarities.set(b.index(), similarityValue);
            }
          }
          if (excludeSelfSimilarity) {
            similarities.setQuick(row.get(), 0);
          }
          ctx.write(row, new VectorWritable(similarities));
        }

    首先dots就是获得某个item的第一个combiner的value输出,由于在实战中使用的数据比较少,所以前面只是用了一个reducer,一个combinner,这样在combiner中的value就只有一条记录,所以两层while循环其实是和combiner的功能一样,都是把vector中对应的项加起来;normA就是item的评分的平方和(norms向量可以参考前面);接着的while循环是求item和item的相似度(等下详细讲);然后excludeSelfSimilarity是true(即不对自身计算相似度),所以直接设置自身的相似度为0。比如针对这样的combiner输出<item105,vector[item106:p*105*106,item107:p*105*107]最终的结果输出应该是这样的<item105,vector[item105:0,item106:simi*105*106,item107:simi*105*107]>

    总结来说,这个输出路径是:pairwiseSimilarityPath,输出格式是<key,value> -->  <itemid,vector[itemid:simi,itemid:simi,...]>

    (3)

    Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,
              IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,
              VectorWritable.class);

    可以看到输入是上面(2)的输出,即pairwiseSimilarityPath,格式是<key,value> -->  <itemid,vector[itemid:simi,itemid:simi,...]>,输出是similarityMatrixPath.toString();

    (3.1)mapper://UnsymmetrifyMapper

    (3.1.1)setup:就设置maxSimilaritiesPerRow变量,这里知道这个参数是在哪里设置了,但是是做什么用的呢?看map函数吧

    (3.1.2)map:

    protected void map(IntWritable row, VectorWritable similaritiesWritable, Context ctx)
            throws IOException, InterruptedException {
          Vector similarities = similaritiesWritable.get();
          // For performance reasons moved transposedPartial creation out of the while loop and reusing the same vector
          Vector transposedPartial = similarities.like();
          TopK<Vector.Element> topKQueue = new TopK<Vector.Element>(maxSimilaritiesPerRow, Vectors.BY_VALUE);
          Iterator<Vector.Element> nonZeroElements = similarities.iterateNonZero();
          while (nonZeroElements.hasNext()) {
            Vector.Element nonZeroElement = nonZeroElements.next();
            topKQueue.offer(new Vectors.TemporaryElement(nonZeroElement));
            transposedPartial.setQuick(row.get(), nonZeroElement.get());
            ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial));
            transposedPartial.setQuick(row.get(), 0.0);
          }
          Vector topKSimilarities = similarities.like();
          for (Vector.Element topKSimilarity : topKQueue.retrieve()) {
            topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());
          }
          ctx.write(row, new VectorWritable(topKSimilarities));
        }

    首先看while循环,比如针对这样的输入<item105,vector[item105:0,item106:simi*105*106,item107:simi*105*107]>,ctx.write的内容就是<item106,vector[item105:simi*105*106]>,<item107,vector[item105:simi*105*107]>;后面的for循环和新定义的TopK的作用是把simi*105*106和simi*105*107做比较,然后按照一定的顺序进行排序输出(应该是大的拍前面,比如107的simi比较大,那么输出就应该是<item105,item107:simi*105*107,vector[item105:0,item106:simi*105*106]>);所以这个mapper是有两种类型的输出(这里的类型不是指key、value的类型)。

    (3.2)combiner://MergeToTopKSimilaritiesReducer

    (3.2.1)setup:和mapper的初始化一样;

    (3.2.2)reduce:

    protected void reduce(IntWritable row, Iterable<VectorWritable> partials, Context ctx)
            throws IOException, InterruptedException {
          Vector allSimilarities = Vectors.merge(partials);
          Vector topKSimilarities = Vectors.topKElements(maxSimilaritiesPerRow, allSimilarities);
          ctx.write(row, new VectorWritable(topKSimilarities));
        }

    这里的merge又把所有的项目整合了起来,就等于是和输入一样了(这里应该不是,所以这里还应该编写一个follow仿制代码测试一下),然后topKElements就是把所有的排序,然后输出,所以他的输出应该就是和(3.1.2)中说到的for和TopK共同的作用了吧。输出比如107的simi比较大,那么输出就应该是<item105,item107:simi*105*107,vector[item105:0,item106:simi*105*106]>;

    (3.3)reducer:// MergeToTopKSimilaritiesReducer,在combiner中提到的reduce中的merge函数如果又把所有的整合一下的话,那么就没有多大的意义了,这里没有多大的意义是指mapper没有做两种类型的输出了,只输出第二种类型即可。这个还有待验证。。。



    分享,成长,快乐

    转载请注明blog地址:http://blog.csdn.net/fansy1990



  • 相关阅读:
    php pdo备份还原数据库方法
    php抛出异常
    手机H5支持视频的比特率
    linux 安装imagick方法 php5.4以上都能用
    lnmp一键安装包 成功运行thinkphp的方法
    NAVICAT文件名目录或卷标语法不正确怎么办
    Linux下捕捉键盘事件
    linux 下shell脚本备份文件
    MQTT学习笔记
    windows生成dump文件
  • 原文地址:https://www.cnblogs.com/riskyer/p/3364681.html
Copyright © 2011-2022 走看看