zoukankan      html  css  js  c++  java
  • Hadoop Netflix数据统计分析1(转)

    image

    image

    1map阶段

    输入:MovieID,UserID,Rating,Date

    输出:<MovieID Rating,Date>

    import java.io.*;

    import java.util.*;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapred.*;

    public class MyMapper {

    public static class MapClass extends MapReduceBase

    implements Mapper<LongWritable, Text, Text, Text> {

    private Text word = new Text();

    public void map(LongWritable key, Text value,

    OutputCollector<Text, Text> output,

    Reporter reporter) throws IOException {

    //将Text value 转化为string

    String line = value.toString();

    //每行的电影评分数据 "movieID,userID,rating,date"

    //字段之间用 ","分隔

    StringTokenizer itr = new StringTokenizer(line, ",");

    String name = itr.nextToken();

    //设置 movieID作为 Key

    word.set(name);

    // ratingAndDate 保存每部电影的 rating and date

    String ratingAndDate = "";

    //跳过 userID

    itr.nextToken();

    ratingAndDate = itr.nextToken();

    ratingAndDate += "," + itr.nextToken();

    //输出 <movieID rating,date>到reducer

    output.collect(word, new Text(ratingAndDate));

    }

    }

    }

    2reduce阶段

    import java.io.IOException;

    import java.io.BufferedReader;

    import java.io.FileReader;

    import java.util.HashMap;

    import java.util.Iterator;

    import java.util.StringTokenizer;

    import org.apache.hadoop.filecache.DistributedCache;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapred.MapReduceBase;

    import org.apache.hadoop.mapred.OutputCollector;

    import org.apache.hadoop.mapred.Reducer;

    import org.apache.hadoop.mapred.Reporter;

    import org.apache.hadoop.mapred.JobConf;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.util.StringUtils;

    //Reducer格式

    //<movieID //firstDateRated,lastDateRated,productionDate,numberOfRatings,averageRating,movieTitle>

    public class MyReducer{

    public static class Reduce extends MapReduceBase

    implements Reducer<Text, Text, Text, Text> {

    // Distributed Cache分布式缓存中文件路径

    Path[] localFiles = new Path[0];

    //HashMap movieTitles 保存 movie_titles.txt中电影信息

    HashMap<String, String> movieTitles = new HashMap<String, String>();

    public void configure(JobConf job) {

    if(job.getBoolean("netflixDriver.distributedCacheFile", false)) {

    //获取分布式缓存文件的路径

    try {

    localFiles = DistributedCache.getLocalCacheFiles(job);

    }

    catch (IOException ioe) {

    System.err.println("Caught exception while getting cached files " + StringUtils.stringifyException(ioe));

    }

    //如果分布式缓存中已有文件

    if(localFiles[0].toString() != null) {

    try {

    // movie_titles.txt作为分布式缓存中文件

    BufferedReader reader = new BufferedReader(new FileReader(localFiles[0].toString()));

    //保存缓存文件中的行

    String cachedLine = "";

    while ((cachedLine = reader.readLine()) != null) {

    StringTokenizer cachedIterator = new StringTokenizer(cachedLine, ",");

    //获取movie_id

    String movieID = cachedIterator.nextToken();

    //获取该行剩下的内容

    String dateAndTitle = cachedIterator.nextToken();

    while(cachedIterator.hasMoreTokens())

    {

    dateAndTitle += "," + cachedIterator.nextToken();

    }

    movieTitles.put(movieID, dateAndTitle);

    }

    } catch (IOException ioe) {

    System.err.println("Caught Exception while parsing the cached file " + StringUtils.stringifyException(ioe));

    }

    }

    }

    }

    public void reduce(Text key, Iterator<Text> values,

    OutputCollector<Text, Text> output,

    Reporter reporter) throws IOException {

    int firstDate = 0;

    int lastDate = 0;

    double rating = 0.0;

    int ratingCount = 0;

    String line;

    String dateStr = "";

    while(values.hasNext()) {

    line = values.next().toString();

    StringTokenizer itr = new StringTokenizer(line, ",");

    rating += Integer.parseInt(itr.nextToken());

    dateStr = itr.nextToken();

    dateStr = dateStr.replaceAll("-","");

    if(firstDate == 0) {

    firstDate = Integer.parseInt(dateStr);

    lastDate = firstDate;

    ratingCount++;

    }

    if(Integer.parseInt(dateStr) > lastDate) {

    lastDate = Integer.parseInt(dateStr);

    }

    if(Integer.parseInt(dateStr) < firstDate) {

    firstDate = Integer.parseInt(dateStr);

    }

    ratingCount++;

    }

    String movieInfo = movieTitles.get(key.toString());

    StringTokenizer tokenizer = new StringTokenizer(movieInfo, ",");

    String prodDate = tokenizer.nextToken();

    String movieTitle = tokenizer.nextToken();

    while(tokenizer.hasMoreTokens())

    {

    movieTitle += "," + tokenizer.nextToken();

    }

    //计算每部电影的平均评分

    rating = rating/ratingCount;

    String dateRange = Integer.toString(firstDate) + "," + Integer.toString(lastDate);

    dateRange += "," + prodDate;

    dateRange += "," + ratingCount;

    dateRange += "," + rating;

    dateRange += "," + movieTitle;

    Text dateRangeText = new Text(dateRange);

    //输出<movieID firstDateRated,lastDateRated,productionDate,numberOfRatings,averageRating,movieTitle>

    output.collect(key, dateRangeText);

    }

    }

    }

    3主程序

    import java.io.IOException;

    import java.util.ArrayList;

    import java.util.List;

    import com.taobao.MyMapper;

    import com.taobao.MyReducer;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapred.FileInputFormat;

    import org.apache.hadoop.mapred.FileOutputFormat;

    import org.apache.hadoop.mapred.JobClient;

    import org.apache.hadoop.mapred.JobConf;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    import org.apache.hadoop.filecache.DistributedCache;

    public class netflixDriver extends Configured implements Tool {

    static int printUsage() {

    System.out.println("netflixDriver [-m <maps>] [-r <reduces>] <input> <output>");

    ToolRunner.printGenericCommandUsage(System.out);

    return -1;

    }

    public int run(String[] args) throws Exception {

    JobConf conf = new JobConf(getConf(), MyMapper.class);

    conf.setJobName("netflixDriver");

    conf.setOutputKeyClass(Text.class);

    conf.setOutputValueClass(Text.class);

    conf.setMapperClass(MyMapper.MapClass.class);

    conf.setReducerClass(MyReducer.Reduce.class);

    List<String> other_args = new ArrayList<String>();

    for(int i=0; i < args.length; ++i) {

    try {

    if ("-m".equals(args[i])) {

    conf.setNumMapTasks(Integer.parseInt(args[++i]));

    } else if ("-r".equals(args[i])) {

    conf.setNumReduceTasks(Integer.parseInt(args[++i]));

    } else if ("-d".equals(args[i])) {

    DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);

    conf.setBoolean("netflixDriver.distributedCacheFile", true);

    } else {

    other_args.add(args[i]);

    }

    } catch (NumberFormatException except) {

    System.out.println("ERROR: Integer expected instead of " + args[i]);

    return printUsage();

    } catch (ArrayIndexOutOfBoundsException except) {

    System.out.println("ERROR: Required parameter missing from " +

    args[i-1]);

    return printUsage();

    }

    }

    if (other_args.size() != 2) {

    System.out.println("ERROR: Wrong number of parameters: " +

    other_args.size() + " instead of 2.");

    return printUsage();

    }

    FileInputFormat.setInputPaths(conf, other_args.get(0));

    FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));

    JobClient.runJob(conf);

    return 0;

    }

    public static void main(String[] args) throws Exception {

    int res = ToolRunner.run(new Configuration(), new netflixDriver(), args);

    System.exit(res);

    }

    }

  • 相关阅读:
    POJ 2774 Long Long Message
    Jmeter学习——5
    Jmeter学习——4
    Jmeter学习——10
    Jmeter学习——9
    使用 JMeter 完成常用的压力测试 [转]
    Jmeter学习——3
    Jmeter学习——7
    Jmeter学习——11
    Jmeter学习——8
  • 原文地址:https://www.cnblogs.com/qq78292959/p/2076600.html
Copyright © 2011-2022 走看看