zoukankan      html  css  js  c++  java
  • mahout算法源码分析之Itembased Collaborative Filtering(一)PreparePreferenceMatrixJob

    Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit。

    本篇分析RecommenderJob的源码,这个类也是继承了AbstractJob,所以也会覆写其run方法,点开这个run方法,可以看到和其他的job类都一样,刚开始都是基本参数的默认值设置和获取;然后到了第一个job,在这个job之前有一个shouldRunNextPhase()函数,点开这个函数看到下面的源码:

    protected static boolean shouldRunNextPhase(Map<String, List<String>> args, AtomicInteger currentPhase) {
        int phase = currentPhase.getAndIncrement();
        String startPhase = getOption(args, "--startPhase");
        String endPhase = getOption(args, "--endPhase");
        boolean phaseSkipped = (startPhase != null && phase < Integer.parseInt(startPhase))
            || (endPhase != null && phase > Integer.parseInt(endPhase));
        if (phaseSkipped) {
          log.info("Skipping phase {}", phase);
        }
        return !phaseSkipped;
      }

    其中phase是获取当前的phase值的,关于phase的相关概念可以参考: mahout中phase的含义,这里可以看到主要是根据phase和startPhase、endPhase的值做比较,然后返回true或者false,因为在实战中是按默认值的(startPhase和endPhase都没有设置),所以RecommenderJob中的这个函数都是返回true的。

    看第一个job的调用:

    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
          ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
                  "--input", getInputPath().toString(),
                  "--output", prepPath.toString(),
                  "--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),
                  "--minPrefsPerUser", String.valueOf(minPrefsPerUser),
                  "--booleanData", String.valueOf(booleanData),
                  "--tempDir", getTempPath().toString()});
    
          numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());
        }

    这里看到调用的job主类是PreparePreferenceMatrixJob,然后这个job的输入参数有输入、出、maxPrefsPerUser、minPrefsPerUser、booleanData、tempDir。那么就打开主类PreparePreferenceMatrixJob,来看看。这个PreparePreferenceMatrixJob同样实现了AbstractJob类,那么直接看run方法吧。在run中的参数设置里有一个ratingShift,这个在调用的时候没有使用,所以按照默认,设置为0.0。大致浏览一下发现一共有三个prepareJob,所以这个主类会产生3个job。下面来一个个来看:

    (1)//convert items to an internal index

    Job itemIDIndex = prepareJob(getInputPath(), getOutputPath(ITEMID_INDEX), TextInputFormat.class,
                ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class, ItemIDIndexReducer.class,
                VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class);

    输入格式:userid,itemid,value

    先看mapper:

    protected void map(LongWritable key,
                         Text value,
                         Context context) throws IOException, InterruptedException {
        String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());
        long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);
        int index = TasteHadoopUtils.idToIndex(itemID);
        context.write(new VarIntWritable(index), new VarLongWritable(itemID));
      }

    在map中,首先获得itemID,在tokens中tokens[1]即是itemID了,至于当transpose为true的时候就要选择tokens[0]作为itemID这个应该是其他的应用吧,由于在调用的时候没有设置这个参数,所以这里按照默认值为false,所以选择tokens[1]作为itemID。然后看到index和itemID的转换使用的是TasteHadoopUtils.idToIndex()函数,看到这个函数返回的是return 0x7FFFFFFF & Longs.hashCode(id);所以当这个数在int可以表示的数范围内(小于2147483647)时候就会返回这个数本身了,比如实战中的项目101,返回的index也是101。

    再看reducer:

    protected void reduce(VarIntWritable index,
                            Iterable<VarLongWritable> possibleItemIDs,
                            Context context) throws IOException, InterruptedException {
        long minimumItemID = Long.MAX_VALUE;
        for (VarLongWritable varLongWritable : possibleItemIDs) {
          long itemID = varLongWritable.get();
          if (itemID < minimumItemID) {
            minimumItemID = itemID;
          }
        }
        if (minimumItemID != Long.MAX_VALUE) {
          context.write(index, new VarLongWritable(minimumItemID));
        }
      }

    总感觉这里没啥必要,reducer返回的还是101-->101,或者这里应该有什么说法的?

    输出文件是ITEMID_INDEX,输出格式<key,value>   :   VarintWritable-->VarLongWritable  

    所以这个job就分析完了。

    (2)//convert user preferences into a vector per user

    Job toUserVectors = prepareJob(getInputPath(), getOutputPath(USER_VECTORS), TextInputFormat.class,
                ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,
                ToUserVectorsReducer.class, VarLongWritable.class, VectorWritable.class, SequenceFileOutputFormat.class);

    输入格式:userid,itemid,value

    看mapper:(ToItemPrefsMapper继承ToEntityPrefsMapper,而ToItemPrefsMapper是空的,所以看ToEntityPrefsMapper)

    public void map(LongWritable key,
                      Text value,
                      Context context) throws IOException, InterruptedException {
        String[] tokens = DELIMITER.split(value.toString());
        long userID = Long.parseLong(tokens[0]);
        long itemID = Long.parseLong(tokens[1]);
        if (itemKey ^ transpose) {
          // If using items as keys, and not transposing items and users, then users are items!
          // Or if not using items as keys (users are, as usual), but transposing items and users,
          // then users are items! Confused?
          long temp = userID;
          userID = itemID;
          itemID = temp;
        }
        if (booleanData) {
          context.write(new VarLongWritable(userID), new VarLongWritable(itemID));
        } else {
          float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) + ratingShift : 1.0f;
          context.write(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
        }
      }

    这么些代码,最主要的就是最后两句了,一句是求评分值,但是这里的加上ratingShift不知道是干啥的?虽然ratingShift是0.0。最后输出就是userID-->[itemID,prefValue]

    再看reducer:

    protected void reduce(VarLongWritable userID,
                            Iterable<VarLongWritable> itemPrefs,
                            Context context) throws IOException, InterruptedException {
        Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
        for (VarLongWritable itemPref : itemPrefs) {
          int index = TasteHadoopUtils.idToIndex(itemPref.get());
          float value = itemPref instanceof EntityPrefWritable ? ((EntityPrefWritable) itemPref).getPrefValue() : 1.0f;
          userVector.set(index, value);
        }
    
        if (userVector.getNumNondefaultElements() >= minPreferences) {
          VectorWritable vw = new VectorWritable(userVector);
          vw.setWritesLaxPrecision(true);
          context.getCounter(Counters.USERS).increment(1);
          context.write(userID, vw);
        }
      }

    首先说下为啥mapper输出的value是EntityPrefWritable,但是这里的Iterable接收的时候使用的是VarLongWritable,因为前者继承后者。然后就是用户所有的评分都写入一个vecotr,使用itemid作为vector的下标,prefValue作为值;最后判断一下,如果vector含有的item个数大于或等于minPreference(这里看出这个参数的意义了吧)就输出,否则不输出。另外,就是设置了一个Counters.USERS计数器,用来统计用户的个数。

    这个job的输出为:USER_VECTORS,格式为:<key,value>   :   userid-->vector[itemid:prefValue,itemid:prefValue,...]

    随后代码获得了用户的个数:

    int numberOfUsers = (int) toUserVectors.getCounters().findCounter(ToUserVectorsReducer.Counters.USERS).getValue();
        HadoopUtil.writeInt(numberOfUsers, getOutputPath(NUM_USERS), getConf());

    (3)//build the rating matrix

    Job toItemVectors = prepareJob(getOutputPath(USER_VECTORS), getOutputPath(RATING_MATRIX),
                ToItemVectorsMapper.class, IntWritable.class, VectorWritable.class, ToItemVectorsReducer.class,
                IntWritable.class, VectorWritable.class);

    输入是第二个job的输出,格式为:<key,value>   :  userid-->vector[itemid:prefValue,itemid:prefValue,...]

    先看mapper:

    protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable, Context ctx)
          throws IOException, InterruptedException {
        Vector userRatings = vectorWritable.get();
    
        int numElementsBeforeSampling = userRatings.getNumNondefaultElements();
        userRatings = Vectors.maybeSample(userRatings, sampleSize);
        int numElementsAfterSampling = userRatings.getNumNondefaultElements();
    
        int column = TasteHadoopUtils.idToIndex(rowIndex.get());
        VectorWritable itemVector = new VectorWritable(new RandomAccessSparseVector(Integer.MAX_VALUE, 1));
        itemVector.setWritesLaxPrecision(true);
    
        Iterator<Vector.Element> iterator = userRatings.iterateNonZero();
        while (iterator.hasNext()) {
          Vector.Element elem = iterator.next();
          itemVector.get().setQuick(column, elem.get());
          ctx.write(new IntWritable(elem.index()), itemVector);
        }
    
        ctx.getCounter(Elements.USER_RATINGS_USED).increment(numElementsAfterSampling);
        ctx.getCounter(Elements.USER_RATINGS_NEGLECTED).increment(numElementsBeforeSampling - numElementsAfterSampling);
      }

    其中的userRatings = Vectors.maybeSample(userRatings, sampleSize);函数,由于sampleSize没有设置,所以取到的数是Integer的最大值,那么maybeSample就会返回原始值,vector中的非默认项的个数肯定是小于Integer的最大值的:

    public static Vector maybeSample(Vector original, int sampleSize) {
        if (original.getNumNondefaultElements() <= sampleSize) {
          return original;
        }
        Vector sample = original.like();
        Iterator<Vector.Element> sampledElements =
            new FixedSizeSamplingIterator<Vector.Element>(sampleSize, original.iterateNonZero());
        while (sampledElements.hasNext()) {
          Vector.Element elem = sampledElements.next();
          sample.setQuick(elem.index(), elem.get());
        }
        return sample;
      }

    map函数中column就是userid,然后输出是elem.index()就是itemID,而itemVector.get().setQuick(column, elem.get())其实就是设置itemVecotor为[userID:prefValue]的格式,这样的话mapper输出就是 itemID-->vector[userID:prefValue];同时还有两个计数器,因为numElementsBeforeSampling - numElementsAfterSampling=0,所以计数器Elements.USER_RATINGS_NEGLECTED就一直是零。

    再看reducer:

      

    protected void reduce(IntWritable row, Iterable<VectorWritable> vectors, Context ctx)
          throws IOException, InterruptedException {
        VectorWritable vectorWritable = VectorWritable.merge(vectors.iterator());
        vectorWritable.setWritesLaxPrecision(true);
        ctx.write(row, vectorWritable);
      }

    merge函数就是把mapper的输出变换成下面的形式:itemID-->vector[userID:prefValue,userID:prefVlaue,...];

    所以这个job的输出是:RATING_MATRIX,格式为:<key,value>   :   itemID-->vector[userID:prefValue,userID:prefVlaue,...];

    额 ,好吧,那个sampleSize是有值的,而非默认的Integer的最大值:

    if (hasOption("maxPrefsPerUser")) {
          int samplingSize = Integer.parseInt(getOption("maxPrefsPerUser"));
          toItemVectors.getConfiguration().setInt(ToItemVectorsMapper.SAMPLE_SIZE, samplingSize);
        }

    这个值也是可以设置的,所以现在你知道maxPrefsPerUser的值的用处了。但是这个值的默认是100,实战总的item才7,所以numElementsBeforeSampling - numElementsAfterSampling=0不变。

    好了,这个job也分析完了。


    分享,成长,快乐

    转载请注明blog地址:http://blog.csdn.net/fansy1990



  • 相关阅读:
    关于pandas里面的合并
    关于动态规划的一丢丢研究
    结巴分词详细讲解
    k折交叉验证
    Boosting和Bagging的异同
    批量归一化的原理
    深度学习模型参数初始化的方法
    NLP 装桶(Bucketing)和填充(padding)
    facebook 摘要生成阅读笔记(二) Abstractive Sentence Summarization with Attentive Recurrent Neural Networks
    facebook 摘要生成阅读笔记(一) A Neural Attention Model for Sentence Summarization
  • 原文地址:https://www.cnblogs.com/phisy/p/3363317.html
Copyright © 2011-2022 走看看