zoukankan      html  css  js  c++  java
  • Hadoop实战-MapReduce之分组(group-by)统计(七)

    1、数据准备

    使用MapReduce计算age.txt中年龄最大、最小、均值
    name,min,max,count
    Mike,35,20,1
    Mike,5,15,2
    Mike,20,13,1
    Steven,40,20,10
    Ken,28,68,1
    Ken,14,198,10
    Cindy,32,31,100

    2、预期结果
    Mike 5 20 4
    Steven,40,20,10
    Ken   14 198 11
    Cindy,32,31,100

    3、需要加入自定义输出类型MinMaxCountTuple

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    public class MinMaxCountTuple implements Writable {
    	private int min;
    	private int max;
    	private int count;
    
    	public int getMin() {
    		return min;
    	}
    
    	public void setMin(int min) {
    		this.min = min;
    	}
    
    	public int getMax() {
    		return max;
    	}
    
    	public void setMax(int max) {
    		this.max = max;
    	}
    
    	public int getCount() {
    		return count;
    	}
    
    	public void setCount(int count) {
    		this.count = count;
    	}
    
    	
    	public void readFields(DataInput in) throws IOException {
    		min = in.readInt();
    		max = in.readInt();
    		count = in.readInt();
    	}
    
    	public void write(DataOutput out) throws IOException {
    		out.writeInt(min);
    		out.writeInt(max);
    		out.writeInt(count);
    	}
    
    	@Override
    	public String toString() {
    		return min + "	" + max + "	" + count;
    	}
    }
    

     4、MapReduce编程

    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class Age {
    	public static class AgeMap extends
    			Mapper<Object, Text, Text, MinMaxCountTuple> {
    
    		private Text userName = new Text();
    		private MinMaxCountTuple outTuple = new MinMaxCountTuple();
    
    		@Override
    		public void map(Object key, Text value, Context context)
    				throws IOException, InterruptedException {
    			StringTokenizer itr = new StringTokenizer(value.toString());
    			while (itr.hasMoreTokens()) {
    				String content = itr.nextToken();
    				String[] splits = content.split(",");
    				String name = splits[0];
    				int min = Integer.valueOf(splits[1]);
    				int max = Integer.valueOf(splits[2]);
    				int count = Integer.valueOf(splits[3]);
    				outTuple.setMin(min);
    				outTuple.setMax(max);
    				outTuple.setCount(count);
    				userName.set(name);
    				context.write(userName, outTuple);
    			}
    		}
    	}
    
    	public static class AgeReduce extends
    			Reducer<Text, MinMaxCountTuple, Text, MinMaxCountTuple> {
    		private MinMaxCountTuple result = new MinMaxCountTuple();
    
    		public void reduce(Text key, Iterable<MinMaxCountTuple> values,
    				Context context) throws IOException, InterruptedException {
    			int sum = 0;
    			result.setMax(0);
    			result.setMin(Integer.MAX_VALUE);
    			for (MinMaxCountTuple tmp : values) {
    				if (tmp.getMin() < result.getMin()) {
    					result.setMin(tmp.getMin());
    				}
    				if (tmp.getMax() > result.getMax()) {
    					result.setMax(tmp.getMax());
    				}
    				sum += tmp.getCount();
    			}
    			result.setCount(sum);
    			context.write(key, result);
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		String[] otherArgs = new GenericOptionsParser(conf, args)
    				.getRemainingArgs();
    		if (otherArgs.length != 2) {
    			System.err.println("Usage: MinMaxCountDriver <in> <out>");
    			System.exit(2);
    		}
    		Job job = new Job(conf, "StackOverflow Comment Date Min Max Count");
    		job.setJarByClass(Age.class);
    		job.setMapperClass(AgeMap.class);
    		job.setCombinerClass(AgeReduce.class);
    		job.setReducerClass(AgeReduce.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(MinMaxCountTuple.class);
    		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    }
    
  • 相关阅读:
    20135203齐岳 信息安全系统设计基础第二周学习总结
    信息安全系统设计基础第一周学习总结
    实验五 cmp传输与加解密
    实验三 敏捷开发与XP实践(改)
    Anaconda添加镜像和删除镜像
    srvany.exe和instsrv.exe打包exe为windows服务趟的坑
    python笔记
    haproxy实现socket5代理
    nginx实现的一些实用性配置,持续更新中
    利用Func封装“方法重试”功能
  • 原文地址:https://www.cnblogs.com/qq27271609/p/6822880.html
Copyright © 2011-2022 走看看