zoukankan      html  css  js  c++  java
  • 034 二次排序

    一:准备

    1.二测排序

      其中1说明了自定义类型

      2与3说明了shuffle阶段的分区与分组,同时说明了程序的写法。

      

    2.RawComparator class

      

    3.二次排序的要点

      组合key,key是一个组合的字段,自定义数据类型

        实现WritableComparable

      保证原来的分区不变,自定义分区规则

        继承Patitioner

      保证原来的分组不变,自定义分组规则

        继承RawComparator

    4.输入的数据

      

    5.需求

      平时的只有一次排序,就是第一个会排序,但是输出的结果中第二个没有排序处理。

      现在希望。在第一个key排序之后,后面的key也可以排序出来。

      

    二:第一次排序

    3.输出第一排序的程序

      MAPPER--------------

      

      REDUCER------------

      

    4.结果

      

    三:二次排序

    5.map和reduce程序

      

    6.自定义类型的程序

      需要实现接口WritableComparable

      输入String,int。

      

    7.自定义分组比较器

      需要实现RawComparator

      两个函数都是相同的意思,都是在返回first的比较结果。

      

    8.定义分区规则

      继承Patitioner

      

    9.运行结果

      

    四:优化点

      例如分区就属于优化,但是这里说的是正负数的优化。

      

    五:重新整理

    1.项目结构

      

    2.程序代码

    RealSecondSort.class

    package com.senior.sort;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.Partitioner;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Mapper.Context;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.senior.network.WebPvCount;
    import com.senior.network.WebPvCount.WebPvCountMapper;
    import com.senior.network.WebPvCount.WebPvCountReducer;
    
    public class RealSecondSort extends Configured implements Tool{
    	//Mapper
    	public static class SortMapper extends Mapper<LongWritable,Text,PariWritable,IntWritable>{
    		private PariWritable mapoutkey=new PariWritable();
    		private IntWritable mapoutvalue=new IntWritable();
    		@Override
    		protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
    			String valueStr=value.toString();
    			String strs[]=valueStr.split(",");
    			mapoutkey.set(strs[0],Integer.valueOf(strs[1]));
    			mapoutvalue.set(Integer.valueOf(strs[1]));
    			context.write(mapoutkey, mapoutvalue);
    		
    		}
    		
    	}
    	
    	//Reducer
    	public static class SortReducer extends Reducer<PariWritable,IntWritable,Text,IntWritable>{
    		private Text outkey=new Text();
    		@Override
    		protected void reduce(PariWritable key, Iterable<IntWritable> value,Context context)throws IOException, InterruptedException {
    			for(IntWritable str : value){
    				outkey.set(key.getFirst());
    				context.write(outkey, str);
    			}
    		}
    		
    	}
    	
    	//Driver
    	public int run(String[] args) throws Exception {
    		Configuration conf=this.getConf();
    		Job job=Job.getInstance(conf,this.getClass().getSimpleName());
    		job.setJarByClass(RealSecondSort.class);
    		//input
    		Path inpath=new Path(args[0]);
    		FileInputFormat.addInputPath(job, inpath);
    		
    		//output
    		Path outpath=new Path(args[1]);
    		FileOutputFormat.setOutputPath(job, outpath);
    		
    		//map
    		job.setMapperClass(SortMapper.class);
    		job.setMapOutputKeyClass(PariWritable.class);
    		job.setMapOutputValueClass(IntWritable.class);
    		
    		//shuffle*************************************
    		job.setPartitionerClass(PartitionNum.class);
    		job.setGroupingComparatorClass(GroupingComparator.class);	
    		
    		//shuffle*************************************
    		
    		//reduce
    		job.setReducerClass(SortReducer.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		
    		//submit
    		boolean isSucess=job.waitForCompletion(true);
    		return isSucess?0:1;
    
    	}
    	
    	//Main
    	public static void main(String[] args) throws Exception{
    		Configuration conf=new Configuration();
    		args=new String[]{
    				"hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/inputSortData",
    				"hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/outputSortData2"
    		};
    		int status=ToolRunner.run(new RealSecondSort(), args);
    		System.exit(status);
    	}
    
    }
    

     

    PariWritable.java

    这个地方使用的接口可以看看下面的说明。

    package com.senior.sort;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class PariWritable implements WritableComparable<PariWritable>{
    	private String first;
    	private Integer second;
    	public PariWritable(){}
    	public PariWritable(String first,Integer second){
    		set(first,second);
    	}
    	//set get
    	public String getFirst() {
    		return first;
    	}
    	public void setFirst(String first) {
    		this.first = first;
    	}
    	public Integer getSecond() {
    		return second-Integer.MAX_VALUE;
    	}
    	public void setSecond(Integer second) {
    		this.second = second+Integer.MAX_VALUE;
    	}
    	public void set(String first, Integer second) {
    		this.first=first;
    		this.second=second;	
    	}
    	//
    	public void readFields(DataInput input) throws IOException {
    		this.first=input.readUTF();
    		this.second=input.readInt();		
    	}
    	
    	public void write(DataOutput output) throws IOException {
    		output.writeUTF(first);
    		output.writeInt(second);
    		
    	}
    	public int compareTo(PariWritable o) {
    		int comp=this.first.compareTo(o.getFirst());
    		if(0!=comp){
    			return comp;
    		}
    		return Integer.valueOf(getSecond()).compareTo(Integer.valueOf(o.getSecond()));
    	}
    	@Override
    	public String toString() {
    		return "PariWritable [first=" + first + ", second=" + second + "]";
    	}
    	
    }
    

      

    PartitionNum.java

    package com.senior.sort;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class PartitionNum extends Partitioner<PariWritable, IntWritable> {
    
    	@Override
    	public int getPartition(PariWritable key, IntWritable value, int num) {
    		
    		return (key.getFirst().hashCode()&Integer.MAX_VALUE)%num;
    	}
    
    }
    

      

    GroupingComparator.java

    关于程序中的一点仔细看下面的一个部分,就可以很好的理解了。

    package com.senior.sort;
    
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.WritableComparator;
    
    public class GroupingComparator implements RawComparator<PariWritable> {
    
    	public int compare(PariWritable o1, PariWritable o2) {
    		return o1.getFirst().compareTo(o2.getFirst());
    	}
    
    	public int compare(byte[] b1, int arg1, int l1, byte[] b2, int arg4,int l2) {
    		
    		return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4);
    	}
    
    }
    

      

    3.效果

      

    六:Hadoop的序列化

    1.说明

      在上面的程序中使用到了序列化,在整理的过程中对这一块进行整理一番。

      

    2.序列化的功能  

      对于需要保存和处理大规模数据的Hadoop来说,其序列化机制要达到以下目的:
    • 排列紧凑:尽量减少带宽,加快数据交换速度
    • 处理快速:进程间通信需要大量的数据交互,使用大量的序列化机制,必须减少序列化和反序列的开支
    • 跨语言:可以支持不同语言间的数据交互啊,如C++
    • 可扩展:当系统协议升级,类定义发生变化,序列化机制需要支持这些升级和变化

    3.Writable

      为了支持以上特性,引用了Writable接口。和说明性Serializable接口不一样,它要求实现两个方法。
      这个功能已经在前面的章节中说明过。
    public interface Writable {  
      void write(DataOutput out) throws IOException;  
      void readFields(DataInput in) throws IOException;  
    }  
    

      

    4.其他接口

      A。WritableComparable:它不仅提供序列化功能,而且还提供比较的功能。这种比较式基于反序列后的对象成员的值,速度较慢。
      B。RawComparator:由于MapReduce十分依赖基于键的比较排序(自定义键还需要重写hashCode和equals方法),因此提供了一个优化接口 RawComparator。该接口允许直接比较数据流中的记录,无需把数据流反序列化为对象,这样避免了新建对象的额外开销。RawComparator定义如下,compare方法可以从每个字节数组b1和b2中读取给定起始位置(s1和s2)以及长度l1和l2的一个整数直接进行比较。
      这个上面已经说明过。
    public interface RawComparator<T> extends Comparator<T> {  
      
      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);  
      
    }  
      C。WritableComparator: 是 RawComparator 的一个通用实现,提供两个功能:提供了一个 RawComparator的comparea()的默认实现,该默认实现只是反序列化了键然后再比较,没有什么性能优势。其次、充当了 RawComaprator 实例的一个工厂方法。
      当我们要实现自定key排序时(自定义分组),需要指定自己的排序规则。
      如需要以StartEndDate为键且以开始时间分组,则需要自定义分组器:
    class MyGrouper implements RawComparator<StartEndDate> {  
        @Override  
        public int compare(StartEndDate o1, StartEndDate o2) {  
            return (int)(o1.getStartDate().getTime()- o2.getEndDate().getTime());  
        }  
        @Override  
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {  
            int compareBytes = WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);  
            return compareBytes;  
        }  
           
    }  
    

      

      

  • 相关阅读:
    Linux 中 eclipse 的tomcat端口号被占用
    JDBC的常用API
    eclipse Alt+/ 无法提示代码
    javaweb项目开发错误代码
    PSP总结报告
    20181204-1 每周例行报告
    对团队成员公开感谢博客
    20181127-2 每周例行报告
    20181120-1 每周例行报告
    20181113-2 每周例行报告
  • 原文地址:https://www.cnblogs.com/juncaoit/p/6016873.html
Copyright © 2011-2022 走看看