zoukankan      html  css  js  c++  java
  • MapReduce编程:最大值、最小值、平均值、计数、中位数、标准差

    MapReduce编程最基础的范例应该就是Wordcount了,然后大部分就是要做一遍最大值最小值的计算。课上老师用的课本是《MapReduce编程与设计模式》,里面第一章就介绍了Wordcount ,接下来就是最大值最小值平均值标准差,其数据来源于Stack Overflow网站上的评论内容,包括评论时间、评论用户ID,评论文本。并且是以.xml文件形式做输入文件。因此读入到mapper时需要先将xml转化为map的键值对形式。transformXmlToMap(value.toString());
    以下是输入文件的形式,随便造的几组数据,只改动了评论时间与用户ID,评论文本内容是直接粘的。

    <row Id="1" PostId="35314" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2018-09-06T08:07:10.730" UserId="1" /> 
    <row Id="1" PostId="35315" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2007-09-06T08:05:33.730" UserId="1" />
    <row Id="1" PostId="35316" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2008-09-06T08:07:10.730" UserId="1" /> 
    <row Id="1" PostId="35317" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2008-08-06T08:07:26.730" UserId="1" /> 
    <row Id="2" PostId="35318" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2008-05-06T08:11:10.730" UserId="1" /> 
    <row Id="2" PostId="35319" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2008-09-06T08:12:10.730" UserId="1" /> 
    <row Id="2" PostId="35320" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2008-06-06T08:03:10.730" UserId="1" /> 
    <row Id="2" PostId="35321" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2008-09-06T08:07:10.880" UserId="1" /> 
    <row Id="2" PostId="35322" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2016-09-06T08:07:39.730" UserId="1" /> 
    <row Id="2" PostId="35323" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2008-03-06T08:07:10.730" UserId="1" /> 
    <row Id="3" PostId="35324" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2007-09-06T08:00:22.730" UserId="1" /> 
    
    

    这在课本上是没有看到这个函数的内部实现的,但是仍是一个基本的工具类,可以自己实现,目的就是将文本抠出来转换成map形式存储。

    public static final String[] REDIS_INSTANCES = { "p0", "p1", "p2", "p3",
    			"p4", "p6" };
    
    	// This helper function parses the stackoverflow into a Map for us.
    	public static Map<String, String> transformXmlToMap(String xml) {
    		Map<String, String> map = new HashMap<String, String>();
    		try {
    			String[] tokens = xml.trim().substring(5, xml.trim().length() - 3)
    					.split(""");
    
    			for (int i = 0; i < tokens.length - 1; i += 2) {
    				String key = tokens[i].trim();
    				String val = tokens[i + 1];
    
    				map.put(key.substring(0, key.length() - 1), val);
    			}
    		} catch (StringIndexOutOfBoundsException e) {
    			System.err.println(xml);
    		}
    
    		return map;
    	}
    

    然后接下来就是一个最大最小值类:
    该代码完全取自课本原文,但接下来的平均值和标准差的计算,课本限于篇幅就没有打出完整代码,只写出了核心的mapper部分与reducer部分。但是仍可根据最大最小值的范例写出模板化的主类部分。

    首先可以看到计算最大最小值类中存在三个域值,可以知道该代码处理的是一个用户多个评论中的最早评论时间【最小值】与最晚评论时间【最大值】,并且还包括一个count计算用户总量。
    并且值得注意的是其中重载了toString函数,作为输出格式。

    其次,MapReduce本身在运行过程中自带排序,无论用户目的是否需要排序,其都会在mapper过程中自动排序,因为排序对大多数处理都是有利的,这也是MapReduce本身的机制。

    package mapreduce_2019;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.*;
    
    import org.apache.hadoop.fs.Path;
    //import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    //import org.apache.hadoop.mapreduce.Mapper.Context;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MinMaxCountTuple implements Writable{
    	private Date min=new Date();///最早时间
    	private Date max=new Date();///最晚时间
    	private long count = 0;///总计数
    	
    	private final static SimpleDateFormat frmt =new SimpleDateFormat(
    		"yyy-MM-dd'T'HH:mm:ss.SSS");
    	
    	public Date getMin(){
    		return min;
    	}
    
    	public void setMin(Date min){
    		this.min=min;
    	}
    	
    	public Date getMax(){
    		return max;
    	}
    	
    	public void setMax(Date max){
    		this.max=max;
    	}
    	
    	public long getCount(){
    		return count;
    	}
    	
    	public void setCount(long count){
    		this.count=count;
    	}
    	
    	public void readFields(DataInput in) throws IOException{
    		min=new Date(in.readLong());
    		max=new Date(in.readLong());
    		count=in.readLong();
    	}
    	
    	public void write(DataOutput out) throws IOException {
    		out.writeLong(min.getTime());
    		out.writeLong(max.getTime());
    		out.writeLong(count);
    	}
    	
    	public String toString(){
    		return frmt.format(min)+"	"+frmt.format(max)+"	"+count;
    	}
    	
    	public static final String[] REDIS_INSTANCES = { "p0", "p1", "p2", "p3",
    			"p4", "p6" };
    
    	// This helper function parses the stackoverflow into a Map for us.
    	public static Map<String, String> transformXmlToMap(String xml) {
    		Map<String, String> map = new HashMap<String, String>();
    		try {
    			String[] tokens = xml.trim().substring(5, xml.trim().length() - 3)
    					.split(""");
    
    			for (int i = 0; i < tokens.length - 1; i += 2) {
    				String key = tokens[i].trim();
    				String val = tokens[i + 1];
    
    				map.put(key.substring(0, key.length() - 1), val);
    			}
    		} catch (StringIndexOutOfBoundsException e) {
    			System.err.println(xml);
    		}
    
    		return map;
    	}
    	
    	public static class MinMaxCountMapper extends Mapper <Object,Text,Text,MinMaxCountTuple>{
    		private Text outUserId = new Text();
    		private MinMaxCountTuple outTuple = new MinMaxCountTuple();
    		
    		private final static SimpleDateFormat frmt= new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
    		
    		public void map(Object key,Text value,Context context) 
    				throws IOException,InterruptedException{
    			
    			Map <String,String> parsed=transformXmlToMap(value.toString());
    			
    			String strDate = parsed.get("CreationDate");
    			String userId = parsed.get("UserId");
    			
    			Date creationDate = null;
    			try {
    				creationDate = frmt.parse(strDate);
    			} catch (ParseException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    			
    			outTuple.setMin(creationDate);
    			outTuple.setMax(creationDate);
    			
    			outTuple.setCount(1);
    			
    			outUserId.set(userId);
    			
    			context.write(outUserId, outTuple);
    		}
    	}
    	
    	public static class MinMaxCountReducer 
    	extends Reducer<Text,MinMaxCountTuple,Text,MinMaxCountTuple>{
    		private MinMaxCountTuple result = new MinMaxCountTuple();
    		
    		public void reduce(Text key,Iterable <MinMaxCountTuple> values,Context context) throws IOException,InterruptedException{
    			result.setMin(null);
    			result.setMax(null);
    			result.setCount(0);
    			int sum=0;
    			
    			for(MinMaxCountTuple val:values){
    				if(result.getMin()==null||val.getMin().compareTo(result.getMin())<0){
    					result.setMin(val.getMin());
    				}
    				if(result.getMax()==null||result.getMax().compareTo(result.getMax())>0){
    					result.setMax(val.getMax());
    				}
    				sum+=val.getCount();
    			}
    			result.setCount(sum);
    			context.write(key, result);
    			
    		}
    	}
    	
    	
    /*============================================================================================================*/	
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		 
    //      FileUtil.deleteDir("output");
    //      String[] otherargs = new String[]{"hdfs://master:9000/input/Comments.xml", "hdfs://master:9000/output1"};
    
    //      if (otherargs.length != 2) {
    //          System.err.println("Usage: mergesort <in> <out>");
    //          System.exit(2);
    //      }
    
          Job job = Job.getInstance();
          job.setJarByClass(MinMaxCountTuple.class);
          job.setMapperClass(MinMaxCountMapper.class);
          job.setReducerClass(MinMaxCountReducer.class);
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(MinMaxCountTuple.class);
          FileInputFormat.addInputPath(job, new Path(args[0]));
          FileOutputFormat.setOutputPath(job, new Path(args[1]));
          System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    }
    	
    

    此代码输入带两个参数:文件输入路径,结果输出路径。

    将其导出.jar包之后再hdfs上运行即可。


    接下来是平均值与计数的代码,其中只有核心mapper与reducer部分是课本原文。
    根据上方样例自行加上主类与主函数。
    可以注意到此处的平均值即评论文本长度的平均值

    package mapreduce_2019;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class CountAverageTuple implements Writable{
    	private float count =0;///总计数
    	private float average=0;///平均值
    	public float getCount(){
    		return count;
    	}
    	
    	public void setCount(float count2){
    		this.count=count2;
    	}
    	
    	public float getAverage(){
    		return average;
    	}
    	
    	public void setAverage(float f){
    		this.average=f;
    	}
    	
    	public void readFields(DataInput in) throws IOException{
    		count=in.readFloat();
    		average=in.readFloat();
    	}
    	
    	public void write(DataOutput out) throws IOException {
    		out.writeFloat(count);
    		out.writeFloat(average);
    	}
    
    	public String toString(){///重载toString函数定义输出格式
    		return count+"	"+average;
    	}
    	
    	public static final String[] REDIS_INSTANCES = { "p0", "p1", "p2", "p3",
    			"p4", "p6" };
    	
    	public static Map<String, String> transformXmlToMap(String xml) {
    		Map<String, String> map = new HashMap<String, String>();
    		try {
    			String[] tokens = xml.trim().substring(5, xml.trim().length() - 3).split(""");
    			for (int i = 0; i < tokens.length - 1; i += 2) {
    				String key = tokens[i].trim();
    				String val = tokens[i + 1];
    				map.put(key.substring(0, key.length() - 1), val);
    			}
    		} catch (StringIndexOutOfBoundsException e) {
    			System.err.println(xml);
    		}
    		return map;
    	}
    	
    	public static class AverageMapper extends Mapper <Object, Text, IntWritable, CountAverageTuple> {
    		private IntWritable outHour = new IntWritable();
    		private CountAverageTuple outCountAverage = new CountAverageTuple();
    		private final static SimpleDateFormat frmt = new SimpleDateFormat ("yyyy-MM-dd'T'HH:mm:ss.SSS");
    		
    		public void map (Object key, Text value, Context context) 
    				throws IOException, InterruptedException {
    			
    			Map <String, String> parsed = transformXmlToMap (value.toString());
    			
    			String strDate = parsed.get("CreationDate");
    			String text = parsed.get("Text");
    			
    			Date creationDate = null;
    			try {
    				creationDate = frmt.parse(strDate);
    			} catch (ParseException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    			
    			outHour.set(creationDate.getHours());///时间戳
    			
    			outCountAverage.setCount(1);
    			
    			outCountAverage.setAverage (text.length());
    			
    			context.write(outHour, outCountAverage);
    		}
    	}
    
        public static class AverageReducer 
        extends Reducer <IntWritable, CountAverageTuple,IntWritable, CountAverageTuple> {
    	    private CountAverageTuple result = new CountAverageTuple();
    	    
    	    public void reduce (IntWritable key, Iterable<CountAverageTuple> values, Context context) throws IOException, InterruptedException {
    		    float sum = 0;
    		    float count = 0;
    		    for (CountAverageTuple val : values) {
    			    sum+=val.getCount() * val.getAverage();
    			    count +=val.getCount();
    		    }
    		    
    		    result.setCount (count) ;
    		    result.setAverage (sum / count);///计算平均值
    		    context.write(key,result);
    	    }
        }
        
        /*============================================================================================================*/	
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		 
    //      FileUtil.deleteDir("output");
          String[] otherargs = new String[]{"hdfs://master:9000/input/Comments.xml", "hdfs://master:9000/output2"};
    
          if (otherargs.length != 2) {
              System.err.println("Usage: mergesort <in> <out>");
              System.exit(2);
          }
    
          Job job = Job.getInstance();
          job.setJarByClass(CountAverageTuple.class);
          job.setMapperClass(AverageMapper.class);
          job.setReducerClass(AverageReducer.class);
          job.setOutputKeyClass(IntWritable.class);
          job.setOutputValueClass(CountAverageTuple.class);
          FileInputFormat.addInputPath(job, new Path(otherargs[0]));
          FileOutputFormat.setOutputPath(job, new Path(otherargs[1]));
          System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    }
    
    
    
    

    标准差与中位数的计算,这个代码是不带内存优化的最朴实版本,可以知道一个用户有多个评论,那么将其作为键值对的形式存储可以很大程度上节省空间,原本需要遍历每条评论的方式,经过key-value整合后,一个用户有几条评论,根据该数量可以快速判断需要的中位数是否在该key下,前期可以略过大部分评论数量较多的用户数据块。

    package mapreduce_2019;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class MedianStdDevTuple implements Writable{
    	private float median =0;///中位数
    	private float stddev =0;///标准差
    	
    	private final static SimpleDateFormat frmt =new SimpleDateFormat(
    			"yyy-MM-dd'T'HH:mm:ss.SSS");
    	
    	public float getMedian(){
    		return median;
    	}
    	
    	public void setMedian(float median){
    		this.median=median;
    	}
    	
    	public float getStdDev(){
    		return stddev;
    	}
    	
    	public void setStdDev(float f){
    		this.stddev=f;
    	}
    	
    	public String toString(){///重载toString定义输出格式
    		return median+"	"+stddev;
    	}
    	
    	@Override
    	public void readFields(DataInput arg0) throws IOException {
    		// TODO Auto-generated method stub
    		
    	}
    
    	@Override
    	public void write(DataOutput arg0) throws IOException {
    		// TODO Auto-generated method stub
    		
    	}
    	
    	public static final String[] REDIS_INSTANCES = { "p0", "p1", "p2", "p3",
    			"p4", "p6" };
    	
    	public static Map<String, String> transformXmlToMap(String xml) {
    		Map<String, String> map = new HashMap<String, String>();
    		try {
    			String[] tokens = xml.trim().substring(5, xml.trim().length() - 3).split(""");
    			for (int i = 0; i < tokens.length - 1; i += 2) {
    				String key = tokens[i].trim();
    				String val = tokens[i + 1];
    				map.put(key.substring(0, key.length() - 1), val);
    			}
    		} catch (StringIndexOutOfBoundsException e) {
    			System.err.println(xml);
    		}
    		return map;
    	}
    	
    	public static class MedianStdDevMapper extends Mapper <Object,Text,IntWritable,IntWritable>{
    		private IntWritable outHour = new IntWritable();
    		private IntWritable outCommentLength = new IntWritable();
    		private final static SimpleDateFormat frmt= new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
    		public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
    			Map <String,String> parsed=transformXmlToMap(value.toString());
    			
    			String strDate = parsed.get("CreationDate");
    			String text = parsed.get("Text");
    			
    			Date creationDate = null;
    			try {
    				creationDate = frmt.parse(strDate);
    			} catch (ParseException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    			
    			outHour.set(creationDate.getHours());
    			
    			outCommentLength.set(text.length());
    			
    			context.write(outHour,outCommentLength);
    		}
    	}
    	
    	
    	public static class MedianStdDevReducer extends Reducer <IntWritable,IntWritable,IntWritable,MedianStdDevTuple>{
    		private MedianStdDevTuple result = new MedianStdDevTuple();
    		private ArrayList <Float> commentLengths = new ArrayList <Float>();
    		
    		public void reduce (IntWritable key,Iterable <IntWritable> values,Context context) throws IOException,InterruptedException{
    			float sum= 0 ;
    			float count  = 0;
    			commentLengths.clear();
    			result.setStdDev(0);
    			for(IntWritable val:values){
    				commentLengths.add((float) val.get());
    				sum+=val.get();
    				++count;
    			}
    			
    			Collections.sort(commentLengths);///按评论长度二次排序
    			
    			if(count % 2==0){//判断总数奇偶性,偶数需要对中间两个中位数取平均值
    				result.setMedian((commentLengths.get((int)count/2-1)+commentLengths.get((int)count/2))/2.0f);
    			}
    			else{
    				result.setMedian(commentLengths.get((int)count/2));
    			}
    			float mean=sum/count;
    			float sumOfSquares = 0.0f;
    			for(Float f: commentLengths){///标准差计算公式
    				sumOfSquares+=(f-mean)*(f-mean);
    			}
    			result.setStdDev((float)Math.sqrt(sumOfSquares/(count-1)));
    			context.write(key, result);
    		}
    	}
    	
        /*============================================================================================================*/	
    
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		String[] otherargs = new String[]{"hdfs://master:9000/input/Comments.xml", "hdfs://master:9000/output3"};
    	    if (otherargs.length != 2) {
              System.err.println("Usage: mergesort <in> <out>");
              System.exit(2);
    	    }
    	     
    	    
    	   Job job = Job.getInstance();
           job.setJarByClass(MedianStdDevTuple.class);
           job.setMapperClass(MedianStdDevMapper.class);
           job.setReducerClass(MedianStdDevReducer.class);
           job.setOutputKeyClass(IntWritable.class);
           job.setOutputValueClass(IntWritable.class);
           FileInputFormat.addInputPath(job, new Path(otherargs[0]));
           FileOutputFormat.setOutputPath(job, new Path(otherargs[1]));
           System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    }
    
    
    
  • 相关阅读:
    分布式和集群
    c++ >>
    c++ ip地址相关
    c++ ip地址的操作 c版
    c++ 缺少动态库
    c++ dirname() basename()
    shell ulimit -n
    shell 进程查询相关的命令
    shell grep 高亮
    c++ swap 函数
  • 原文地址:https://www.cnblogs.com/kuronekonano/p/11135654.html
Copyright © 2011-2022 走看看