zoukankan      html  css  js  c++  java
  • mapreduce分析百度百家作者分布的文章数和总阅读量

    主要完成的任务是从一个文件中读取数据后,去重,然后分析出作者发布的文章数量和总的阅读量之后,把分析结果保存在另一个文件中

    首先创建一个flowbean

    package flow;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    
    public class FlowBean implements WritableComparable<FlowBean>{
        
        
        private String userName;
        private long numerRead;
        private long numberArticle;
        
        //在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数
        public FlowBean(){}
        
        //为了对象数据的初始化方便,加入一个带参的构造函数
        public FlowBean(String userName, long numerRead, long numberArticle) {
            this.userName = userName;
            this.numerRead = numerRead;
            this.numberArticle = numberArticle;
        }
    
        public String getUserName() {
            return userName;
        }
    
        public void setUserName(String userName) {
            this.userName = userName;
        }
    
        public long getNumerRead() {
            return numerRead;
        }
    
        public void setNumerRead(long numerRead) {
            this.numerRead = numerRead;
        }
    
        public long getNumberArticle() {
            return numberArticle;
        }
    
        public void setNumberArticle(long numberArticle) {
            this.numberArticle = numberArticle;
        }
    
    
        
        
        //将对象数据序列化到流中
        @Override
        public void write(DataOutput out) throws IOException {
    
            out.writeUTF(userName);
            out.writeLong(numerRead);
            out.writeLong(numberArticle);
            
        }
    
        
        //从数据流中反序列出对象的数据
        //从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
        @Override
        public void readFields(DataInput in) throws IOException {
    
            userName = in.readUTF();
            numerRead = in.readLong();
            numberArticle = in.readLong();
            
        }
        @Override
        public String toString() {
    
            return "" + userName + "	" +numerRead + "	" + numberArticle;
        }
    
        @Override
        public int compareTo(FlowBean o) {
            return numerRead>o.getNumerRead()?-1:1;
        }
        
    
    }

    然后创建map和reduce

    package flow;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.OutputFormat;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Mapper.Context;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import flow.RemoveDup.RemoveDupMapper;
    import flow.RemoveDup.RemoveDupReducer;
    
    public class FlowSumRunner extends Configured implements Tool{
    
         public static class RemoveDupMapper
         extends Mapper<Object, Text, Text, NullWritable> {
    
         public void map(Object key, Text value, Context context)
             throws IOException, InterruptedException {
             context.write(value, NullWritable.get());
         }
         }
    
         public static class RemoveDupReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
             public void reduce(Text key, Iterable<NullWritable> values, Context context)
             throws IOException, InterruptedException {
         context.write(key, NullWritable.get());
         //System.out.println("reduce: key=" + key);
             }
         }
         
         /**
          * FlowBean 是我们自定义的一种数据类型,要在hadoop的各个节点之间传输,应该遵循hadoop的序列化机制
          * 就必须实现hadoop相应的序列化接口
          * @author duanhaitao@itcast.cn
          *
          */
         public static class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
    
             @Override
             protected void map(LongWritable key, Text value,Context context)
                     throws IOException, InterruptedException {
    
                 //拿一行数据
                 String line = value.toString();
                 //切分成各个字段
                 String[] fields = StringUtils.split(line, "  ");
                 
                 //拿到我们需要的字段
                 String username = fields[1];
                 long userRead = Long.parseLong(fields[2]);
             
                 //封装数据为kv并输出
                 context.write(new Text(username), new FlowBean(username,userRead,1));
                 System.out.println(username + " " + userRead );
             }
             
             
         }
    
    
        public static class FlowSumReducer extends Reducer<Text, FlowBean, FlowBean, Text>{
        
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values,Context context)
                throws IOException, InterruptedException {
    
            long numberRead_counter = 0;
            long numberArticle_counter = 0;
            
            for(FlowBean bean : values){
                
                numberRead_counter += bean.getNumerRead();
                numberArticle_counter += bean.getNumberArticle();
                
            }
            
            
            context.write(new FlowBean(key.toString(), numberRead_counter, numberArticle_counter), new Text());
        }
    
    }
    
        
        @Override
        public int run(String[] args) throws Exception {
            
            Configuration conf = new Configuration();    
            
            Job job = Job.getInstance(conf, "RemoveDup");
            job.setJarByClass(RemoveDup.class);
            job.setMapperClass(RemoveDupMapper.class);
            job.setCombinerClass(RemoveDupReducer.class);
            job.setReducerClass(RemoveDupReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            FileInputFormat.setInputPaths(job, new Path("/Users/lihu/Desktop/crawle/tap.txt"));
            FileOutputFormat.setOutputPath(job, new Path("/Users/lihu/Desktop/crawle/quchong"));
            
            Job job1 = Job.getInstance(conf);
            
            job1.setJarByClass(FlowSumRunner.class);
            
            job1.setMapperClass(FlowSumMapper.class);
            job1.setReducerClass(FlowSumReducer.class);
            
            job1.setMapOutputKeyClass(Text.class);
            job1.setMapOutputValueClass(FlowBean.class);
            
            job1.setOutputKeyClass(Text.class);
            job1.setOutputValueClass(FlowBean.class);
            
            FileInputFormat.setInputPaths(job1, new Path("/Users/lihu/Desktop/crawle/quchong"));
            FileOutputFormat.setOutputPath(job1, new Path("/Users/lihu/Desktop/crawle/logs"));
            //提交job1及job2,并等待完成
            if (job.waitForCompletion(true)) {
                return job1.waitForCompletion(true)?0:1;
            }
            
            return 0;
        }
        
        
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
            System.exit(res);
        }
    
    }
  • 相关阅读:
    gojs常用API-画布定义
    页面开发的标准
    iis7.5做反向代理配置方法实例图文教程
    Tomcat实现反向代理
    nodejs的package.json依赖dependencies中 ^ 和 ~ 的区别
    dependencies与devDependencies的区别
    常见的cmd命令
    解决SecureCRT中文显示乱码
    ASP防XSS代码
    Android页面之间进行数据回传
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6362696.html
Copyright © 2011-2022 走看看