zoukankan      html  css  js  c++  java
  • Hadoop.2.x_高级应用_二次排序及MapReduce端join

    一、对于二次排序案例部分理解

    1. 分析需求(首先对第一个字段排序,然后在对第二个字段排序)
        杂乱的原始数据           排序完成的数据
            a,1                     a,1
            b,1                     a,2
            a,2       [排序]        a,100
            b,6        ===>         b,-3
            c,2                     b,-2
            b,-2                    b,1
            a,100                   b,6
            b,-3                    c,-7
            c,-7                    c,2
    2. 分析[MapRedice过程]
         1> 分析数据传入通过input()传入map()
         2> map()对数据进行层层过滤,以达到我们想要的数据源,
         3> 过滤方法中可添加自定义计数器
         4> 过滤后写入context,转入shuffle阶段
         5> 可以说大部分shuffle阶段是map()端的shuffle
         6> 具体shullfe中,数据经过默认分区(hashPartitioner),而默认分区规则是获取
            (key.getHashCode() & Integer.MAX_VALUE)%numReudeceTasks;当然默认reduce数目就一个,
            reduce输出的文件也就一个,我是这样认为的,经过输出测试,就算你设置了自定义的分区,但你的partition数目
            并没设置,仍然走默认分区
         7> 分区之后对是分区的一个排序,再对分区中的数据进行排序,排序规则按照key排序,我们可以自定义数据类型对其
            设置排序规则,比如二次排序,可以自定义一个组合的key,在组合key中定义根据第一个字段排序,如果第一个字段
            相同,那么再进行对第二个字段排序,以达到二次排序的目的,在分区排序后进入分组阶段也是默认按照key分的,
            分组需要实现RawComparator
         8> 分组之后是merge个并归排序然后进入reduce,其中分组决定数据进入某个reduce,而分区决定了reduce阶段生成
            文件的数目,分组算是shuffle阶段对程序运行的一个优化吧我是这么理解的
     3. 分析[二次排序]
         1> 从上面的数据可以看出,我们可以自定义一个数据类型,来存放第一个和第二个字段,然后自定义一个比较器来
            说明排序规则按照key中的第一个字段进行排序,这里涉及到自定义数据需要实现WritableComparable也可以
            分别继承Writable和Comparable,反正越方便越好
         2> 接下来看看分区操作,该例只生成一个排好序的文件,不用自定义分区,自定义分区后也不会走该类,自定义分区需要
            继承Partitioner,注意是继承,我们自己要重写分区规则
         3> 然后是分组操作,分组为优化考虑还是有必要的,我们设计分组规则为按照自定义数据类型的第一个字段进行分组,
            分组需要实现RawComparator
         4> 考虑哪里还需要优化,根据数据源的数据量,字段是否必在,长度情况,
            类型情况,是否使用combine与自定义压缩类,数值为负数等,在比较器中既然定义了根据第二个字段比较,我想也
            没必要加个大数减个大数
      效果展示:
          数据源      map()后         shuffle阶段后         reduce()后
           a,1        a#1,1          a#1 [1,2,100]          a   1
           b,1        b#1,1          b#-3 [-3,-2,1,6]       a   2
           a,2        a#2,2          c#-7 [-7,2]            a   100
           b,6        b#6,6                                 b   -3
           c,2        c#2,2                                 b   -2
           b,-2       b#-2,-2                               b   1
           a,100      a#100,100                             b   6
           b,-3       b#-3,-3                               c   -7
           c,-7       c#-7,-7                               c   2
    

    二、二次排序示例代码

    SSortMr.java ## 主类
    ============
    package com.bigdata_senior.SSortMr;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class SSortMr {
    
    	//Mapper Class
    	private static class SSortMapper 
    	    extends Mapper<LongWritable, Text, SecondaryWritable, LongWritable>{
    		private SecondaryWritable mapOutKey = new SecondaryWritable();
    		private LongWritable mapOutValue = new LongWritable();
    		@Override
    		public void map(LongWritable key, Text value, Context context)
    				throws IOException, InterruptedException {
    			
    			String lineValue = value.toString();
    			String [] strValue = lineValue.split(",");
    			mapOutKey.set(strValue[0],Integer.valueOf(strValue[1]));
    			mapOutValue.set(Integer.valueOf(strValue[1]));
    			context.write(mapOutKey, mapOutValue);
    			System.out.println("key-->"+mapOutKey+" value-->"+mapOutValue);
    		}
    	}
    	
    	//Reduce Class
    	private static class SSortReduce 
    	    extends Reducer<SecondaryWritable, LongWritable, Text, LongWritable>{
    		private Text reduceOutKey = new Text(); 
    		@Override
    		public void reduce(SecondaryWritable key, Iterable<LongWritable> values,Context context)
    				throws IOException, InterruptedException {
    		
    			for(LongWritable value : values){
    				reduceOutKey.set(key.getFirst()+"#"+key.getSecond());
    				context.write(reduceOutKey, value);
    			}
    		}
    	}
    	
    	//Driver
    	public int run(String[] args) throws Exception {
    		
    		Configuration configuration = new Configuration();
    		Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
    		job.setJarByClass(this.getClass());
    		//job.setNumReduceTasks(3);
    		
    		//input
    		Path inPath = new Path(args[0]);
    		FileInputFormat.addInputPath(job,inPath);
    		
    		//output
    		Path outPath = new Path(args[1]);
    		FileOutputFormat.setOutputPath(job, outPath);
    		
    		//mapper
    		job.setMapperClass(SSortMapper.class);
    		job.setMapOutputKeyClass(SecondaryWritable.class);
    		job.setMapOutputValueClass(LongWritable.class);
    		
    		//partitioner
    		//job.setPartitionerClass(SecondaryPartionerCLass.class);
    		
    		//group
    		job.setGroupingComparatorClass(SecondaryGroupClass.class);
    		
    		//Reduce
    		job.setReducerClass(SSortReduce.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(LongWritable.class);
    		
    		//submit job
    		boolean isSuccess = job.waitForCompletion(true);
    		
    		return isSuccess ? 0 : 1;
    	}
    	
    	public static void main(String[] args) throws Exception {
    		
    		args = new String[]{
    			"hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/sortmr/input",
    			"hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/sortmr/output13"
    		};
    		//run job
    		int status = new SSortMr().run(args);
    		System.exit(status);
    	}
    }
    
    SecondaryWritable.java ## 自定义数据类型
    ======================
    package com.bigdata_senior.SSortMr;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class SecondaryWritable implements WritableComparable<SecondaryWritable> {
    	
    	private String first;
    	private int second;
    	
    	public SecondaryWritable() {}
    	
    	public SecondaryWritable(String first,int second){
    		this.set(first, second);
    	}
    	
    	public void set(String fist,int second){
    		this.first = fist;
    		this.second = second;
    	}
    	
    	public String getFirst() {
    		return first;
    	}
    
    	public void setFirst(String first) {
    		this.first = first;
    	}
    
    	public int getSecond() {
    		return second ;
    	}
    
    	public void setSecond(int second) {
    		this.second = second ;
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		
    		out.writeUTF(this.first);
    		out.writeInt(this.second);
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		
    		this.first = in.readUTF();
    		this.second = in.readInt();
    	}
    
    	@Override
    	public int compareTo(SecondaryWritable o) {
    		
    		int comp = this.first.compareTo(o.first);
    		if(0 != comp){
    			return comp;
    		}
    		return Integer.valueOf(this.second).compareTo(Integer.valueOf(o.second));
    	}
    
    	@Override
    	public String toString() {
    		return first + "#" + second;
    	}
    
    	@Override
    	public int hashCode() {
    		final int prime = 31;
    		int result = 1;
    		result = prime * result + ((first == null) ? 0 : first.hashCode());
    		result = prime * result + second;
    		return result;
    	}
    
    	@Override
    	public boolean equals(Object obj) {
    		if (this == obj)
    			return true;
    		if (obj == null)
    			return false;
    		if (getClass() != obj.getClass())
    			return false;
    		SecondaryWritable other = (SecondaryWritable) obj;
    		if (first == null) {
    			if (other.first != null)
    				return false;
    		} else if (!first.equals(other.first))
    			return false;
    		if (second != other.second)
    			return false;
    		return true;
    	}
    }
    
    SecondaryPartionerCLass.java ## 自定义分区规则(已注释不用)
    ============================
    package com.bigdata_senior.SSortMr;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class SecondaryPartionerCLass extends Partitioner<SecondaryWritable, LongWritable> {
    
    	@Override
    	public int getPartition(SecondaryWritable key, LongWritable value,
    			int numPartitions) {
    		return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
    	}
    }
    
    SecondaryGroupClass.java ## 自定义分组规则
    ========================
    package com.bigdata_senior.SSortMr;
    
    import java.util.Arrays;
    
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.WritableComparator;
    
    public class SecondaryGroupClass implements RawComparator<SecondaryWritable> {
    
    	@Override
    	public int compare(SecondaryWritable o1, SecondaryWritable o2) {
    		System.out.println("o1: "+o1.toString()+" o2: "+o2.toString());
    		return o1.getFirst().compareTo(o2.getFirst());
    	}
    
    	@Override
    	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    		System.out.println("b1: "+Arrays.toString(b1)+" b2: "+Arrays.toString(b2));
    		return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4);
    	}
    }
    
    另外还可以: ## 但这个对于小数据可用,大数据将非常消耗资源
    SSortMr2.java 
    =============
    package com.bigdata_senior.SSortMr2;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Collections;
    import java.util.Iterator;
    import java.util.List;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class SSortMr2 {
    
    	//Mapper Class
    	private static class SSortMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
    		private Text mapOutKey = new Text();
    		private LongWritable mapOutValue = new LongWritable();
    		@Override
    		public void map(LongWritable key, Text value, Context context)
    				throws IOException, InterruptedException {
    			
    			String lineValue = value.toString();
    			String [] strValue = lineValue.split(",");
    			mapOutKey.set(strValue[0]);
    			mapOutValue.set(Integer.valueOf(strValue[1]));
    			context.write(mapOutKey, mapOutValue);
    			System.out.println("key-->"+mapOutKey+" value-->"+mapOutValue);
    		}
    	}
    	
    	//Reduce Class
    	private static class SSortReduce extends Reducer<Text, LongWritable, Text, Long>{
    		@Override
    		public void reduce(Text key, Iterable<LongWritable> values,Context context)
    				throws IOException, InterruptedException {
    			
    			List<Long> longList = new ArrayList<Long>();
    			for(LongWritable value: values){
    				longList.add(value.get());
    			}
    			Collections.sort(longList);
    			for(Long value : longList){
    				System.out.println("key--> "+key+" value--> "+value);
    				context.write(key, value);
    			}
    		}
    	}
    	
    	//Driver
    	public int run(String[] args) throws Exception {
    	
    		Configuration configuration = new Configuration();
    		Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
    		job.setJarByClass(this.getClass());
    		
    		//input
    		Path inPath = new Path(args[0]);
    		FileInputFormat.addInputPath(job,inPath);
    		
    		//output
    		Path outPath = new Path(args[1]);
    		FileOutputFormat.setOutputPath(job, outPath);
    		
    		//mapper
    		job.setMapperClass(SSortMapper.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(LongWritable.class);
    		
    		//Reduce
    		job.setReducerClass(SSortReduce.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Long.class);
    		
    		//submit job
    		boolean isSuccess = job.waitForCompletion(true);
    		
    		return isSuccess ? 0 : 1;
    	}
    	
    	public static void main(String[] args) throws Exception {
    		
    		args = new String[]{
    			"hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/sortmr/input",
    			"hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/sortmr/output22"
    		};
    		//run job
    		int status = new SSortMr2().run(args);
    		System.exit(status);
    	}
    }
    

    三、MapReduce join简单理解

    1. join(组合)
    2. 即两张或两张以上的数据源数据组合输出
    3. 由于学了hive,感觉MapReduce的join不再是重点,因为在MapReduce处理
        1> 为止join表数目
        2> 操作繁琐,过滤多样,可能会考虑不全
        3> 资源消耗较重
    4. MapReduce的join大致就是将两张表加载进内存,在数据混淆情况下,为其设置自定义数据类型以区分两张表,
       然后在reudece()中分别获取表并指定输出结果,当然处理join的方式还有很多,比如setup()加载一张表存进集合处理
    

    四、MapReduce join代码示例

    JoinMr.java ## 主类
    ===========
    package com.bigdata_senior.joinMr;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class JoinMr {
    
    	//Mapper Class
    	private static class WordCountMapper extends 
    		Mapper<LongWritable, Text, LongWritable, JoinWritable>{
    
    		private LongWritable mapoutputkey = new LongWritable();
    		private JoinWritable mapoutputvalue = new JoinWritable();
    		
    		@Override
    		protected void setup(Context context) throws IOException,
    				InterruptedException {
    		}
    
    		@Override
    		public void map(LongWritable key, Text value, Context context)
    				throws IOException, InterruptedException {
    			
    			String lineValue = value.toString();
    			String [] strValue = lineValue.split(",");
    			
    			int length = strValue.length;
    			if(3 != length && 4 != length){
    				return;
    			}
    			
    			//get cid
    			Long cid = Long.valueOf(strValue[0]);
    			//get cname
    			String cname = strValue[1];
    			//set customer
    			if(3 == length){
    				String phone = strValue[2];
    				mapoutputkey.set(cid);
    				mapoutputvalue.set("customer", cname + "," + phone);
    			}
    			
    			//set order
    			if(4 == length){
    				String price = strValue[2];
    				String date = strValue[3];
    				mapoutputkey.set(cid);
    				mapoutputvalue.set("order", cname +","+price +","+ date);
    			}
    			context.write(mapoutputkey, mapoutputvalue);
    		}
    	}
    	
    	//Reduce Class
    	private static class WordCountReduce extends 
    		Reducer<LongWritable, JoinWritable, NullWritable, Text>{
    
    		private Text outputValue = new Text(); 
    		@Override
    		public void reduce(LongWritable key, Iterable<JoinWritable> values,Context context)
    				throws IOException, InterruptedException {
    		
    			String customerInfo = null;
    			List<String> orderList = new ArrayList<String>();
    			for(JoinWritable value : values){
    				if("customer".equals(value.getTag())){
    					customerInfo = value.getData();
    					System.out.println(customerInfo);
    				}else if("order".equals(value.getTag())){
    					orderList.add(value.getData());
    				}
    			}
    			for(String order: orderList){
    				outputValue.set(key.get()+","+customerInfo+","+order);
    				context.write(NullWritable.get(), outputValue);
    			}
    		}
    	}
    	
    	//Driver
    	public int run(String[] args) throws Exception {
    		
    		Configuration configuration = new Configuration();
    		Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
    		job.setJarByClass(this.getClass());
    		
    		//input
    		Path inPath = new Path(args[0]);
    		FileInputFormat.addInputPath(job,inPath);
    		
    		//output
    		Path outPath = new Path(args[1]);
    		FileOutputFormat.setOutputPath(job, outPath);
    		
    		//mapper
    		job.setMapperClass(WordCountMapper.class);
    		job.setMapOutputKeyClass(LongWritable.class);
    		job.setMapOutputValueClass(JoinWritable.class);
    		
    		//Reduce
    		job.setReducerClass(WordCountReduce.class);
    		job.setOutputKeyClass(NullWritable.class);
    		job.setOutputValueClass(Text.class);
    		
    		//submit job
    		boolean isSuccess = job.waitForCompletion(true);
    		
    		return isSuccess ? 0 : 1;
    	}
    	
    	public static void main(String[] args) throws Exception {
    		
    		args = new String[]{
    			"hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/join/input",
    			"hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/join/output2"
    		};
    		//run job
    		int status = new JoinMr().run(args);
    		System.exit(status);
    	}
    }
    
    JoinWritable.java ## 自定义数据类型
    package com.bigdata_senior.joinMr;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import org.apache.hadoop.io.Writable;
    
    public class JoinWritable implements Writable {
    
    	private String tag;
    	private String data;
    	
    	public JoinWritable(){}
    	
    	public JoinWritable(String tag,String data){
    		this.set(tag, data);
    	}
    	public void set(String tag,String data){
    		this.setTag(tag);
    		this.setData(data);
    	}
    	
    	public String getTag() {
    		return tag;
    	}
    
    	public void setTag(String tag) {
    		this.tag = tag;
    	}
    
    	public String getData() {
    		return data;
    	}
    
    	public void setData(String data) {
    		this.data = data;
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    
    		out.writeUTF(this.getTag());
    		out.writeUTF(this.getData());
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		this.setTag(in.readUTF());
    		this.setData(in.readUTF());
    	}
    
    	@Override
    	public int hashCode() {
    		final int prime = 31;
    		int result = 1;
    		result = prime * result + ((data == null) ? 0 : data.hashCode());
    		result = prime * result + ((tag == null) ? 0 : tag.hashCode());
    		return result;
    	}
    
    	@Override
    	public boolean equals(Object obj) {
    		if (this == obj)
    			return true;
    		if (obj == null)
    			return false;
    		if (getClass() != obj.getClass())
    			return false;
    		JoinWritable other = (JoinWritable) obj;
    		if (data == null) {
    			if (other.data != null)
    				return false;
    		} else if (!data.equals(other.data))
    			return false;
    		if (tag == null) {
    			if (other.tag != null)
    				return false;
    		} else if (!tag.equals(other.tag))
    			return false;
    		return true;
    	}
    
    	@Override
    	public String toString() {
    		return tag + "," +data;
    	}
    }
  • 相关阅读:
    oracle11g dataguard部署指南
    扩展Oracle表空间
    ORACLE SQLloader详细语法
    Oracle Data Guard
    struts2学习(4)struts2核心知识III
    struts2学习(3)struts2核心知识II
    struts2学习(2)struts2核心知识
    struts2学习(1)struts2 helloWorld
    java单例模式等一些程序的写法....持续更新...
    峰Spring4学习(8)spring对事务的支持
  • 原文地址:https://www.cnblogs.com/eRrsr/p/6009000.html
Copyright © 2011-2022 走看看