zoukankan      html  css  js  c++  java
  • 19-hadoop-fof好友推荐

    好友推荐的案例, 需要两个job, 第一个进行好友关系度计算, 第二个job将计算的关系进行推荐

    1, fof关系类

    package com.wenbronk.friend;
    
    import org.apache.hadoop.io.Text;
    
    /**
     * 定义fof关系
     * @author root
     *
     */
    public class Fof extends Text{
    
        public Fof() {
            super();
        }
        
        /**'
         * 不论谁在前,返回一致的顺序
         * @param a
         * @param b
         */
        public Fof(String a, String b) {
            super(getFof(a, b));
        }
        
        /**
         * 按字典顺序排序, 保证两个fof为同一组输出
         * @param a
         * @param b
         * @return
         */
        public static String getFof(String a, String b) {
            int r = a.compareTo(b);
            if (r < 0) {
                return a + "	" + b;
            }else {
                return b + "	" + a;
            }
                
        }
        
        
        
    }

    2, user类

    package com.wenbronk.friend;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class User implements WritableComparable<User>{
    
        private String uname;
        private int friedsCount;
        
        public String getUname() {
            return uname;
        }
    
        public void setUname(String uname) {
            this.uname = uname;
        }
    
        public int getFriedsCount() {
            return friedsCount;
        }
    
        public void setFriedsCount(int friedsCount) {
            this.friedsCount = friedsCount;
        }
        
        public User() {
            super();
        }
    
        public User(String uname, int friedsCount) {
            super();
            this.uname = uname;
            this.friedsCount = friedsCount;
        }
    
        @Override
        public void readFields(DataInput arg0) throws IOException {
            this.uname = arg0.readUTF();
            this.friedsCount = arg0.readInt();
        }
    
        @Override
        public void write(DataOutput arg0) throws IOException {
            arg0.writeUTF(uname);
            arg0.writeInt(friedsCount);
        }
    
        @Override
        public int compareTo(User o) {
            int result = this.uname.compareTo(o.getUname());
            if (result == 0) {
                return Integer.compare(this.friedsCount, o.getFriedsCount());
            }
            return result;
        }
        
    }

    3, sort

    package com.wenbronk.friend;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    /**
     * 排序
     * @author root
     *
     */
    public class FofSort extends WritableComparator {
    
        public FofSort() {
            super(User.class, true);
        }
        
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            User user1 = (User) a;
            User user2 = (User) b;
            
            int compareTo = user1.getUname().compareTo(user2.getUname());
            if (compareTo == 0) {
                compareTo = Integer.compare(user1.getFriedsCount(), user2.getFriedsCount());
            }
            return compareTo;
        }
        
    }

    4, group

    package com.wenbronk.friend;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    /**
     * 自定义分组
     * @author root
     *
     */
    public class FofGroup extends WritableComparator {
    
        public FofGroup() {
            super(User.class, true);
        }
        
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            User u1 = (User) a;
            User u2 = (User) b;
            return u1.getUname().compareTo(u2.getUname());
        }
        
    }

    5, job

    package com.wenbronk.friend;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 1个mapreduce找到所有的fof关系 第二个mapreduce执行排序
     * 
     * @author root
     */
    public class RunJob {
    
        public static void main(String[] args) throws IOException {
            Configuration configuration = new Configuration();
    //        configuration.set("mapred.jar", "C:/Users/wenbr/Desktop/fof.jar");
            
            // 本地运行
            configuration.set("fs.default", "hdfs://wenbronk.hdfs.com:8020    ");
            configuration.set("yarn.resourcemanager", "hdfs://192.168.208.106");
            
            
            if (runFindFof(configuration)) {
                // 根据foffind进行排序
                run2(configuration);
            }
            
        }
    
        /**
         * 找到所有的fof关系
         * @throws IOException 
         */
        private static boolean runFindFof(Configuration conf) throws IOException {
            try {
                FileSystem fs = FileSystem.get(conf);
                Job job = Job.getInstance(conf);
                job.setJobName("friend");
                
                job.setJarByClass(RunJob.class);
                job.setMapperClass(FofMapper.class);
                job.setReducerClass(FofReduce.class);
                job.setMapOutputKeyClass(Fof.class);
                job.setMapOutputValueClass(IntWritable.class);
                
    //            job.setJar("C:/Users/wenbr/Desktop/friend.jar");
                
                job.setInputFormatClass(KeyValueTextInputFormat.class);
    //            FileInputFormat.addInputPath(job, new Path("/usr/friend.txt"));
                FileInputFormat.addInputPath(job, new Path("E:\sxt\1-MapReduce\data\friend.txt"));
                
                Path path = new Path("/root/usr/fof/f1");
                if (fs.exists(path)) {
                    fs.delete(path, true);
                }
                FileOutputFormat.setOutputPath(job, path);            
                return job.waitForCompletion(true);
            }catch(Exception e) {
                e.printStackTrace();
            }
            return false;
        }
        static class FofMapper extends Mapper<Text, Text, Fof, IntWritable> {
            @Override
            protected void map(Text key, Text value, Mapper<Text, Text, Fof, IntWritable>.Context context)
                    throws IOException, InterruptedException {
                // super.map(key, value, context);
                String user = key.toString();
                String[] frieds = StringUtils.split(value.toString(), '	');
    
                for (int i = 0; i < frieds.length; i++) {
                    String f1 = frieds[i];
                    // 去掉是直接好友的, 按组输出, 如果组中有value=0 的, 整组数据舍弃
                    context.write(new Fof(user, f1), new IntWritable(0));
                    for (int j = i + 1; j < frieds.length; j++) {
                        String f2 = frieds[j];
    
                        Fof fof = new Fof(f1, f2);
                        context.write(fof, new IntWritable(1));
                    }
                }
            }
        }
        static class FofReduce extends Reducer<Fof, IntWritable, Fof, IntWritable> {
            @Override
            protected void reduce(Fof arg0, Iterable<IntWritable> arg1,
                    Reducer<Fof, IntWritable, Fof, IntWritable>.Context arg2) throws IOException, InterruptedException {
                boolean flag = false;
                int sum = 0;
                for (IntWritable count : arg1) {
                    // 值有0的, 整组数据舍弃
                    if (count.get() == 0) {
                        flag = true;
                        break;
                    } else {
                        sum += count.get();
                    }
                }
                
                if (!flag) {
                    arg2.write(arg0, new IntWritable(sum));
                }
            }
        }
        
        /**
         * 向用户推荐好友
         * @param config
         */
        public static void run2(Configuration config) {
            try {
                FileSystem fileSystem = FileSystem.get(config);
                Job job = Job.getInstance(config);
                
                job.setJobName("fof2");
                
                job.setMapperClass(SortMapper.class);
                job.setReducerClass(SortReduce.class);
                job.setSortComparatorClass(FofSort.class);
                job.setGroupingComparatorClass(FofGroup.class);
                
                job.setMapOutputKeyClass(User.class);
                job.setMapOutputValueClass(User.class);
                
                job.setInputFormatClass(KeyValueTextInputFormat.class);
                
                // 设置MR执行的输入文件
                FileInputFormat.addInputPath(job, new Path("/usr/output/f1"));
                
                // 设置输出文件, 文件不可存在
                Path path = new Path("/root/usr/fof/f2");
                if (fileSystem.exists(path)) {
                    fileSystem.delete(path, true);
                }
                
                FileOutputFormat.setOutputPath(job, path);
                
                boolean f = job.waitForCompletion(true);
                if (f) {
                    System.out.println("job, 成功执行");
                }
                
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
        static class SortMapper extends Mapper<Text, Text, User, User> {
            @Override
            protected void map(Text key, Text value, Mapper<Text, Text, User, User>.Context context)
                    throws IOException, InterruptedException {
                String[] args = StringUtils.split(value.toString(), '	');
                String other = args[0];
                int friendsCount = Integer.parseInt(args[1]);
                // 输出两次, 同时给fof两个用户推荐好友
                context.write(new User(key.toString(), friendsCount), new User(other, friendsCount));
                context.write(new User(other, friendsCount), new User(key.toString(), friendsCount));
            }
        }
        static class SortReduce extends Reducer<User, User, Text, Text>{
            @Override
            protected void reduce(User arg0, Iterable<User> arg1, Reducer<User, User, Text, Text>.Context arg2)
                    throws IOException, InterruptedException {
                String uname = arg0.getUname();
                StringBuilder stringBuilder = new StringBuilder();
                for (User user : arg1) {
                    stringBuilder.append(user.getUname() + ": " + user.getFriedsCount());
                    stringBuilder.append(", ");
                }
                arg2.write(new Text(uname), new Text(stringBuilder.toString()));
            }
        }
    
    }

    初始文档 

    小明    老王    如花    林志玲
    老王    小明    凤姐
    如花    小明    李刚    凤姐
    林志玲    小明    李刚    凤姐    郭美美
    李刚    如花    凤姐    林志玲
    郭美美    凤姐    林志玲
    凤姐    如花    老王    林志玲    郭美美

    系列来自尚学堂视频

  • 相关阅读:
    RabbitMQ 高可用集群搭建
    Ubuntu16.04 安装RabbitMQ
    surging+CentOS7+docker+rancher2.0 菜鸟部署运行笔记
    查看进程使用swap的状态
    查看磁盘信息命令汇总
    复制一批文件,每个文件名包含日期
    小妙招:yum 夯住了怎么办?
    测试并发数
    centos7安装python3
    使用rsync需要注意的一些问题
  • 原文地址:https://www.cnblogs.com/wenbronk/p/7308716.html
Copyright © 2011-2022 走看看