zoukankan      html  css  js  c++  java
  • Hadoop学习之路(二十六)MapReduce的API使用(三)

     

    影评案例

    数据及需求

    数据格式

    movies.dat  3884条数据

    1::Toy Story (1995)::Animation|Children's|Comedy
    2::Jumanji (1995)::Adventure|Children's|Fantasy
    3::Grumpier Old Men (1995)::Comedy|Romance
    4::Waiting to Exhale (1995)::Comedy|Drama
    5::Father of the Bride Part II (1995)::Comedy
    6::Heat (1995)::Action|Crime|Thriller
    7::Sabrina (1995)::Comedy|Romance
    8::Tom and Huck (1995)::Adventure|Children's
    9::Sudden Death (1995)::Action
    10::GoldenEye (1995)::Action|Adventure|Thriller

    users.dat  6041条数据

    1::F::1::10::48067
    2::M::56::16::70072
    3::M::25::15::55117
    4::M::45::7::02460
    5::M::25::20::55455
    6::F::50::9::55117
    7::M::35::1::06810
    8::M::25::12::11413
    9::M::25::17::61614
    10::F::35::1::95370

    ratings.dat  1000210条数据

    1::1193::5::978300760
    1::661::3::978302109
    1::914::3::978301968
    1::3408::4::978300275
    1::2355::5::978824291
    1::1197::3::978302268
    1::1287::5::978302039
    1::2804::5::978300719
    1::594::4::978302268
    1::919::4::978301368

    数据解释

    1、users.dat 数据格式为: 2::M::56::16::70072
    对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
    对应字段中文解释:用户id,性别,年龄,职业,邮政编码

    2、movies.dat 数据格式为: 2::Jumanji (1995)::Adventure|Children's|Fantasy
    对应字段为:MovieID BigInt, Title String, Genres String
    对应字段中文解释:电影ID,电影名字,电影类型

    3、ratings.dat 数据格式为: 1::1193::5::978300760
    对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
    对应字段中文解释:用户ID,电影ID,评分,评分时间戳

    用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
    userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType

    需求统计

    (1)求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)
    (2)分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)
    (3)求movieid = 2116这部电影各年龄段(因为年龄就只有7个,就按这个7个分就好了)的平均影评(年龄段,评分)
    (4)求最喜欢看电影(影评次数最多)的那位女性评最高分的10部电影的平均影评分(人,电影名,影评)
    (5)求好片(评分>=4.0)最多的那个年份的最好看的10部电影
    (6)求1997年上映的电影中,评分最高的10部Comedy类电影
    (7)该影评库中各种类型电影中评价最高的5部电影(类型,电影名,平均影评分)
    (8)各年评分最高的电影类型(年份,类型,影评分)
    (9)每个地区最高评分的电影名,把结果存入HDFS(地区,电影名,电影评分)

    代码实现

    1、求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)

    分析:此问题涉及到2个文件,ratings.dat和movies.dat,2个文件数据量倾斜比较严重,此处应该使用mapjoin方法,先将数据量较小的文件预先加载到内存中

    MovieMR1_1.java

      1 public class MovieMR1_1 {
      2 
      3     public static void main(String[] args) throws Exception {
      4         
      5         if(args.length < 4) {
      6             args = new String[4];
      7             args[0] = "/movie/input/";
      8             args[1] = "/movie/output/";
      9             args[2] = "/movie/cache/movies.dat";
     10             args[3] = "/movie/output_last/";
     11         }
     12         
     13         
     14         Configuration conf1 = new Configuration();
     15         conf1.set("fs.defaultFS", "hdfs://hadoop1:9000/");
     16         System.setProperty("HADOOP_USER_NAME", "hadoop");
     17         FileSystem fs1 = FileSystem.get(conf1);
     18         
     19         
     20         Job job1 = Job.getInstance(conf1);
     21         
     22         job1.setJarByClass(MovieMR1_1.class);
     23         
     24         job1.setMapperClass(MoviesMapJoinRatingsMapper1.class);
     25         job1.setReducerClass(MovieMR1Reducer1.class);
     26         
     27         job1.setMapOutputKeyClass(Text.class);
     28         job1.setMapOutputValueClass(IntWritable.class);
     29         
     30         job1.setOutputKeyClass(Text.class);
     31         job1.setOutputValueClass(IntWritable.class);
     32         
     33         
     34         
     35         //缓存普通文件到task运行节点的工作目录
     36         URI uri = new URI("hdfs://hadoop1:9000"+args[2]);
     37         System.out.println(uri);
     38         job1.addCacheFile(uri);
     39         
     40         
     41         Path inputPath1 = new Path(args[0]);
     42         Path outputPath1 = new Path(args[1]);
     43         if(fs1.exists(outputPath1)) {
     44             fs1.delete(outputPath1, true);
     45         }
     46         FileInputFormat.setInputPaths(job1, inputPath1);
     47         FileOutputFormat.setOutputPath(job1, outputPath1);
     48         
     49         boolean isDone = job1.waitForCompletion(true);
     50         System.exit(isDone ? 0 : 1);
     51        
     52     }
     53     
     54     public static class MoviesMapJoinRatingsMapper1 extends Mapper<LongWritable, Text, Text, IntWritable>{
     55         
     56         //用了存放加载到内存中的movies.dat数据
     57         private static Map<String,String> movieMap =  new HashMap<>();
     58         //key:电影ID
     59         Text outKey = new Text();
     60         //value:电影名+电影类型
     61         IntWritable outValue = new IntWritable();
     62         
     63         
     64         /**
     65          * movies.dat:    1::Toy Story (1995)::Animation|Children's|Comedy
     66          * 
     67          * 
     68          * 将小表(movies.dat)中的数据预先加载到内存中去
     69          * */
     70         @Override
     71         protected void setup(Context context) throws IOException, InterruptedException {
     72             
     73             Path[] localCacheFiles = context.getLocalCacheFiles();
     74             
     75             
     76             String strPath = localCacheFiles[0].toUri().toString();
     77             
     78             BufferedReader br = new BufferedReader(new FileReader(strPath));
     79             String readLine;
     80             while((readLine = br.readLine()) != null) {
     81                 
     82                 String[] split = readLine.split("::");
     83                 String movieId = split[0];
     84                 String movieName = split[1];
     85                 String movieType = split[2];
     86                 
     87                 movieMap.put(movieId, movieName+"	"+movieType);
     88             }
     89             
     90             br.close();
     91         }
     92         
     93         
     94         /**
     95          * movies.dat:    1    ::    Toy Story (1995)    ::    Animation|Children's|Comedy    
     96          *                 电影ID    电影名字                    电影类型
     97          * 
     98          * ratings.dat:    1    ::    1193    ::    5    ::    978300760
     99          *                 用户ID    电影ID        评分        评分时间戳
    100          * 
    101          * value:    ratings.dat读取的数据
    102          * */
    103         @Override
    104         protected void map(LongWritable key, Text value, Context context)
    105                 throws IOException, InterruptedException {
    106             
    107             String[] split = value.toString().split("::");
    108             
    109             String userId = split[0];
    110             String movieId = split[1];
    111             String movieRate = split[2];
    112             
    113             //根据movieId从内存中获取电影名和类型
    114             String movieNameAndType = movieMap.get(movieId);
    115             String movieName = movieNameAndType.split("	")[0];
    116             String movieType = movieNameAndType.split("	")[1];
    117             
    118             outKey.set(movieName);
    119             outValue.set(Integer.parseInt(movieRate));
    120             
    121             context.write(outKey, outValue);
    122             
    123         }
    124             
    125     }
    126 
    127     
    128     public static class MovieMR1Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable>{
    129         //每部电影评论的次数
    130         int count;
    131         //评分次数
    132         IntWritable outValue = new IntWritable();
    133         
    134         @Override
    135         protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
    136             
    137             count = 0;
    138             
    139             for(IntWritable value : values) {
    140                 count++;
    141             }
    142             
    143             outValue.set(count);
    144             
    145             context.write(key, outValue);
    146         }
    147         
    148     }
    149     
    150     
    151 }
    View Code

    MovieMR1_2.java

     1 public class MovieMR1_2 {
     2 
     3     public static void main(String[] args) throws Exception {
     4         if(args.length < 2) {
     5             args = new String[2];
     6             args[0] = "/movie/output/";
     7             args[1] = "/movie/output_last/";
     8         }
     9         
    10         
    11         Configuration conf1 = new Configuration();
    12         conf1.set("fs.defaultFS", "hdfs://hadoop1:9000/");
    13         System.setProperty("HADOOP_USER_NAME", "hadoop");
    14         FileSystem fs1 = FileSystem.get(conf1);
    15         
    16         
    17         Job job = Job.getInstance(conf1);
    18         
    19         job.setJarByClass(MovieMR1_2.class);
    20         
    21         job.setMapperClass(MoviesMapJoinRatingsMapper2.class);
    22         job.setReducerClass(MovieMR1Reducer2.class);
    23 
    24         
    25         job.setMapOutputKeyClass(MovieRating.class);
    26         job.setMapOutputValueClass(NullWritable.class);
    27         
    28         job.setOutputKeyClass(MovieRating.class);
    29         job.setOutputValueClass(NullWritable.class);
    30         
    31         
    32         Path inputPath1 = new Path(args[0]);
    33         Path outputPath1 = new Path(args[1]);
    34         if(fs1.exists(outputPath1)) {
    35             fs1.delete(outputPath1, true);
    36         }
    37         //对第一步的输出结果进行降序排序
    38         FileInputFormat.setInputPaths(job, inputPath1);
    39         FileOutputFormat.setOutputPath(job, outputPath1);
    40         
    41         boolean isDone = job.waitForCompletion(true);
    42         System.exit(isDone ? 0 : 1);
    43         
    44 
    45     }
    46     
    47     //注意输出类型为自定义对象MovieRating,MovieRating按照降序排序
    48     public static class MoviesMapJoinRatingsMapper2 extends Mapper<LongWritable, Text, MovieRating, NullWritable>{
    49         
    50         MovieRating outKey = new MovieRating();
    51         
    52         @Override
    53         protected void map(LongWritable key, Text value, Context context)
    54                 throws IOException, InterruptedException {
    55             //'Night Mother (1986)         70
    56             String[] split = value.toString().split("	");
    57             
    58             outKey.setCount(Integer.parseInt(split[1]));;
    59             outKey.setMovieName(split[0]);
    60             
    61             context.write(outKey, NullWritable.get());
    62                         
    63         }
    64                 
    65     }
    66     
    67     //排序之后自然输出,只取前10部电影
    68     public static class MovieMR1Reducer2 extends Reducer<MovieRating, NullWritable, MovieRating, NullWritable>{
    69         
    70         Text outKey = new Text();
    71         int count = 0;
    72         
    73         @Override
    74         protected void reduce(MovieRating key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException {
    75 
    76             for(NullWritable value : values) {
    77                 count++;
    78                 if(count > 10) {
    79                     return;
    80                 }
    81                 context.write(key, value);
    82                 
    83             }
    84         
    85         }
    86         
    87     }
    88 }
    View Code

    MovieRating.java

     1 public class MovieRating implements WritableComparable<MovieRating>{
     2     private String movieName;
     3     private int count;
     4     
     5     public String getMovieName() {
     6         return movieName;
     7     }
     8     public void setMovieName(String movieName) {
     9         this.movieName = movieName;
    10     }
    11     public int getCount() {
    12         return count;
    13     }
    14     public void setCount(int count) {
    15         this.count = count;
    16     }
    17     
    18     public MovieRating() {}
    19     
    20     public MovieRating(String movieName, int count) {
    21         super();
    22         this.movieName = movieName;
    23         this.count = count;
    24     }
    25     
    26     
    27     @Override
    28     public String toString() {
    29         return  movieName + "	" + count;
    30     }
    31     @Override
    32     public void readFields(DataInput in) throws IOException {
    33         movieName = in.readUTF();
    34         count = in.readInt();
    35     }
    36     @Override
    37     public void write(DataOutput out) throws IOException {
    38         out.writeUTF(movieName);
    39         out.writeInt(count);
    40     }
    41     @Override
    42     public int compareTo(MovieRating o) {
    43         return o.count - this.count ;
    44     }
    45     
    46 }
    View Code 

    2、分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)

    分析:此问题涉及到3个表的联合查询,需要先将2个小表的数据预先加载到内存中,再进行查询

    对三表进行联合

    MoviesThreeTableJoin.java

      1 /**
      2  * 进行3表的联合查询
      3  * 
      4  * */
      5 public class MoviesThreeTableJoin {
      6 
      7     public static void main(String[] args) throws Exception {
      8         
      9         if(args.length < 4) {
     10             args = new String[4];
     11             args[0] = "/movie/input/";
     12             args[1] = "/movie/output2/";
     13             args[2] = "/movie/cache/movies.dat";
     14             args[3] = "/movie/cache/users.dat";
     15         }
     16         
     17         Configuration conf = new Configuration();
     18         conf.set("fs.defaultFS", "hdfs://hadoop1:9000/");
     19         System.setProperty("HADOOP_USER_NAME", "hadoop");
     20         FileSystem fs = FileSystem.get(conf);
     21         Job job = Job.getInstance(conf);
     22         
     23         job.setJarByClass(MoviesThreeTableJoin.class);
     24         job.setMapperClass(ThreeTableMapper.class);
     25         
     26         job.setOutputKeyClass(Text.class);
     27         job.setOutputValueClass(NullWritable.class);
     28         
     29         URI uriUsers = new URI("hdfs://hadoop1:9000"+args[3]);
     30         URI uriMovies = new URI("hdfs://hadoop1:9000"+args[2]);
     31         job.addCacheFile(uriUsers);
     32         job.addCacheFile(uriMovies);
     33         
     34         Path inputPath = new Path(args[0]);
     35         Path outputPath = new Path(args[1]);
     36         
     37         if(fs.exists(outputPath)) {
     38             fs.delete(outputPath,true);
     39         }
     40         
     41         FileInputFormat.setInputPaths(job, inputPath);
     42         FileOutputFormat.setOutputPath(job, outputPath);
     43         
     44         boolean isDone = job.waitForCompletion(true);
     45         System.exit(isDone ? 0 : 1);
     46         
     47     }
     48 
     49     
     50     public static class ThreeTableMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
     51         
     52         
     53         //用于缓存movies和users中数据
     54         private Map<String,String> moviesMap = new HashMap<>();
     55         private Map<String,String> usersMap = new HashMap<>();
     56         //用来存放读取的ratings.dat中的一行数据
     57         String[] ratings;
     58         
     59         
     60         Text outKey = new Text();
     61         
     62         @Override
     63         protected void setup(Context context) throws IOException, InterruptedException {
     64             
     65             BufferedReader br = null;
     66             
     67             Path[] paths = context.getLocalCacheFiles();
     68             String usersLine = null;
     69             String moviesLine = null;
     70             
     71             for(Path path : paths) {
     72                 String name = path.toUri().getPath();
     73                 if(name.contains("movies.dat")) {
     74                     //读取movies.dat文件中的一行数据
     75                     br = new BufferedReader(new FileReader(name));
     76                     while((moviesLine = br.readLine()) != null) {
     77                         /**对读取的这行数据按照::进行切分
     78                         *    2::Jumanji (1995)::Adventure|Children's|Fantasy
     79                         *    电影ID,电影名字,电影类型
     80                         *
     81                         *电影ID作为key,其余作为value
     82                         */
     83                         String[] split = moviesLine.split("::");
     84                         moviesMap.put(split[0], split[1]+"::"+split[2]);
     85                     }        
     86                 }else if(name.contains("users.dat")) {
     87                     //读取users.dat文件中的一行数据
     88                     br = new BufferedReader(new FileReader(name));
     89                     while((usersLine = br.readLine()) != null) {
     90                         /**
     91                          * 对读取的这行数据按照::进行切分
     92                          * 2::M::56::16::70072
     93                          * 用户id,性别,年龄,职业,邮政编码
     94                          * 
     95                          * 用户ID作为key,其他的作为value
     96                          * */
     97                         String[] split = usersLine.split("::");
     98                         System.out.println(split[0]+"----"+split[1]);
     99                         usersMap.put(split[0], split[1]+"::"+split[2]+"::"+split[3]+"::"+split[4]);
    100                     }
    101                 }
    102             
    103             }
    104         
    105         }
    106         
    107         
    108         @Override
    109         protected void map(LongWritable key, Text value, Context context)
    110                 throws IOException, InterruptedException {
    111             
    112             ratings = value.toString().split("::");
    113             //通过电影ID和用户ID获取用户表和电影表中的其他信息
    114             String movies = moviesMap.get(ratings[1]);
    115             String users = usersMap.get(ratings[0]);
    116             
    117             //三表信息的联合
    118             String threeTables = value.toString()+"::"+movies+"::"+users;
    119             outKey.set(threeTables);
    120             
    121             context.write(outKey, NullWritable.get());
    122         }
    123     }
    124     
    125     
    126 }
    View Code

    三表联合之后的数据为

    1000::1023::5::975041651::Winnie the Pooh and the Blustery Day (1968)::Animation|Children's::F::25::6::90027
    1000::1029::3::975041859::Dumbo (1941)::Animation|Children's|Musical::F::25::6::90027
    1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
    1000::1104::5::975042421::Streetcar Named Desire, A (1951)::Drama::F::25::6::90027
    1000::110::5::975040841::Braveheart (1995)::Action|Drama|War::F::25::6::90027
    1000::1196::3::975040841::Star Wars: Episode V - The Empire Strikes Back (1980)::Action|Adventure|Drama|Sci-Fi|War::F::25::6::90027
    1000::1198::5::975040841::Raiders of the Lost Ark (1981)::Action|Adventure::F::25::6::90027
    1000::1200::4::975041125::Aliens (1986)::Action|Sci-Fi|Thriller|War::F::25::6::90027
    1000::1201::5::975041025::Good, The Bad and The Ugly, The (1966)::Action|Western::F::25::6::90027
    1000::1210::5::975040629::Star Wars: Episode VI - Return of the Jedi (1983)::Action|Adventure|Romance|Sci-Fi|War::F::25::6::90027

    字段解释

    1000    ::    1036    ::    4    ::    975040964    ::    Die Hard (1988)    ::    Action|Thriller    ::    F    ::    25    ::    6    ::    90027
    
    用户ID        电影ID        评分        评分时间戳             电影名字                  电影类型                性别        年龄        职业        邮政编码

    0        1        2        3            4              5            6      7      8       9

     要分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)

    1、以性别和电影名分组,以电影名+性别为key,以评分为value进行计算;

    2、以性别+电影名+评分作为对象,以性别分组,以评分降序进行输出TOP10

    业务逻辑:MoviesDemo2.java

      1 public class MoviesDemo2 {
      2 
      3     public static void main(String[] args) throws Exception {
      4         
      5         Configuration conf1 = new Configuration();
      6         Configuration conf2 = new Configuration();
      7         FileSystem fs1 = FileSystem.get(conf1);
      8         FileSystem fs2 = FileSystem.get(conf2);
      9         Job job1 = Job.getInstance(conf1);
     10         Job job2 = Job.getInstance(conf2);
     11         
     12         job1.setJarByClass(MoviesDemo2.class);
     13         job1.setMapperClass(MoviesDemo2Mapper1.class);
     14         job2.setMapperClass(MoviesDemo2Mapper2.class);
     15         job1.setReducerClass(MoviesDemo2Reducer1.class);
     16         job2.setReducerClass(MoviesDemo2Reducer2.class);
     17         
     18         job1.setOutputKeyClass(Text.class);
     19         job1.setOutputValueClass(DoubleWritable.class);
     20         
     21         job2.setOutputKeyClass(MoviesSexBean.class);
     22         job2.setOutputValueClass(NullWritable.class);
     23         
     24         job2.setGroupingComparatorClass(MoviesSexGC.class);
     25         
     26         Path inputPath1 = new Path("D:\MR\hw\movie\output3he1");
     27         Path outputPath1 = new Path("D:\MR\hw\movie\output2_1");
     28         Path inputPath2 = new Path("D:\MR\hw\movie\output2_1");
     29         Path outputPath2 = new Path("D:\MR\hw\movie\output2_end");
     30         
     31         if(fs1.exists(outputPath1)) {
     32             fs1.delete(outputPath1,true);
     33         }
     34         if(fs2.exists(outputPath2)) {
     35             fs2.delete(outputPath2,true);
     36         }
     37         
     38         
     39         FileInputFormat.setInputPaths(job1, inputPath1);
     40         FileOutputFormat.setOutputPath(job1, outputPath1);
     41         
     42         FileInputFormat.setInputPaths(job2, inputPath2);
     43         FileOutputFormat.setOutputPath(job2, outputPath2);
     44         
     45         JobControl control = new JobControl("MoviesDemo2");
     46         
     47         ControlledJob aJob = new ControlledJob(job1.getConfiguration());
     48         ControlledJob bJob = new ControlledJob(job2.getConfiguration());
     49         
     50         bJob.addDependingJob(aJob);
     51         
     52         control.addJob(aJob);
     53         control.addJob(bJob);
     54         
     55         Thread thread = new Thread(control);
     56         thread.start();
     57         
     58         while(!control.allFinished()) {
     59             thread.sleep(1000);
     60         }
     61         System.exit(0);
     62         
     63         
     64     }
     65     
     66     
     67     /**
     68      * 数据来源:3个文件关联之后的输出文件
     69      * 以电影名+性别为key,以评分为value进行输出
     70      * 
     71      * 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
     72      * 
     73      * 用户ID::电影ID::评分::评分时间戳::电影名字::电影类型::性别::年龄::职业::邮政编码
     74      * 
     75      * */
     76     public static class MoviesDemo2Mapper1 extends Mapper<LongWritable, Text, Text, DoubleWritable>{
     77         
     78         Text outKey = new Text();
     79         DoubleWritable outValue = new DoubleWritable();
     80         
     81         @Override
     82         protected void map(LongWritable key, Text value,Context context)
     83                 throws IOException, InterruptedException {
     84 
     85             String[] split = value.toString().split("::");
     86             String strKey = split[4]+"	"+split[6];
     87             String strValue = split[2];
     88             
     89             outKey.set(strKey);
     90             outValue.set(Double.parseDouble(strValue));
     91             
     92             context.write(outKey, outValue);
     93         }
     94         
     95     }
     96     
     97     /**
     98      * 以电影名+性别为key,计算平均分
     99      * */
    100     public static class MoviesDemo2Reducer1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>{
    101         
    102         DoubleWritable outValue = new DoubleWritable();
    103         
    104         @Override
    105         protected void reduce(Text key, Iterable<DoubleWritable> values,Context context)
    106                 throws IOException, InterruptedException {
    107             
    108             int count = 0;
    109             double sum = 0;
    110             for(DoubleWritable value : values) {
    111                 count++;
    112                 sum += Double.parseDouble(value.toString());
    113             }
    114             double avg = sum / count;
    115             
    116             outValue.set(avg);
    117             context.write(key, outValue);
    118         }
    119     }
    120     
    121     /**
    122      * 以电影名+性别+评分作为对象,以性别分组,以评分降序排序
    123      * */
    124     public static class MoviesDemo2Mapper2 extends Mapper<LongWritable, Text, MoviesSexBean, NullWritable>{
    125         
    126         MoviesSexBean outKey = new MoviesSexBean();
    127         
    128         @Override
    129         protected void map(LongWritable key, Text value,Context context)
    130                 throws IOException, InterruptedException {
    131             
    132             String[] split = value.toString().split("	");
    133             outKey.setMovieName(split[0]);
    134             outKey.setSex(split[1]);
    135             outKey.setScore(Double.parseDouble(split[2]));
    136             
    137             context.write(outKey, NullWritable.get());
    138         
    139         }
    140     }
    141     
    142     /**
    143      * 取性别男女各前10名评分最好的电影
    144      * */
    145     public static class MoviesDemo2Reducer2 extends Reducer<MoviesSexBean, NullWritable, MoviesSexBean, NullWritable>{
    146         
    147         @Override
    148         protected void reduce(MoviesSexBean key, Iterable<NullWritable> values,Context context)
    149                 throws IOException, InterruptedException {
    150             
    151             int count = 0;
    152             for(NullWritable nvl : values) {
    153                 count++;
    154                 context.write(key, NullWritable.get());
    155                 if(count == 10) {
    156                     return;
    157                 }        
    158             }
    159         
    160         }
    161     }
    162 }
    View Code

    对象:MoviesSexBean.java

     1 public class MoviesSexBean implements WritableComparable<MoviesSexBean>{
     2     
     3     private String movieName;
     4     private String sex;
     5     private double score;
     6     
     7     public MoviesSexBean() {
     8         super();
     9     }
    10     public MoviesSexBean(String movieName, String sex, double score) {
    11         super();
    12         this.movieName = movieName;
    13         this.sex = sex;
    14         this.score = score;
    15     }
    16     public String getMovieName() {
    17         return movieName;
    18     }
    19     public void setMovieName(String movieName) {
    20         this.movieName = movieName;
    21     }
    22     public String getSex() {
    23         return sex;
    24     }
    25     public void setSex(String sex) {
    26         this.sex = sex;
    27     }
    28     public double getScore() {
    29         return score;
    30     }
    31     public void setScore(double score) {
    32         this.score = score;
    33     }
    34     @Override
    35     public String toString() {
    36         return movieName + "	" + sex + "	" + score ;
    37     }
    38     @Override
    39     public void readFields(DataInput in) throws IOException {
    40         movieName = in.readUTF();
    41         sex = in.readUTF();
    42         score = in.readDouble();
    43     }
    44     @Override
    45     public void write(DataOutput out) throws IOException {
    46         out.writeUTF(movieName);
    47         out.writeUTF(sex);
    48         out.writeDouble(score);
    49     }
    50     @Override
    51     public int compareTo(MoviesSexBean o) {
    52         
    53         int result = this.getSex().compareTo(o.getSex());
    54         if(result == 0) {
    55             double diff = this.getScore() - o.getScore();
    56             
    57             if(diff == 0) {
    58                 return 0;
    59             }else {
    60                 return diff > 0 ? -1 : 1;
    61             }
    62             
    63         }else {
    64             return result > 0 ? -1 : 1;
    65         }
    66         
    67     }
    68     
    69     
    70     
    71 }
    View Code

    分组:MoviesSexGC.java

     1 public class MoviesSexGC extends WritableComparator{
     2     
     3     public MoviesSexGC() {
     4         super(MoviesSexBean.class,true);
     5     }
     6     
     7     @Override
     8     public int compare(WritableComparable a, WritableComparable b) {
     9         
    10         MoviesSexBean msb1 = (MoviesSexBean)a;
    11         MoviesSexBean msb2 = (MoviesSexBean)b;
    12 
    13         return msb1.getSex().compareTo(msb2.getSex());
    14     }
    15     
    16 }
    View Code

    3、求movieid = 2116这部电影各年龄段(因为年龄就只有7个,就按这个7个分就好了)的平均影评(年龄段,评分)

    以第二部三表联合之后的文件进行操作

     1 public class MovieDemo3 {
     2 
     3     public static void main(String[] args) throws Exception {
     4         
     5         Configuration conf = new Configuration();
     6         FileSystem fs = FileSystem.get(conf);
     7         Job job = Job.getInstance(conf);
     8         
     9         job.setJarByClass(MovieDemo3.class);
    10         job.setMapperClass(MovieDemo3Mapper.class);
    11         job.setReducerClass(MovieDemo3Reducer.class);
    12         
    13         job.setOutputKeyClass(Text.class);
    14         job.setOutputValueClass(DoubleWritable.class);
    15         
    16         Path inputPath = new Path("D:\MR\hw\movie\3he1");
    17         Path outputPath = new Path("D:\MR\hw\movie\outpu3");
    18         
    19         if(fs.exists(outputPath)) {
    20             fs.delete(outputPath,true);
    21         }
    22         
    23         FileInputFormat.setInputPaths(job, inputPath);
    24         FileOutputFormat.setOutputPath(job, outputPath);
    25         
    26         boolean isDone = job.waitForCompletion(true);
    27         System.exit(isDone ? 0 : 1);
    28         
    29     }
    30     
    31     
    32     /**
    33      * 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
    34      * 
    35      * 用户ID::电影ID::评分::评分时间戳::电影名字::电影类型::性别::年龄::职业::邮政编码
    36      * 0        1     2      3        4       5     6   7    8    9
    37      * 
    38      * key:电影ID+电影名字+年龄段
    39      * value:评分
    40      * 求movieid = 2116这部电影各年龄段
    41      * */
    42     public static class MovieDemo3Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{
    43         
    44         Text outKey = new Text();
    45         DoubleWritable outValue = new DoubleWritable();
    46         
    47         @Override
    48         protected void map(LongWritable key, Text value, Context context)
    49                 throws IOException, InterruptedException {
    50 
    51             String[] split = value.toString().split("::");
    52             int movieID = Integer.parseInt(split[1]);
    53             
    54             if(movieID == 2116) {
    55                 String strKey = split[1]+"	"+split[4]+"	"+split[7];
    56                 String strValue = split[2];
    57                 
    58                 outKey.set(strKey);
    59                 outValue.set(Double.parseDouble(strValue));
    60                 
    61                 context.write(outKey, outValue);
    62             }
    63             
    64         }
    65     }
    66     
    67     
    68     
    69     /**
    70      * 对map的输出结果求平均评分
    71      * */
    72     public static class MovieDemo3Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable>{
    73 
    74         DoubleWritable outValue = new DoubleWritable();
    75         
    76         @Override
    77         protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
    78                 throws IOException, InterruptedException {
    79 
    80             int count = 0;
    81             double sum = 0;
    82             
    83             for(DoubleWritable value : values) {
    84                 count++;
    85                 sum += Double.parseDouble(value.toString()); 
    86             }
    87             
    88             double avg = sum / count;
    89             
    90             outValue.set(avg);
    91             
    92             context.write(key, outValue);
    93             
    94         }
    95         
    96     }
    97     
    98 }
    View Code

    4、求最喜欢看电影(影评次数最多)的那位女性评最高分的10部电影的平均影评分(人,电影名,影评)

    1000    ::    1036    ::    4    ::    975040964    ::    Die Hard (1988)    ::    Action|Thriller    ::    F    ::    25    ::    6    ::    90027
    
    用户ID        电影ID        评分        评分时间戳             电影名字                  电影类型                性别        年龄        职业        邮政编码
    
    0        1        2        3            4              5            6      7      8         9

    (1)求出评论次数最多的女性ID

      MoviesDemo4_1.java

      1 public class MoviesDemo4 {
      2 
      3     public static void main(String[] args) throws Exception {
      4         
      5         Configuration conf1 = new Configuration();
      6         FileSystem fs1 = FileSystem.get(conf1);
      7         Job job1 = Job.getInstance(conf1);
      8         
      9         job1.setJarByClass(MoviesDemo4.class);
     10         job1.setMapperClass(MoviesDemo4Mapper1.class);
     11         job1.setReducerClass(MoviesDemo4Reducer1.class);
     12         
     13         
     14         job1.setMapOutputKeyClass(Text.class);
     15         job1.setMapOutputValueClass(Text.class);
     16         job1.setOutputKeyClass(Text.class);
     17         job1.setOutputValueClass(DoubleWritable.class);
     18         
     19         
     20         Configuration conf2 = new Configuration();
     21         FileSystem fs2 = FileSystem.get(conf2);
     22         Job job2 = Job.getInstance(conf2);
     23         
     24         job2.setJarByClass(MoviesDemo4.class);
     25         job2.setMapperClass(MoviesDemo4Mapper2.class);
     26         job2.setReducerClass(MoviesDemo4Reducer2.class);
     27         
     28         job2.setMapOutputKeyClass(Moviegoers.class);
     29         job2.setMapOutputValueClass(NullWritable.class);
     30         job2.setOutputKeyClass(Moviegoers.class);
     31         job2.setOutputValueClass(NullWritable.class);
     32         
     33         Path inputPath1 = new Path("D:\MR\hw\movie\3he1");
     34         Path outputPath1 = new Path("D:\MR\hw\movie\outpu4_1");
     35         
     36         if(fs1.exists(outputPath1)) {
     37             fs1.delete(outputPath1,true);
     38         }
     39         
     40         FileInputFormat.setInputPaths(job1, inputPath1);
     41         FileOutputFormat.setOutputPath(job1, outputPath1);
     42         
     43         
     44         Path inputPath2 = new Path("D:\MR\hw\movie\outpu4_1");
     45         Path outputPath2 = new Path("D:\MR\hw\movie\outpu4_2");
     46         
     47         if(fs2.exists(outputPath2)) {
     48             fs2.delete(outputPath2,true);
     49         }
     50         
     51         FileInputFormat.setInputPaths(job2, inputPath2);
     52         FileOutputFormat.setOutputPath(job2, outputPath2);
     53         
     54         JobControl control = new JobControl("MoviesDemo4");
     55         
     56         ControlledJob ajob = new ControlledJob(job1.getConfiguration());
     57         ControlledJob bjob = new ControlledJob(job2.getConfiguration());
     58         
     59         bjob.addDependingJob(ajob);
     60         
     61         control.addJob(ajob);
     62         control.addJob(bjob);
     63         
     64         Thread thread = new Thread(control);
     65         thread.start();
     66         
     67         while(!control.allFinished()) {
     68             thread.sleep(1000);
     69         }
     70         System.exit(0);
     71     }
     72     
     73     /**
     74      * 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
     75      * 
     76      * 用户ID::电影ID::评分::评分时间戳::电影名字::电影类型::性别::年龄::职业::邮政编码
     77      * 0        1     2      3        4       5     6   7    8    9
     78      * 
     79      * 1、key:用户ID
     80       * 2、value:电影名+评分
     81      * 
     82      * */
     83     public static class MoviesDemo4Mapper1 extends Mapper<LongWritable, Text, Text, Text>{
     84         
     85         Text outKey = new Text();
     86         Text outValue = new Text();
     87         
     88         @Override
     89         protected void map(LongWritable key, Text value, Context context)
     90                 throws IOException, InterruptedException {
     91             
     92             String[] split = value.toString().split("::");
     93             
     94             String strKey = split[0];
     95             String strValue = split[4]+"	"+split[2];
     96             
     97             if(split[6].equals("F")) {
     98                 outKey.set(strKey);
     99                 outValue.set(strValue);
    100                 context.write(outKey, outValue);
    101             }
    102             
    103         }
    104         
    105     }
    106     
    107     //统计每位女性的评论总数
    108     public static class MoviesDemo4Reducer1 extends Reducer<Text, Text, Text, IntWritable>{
    109         
    110         IntWritable outValue = new IntWritable();
    111         
    112         @Override
    113         protected void reduce(Text key, Iterable<Text> values, Context context)
    114                 throws IOException, InterruptedException {
    115 
    116             int count = 0;
    117             for(Text value : values) {
    118                 count++;
    119             }
    120             outValue.set(count);
    121             context.write(key, outValue);
    122         }
    123         
    124     }
    125     
    126     //对第一次MapReduce的输出结果进行降序排序
    127     public static class MoviesDemo4Mapper2 extends Mapper<LongWritable, Text,Moviegoers,NullWritable>{
    128         
    129         Moviegoers outKey = new Moviegoers();
    130         
    131         @Override
    132         protected void map(LongWritable key, Text value, Context context)
    133                 throws IOException, InterruptedException {
    134             
    135             String[] split = value.toString().split("	");
    136             
    137             outKey.setName(split[0]);
    138             outKey.setCount(Integer.parseInt(split[1]));
    139             context.write(outKey, NullWritable.get());
    140         }
    141         
    142     }
    143     
    144     //排序之后取第一个值(评论最多的女性ID和评论次数)
    145     public static class MoviesDemo4Reducer2 extends Reducer<Moviegoers,NullWritable, Moviegoers,NullWritable>{
    146         
    147         int count = 0;
    148         
    149         @Override
    150         protected void reduce(Moviegoers key, Iterable<NullWritable> values,Context context)
    151                 throws IOException, InterruptedException {
    152 
    153             for(NullWritable nvl : values) {
    154                 count++;
    155                 if(count > 1) {
    156                     return;
    157                 }
    158                 context.write(key, nvl);    
    159             }
    160         
    161         }
    162         
    163     }
    164     
    165     
    166 }
    View Code

    (2)

  • 相关阅读:
    NC外部统一流程管理平台方案
    Activiti 多个并发子流程的应用
    基于Activiti的流程应用开发平台JSAAS-WF V5.3
    整合Acitiviti在线流程设计器(Activiti-Modeler 5.18.0)
    基于Spring Security 的JSaaS应用的权限管理
    微信分享功能开发
    ORACLE schedule job设置
    存储过程清理N天前数据
    oracle函数trunc的使用
    往前往后推时间(排除工作日和节假日)
  • 原文地址:https://www.cnblogs.com/qingyunzong/p/8619607.html
Copyright © 2011-2022 走看看