zoukankan      html  css  js  c++  java
  • 协同过滤算法(天池竞赛试题)

    一:推荐算法分类:

      1.按数据使用划分:

      • 协同过滤算法:UserCF, ItemCF, ModelCF
      • 基于内容的推荐: 用户内容属性和物品内容属性
      • 社会化过滤:基于用户的社会网络关系

      2.案例:天池大数据竞赛

        我们会开放如下数据类型:

    字 段

    字段说明

    提取说明

    user_id

    用户标记

    抽样&字段加密

    Time

    行为时间

    精度到天级别&隐藏年份

    action_type

    用户对品牌的行为类型

    包括点击、购买、加入购物车、收藏4种行为 
    (点击:0 购买:1 收藏:2 购物车:3)

    brand_id

    品牌数字ID

    抽样&字段加密

     

         提供的数据量,涉及千万级天猫用户,万级天猫品牌,时间跨度4个月的行为记录。 
         提供的训练数据在天池集群的表t_alibaba_bigdata_user_brand_total_1中,字段分别为:user_id,brand_id, type, visit_datetime。如图所示

         3.用户4种行为类型(Type)对应代码分别为: 
         点击:0;购买:1;收藏:2;购物车:3 

    二:实现思路及代码

    1、 对原数据去重

     1 package com.oracle.www.TianChi_compition;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.fs.FileSystem;
     7 import org.apache.hadoop.fs.Path;
     8 import org.apache.hadoop.io.LongWritable;
     9 import org.apache.hadoop.io.NullWritable;
    10 import org.apache.hadoop.io.Text;
    11 import org.apache.hadoop.mapreduce.Job;
    12 import org.apache.hadoop.mapreduce.Mapper;
    13 import org.apache.hadoop.mapreduce.Reducer;
    14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    16 
    17 /*
    18  * 对原数据去重,去表头
    19  */
    20 public class Step01 {
    21     static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    22         @Override
    23         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
    24                 throws IOException, InterruptedException {
    25             if (key.get() > 0) {
    26                 context.write(value, NullWritable.get());
    27             }
    28         }
    29     }
    30 
    31     static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    32         @Override
    33         protected void reduce(Text key, Iterable<NullWritable> vlue,
    34                 Reducer<Text, NullWritable, Text, NullWritable>.Context context)
    35                 throws IOException, InterruptedException {
    36             context.write(key, NullWritable.get());
    37         }
    38     }
    39 
    40     public static void main(String[] args)
    41             throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
    42         Configuration conf = new Configuration();
    43         Job job = Job.getInstance(conf);
    44         job.setJarByClass(Step01.class);
    45 
    46         job.setMapperClass(MyMapper.class);
    47         job.setReducerClass(MyReducer.class);
    48 
    49         job.setMapOutputKeyClass(Text.class);
    50         job.setMapOutputValueClass(NullWritable.class);
    51 
    52         job.setOutputKeyClass(Text.class);
    53 
    54         Path outPath = new Path("hdfs://192.168.9.13:8020/deweight");
    55         FileSystem fs = outPath.getFileSystem(conf);
    56         if (fs.exists(outPath)) {
    57             fs.delete(outPath, true);
    58         }
    59         FileInputFormat.addInputPath(job, new Path("hdfs://192.168.9.13:8020/TianmaoData"));
    60         FileOutputFormat.setOutputPath(job, outPath);
    61         job.waitForCompletion(true);
    62 
    63     }
    64 
    65 }

    2、 获得所有物品之间的同现矩阵

      1 package com.oracle.www.TianChi_compition;
      2 
      3 import java.io.IOException;
      4 import java.util.ArrayList;
      5 
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.fs.FileSystem;
      8 import org.apache.hadoop.fs.Path;
      9 import org.apache.hadoop.io.IntWritable;
     10 import org.apache.hadoop.io.Text;
     11 import org.apache.hadoop.mapreduce.Job;
     12 import org.apache.hadoop.mapreduce.Mapper;
     13 import org.apache.hadoop.mapreduce.Reducer;
     14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
     16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     17 
     18 /*
     19  * 生成同现(显)矩阵
     20  * map端<商品1-商品2,1>拆分,发送
     21  * reduce端<商品1-商品2,1,1,1...>统计
     22  */
     23 public class Step03 {
     24 
     25     static class MyMapper extends Mapper<Text, Text, Text, IntWritable> {
     26         Text k = new Text();
     27         IntWritable v = new IntWritable();
     28 
     29         @Override
     30         protected void map(Text key, Text value, Mapper<Text, Text, Text, IntWritable>.Context context)
     31                 throws IOException, InterruptedException {
     32             ArrayList<String> itemList = new ArrayList<>();
     33             String line = value.toString();
     34             String[] datas = line.split("	");
     35             for (String data : datas) {// 将用户购买过的商品添加到list集合中
     36                 String[] item_mark = data.split(":");
     37                 itemList.add(item_mark[0]);
     38             }
     39 
     40             for (int i = 0; i < itemList.size(); i++) {
     41                 for (int j = 0; j < itemList.size(); j++) {
     42                     k.set(itemList.get(i) + "-" + itemList.get(j));
     43                     v.set(1);
     44                     context.write(k, v);
     45                 }
     46 
     47             }
     48         }
     49     }
     50 
     51     static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
     52         Text k = new Text();
     53         IntWritable v = new IntWritable();
     54 
     55         @Override
     56         protected void reduce(Text key, Iterable<IntWritable> value,
     57                 Reducer<Text, IntWritable, Text, IntWritable>.Context context)
     58                 throws IOException, InterruptedException {
     59             int sum = 0;
     60             for (IntWritable val : value) {
     61                 sum += val.get();
     62             }
     63             k.set(key.toString());
     64             v.set(sum);
     65             context.write(k, v);
     66         }
     67     }
     68 
     69     public static void main(String[] args) throws ClassNotFoundException, InterruptedException {
     70         Configuration conf = new Configuration();
     71         try {
     72             Job job = Job.getInstance(conf);
     73 
     74             job.setJarByClass(Step03.class);
     75             job.setMapperClass(MyMapper.class);
     76             job.setReducerClass(MyReducer.class);
     77 
     78             job.setMapOutputKeyClass(Text.class);
     79             job.setMapOutputValueClass(IntWritable.class);
     80 
     81             job.setOutputKeyClass(Text.class);
     82             job.setOutputValueClass(IntWritable.class);
     83 
     84             job.setInputFormatClass(KeyValueTextInputFormat.class);
     85 
     86             // 判断output文件夹是否存在,如果存在则删除
     87             Path outPath = new Path("hdfs://192.168.9.13:8020/implyCount");// 输出路径
     88             FileSystem fs = outPath.getFileSystem(conf);// 根据输出路径找到文件,参数为配置文件
     89             if (fs.exists(outPath)) {
     90                 fs.delete(outPath);
     91                 // fs.delete(outPath, true);true的意思是,就算output有东西,也一带删除,默认为true
     92 
     93             }
     94             FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.9.13:8020/gradeMarking"));
     95             FileOutputFormat.setOutputPath(job, outPath);
     96             job.waitForCompletion(true);
     97         } catch (IOException e) {
     98             // TODO Auto-generated catch block
     99             e.printStackTrace();
    100         }
    101     }
    102 }

     

    3、 权重矩阵(用户对同一件商品的不同行为操作得到的评分矩阵)

      1 package com.oracle.www.TianChi_compition;
      2 
      3 import java.io.IOException;
      4 import java.util.HashMap;
      5 import java.util.Iterator;
      6 
      7 import org.apache.hadoop.conf.Configuration;
      8 import org.apache.hadoop.fs.FileSystem;
      9 import org.apache.hadoop.fs.Path;
     10 /*
     11  * 生成评分矩阵
     12  * map端拆分,发送<用户    商品+":"+操作>
     13  * reduce端统计生成<用户    商品1+":"+评分,商品2+":"+评分,...>
     14  */
     15 import org.apache.hadoop.io.LongWritable;
     16 import org.apache.hadoop.io.Text;
     17 import org.apache.hadoop.mapreduce.Job;
     18 import org.apache.hadoop.mapreduce.Mapper;
     19 import org.apache.hadoop.mapreduce.Reducer;
     20 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     22 
     23 public class Step02 {
     24     static Text userId = new Text();
     25     static Text shopping_operate = new Text();
     26 
     27     static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
     28         @Override
     29         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
     30                 throws IOException, InterruptedException {
     31 
     32             String line = value.toString();
     33             String[] datas = line.split("	");
     34             userId.set(datas[1]);
     35             shopping_operate.set(datas[0] + ":" + datas[2]);
     36             context.write(userId, shopping_operate);
     37         }
     38     }
     39 
     40     static class MyReducer extends Reducer<Text, Text, Text, Text> {
     41         Text v = new Text();
     42         double click = 0;
     43         double collect = 0;
     44         double cart = 0;
     45         double alipay = 0;
     46 
     47         @Override
     48         protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
     49                 throws IOException, InterruptedException {
     50             // shoppingOperate_counter<商品,<商品操作,操作次数>>
     51             HashMap<String, HashMap<String, Integer>> shoppingOperate_counter = new HashMap<>();
     52             String[] temp_str = null;
     53             String shoppingName = null;
     54             String shoppingOperate = null;
     55             HashMap<String, Integer> operate_counter = null;// 内层map,记录对商品的操作和操作次数
     56             for (Text val : value) {
     57                 temp_str = val.toString().split(":");
     58                 shoppingName = temp_str[0];
     59                 shoppingOperate = temp_str[1];
     60                 if (!shoppingOperate_counter.containsKey(shoppingName)) {// map中不存在此商品信息,添加并给予初始值
     61                     operate_counter = new HashMap<>();
     62                     operate_counter.put(shoppingOperate, 1);
     63                     shoppingOperate_counter.put(shoppingName, operate_counter);
     64                 } else {// map中包含此商品
     65                     operate_counter = shoppingOperate_counter.get(shoppingName);
     66                     if (!operate_counter.containsKey(shoppingOperate)) {// 包含此商品不包含此操作
     67                         operate_counter.put(shoppingOperate, 1);
     68                     } else {
     69                         operate_counter.put(shoppingOperate, operate_counter.get(shoppingOperate) + 1);
     70                     }
     71                 }
     72             }
     73             // 通过对shoppingOperate_counter循环遍历,统计算分
     74             Iterator<String> iter = shoppingOperate_counter.keySet().iterator();
     75             StringBuffer shopping_marking = new StringBuffer();
     76             while (iter.hasNext()) {
     77                 click = 0;
     78                 collect = 0;
     79                 cart = 0;
     80                 alipay = 0;
     81                 shoppingName = iter.next();
     82                 operate_counter = shoppingOperate_counter.get(shoppingName);
     83                 Iterator<String> operateIter = operate_counter.keySet().iterator();
     84                 int counter = 0;// 记录用户对单个商品操作过的次数
     85                 while (operateIter.hasNext()) {
     86                     counter++;
     87                     shoppingOperate = operateIter.next();
     88                     if ("click".equals(shoppingOperate)) {
     89                         click = operate_counter.get(shoppingOperate);
     90                     } else if ("collect".equals(shoppingOperate)) {
     91                         collect = operate_counter.get(shoppingOperate);
     92                     } else if ("cart".equals(shoppingOperate)) {
     93                         cart = operate_counter.get(shoppingOperate);
     94                     } else {
     95                         alipay = operate_counter.get(shoppingOperate);
     96                     }
     97                 }
     98                 double sum = click / counter * 1.0 + collect / counter * 2.0 + cart / counter * 3.0
     99                         + alipay / counter * 4.0;
    100                 shopping_marking.append(shoppingName + ":" + sum + "	");
    101             }
    102             v.set(shopping_marking.toString());
    103             context.write(key, v);
    104         }
    105     }
    106 
    107     public static void main(String[] args) throws ClassNotFoundException, InterruptedException {
    108         Configuration conf = new Configuration();
    109         try {
    110             Job job = Job.getInstance(conf);
    111             job.setJarByClass(Step02.class);
    112             job.setMapperClass(MyMapper.class);
    113             job.setReducerClass(MyReducer.class);
    114 
    115             job.setMapOutputKeyClass(Text.class);
    116             job.setMapOutputValueClass(Text.class);
    117 
    118             job.setOutputKeyClass(Text.class);
    119             job.setOutputKeyClass(Text.class);
    120 
    121             Path outPath = new Path("hdfs://192.168.9.13:8020/deweight");
    122             FileSystem fs = outPath.getFileSystem(conf);
    123             if (fs.exists(outPath)) {
    124                 fs.delete(outPath);
    125             }
    126             FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.9.13:8020/deweight"));
    127             FileOutputFormat.setOutputPath(job, outPath);
    128             job.waitForCompletion(true);
    129         } catch (IOException e) {
    130             // TODO Auto-generated catch block
    131             e.printStackTrace();
    132         }
    133 
    134     }
    135 
    136 }

     

    4、 两个矩阵相乘得到三维矩阵

      1 package com.oracle.www.TianChi_compition;
      2 
      3 import java.io.IOException;
      4 import java.util.HashMap;
      5 import java.util.Map.Entry;
      6 
      7 import org.apache.hadoop.conf.Configuration;
      8 import org.apache.hadoop.fs.FileSystem;
      9 import org.apache.hadoop.fs.Path;
     10 import org.apache.hadoop.io.DoubleWritable;
     11 import org.apache.hadoop.io.Text;
     12 import org.apache.hadoop.mapreduce.Job;
     13 import org.apache.hadoop.mapreduce.Mapper;
     14 import org.apache.hadoop.mapreduce.Reducer;
     15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     16 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     17 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
     18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     19 
     20 public class Step04 {
     21     static class MyMapper extends Mapper<Text, Text, Text, Text> {
     22         String parentName = null;
     23         Text k = new Text();
     24         Text v = new Text();
     25 
     26         @Override
     27         protected void setup(Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
     28             FileSplit fs = (FileSplit) context.getInputSplit();
     29             parentName = fs.getPath().getParent().getName();
     30         }
     31 
     32         @Override
     33         protected void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context)
     34                 throws IOException, InterruptedException {
     35             String line = value.toString();
     36             String[] datas = null;
     37             // 判断输入目录
     38             if (parentName.equals("gradeMarking")) {// 评分
     39                 datas = line.split("	");
     40                 for (String data : datas) {
     41                     String[] item_mark = data.split(":");
     42                     k.set(item_mark[0]);
     43                     v.set(key.toString() + ":" + item_mark[1]);
     44                     context.write(k, v);
     45                 }
     46             } else {
     47                 datas = key.toString().split("-");
     48                 k.set(datas[1]);
     49                 v.set(datas[0] + ":" + line);
     50                 context.write(k, v);
     51             }
     52         }
     53     }
     54 
     55     static class MyReducer extends Reducer<Text, Text, Text, DoubleWritable> {
     56         Text k = new Text();
     57         DoubleWritable v = new DoubleWritable();
     58         // <商品x 用户1:评分1,用户2:评分2,...,商品1:频次1,商品2:频次2,...>(频次值为两件商品同时出现的次数)
     59 
     60         @Override
     61         protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, DoubleWritable>.Context context)
     62                 throws IOException, InterruptedException {
     63             HashMap<String, Double> user_mark = new HashMap<>();
     64             HashMap<String, Double> item_counter = new HashMap<>();
     65             // 将 用户:评分 , 商品:频次 添加到对应的map中
     66             String[] datas = null;
     67             for (Text val : value) {
     68                 datas = val.toString().split(":");
     69                 if (datas[0].startsWith("u")) {
     70                     user_mark.put(datas[0], Double.parseDouble(datas[1]));
     71                 } else {
     72                     item_counter.put(datas[0], Double.parseDouble(datas[1]));
     73                 }
     74             }
     75 
     76             // 遍历循环相乘
     77             String userName = null;
     78             double userMark = 0.0;
     79             String itemName = null;
     80             double iterCounter = 0;
     81             for (Entry<String, Double> entry1 : user_mark.entrySet()) {
     82                 userName = entry1.getKey();
     83                 userMark = entry1.getValue();
     84                 for (Entry<String, Double> entry2 : item_counter.entrySet()) {
     85                     itemName = entry2.getKey();
     86                     iterCounter = entry2.getValue();
     87                     k.set(userName + ":" + itemName);
     88                     v.set(userMark * iterCounter);
     89                     context.write(k, v);
     90                 }
     91             }
     92 
     93         }
     94     }
     95 
     96     public static void main(String[] args) throws ClassNotFoundException, InterruptedException {
     97         Configuration conf = new Configuration();
     98         try {
     99             Job job = Job.getInstance(conf);
    100 
    101             job.setJarByClass(Step03.class);
    102             job.setMapperClass(MyMapper.class);
    103             job.setReducerClass(MyReducer.class);
    104 
    105             job.setMapOutputKeyClass(Text.class);
    106             job.setMapOutputValueClass(Text.class);
    107 
    108             job.setOutputKeyClass(Text.class);
    109             job.setOutputValueClass(DoubleWritable.class);
    110 
    111             job.setInputFormatClass(KeyValueTextInputFormat.class);
    112 
    113             // 判断output文件夹是否存在,如果存在则删除
    114             Path outPath = new Path("hdfs://192.168.9.13:8020/mark&implyCount_multiply");// 输出路径
    115             FileSystem fs = outPath.getFileSystem(conf);// 根据输出路径找到文件,参数为配置文件
    116             if (fs.exists(outPath)) {
    117                 fs.delete(outPath);
    118                 // fs.delete(outPath, true);true的意思是,就算output有东西,也一带删除,默认为true
    119 
    120             }
    121             FileInputFormat.setInputPaths(job, new Path[] { new Path("hdfs://192.168.9.13:8020/gradeMarking"),
    122                     new Path("hdfs://192.168.9.13:8020/implyCount") });
    123             FileOutputFormat.setOutputPath(job, outPath);
    124             job.waitForCompletion(true);
    125         } catch (IOException e) {
    126             // TODO Auto-generated catch block
    127             e.printStackTrace();
    128         }
    129     }
    130 
    131 }

    5、 三维矩阵的数据相加获得所有用户对所有物品的推荐值(二维矩阵)

      1 package com.oracle.www.TianChi_compition;
      2 
      3 /*
      4  * 筛选掉用户购买过的商品,并求和
      5  */
      6 import java.io.BufferedReader;
      7 import java.io.FileReader;
      8 import java.io.IOException;
      9 import java.net.URI;
     10 import java.net.URISyntaxException;
     11 import java.util.ArrayList;
     12 
     13 import org.apache.hadoop.conf.Configuration;
     14 import org.apache.hadoop.fs.FileSystem;
     15 import org.apache.hadoop.fs.Path;
     16 import org.apache.hadoop.io.Text;
     17 import org.apache.hadoop.mapreduce.Job;
     18 import org.apache.hadoop.mapreduce.Mapper;
     19 import org.apache.hadoop.mapreduce.Reducer;
     20 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     21 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
     22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     23 
     24 public class Step05 {
     25     static class MyMapper extends Mapper<Text, Text, Text, Text> {
     26         // boughtList集合用于存放哪些用户买过哪些商品,不能使用map集合存放,
     27         // 同一个用户可能买过多件商品,同一件商品也有可能同时被好多人买过;
     28         ArrayList<String> boughtList = new ArrayList<>();
     29         BufferedReader br = null;
     30 
     31         // setup方法初始化boughtList集合
     32         @Override
     33         protected void setup(Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
     34             br = new BufferedReader(new FileReader("part-r-00000"));
     35             String line = null;
     36             String[] datas = null;
     37             while ((line = br.readLine()) != null) {
     38                 datas = line.split("	");
     39                 if ("alipay".equals(datas[2])) {
     40                     boughtList.add(datas[1] + ":" + datas[0]);
     41                 }
     42             }
     43         }
     44 
     45         // map方法排除掉用户购买过的商品,使其不推荐
     46         @Override
     47         protected void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context)
     48                 throws IOException, InterruptedException {
     49             // 判断向该用户推荐的商品是否被该用户购买过,如果购买过,则不推荐(即不向reduce端发送)
     50             if (!boughtList.contains(key.toString())) {
     51                 context.write(key, value);
     52             }
     53         }
     54     }
     55 
     56     static class MyReducer extends Reducer<Text, Text, Text, Text> {
     57         Text k = new Text();
     58         Text v = new Text();
     59 
     60         @Override
     61         protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
     62                 throws IOException, InterruptedException {
     63             double rank = 0.0;
     64             for (Text val : value) {
     65                 rank += Double.parseDouble(val.toString());
     66             }
     67             k.set(key.toString().split(":")[0]);
     68             v.set(key.toString().split(":")[1] + ":" + rank);
     69             context.write(k, v);
     70         }
     71     }
     72 
     73     public static void main(String[] args)
     74             throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
     75         Configuration conf = new Configuration();
     76         Job job = Job.getInstance(conf);
     77         job.setJarByClass(Step05.class);
     78         job.setMapperClass(MyMapper.class);
     79         job.setReducerClass(MyReducer.class);
     80 
     81         job.setMapOutputKeyClass(Text.class);
     82         job.setMapOutputValueClass(Text.class);
     83 
     84         job.setOutputKeyClass(Text.class);
     85         job.setOutputValueClass(Text.class);
     86 
     87         job.setInputFormatClass(KeyValueTextInputFormat.class);
     88 
     89         job.addCacheFile(new URI("hdfs://192.168.9.13:8020/deweight/part-r-00000"));
     90 
     91         Path outPath = new Path("hdfs://192.168.9.13:8020/shoppingRecommend");
     92         FileSystem fs = outPath.getFileSystem(conf);
     93         if (fs.exists(outPath)) {
     94             fs.delete(outPath, true);
     95         }
     96         FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.9.13:8020/mark&implyCount_multiply"));
     97         FileOutputFormat.setOutputPath(job, outPath);
     98 
     99         job.waitForCompletion(true);
    100 
    101     }
    102 
    103 }

    6、 按照推荐值降序排序(筛选权重高的前十件商品)。

      1 package com.oracle.www.TianChi_compition;
      2 
      3 import java.io.DataInput;
      4 import java.io.DataOutput;
      5 import java.io.IOException;
      6 import java.lang.reflect.InvocationTargetException;
      7 import java.util.ArrayList;
      8 import java.util.Collections;
      9 
     10 import org.apache.commons.beanutils.BeanUtils;
     11 import org.apache.hadoop.conf.Configuration;
     12 import org.apache.hadoop.fs.FileSystem;
     13 import org.apache.hadoop.fs.Path;
     14 import org.apache.hadoop.io.Text;
     15 import org.apache.hadoop.io.WritableComparable;
     16 import org.apache.hadoop.mapreduce.Job;
     17 import org.apache.hadoop.mapreduce.Mapper;
     18 import org.apache.hadoop.mapreduce.Reducer;
     19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     20 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
     21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     22 
     23 /*
     24  * 排序,选出向用户推荐的前十个商品
     25  */
     26 public class Step06 {
     27     // 将取到一行的内容拆分,以 <用户,商品j:权重r>发送到reduce端进行处理
     28     static class MyMapper extends Mapper<Text, Text, Text, Sort> {
     29         Sort sort = null;
     30 
     31         @Override
     32         protected void map(Text key, Text value, Mapper<Text, Text, Text, Sort>.Context context)
     33                 throws IOException, InterruptedException {
     34 
     35             sort = new Sort(value.toString().split(":")[0], Double.parseDouble(value.toString().split(":")[1]));
     36             context.write(key, sort);
     37         }
     38     }
     39 
     40     // reduce端将同一用户的推荐商品按权值大小排序,将前十个拼接输出
     41     static class MyReducer extends Reducer<Text, Sort, Text, Text> {
     42         ArrayList<Sort> list = new ArrayList<>();
     43         Text v = new Text();
     44 
     45         @Override
     46         protected void reduce(Text key, Iterable<Sort> value, Reducer<Text, Sort, Text, Text>.Context context)
     47                 throws IOException, InterruptedException {
     48             StringBuffer sb = new StringBuffer();
     49             list.clear();
     50             // map端如果将自定义对象作为value发送到reduce端进行迭代时,需要将迭代器中的每个对象使用BeanUtils.copyProperties(dest,org)将属性拷贝到另外一个对象中;
     51             for (Sort sort : value) {
     52                 Sort tempSort = new Sort();
     53                 try {
     54                     BeanUtils.copyProperties(tempSort, sort);
     55                     list.add(tempSort);
     56                 } catch (IllegalAccessException e) {
     57                     // TODO Auto-generated catch block
     58                     e.printStackTrace();
     59                 } catch (InvocationTargetException e) {
     60                     // TODO Auto-generated catch block
     61                     e.printStackTrace();
     62                 }
     63             }
     64 
     65             Collections.sort(list);
     66             for (int i = 0; i < list.size() && i < 10; i++) {
     67                 sb.append(list.get(i));
     68             }
     69             v.set(sb.toString());
     70             context.write(key, v);
     71         }
     72     }
     73 
     74     static public class Sort implements WritableComparable<Sort> {
     75         private String shoppingName;
     76         private double shoppingRank;
     77 
     78         public Sort() {
     79         }
     80 
     81         public Sort(String shoppingName, double shoppingRank) {
     82             this.shoppingName = shoppingName;
     83             this.shoppingRank = shoppingRank;
     84         }
     85 
     86         public String getShoppingName() {
     87             return shoppingName;
     88         }
     89 
     90         public void setShoppingName(String shoppingName) {
     91             this.shoppingName = shoppingName;
     92         }
     93 
     94         public double getShoppingRank() {
     95             return shoppingRank;
     96         }
     97 
     98         public void setShoppingRank(double shoppingRank) {
     99             this.shoppingRank = shoppingRank;
    100         }
    101 
    102         @Override
    103         public String toString() {
    104             return shoppingName + ":" + shoppingRank + "	";
    105         }
    106 
    107         @Override
    108         public void write(DataOutput out) throws IOException {
    109             out.writeDouble(shoppingRank);
    110             out.writeUTF(shoppingName);
    111         }
    112 
    113         @Override
    114         public void readFields(DataInput in) throws IOException {
    115             this.shoppingRank = in.readDouble();
    116             this.shoppingName = in.readUTF();
    117         }
    118 
    119         @Override
    120         public int compareTo(Sort o) {
    121             int temp = 0;
    122             if (this.getShoppingRank() - o.getShoppingRank() < 0) {
    123                 return 1;
    124             } else if (this.getShoppingRank() - o.getShoppingRank() > 0) {
    125                 return -1;
    126             }
    127             return temp;
    128         }
    129     }
    130 
    131     public static void main(String[] args) throws ClassNotFoundException, InterruptedException {
    132         Configuration conf = new Configuration();
    133         try {
    134             Job job = Job.getInstance();
    135 
    136             job.setJarByClass(Step06.class);
    137             job.setMapperClass(MyMapper.class);
    138             job.setReducerClass(MyReducer.class);
    139 
    140             job.setMapOutputKeyClass(Text.class);
    141             job.setMapOutputValueClass(Sort.class);
    142 
    143             job.setOutputKeyClass(Text.class);
    144             job.setOutputValueClass(Text.class);
    145 
    146             job.setInputFormatClass(KeyValueTextInputFormat.class);
    147 
    148             Path outPath = new Path("hdfs://192.168.9.13:8020/ShoppingRecommend_Sort");
    149             FileSystem fs = outPath.getFileSystem(conf);
    150             if (fs.exists(outPath)) {
    151                 fs.delete(outPath);
    152             }
    153 
    154             FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.9.13:8020/shoppingRecommend"));
    155             FileOutputFormat.setOutputPath(job, outPath);
    156 
    157             job.waitForCompletion(true);
    158         } catch (IOException e) {
    159             // TODO Auto-generated catch block
    160             e.printStackTrace();
    161         }
    162 
    163     }
    164 
    165 }

     

     

  • 相关阅读:
    2021.06.19 DP-方格取数 + 花店橱窗布置
    2021.06.15 DP-编辑距离
    2021.06.12模拟总结
    2021.6.8 背包模拟 总结
    20210529-背包
    lnmp环境中的:supervisorctl
    python常用语法合集
    python 常用数据结构
    DVWA环境
    mysql中each( use () {})
  • 原文地址:https://www.cnblogs.com/le-ping/p/7783925.html
Copyright © 2011-2022 走看看