zoukankan      html  css  js  c++  java
  • 大数据作业之利用MapRedeuce实现简单的数据操作

    Map/Reduce编程作业

     

     

    现有student.txtstudent_score.txt。将两个文件上传到hdfs上。使用Map/Reduce框架完成下面的题目

    student.txt

     

    2016001,王毅
    2016002,张小明
    2016003,李学彭
    2016004,王东
    2016005,王笑笑
    

     

     student_score.txt

    2016001,操作系统,60
    2016001,数据库,88
    2016001,大数据概论,85
    2016002,操作系统,91
    2016002,大数据概论,91
    2016003,大数据概论,56
    2016003,操作系统,88
    2016004,数据库,90
    2016004,大数据概论,82
    2016004,操作系统,78
    2016005,操作系统,69
    2016005,大数据概论,70
    2016005,数据库,89
    

    1)将stduent.txtstudent_score.txt连接,输出学号、姓名、课程、分数字段。

    2)统计每个同学的平均成绩,显示学号、姓名和平均成绩,并按照成绩高低降序排序。

    3)统计每门课的最高分、最低分和平均分。

    问题一:

            StudentScore1.java

    import java.io.IOException;
    import java.lang.reflect.InvocationTargetException;
    import java.util.ArrayList;
    import java.util.List;
    import org.apache.commons.beanutils.BeanUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class StudentScore1 {
    
    	public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
    		Configuration conf=new Configuration();
    		Job job=Job.getInstance(conf,"StudentScore1");
    		job.setJarByClass(StudentScore1.class);
            
    		job.setMapperClass(ScoreMapper.class);
    		//Map的输出,避免程序不确定Map输出的值的类型不确定
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(SC.class);
    		
    		job.setReducerClass(ScoreReduce.class);
    		//输出类型
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(NullWritable.class);
    		
    		//数据来源
    		FileInputFormat.addInputPath(job,new Path("/StudentInput"));
    		//输出位置
    		FileOutputFormat.setOutputPath(job, new Path("/Output1"));
    		
    		System.exit(job.waitForCompletion(true)?0:1);
    	}
         public static class ScoreMapper extends Mapper<Object, Text, Text, SC>{
    
    		@Override
    		protected void map(Object key, Text value, Mapper<Object, Text, Text, SC>.Context context)
    				throws IOException, InterruptedException {
    			//以“,”分割字符串
    			//Student         2016001,王毅          [2016001,王毅]
    			//Student_score   2016001,操作系统,60    [2016001,操作系统,60]
    			String[] words=value.toString().split(",");
    			//记录学号
    			String Sid=words[0];
    			SC sc=new SC();
    			//区分字符串属于那个类型
    			if(words.length==2) {//长度为2的记录信息是  学生
    				sc.setSid(Sid);
    				sc.setName(words[1]);
    				sc.setTable("Student");
    			    context.write(new Text(Sid), sc);
    			}else {//长度为3的记录信息是   学科成绩
    			sc.setSid(Sid);
    			sc.setCourse(words[1]);
    			sc.setScore(Integer.parseInt(words[2]));
    			sc.setTable("Student_score");
    			context.write(new Text(Sid), sc);
    		     }
        	 
             }
          }
         public static class ScoreReduce extends Reducer<Text, SC, Text, NullWritable>{
    
    		@Override
    		protected void reduce(Text key, Iterable<SC> values,
    				Reducer<Text, SC, Text,NullWritable>.Context context) throws IOException, InterruptedException {
    			
    			List<SC> list=new ArrayList<SC>();
    			String Name="";
    			//遍历结果集的value
    			for(SC value:values) {
    				
    				if(value.getTable().equals("Student")) {//只有姓名信息的记录下来
    					Name=value.getName();
    				}else {//否则,将其添加到待输出list中
    					SC sc=new SC();
    					try {
    						BeanUtils.copyProperties(sc, value);
    						list.add(sc);
    					} catch (IllegalAccessException e) {
    						// TODO Auto-generated catch block
    						e.printStackTrace();
    					} catch (InvocationTargetException e) {
    						// TODO Auto-generated catch block
    						e.printStackTrace();
    					}
    					
    				}
    			}
    			//遍历list 
    			for(SC sc:list) {
    				sc.setName(Name);
    				context.write(new Text(sc.toString()), NullWritable.get());
    			}
    		}
        	 
         }
    }
    

             SC.java

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    public class SC implements Writable{
    
    	private String Name="";
    	private String Sid="";
    	private String Course="";
    	private String Table="";
    	private int Score=0;
    	public String getName() {
    		return Name;
    	}
    	public void setName(String name) {
    		Name = name;
    	}
    	public String getSid() {
    		return Sid;
    	}
    	public void setSid(String sid) {
    		Sid = sid;
    	}
    	public String getCourse() {
    		return Course;
    	}
    	public void setCourse(String course) {
    		Course = course;
    	}
    	public String getTable() {
    		return Table;
    	}
    	public void setTable(String table) {
    		Table = table;
    	}
    	public int getScore() {
    		return Score;
    	}
    	public void setScore(int score) {
    		Score = score;
    	}
    	
    	@Override
    	public String toString() {
    		return  Sid + "," + Name + "," + Course + "," + Score;
    	}
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		this.Sid=in.readUTF();
    		this.Name=in.readUTF();
    		this.Course=in.readUTF();
    		this.Table=in.readUTF();
    		this.Score=in.readInt();
    		
    	}
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeUTF(Sid);
    		out.writeUTF(Name);
    		out.writeUTF(Course);
    		out.writeUTF(Table);
    		out.writeInt(Score);
    	}
    }
    

    结果:

    2016001,王毅,操作系统,60
    2016001,王毅,数据库,88
    2016001,王毅,大数据概论,85
    2016002,张小明,操作系统,91
    2016002,张小明,大数据概论,91
    2016003,李学彭,操作系统,88
    2016003,李学彭,大数据概论,56
    2016004,王东,大数据概论,82
    2016004,王东,操作系统,78
    2016004,王东,数据库,90
    2016005,王笑笑,数据库,89
    2016005,王笑笑,操作系统,69
    2016005,王笑笑,大数据概论,70
    

    问题二:

             Average2.java

    import java.io.IOException;
    import java.util.Comparator;
    import java.util.TreeMap;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class Average2 {
    
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		Configuration conf=new Configuration();
    		Job job=Job.getInstance(conf,"Average2");
    		
    		job.setJarByClass(Average2.class);
    		job.setMapperClass(Average2Mapper.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(DoubleWritable.class);
    		
    		
    		job.setReducerClass(Average2Reduce.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(DoubleWritable.class);
    		
    		FileInputFormat.addInputPath(job, new Path("/Output1"));
    		FileOutputFormat.setOutputPath(job, new Path("/Output2"));
    		System.exit(job.waitForCompletion(true)?0:1);
    
    	}
     
         public static class Average2Mapper extends Mapper<Object,Text,Text,DoubleWritable>{
    		@Override
    		protected void map(Object key, Text value, Mapper<Object, Text, Text, DoubleWritable>.Context context)
    				throws IOException, InterruptedException {
    			//分割
    			String[] words=value.toString().split(",");
    			//keybuf=[2016001,王毅,]
    			StringBuffer keybuf=new StringBuffer();
    			keybuf.append(words[0]).append(",").append(words[1]).append(",");
    			//score用来记录成绩
    			Double score=Double.parseDouble(words[3]);
    			context.write(new Text(keybuf.toString()), new DoubleWritable(score));
    		}
         }
         
        public static class Average2Reduce extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
            //new Comparetor<Double> 的方法  倒叙(从高到低)排序       	
        	private TreeMap<Double, String> treeMap=new TreeMap<Double, String>(new Comparator<Double>() {
    			@Override
    			public int compare(Double x, Double y) {
    				return y.compareTo(x);
    			}
    		});
        	 
    		@Override
    		protected void reduce(Text key, Iterable<DoubleWritable> values,
    				Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
    				throws IOException, InterruptedException {
    			//reduce的操作对象是[key,<value1,value2...>]
    			Double sumscore=0.0;
    			int num=0;
    			for(DoubleWritable value:values) {
    				num++;
    				sumscore=sumscore+value.get();
    			}
    			Double avg= sumscore/num;
    			//得到的结果先不输出,到treepMap里面先排个序
    			treeMap.put(avg, key.toString());
    		}
                    //输出
    		protected void cleanup(Context context) throws IOException, InterruptedException {
    			for(Double key:treeMap.keySet()) {
    				context.write(new Text(treeMap.get(key)), new DoubleWritable(key));
    			}
    		}
        	 
         }
    }
    

    结果:

    2016002,张小明,	91.0
    2016004,王东,	83.33333333333333
    2016001,王毅,	77.66666666666667
    2016005,王笑笑,	76.0
    2016003,李学彭,	72.0
    

    问题三:

             Course3.java

    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class Course3 {
    
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		Configuration conf=new Configuration();
    		Job job=Job.getInstance(conf,"Course3");
    		
    		job.setJarByClass(Course3.class);
    		job.setMapperClass(Course3Mapper.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    		
    		job.setReducerClass(Course3Reduce.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    		
    		FileInputFormat.addInputPath(job, new Path("/Output1"));
    		FileOutputFormat.setOutputPath(job, new Path("/Output3"));
    		System.exit(job.waitForCompletion(true)?0:1);
    		
    
    	}
     
         public static class Course3Mapper extends Mapper<Object,Text,Text,IntWritable>{
    
    		@Override
    		protected void map(Object key, Text value, Mapper<Object, Text,Text, IntWritable>.Context context)
    				throws IOException, InterruptedException {
    			//分割
    			String[] words=value.toString().split(",");
    			int Score=Integer.parseInt(words[3]);
    			//key=课程  value=某人某科成绩
    			context.write(new Text(words[2]), new IntWritable(Score));
    			
    		}
         }
         
         
         public static class Course3Reduce extends Reducer<Text,IntWritable,Text,Text>{
    
    		@Override
    		protected void reduce(Text key, Iterable<IntWritable> values,
    				Reducer<Text, IntWritable, Text, Text>.Context context) throws IOException, InterruptedException {
    			
    			int mmax=0;//最大值
    			int mmin=101;//最小值
    			double avg=0;//平均成绩
    			int num=0;//每科人数
    			for(IntWritable value:values) {
    				num++;
    				if(value.get()>mmax) mmax=value.get();
    				if(value.get()<mmin) mmin=value.get();
    				avg=avg+value.get();
    			}
    			avg=avg/num;
    			String score=String.valueOf(mmax)+","+String.valueOf(mmin)+","+String.valueOf(avg);
    			context.write(key,new Text(score));
    		}
         }
    }
    

    结果:

    大数据概论	91,56,76.8
    操作系统	91,60,77.2
    数据库	90,88,89.0
    
  • 相关阅读:
    Java 学习笔记之 线程interrupted方法
    Java 学习笔记之 线程interrupt方法
    定义函数
    调用函数
    pass语句
    循环
    条件语句
    别样赋值
    import语句
    字典方法
  • 原文地址:https://www.cnblogs.com/msq2000/p/11801074.html
Copyright © 2011-2022 走看看