环境:
Hadoop1.x,CentOS6.5,三台虚拟机搭建的模拟分布式环境
数据:下载的amazon产品共同采购网络元数据(需翻墙下载)http://snap.stanford.edu/data/amazon-meta.html
方案目标:
从数据中提取出每个用户买过哪些商品,根据买过的商品以及商品之间的相关性来对用户进行推荐商品
下载的数据如下所示为单位
Id: 1
ASIN: 0827229534
title: Patterns of Preaching: A Sermon Sampler
group: Book
salesrank: 396585
similar: 5 0804215715 156101074X 0687023955 0687074231 082721619X
categories: 2
|Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Christianity[12290]
|Clergy[12360]|Preaching[12368]
|Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Christianity[12290]
|Clergy[12360]|Sermons[12370]
reviews: total: 2 downloaded: 2 avg rating: 5
2000-7-28 cutomer: A2JW67OY8U6HHK rating: 5 votes: 10 helpful: 9
2003-12-14 cutomer: A2VE83MZF98ITY rating: 5 votes: 6 helpful: 5
思路:
整套程序需要分解为两个步骤。1.提取每个用户买过哪些商品。2.根据第一步产生的数据,结合用户的感兴趣度与商品之间的关联生成推荐商品
本篇文章主要做第一步。
这一步骤的主要难点是对自定义输入格式的编写。
1.自定义格式化输入数据
如上所示的数据, 需要自定义输入数据的格式来提取数据。
job.setInputFormatClass(TestAmazonDataFormat.class);
那怎么做自定义输入格式呢?
这里我们需要了解文件在HDFS中的处理方式。我们知道文件在放入HDFS中时会进行分片。因此我们要对数据进行操作的时候,需要获取文件的信息(文件名、path、开始位置、长度、位于哪个节点等)。
传入文件信息:
//获取文件信息 public class TestAmazonDataFormat extends FileInputFormat<Text, Text> { TestAmazonDataReader datareader; @Override public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext attempt) throws IOException, InterruptedException { datareader = new TestAmazonDataReader(); datareader.initialize(inputSplit, attempt); //传入文件信息 // TODO Auto-generated method stub return datareader; } }
读取文件:
package ren.snail; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * @author Srinath Perera (hemapani@apache.org) */ public class TestAmazonDataReader extends RecordReader<Text, Text> { private static Pattern pattern1 = Pattern.compile( "\s+([^\s]+)\s+cutomer:\s+([^\s]+)\s+rating:\s+([^\s]+)\s+votes:\s+([^\s]+)\s+helpful:\s+([^\s]+).*"); private BufferedReader reader; private int count = 0; private Text key; private Text value; private StringBuffer currentLineData = new StringBuffer(); String line = null; public TestAmazonDataReader() { } public void initialize(InputSplit inputSplit, TaskAttemptContext attempt) throws IOException, InterruptedException { // TODO Auto-generated method stub Path path = ((FileSplit) inputSplit).getPath(); FileSystem fs = FileSystem.get(URI.create(path.toString()), attempt.getConfiguration()); //这里需要注意:由于fs.open的格式为file:///,而path获取的为HDFS的hdfs://XXXXX,因此需要在此进行转换 // FileSystem fs = FileSystem.get(attempt.getConfiguration()); FSDataInputStream fsStream = fs.open(path); reader = new BufferedReader(new InputStreamReader(fsStream), 1024 * 100); while ((line = reader.readLine()) != null) { if (line.startsWith("Id:")) { break; } } } // define key and value @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub currentLineData = new StringBuffer(); count++; boolean readingreview = false; while ((line = reader.readLine()) != null) { if(line.trim().length() == 0){ value = new Text(currentLineData.toString()); return true; } else { if (readingreview) { Matcher matcher = pattern1.matcher(line); if(matcher.matches()) { currentLineData.append("review=").append(matcher.group(2)).append("|") .append(matcher.group(3)).append("|") .append(matcher.group(4)).append("|") .append(matcher.group(5)).append("#"); } else{ System.out.println("review "+ line + "does not match"); } } else { int indexOf = line.indexOf(":"); if(indexOf > 0){ String key = line.substring(0,indexOf).trim(); String value = line.substring(indexOf+1).trim(); if(value == null || value.length() == 0){ continue; } if(value.indexOf("#") > 0){ value = value.replaceAll("#", "_"); } if(key.equals("ASIN") || key.equals("Id") || key.equals("title") || key.equals("group") || key.equals("salesrank")){ if(key.equals("ASIN")){ this.key = new Text(value); } currentLineData.append(key).append("=").append(value.replaceAll(",", "")).append("#"); }else if(key.equals("similar")){ String[] tokens = value.split("\s+"); //yes we skip the first one if(tokens.length >= 2){ currentLineData.append(key).append("="); for(int i=1;i<tokens.length;i++){ currentLineData.append(tokens[i].trim()).append("|"); } currentLineData.append("#"); } }else if( key.equals("reviews")){ readingreview = true; } } } } } return false; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return count; } @Override public void close() throws IOException { reader.close(); } }
Map和Reduce
代码Map中有对于Amazon元数据的方法,就不给出了。就是对input传入的value数据进行解析
package ren.snail; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.tools.GetConf; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import ren.snail.AmazonCustomer.ItemData; /** * Find number of owner and replies received by each thread * @author Srinath Perera (hemapani@apache.org) */ public class Main extends Configured implements Tool { public static SimpleDateFormat dateFormatter = new SimpleDateFormat("EEEE dd MMM yyyy hh:mm:ss z"); public static class AMapper extends Mapper<Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { //System.out.println(key + "="+ value); try { List<AmazonCustomer> customerList = AmazonCustomer.parseAItemLine(value.toString()); for(AmazonCustomer customer: customerList){ context.write(new Text(customer.customerID), new Text(customer.toString())); //System.out.println(customer.customerID + "=" + customer.toString()); } } catch (Exception e) { e.printStackTrace(); System.out.println("Error:" +e.getMessage()); } } } public static class AReducer extends Reducer<Text, Text, IntWritable, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { AmazonCustomer customer = new AmazonCustomer(); customer.customerID = key.toString(); for(Text value: values){ Set<ItemData> itemsBrought = new AmazonCustomer(value.toString()).itemsBrought; for(ItemData itemData: itemsBrought){ customer.itemsBrought.add(itemData); } } // if(customer.itemsBrought.size() > 5){ context.write(new IntWritable(customer.itemsBrought.size()), new Text(customer.toString())); // } } } public static void main(String[] args) throws Exception { int result = ToolRunner.run(new Configuration(), new Main(), args); System.exit(result); } @Override public int run(String[] arg0) throws Exception { // TODO Auto-generated method stub Configuration configuration = getConf(); Job job = new Job(configuration, "MostFrequentUserFinder"); job.setJarByClass(Main.class); job.setMapperClass(AMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); // Uncomment this to // job.setCombinerClass(IntSumReducer.class); job.setReducerClass(AReducer.class); job.setInputFormatClass(TestAmazonDataFormat.class); FileInputFormat.addInputPath(job, new Path(arg0[0])); FileOutputFormat.setOutputPath(job, new Path(arg0[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } }
最终的输出如下:
customerID=A11NCO6YTE4BTJ,review=ASIN=0738700797#title=Candlemas: Feast of Flames#salesrank=168596#group=Book#rating=5#similar=0738700827|1567184960|1567182836|0738700525|0738700940|,