zoukankan      html  css  js  c++  java
  • Hadoop Netflix数据统计分析2(转)

    map阶段
    package com.taobao;
    import java.io.*;
    import java.util.*;
    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.util.StringUtils;

    public class MyMapper {   
          public static class MapClass extends MapReduceBase
            implements Mapper<LongWritable, Text, Text, Text> {
             
              Path[] localFiles = new Path[0];
              HashMap<String, String> dateMap = new HashMap<String, String>();
               
              public void configure(JobConf job) {
                   
                        if(job.getBoolean("HadoopDriver.distributedCacheFile", false)) {
                            try {
                                localFiles = DistributedCache.getLocalCacheFiles(job);
                            }
                            catch (IOException ioe) {
                                System.err.println("Caught exception while getting cached files " + StringUtils.stringifyException(ioe));
                            }
                            if(localFiles[0].toString() != null) {
                                try {
                                    BufferedReader reader = new BufferedReader(new FileReader(localFiles[0].toString()));
                                    String cachedLine = "";
                                    while ((cachedLine = reader.readLine()) != null) {
                                        StringTokenizer cachedIterator = new StringTokenizer(cachedLine, ",");
                                        String cachedMovieID = "";
                                        String productionDate = "";
                                        cachedMovieID = cachedIterator.nextToken();
                                        productionDate = cachedIterator.nextToken();
                                     
                                        dateMap.put(cachedMovieID, productionDate);
                                    }
                                } catch (IOException ioe) {
                                    System.err.println("Caught Exception while parsing the cached file " + StringUtils.stringifyException(ioe));
                                }
                            }
                        }
                    } 
           
            private Text word = new Text(); 
           
            public void map(LongWritable key, Text value,
                            OutputCollector<Text, Text> output,
                            Reporter reporter) throws IOException {
              String line = value.toString();
              StringTokenizer itr = new StringTokenizer(line, ",");
              String movieID = itr.nextToken();
              String userID = itr.nextToken();
              word.set(userID);
              String rating = itr.nextToken();
              String dateRated = itr.nextToken();
               dateRated = dateRated.replaceAll("-","");
             String productionDate = dateMap.get(movieID);
              try{
              int prodDate = Integer.parseInt(productionDate);
                 int ratedDate = Integer.parseInt(dateRated.substring(0,4));
                 int ratingDelay = ratedDate - prodDate;
                String outputStr = movieID;
                outputStr += "," + rating;
                outputStr += "," + ratingDelay;
                //output <userID movieID,rating,ratingDealy> to the reducer
                output.collect(word, new Text(outputStr));
              }
             //catch NumberFormatException and return void
             //this will skip collecting the output for movies with "NULL" production dates
              catch (NumberFormatException nfe) {
                  System.err.println("Caught NumberFormat Exception: " + StringUtils.stringifyException(nfe));
              }
            }
          }
    }
    reduce阶段
    package com.taobao;
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.StringTokenizer;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reducer;
    import org.apache.hadoop.mapred.Reporter;



    public class MyReducer{
      public static class Reduce extends MapReduceBase
        implements Reducer<Text, Text, Text, Text> {
         
        public void reduce(Text key, Iterator<Text> values,
                           OutputCollector<Text, Text> output,
                           Reporter reporter) throws IOException {
           

           
            Double ratingTotal = 0.0;
            int ratingCount = 0;
            String line;
            Double ratingDelay = 0.0;
         
            //iterate through each <K V> output from the mapper
            while(values.hasNext()) {
                //iterate through each value passed from the mapper
                line = values.next().toString();
                //tokenize input on ","
                StringTokenizer itr = new StringTokenizer(line, ",");
                //skip the movieID
                itr.nextToken();
                //add up the rating for each movie
                ratingTotal += Integer.parseInt(itr.nextToken().toString());
                //increment ratingCount
                ratingCount++;
                //add up the rating delay for each movie
                ratingDelay += Integer.parseInt(itr.nextToken().toString());
              }//end while loop
         
            //ratingAvg is computed by dividing the total of ratings by the number of ratings
            //stored in a Double to get rating to a decimal, netflix stores as an int
            Double ratingAvg = Double.valueOf(ratingTotal.toString());
            ratingAvg = ratingAvg/ratingCount;

            //compute ratingDelay across all movies rated by a user
            ratingDelay = ratingDelay / ratingCount;
           
            //string for value for output <KV> pairs
               String dateRange = "";
               dateRange += ratingCount;
               dateRange += "," + ratingAvg;
               dateRange += "," + ratingDelay;

               Text dateRangeText = new Text(dateRange);
               //output <userID ratingCount,ratingAverage,ratingDelay>
              output.collect(key, dateRangeText);
            }
        }
      }

    主程序
    package com.taobao;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import com.taobao.MyMapper;
    import com.taobao.MyReducer;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.apache.hadoop.filecache.DistributedCache;
    public class HadoopDriver extends Configured implements Tool {

         
         
          static int printUsage() {
            System.out.println("HadoopDriver [-m <maps>] [-r <reduces>] [- d <distributedCache>] <input> <output>");
            ToolRunner.printGenericCommandUsage(System.out);
            return -1;
          }
         
          public int run(String[] args) throws Exception {
            JobConf conf = new JobConf(getConf(), MyMapper.class);
            conf.setJobName("HadoopDriver");
         
            conf.setOutputKeyClass(Text.class);
            conf.setOutputValueClass(Text.class);
            conf.setMapperClass(MyMapper.MapClass.class);       
            conf.setReducerClass(MyReducer.Reduce.class);
           
            List<String> other_args = new ArrayList<String>();
            for(int i=0; i < args.length; ++i) {
              try {
                if ("-m".equals(args[i])) {
                  conf.setNumMapTasks(Integer.parseInt(args[++i]));
                } else if ("-r".equals(args[i])) {
                  conf.setNumReduceTasks(Integer.parseInt(args[++i]));
                } else if ("-d".equals(args[i])) {
                    DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
                    conf.setBoolean("HadoopDriver.distributedCacheFile", true);
                } else {
                  other_args.add(args[i]);
                }
              } catch (NumberFormatException except) {
                System.out.println("ERROR: Integer expected instead of " + args[i]);
                return printUsage();
              } catch (ArrayIndexOutOfBoundsException except) {
                System.out.println("ERROR: Required parameter missing from " +
                                   args[i-1]);
                return printUsage();
              }
            }
            // Make sure there are exactly 2 parameters left.
            if (other_args.size() != 2) {
              System.out.println("ERROR: Wrong number of parameters: " +
                                 other_args.size() + " instead of 2.");
              return printUsage();
            }
            FileInputFormat.setInputPaths(conf, other_args.get(0));
            FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
               
            JobClient.runJob(conf);
            return 0;
          }
         
         
          public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new HadoopDriver(), args);
            System.exit(res);
          }

        }






    $hadoop jar ./hadoopnetflix.jar com.taobao.HadoopDriver -d /group/tbdev/afan/netflix/movie_titles.txt /group/tbdev/afan/netflix/netflix.txt /group/tbdev/afan/netflix/output/
    11/04/26 09:44:53 INFO mapred.FileInputFormat: Total input paths to process : 1
    11/04/26 09:44:53 INFO mapred.JobClient: Running job: job_201104091858_654015
    11/04/26 09:44:54 INFO mapred.JobClient:  map 0% reduce 0%
    11/04/26 09:47:01 INFO mapred.JobClient:  map 2% reduce 0%
    11/04/26 09:47:03 INFO mapred.JobClient:  map 3% reduce 0%
    11/04/26 09:47:06 INFO mapred.JobClient:  map 5% reduce 0%
    11/04/26 09:47:07 INFO mapred.JobClient:  map 12% reduce 0%
    11/04/26 09:47:08 INFO mapred.JobClient:  map 13% reduce 0%
    11/04/26 09:47:11 INFO mapred.JobClient:  map 16% reduce 0%
    11/04/26 09:47:12 INFO mapred.JobClient:  map 22% reduce 0%
    11/04/26 09:47:13 INFO mapred.JobClient:  map 23% reduce 0%
    11/04/26 09:47:16 INFO mapred.JobClient:  map 32% reduce 0%
    11/04/26 09:47:20 INFO mapred.JobClient:  map 34% reduce 0%
    11/04/26 09:47:22 INFO mapred.JobClient:  map 39% reduce 0%
    11/04/26 09:47:23 INFO mapred.JobClient:  map 40% reduce 0%
    11/04/26 09:47:26 INFO mapred.JobClient:  map 43% reduce 0%
    11/04/26 09:47:27 INFO mapred.JobClient:  map 45% reduce 0%
    11/04/26 09:47:28 INFO mapred.JobClient:  map 46% reduce 0%
    11/04/26 09:47:31 INFO mapred.JobClient:  map 48% reduce 0%
    11/04/26 09:47:32 INFO mapred.JobClient:  map 53% reduce 0%
    11/04/26 09:47:33 INFO mapred.JobClient:  map 54% reduce 0%
    11/04/26 09:47:36 INFO mapred.JobClient:  map 57% reduce 0%
    11/04/26 09:47:38 INFO mapred.JobClient:  map 61% reduce 0%
    11/04/26 09:47:39 INFO mapred.JobClient:  map 62% reduce 0%
    11/04/26 09:47:41 INFO mapred.JobClient:  map 64% reduce 0%
    11/04/26 09:47:42 INFO mapred.JobClient:  map 68% reduce 0%
    11/04/26 09:47:43 INFO mapred.JobClient:  map 70% reduce 0%
    11/04/26 09:47:44 INFO mapred.JobClient:  map 71% reduce 0%
    11/04/26 09:47:46 INFO mapred.JobClient:  map 73% reduce 0%
    11/04/26 09:47:47 INFO mapred.JobClient:  map 75% reduce 0%
    11/04/26 09:47:48 INFO mapred.JobClient:  map 76% reduce 0%
    11/04/26 09:47:52 INFO mapred.JobClient:  map 80% reduce 0%
    11/04/26 09:47:53 INFO mapred.JobClient:  map 83% reduce 0%
    11/04/26 09:47:54 INFO mapred.JobClient:  map 84% reduce 0%
    11/04/26 09:47:57 INFO mapred.JobClient:  map 86% reduce 0%
    11/04/26 09:47:58 INFO mapred.JobClient:  map 88% reduce 0%
    11/04/26 09:47:59 INFO mapred.JobClient:  map 89% reduce 0%
    11/04/26 09:48:02 INFO mapred.JobClient:  map 91% reduce 0%
    11/04/26 09:48:03 INFO mapred.JobClient:  map 92% reduce 0%
    11/04/26 09:48:07 INFO mapred.JobClient:  map 94% reduce 0%
    11/04/26 09:48:08 INFO mapred.JobClient:  map 95% reduce 0%
    11/04/26 09:48:10 INFO mapred.JobClient:  map 96% reduce 0%
    11/04/26 09:48:11 INFO mapred.JobClient:  map 97% reduce 0%
    11/04/26 09:48:14 INFO mapred.JobClient:  map 98% reduce 0%
    11/04/26 09:48:16 INFO mapred.JobClient:  map 99% reduce 0%
    11/04/26 09:48:25 INFO mapred.JobClient:  map 100% reduce 0%
    11/04/26 09:50:19 INFO mapred.JobClient:  map 100% reduce 2%
    11/04/26 09:50:24 INFO mapred.JobClient:  map 100% reduce 4%
    11/04/26 09:51:15 INFO mapred.JobClient:  map 100% reduce 5%
    11/04/26 09:51:16 INFO mapred.JobClient:  map 100% reduce 6%
    11/04/26 09:51:18 INFO mapred.JobClient:  map 100% reduce 7%
    11/04/26 09:51:20 INFO mapred.JobClient:  map 100% reduce 13%
    11/04/26 09:51:21 INFO mapred.JobClient:  map 100% reduce 19%
    11/04/26 09:51:22 INFO mapred.JobClient:  map 100% reduce 22%
    11/04/26 09:51:23 INFO mapred.JobClient:  map 100% reduce 23%
    11/04/26 09:51:24 INFO mapred.JobClient:  map 100% reduce 29%
    11/04/26 09:51:25 INFO mapred.JobClient:  map 100% reduce 33%
    11/04/26 09:51:26 INFO mapred.JobClient:  map 100% reduce 36%
    11/04/26 09:51:28 INFO mapred.JobClient:  map 100% reduce 38%
    11/04/26 09:51:29 INFO mapred.JobClient:  map 100% reduce 40%
    11/04/26 09:51:30 INFO mapred.JobClient:  map 100% reduce 42%
    11/04/26 09:51:31 INFO mapred.JobClient:  map 100% reduce 44%
    11/04/26 09:51:34 INFO mapred.JobClient:  map 100% reduce 46%
    11/04/26 09:51:40 INFO mapred.JobClient:  map 100% reduce 56%
    11/04/26 09:51:42 INFO mapred.JobClient:  map 100% reduce 65%
    11/04/26 09:51:43 INFO mapred.JobClient:  map 100% reduce 69%
    11/04/26 09:51:45 INFO mapred.JobClient:  map 100% reduce 75%
    11/04/26 09:51:46 INFO mapred.JobClient:  map 100% reduce 81%
    11/04/26 09:51:47 INFO mapred.JobClient:  map 100% reduce 82%
    11/04/26 09:51:48 INFO mapred.JobClient:  map 100% reduce 85%
    11/04/26 09:51:49 INFO mapred.JobClient:  map 100% reduce 93%
    11/04/26 09:51:50 INFO mapred.JobClient:  map 100% reduce 96%
    11/04/26 09:51:51 INFO mapred.JobClient:  map 100% reduce 98%
    11/04/26 09:51:55 INFO mapred.JobClient:  map 100% reduce 99%
    11/04/26 09:51:56 INFO mapred.JobClient:  map 100% reduce 100%
    11/04/26 09:52:01 INFO mapred.JobClient: Job complete: job_201104091858_654015
    11/04/26 09:52:01 INFO mapred.JobClient: Counters: 17
    11/04/26 09:52:01 INFO mapred.JobClient:   File Systems
    11/04/26 09:52:01 INFO mapred.JobClient:     HDFS bytes read=2461706873
    11/04/26 09:52:01 INFO mapred.JobClient:     HDFS bytes written=21132919
    11/04/26 09:52:01 INFO mapred.JobClient:     Local bytes read=5409689109
    11/04/26 09:52:01 INFO mapred.JobClient:     Local bytes written=7008943577
    11/04/26 09:52:01 INFO mapred.JobClient:   Job Counters
    11/04/26 09:52:01 INFO mapred.JobClient:     Launched reduce tasks=355
    11/04/26 09:52:01 INFO mapred.JobClient:     Rack-local map tasks=5
    11/04/26 09:52:01 INFO mapred.JobClient:     Launched map tasks=10
    11/04/26 09:52:01 INFO mapred.JobClient:     Data-local map tasks=2
    11/04/26 09:52:01 INFO mapred.JobClient:   Map-Reduce Framework
    11/04/26 09:52:01 INFO mapred.JobClient:     Reduce input groups=480031
    11/04/26 09:52:01 INFO mapred.JobClient:     Combine output records=0
    11/04/26 09:52:01 INFO mapred.JobClient:     Map input records=94879665
    11/04/26 09:52:01 INFO mapred.JobClient:     Reduce output records=480031
    11/04/26 09:52:01 INFO mapred.JobClient:     Map output bytes=1642570896
    11/04/26 09:52:01 INFO mapred.JobClient:     Map input bytes=2461674096
    11/04/26 09:52:01 INFO mapred.JobClient:     Combine input records=0
    11/04/26 09:52:01 INFO mapred.JobClient:     Map output records=94878816
    11/04/26 09:52:01 INFO mapred.JobClient:     Reduce input records=94878816
    输出文件格式 <UserID totalNumRatings,avgRating,ratingDelay>
    $hadoop fs -text /group/tbdev/afan/netflix/output/part-00299 | more
    1000843 146,3.363013698630137,10.993150684931507
    1001491 225,3.6577777777777776,8.782222222222222
    1001653 433,3.531177829099307,15.249422632794458
    1001734 349,3.5644699140401146,10.071633237822349
    1001815 61,4.065573770491803,6.540983606557377
    1003930 37,4.081081081081081,6.162162162162162
    1006027 6,3.5,8.5
    1008061 17,4.117647058823529,19.058823529411764
    100818  43,4.162790697674419,21.046511627906977
    1009033 1028,3.6585603112840466,14.012645914396888
    1010149 97,2.7010309278350517,1.865979381443299
    1010725 902,3.712860310421286,7.368070953436807
    1011373 24,4.041666666666667,4.791666666666667
    1014703 329,3.7416413373860182,9.19756838905775
    1017052 589,3.2580645161290325,7.910016977928692
    1019410 237,3.3122362869198314,9.248945147679326
    102195  120,3.7583333333333333,17.566666666666666
    1022146 45,4.4222222222222225,6.355555555555555
    1022227 56,3.8035714285714284,15.339285714285714
    1023118 78,4.564102564102564,9.320512820512821
    1024342 64,3.734375,28.34375
    1025071 214,3.5654205607476634,6.0327102803738315
    1026043 5,4.2,5.2
    1027510 55,3.672727272727273,10.581818181818182
    102771  29,3.8620689655172415,19.586206896551722
  • 相关阅读:
    JAVA Oauth 认证服务器的搭建
    ibatis 中isNull, isNotNull与isEmpty, isNotEmpty区别
    Java OAuth开发包资料
    hOAuth2.0认证和授权原理
    Spring+Quartz实现定时任务的配置方法
    cron表达式详解(Spring定时任务配置时间间隔)
    spring定时任务的配置使用
    [spring-framework]Spring定时器的配置和使用
    net.sf.json在处理json对象转换为普通java实体对象时的问题和解决方案
    大数据和拉普拉斯妖
  • 原文地址:https://www.cnblogs.com/qq78292959/p/2076599.html
Copyright © 2011-2022 走看看