zoukankan      html  css  js  c++  java
  • MapReduce实例-基于内容的推荐(一)

    环境:
      Hadoop1.x,CentOS6.5,三台虚拟机搭建的模拟分布式环境

      数据:下载的amazon产品共同采购网络元数据(需翻墙下载)http://snap.stanford.edu/data/amazon-meta.html

    方案目标:

      从数据中提取出每个用户买过哪些商品,根据买过的商品以及商品之间的相关性来对用户进行推荐商品

       下载的数据如下所示为单位

    Id: 1
    ASIN: 0827229534
    title: Patterns of Preaching: A Sermon Sampler
    group: Book
    salesrank: 396585
    similar: 5 0804215715 156101074X 0687023955 0687074231 082721619X
    categories: 2
    |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Christianity[12290]
    |Clergy[12360]|Preaching[12368]
    |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Christianity[12290]
    |Clergy[12360]|Sermons[12370]
    reviews: total: 2 downloaded: 2 avg rating: 5
    2000-7-28 cutomer: A2JW67OY8U6HHK rating: 5 votes: 10 helpful: 9
    2003-12-14 cutomer: A2VE83MZF98ITY rating: 5 votes: 6 helpful: 5

     

    思路:

      整套程序需要分解为两个步骤。1.提取每个用户买过哪些商品。2.根据第一步产生的数据,结合用户的感兴趣度与商品之间的关联生成推荐商品

    本篇文章主要做第一步。

    这一步骤的主要难点是对自定义输入格式的编写。

    1.自定义格式化输入数据

      如上所示的数据, 需要自定义输入数据的格式来提取数据。

      job.setInputFormatClass(TestAmazonDataFormat.class);

      那怎么做自定义输入格式呢?

      这里我们需要了解文件在HDFS中的处理方式。我们知道文件在放入HDFS中时会进行分片。因此我们要对数据进行操作的时候,需要获取文件的信息(文件名、path、开始位置、长度、位于哪个节点等)。

    传入文件信息:

    //获取文件信息
    public class TestAmazonDataFormat extends FileInputFormat<Text, Text> {
    
         TestAmazonDataReader datareader;
        @Override
        public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext attempt)
                throws IOException, InterruptedException {
            datareader = new TestAmazonDataReader();
            datareader.initialize(inputSplit, attempt);    //传入文件信息
            // TODO Auto-generated method stub
            return datareader;
        }
        
    
    }
    View Code

    读取文件:

    package ren.snail;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    /**
     * @author Srinath Perera (hemapani@apache.org)
     */
    
    public class TestAmazonDataReader extends RecordReader<Text, Text> {
        private static Pattern pattern1 = Pattern.compile(
                "\s+([^\s]+)\s+cutomer:\s+([^\s]+)\s+rating:\s+([^\s]+)\s+votes:\s+([^\s]+)\s+helpful:\s+([^\s]+).*");
        private BufferedReader reader;
        private int count = 0;
        private Text key;
        private Text value;
        private StringBuffer currentLineData = new StringBuffer();
        String line = null;
    
        public TestAmazonDataReader() {
        }
    
        public void initialize(InputSplit inputSplit, TaskAttemptContext attempt) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            Path path = ((FileSplit) inputSplit).getPath();
            FileSystem fs = FileSystem.get(URI.create(path.toString()), attempt.getConfiguration());     //这里需要注意:由于fs.open的格式为file:///,而path获取的为HDFS的hdfs://XXXXX,因此需要在此进行转换
            // FileSystem fs = FileSystem.get(attempt.getConfiguration());
            FSDataInputStream fsStream = fs.open(path);
            reader = new BufferedReader(new InputStreamReader(fsStream), 1024 * 100);
            while ((line = reader.readLine()) != null) {
                if (line.startsWith("Id:")) {
                    break;
                }
            }
        }
    
        // define key and value
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            // TODO Auto-generated method stub
             currentLineData = new StringBuffer();
                count++;
                boolean readingreview = false;
            while ((line = reader.readLine()) != null) {
                 if(line.trim().length() == 0){
                        value = new Text(currentLineData.toString());
                        return true; 
                    } 
                 else {
                    if (readingreview) {
                         
                            Matcher matcher = pattern1.matcher(line);
                            if(matcher.matches())
                            {
                                currentLineData.append("review=").append(matcher.group(2)).append("|")
                                .append(matcher.group(3)).append("|")
                                .append(matcher.group(4)).append("|")
                                .append(matcher.group(5)).append("#");
                            }
                            else{
                                System.out.println("review "+ line + "does not match");
                            }
                    } else {
                         int indexOf = line.indexOf(":");
                            if(indexOf > 0){
                                String key = line.substring(0,indexOf).trim();
                                String value = line.substring(indexOf+1).trim();
                                if(value == null || value.length() == 0){
                                    continue;
                                }
                                if(value.indexOf("#") > 0){
                                    value = value.replaceAll("#", "_");
                                }
                                
                                if(key.equals("ASIN") || key.equals("Id") || key.equals("title") || key.equals("group") || key.equals("salesrank")){
                                    if(key.equals("ASIN")){
                                        this.key = new Text(value);
                                    }
                                    currentLineData.append(key).append("=").append(value.replaceAll(",", "")).append("#");
                                }else  if(key.equals("similar")){
                                    String[] tokens = value.split("\s+");
                                    //yes we skip the first one
                                    if(tokens.length >= 2){
                                        currentLineData.append(key).append("=");
                                        for(int i=1;i<tokens.length;i++){
                                            currentLineData.append(tokens[i].trim()).append("|");
                                        }
                                        currentLineData.append("#");
                                    }
                                }else  if( key.equals("reviews")){
                                    readingreview = true; 
                                }
                            }
                    }
                }
    
            }
            return false;
        }
    
        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
            return key;
        }
    
        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }
    
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return count;
        }
    
        @Override
        public void close() throws IOException {
            reader.close();
        }
    }
    View Code

    Map和Reduce

    代码Map中有对于Amazon元数据的方法,就不给出了。就是对input传入的value数据进行解析

    package ren.snail;
    
    
    
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.List;
    import java.util.Set;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hdfs.tools.GetConf;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import ren.snail.AmazonCustomer.ItemData;
    
     
    
    /**
     * Find number of owner and replies received by each thread 
     * @author Srinath Perera (hemapani@apache.org)
     */
    public class Main extends Configured implements Tool {
        public static SimpleDateFormat dateFormatter = new SimpleDateFormat("EEEE dd MMM yyyy hh:mm:ss z");
    
        public static class AMapper extends Mapper<Object, Text, Text, Text> {
    
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        //System.out.println(key + "="+ value);
        try {
        List<AmazonCustomer> customerList = AmazonCustomer.parseAItemLine(value.toString());
        for(AmazonCustomer customer: customerList){
            context.write(new Text(customer.customerID), new Text(customer.toString()));
            //System.out.println(customer.customerID + "=" + customer.toString());
        }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Error:" +e.getMessage());
        }
    }
        }
    
        public static class AReducer extends Reducer<Text, Text, IntWritable, Text> {
    
            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                AmazonCustomer  customer = new AmazonCustomer(); 
                customer.customerID = key.toString(); 
                
                for(Text value: values){
                    Set<ItemData> itemsBrought = new AmazonCustomer(value.toString()).itemsBrought; 
                    for(ItemData itemData: itemsBrought){
                        customer.itemsBrought.add(itemData); 
                    }
                }
    //            if(customer.itemsBrought.size() > 5){
                    context.write(new IntWritable(customer.itemsBrought.size()), new Text(customer.toString()));
    //            }
            }
        }
    
        public static void main(String[] args) throws Exception {
            int result = ToolRunner.run(new Configuration(), new Main(), args);
            System.exit(result);
            
            
        }
    
        @Override
        public int run(String[] arg0) throws Exception {
            // TODO Auto-generated method stub
             
            Configuration configuration = getConf();
            Job job = new Job(configuration, "MostFrequentUserFinder");
            job.setJarByClass(Main.class);
            job.setMapperClass(AMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(Text.class);
            // Uncomment this to
            // job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(AReducer.class);
            job.setInputFormatClass(TestAmazonDataFormat.class);
            FileInputFormat.addInputPath(job, new Path(arg0[0]));
            FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
            return 0;
        }
    }
    View Code

    最终的输出如下:

    customerID=A11NCO6YTE4BTJ,review=ASIN=0738700797#title=Candlemas: Feast of Flames#salesrank=168596#group=Book#rating=5#similar=0738700827|1567184960|1567182836|0738700525|0738700940|,

  • 相关阅读:
    关于 js 下载PDF文件时
    vue3.0 学习
    iOS
    bootstrap treeview
    SVN版本管理
    js框架
    正则表达式
    如何让安卓手机在电脑上同步显示(MX4 Pro为例)
    mysql 中文乱码
    ADO.NET 数据库连接池大小
  • 原文地址:https://www.cnblogs.com/ren-jie/p/5427056.html
Copyright © 2011-2022 走看看