zoukankan      html  css  js  c++  java
  • mapreduce 二次排序

    1 二次排序

    1.1 思路

    所谓二次排序,对第1个字段相同的数据,使用第2个字段进行排序。
    举个例子,电商平台记录了每一用户的每一笔订单的订单金额,现在要求属于同一个用户的所有订单金额作排序,并且输出的用户名也要排序。

    账户 订单金额
    hadoop@apache 200
    hive@apache 550
    yarn@apache 580
    hive@apache 159
    hadoop@apache 300
    hive@apache 258
    hadoop@apache 300
    yarn@apache 100
    hadoop@apache 150
    yarn@apache 560
    yarn@apache 260

    二次排序后的结果

    账户 订单金额
    hadoop@apache 150
    hadoop@apache 200
    hadoop@apache 300
    hadoop@apache 300
    hive@apache 159
    hive@apache 258
    hive@apache 550
    yarn@apache 100
    yarn@apache 260
    yarn@apache 560
    yarn@apache 580

    实现的思路是使用自定义key,key中实现按用户名和订单金额2个字段的排序,自定义分区和分组类,按用户名进行分区和分组。自定义排序的比较器,分别用于在map端和reduce的合并排序。

    因为hadoop默认使用的字符串序列化java.io.DataOutputStream.writeUTF(), 使用了"变种的UTF编码",序列化后的字节流不能在RawComparator使用。
    在实现中,用一种变通的方法,直接使用“账户”字段的字节流,并且把字节流长度也一并序列化。RawComparator得到的字节流就是我们写进去的字节流。当然,在进行反序列化时,需要根据这个长度来读出“账户”字段。

    1.2 实现

    程序代码

    package com.hadoop;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.nio.charset.Charset;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.security.UserGroupInformation;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class SecondarySortMapReduce extends Configured implements Tool {
    
    	/**
    	 * 消费信息
    	 * @author Ivan
    	 *
    	 */
    	public static class CostBean implements WritableComparable<CostBean> {
    		private String account;
    		private double cost;
    		
    		public void set(String account, double cost) {
    			this.account = account;
    			this.cost = cost;
    		}
    		
    		public String getAccount() {
    			return account;
    		}
    		
    		public double getCost() {
    			return cost;
    		}
    		
    		@Override
    		public void write(DataOutput out) throws IOException {
    			byte[] buffer = account.getBytes(Charset.forName("UTF-8"));
    			
    			out.writeInt(buffer.length);				// 账户的字节流长度. out.writeUTF()使用的编码方式很复杂,需要使用DataInput.readUTF()来解码,这里不这么用
    			out.write(buffer);
    			out.writeDouble(cost);
    		}
    
    		@Override
    		public void readFields(DataInput in) throws IOException {
    			int accountLength = in.readInt();
    			byte[] bytes = new byte[accountLength];
    			in.readFully(bytes);
    			
    			account = new String(bytes);		
    			cost = in.readDouble();
    		}
    
    		@Override
    		public int compareTo(CostBean o) {
    			if (account.equals(o.account)) {		//账户相等, 接下来比较消费金额 
    				return cost == o.cost ? 0 : (cost > o.cost ? 1 : -1);
    			}
    			
    			return account.compareTo(o.account);
    		}
    		
    		@Override
    		public String toString() {
    			return account + "	" + cost;
    		}
    	}
    	
    	/**
    	 * 用于map端和reduce端排序的比较器:如果账户相同,则比较金额
    	 * @author Ivan
    	 *
    	 */
    	public static class CostBeanComparator extends WritableComparator {
    		@Override
    		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    			int accountLength1 = readInt(b1, s1);  
    			int accountLength2 = readInt(b2, s2);
    			
    			int result = compareBytes(b1, s1 + 4, accountLength1, b2, s2 + 4, accountLength2);
    			if (result == 0) {	// 账户相同,则比较金额 
    				double thisValue = readDouble(b1, s1 + 4 + accountLength1);
    				double thatValue = readDouble(b2, s2 + 4 + accountLength2);
    			    return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
    			} else {			
    				return result;
    			}
    		}
    	}
    	
    	/**
    	 * 用于map端在写磁盘使用的分区器
    	 * @author Ivan
    	 *
    	 */
    	public static class CostBeanPatitioner extends Partitioner<CostBean, DoubleWritable> {
    		
    		/**
    		 * 根据 account分区
    		 */
    		@Override
    		public int getPartition(CostBean key, DoubleWritable value, int numPartitions) {
    			return key.account.hashCode() % numPartitions;
    		}
    	}
    	
    	/**
    	 * 用于在reduce端分组的比较器根据account字段分组,即相同account的作为一组
    	 * @author Ivan
    	 *
    	 */
    	public static class GroupComparator extends WritableComparator {
    		@Override
    		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    			int accountLength1 = readInt(b1, s1);  
    			int accountLength2 = readInt(b2, s2);
    			
    			byte[] tmpb1 = new byte[accountLength1];
    			byte[] tmpb2 = new byte[accountLength2];
    			System.arraycopy(b1, s1 + 4, tmpb1, 0, accountLength1);
    			System.arraycopy(b2, s2 + 4, tmpb2, 0, accountLength2);
    			
    			String account1 = new String(tmpb1, Charset.forName("UTF-8"));
    			String account2 = new String(tmpb1, Charset.forName("UTF-8"));
    			
    			System.out.println("grouping: accout1=" + account1 + ", accout2=" + account2);
    			
    			return compareBytes(b1, s1 + 4, accountLength1, b2, s2 + 4, accountLength2);
    		}
    	}
    	
    	/**
    	 * Mapper类
    	 * @author Ivan
    	 *
    	 */
    	public static class SecondarySortMapper extends Mapper<LongWritable, Text, CostBean, DoubleWritable> {
    		private final CostBean outputKey = new CostBean();
    		private final DoubleWritable outputValue = new DoubleWritable();
    		
    		@Override
    		protected void map(LongWritable key, Text value, Context context)
    				throws IOException, InterruptedException {
    			String[] data = value.toString().split("	");
    			
    			double cost = Double.parseDouble(data[1]);
    			outputKey.set(data[0].trim(), cost);
    			outputValue.set(cost);			
    
    			context.write(outputKey, outputValue);
    		}
    	}
    	
    	public static class SecondarySortReducer extends Reducer<CostBean, DoubleWritable, Text, DoubleWritable> {
    		private final Text outputKey = new Text();
    		private final DoubleWritable outputValue = new DoubleWritable();
    		@Override
    		protected void reduce(CostBean key, Iterable<DoubleWritable> values,Context context)
    				throws IOException, InterruptedException {
    			outputKey.set(key.getAccount());
    			
    			for (DoubleWritable v : values) {
    				outputValue.set(v.get());
    				context.write(outputKey, outputValue);
    			}
    		}
    	}
    	
    	public int run(String[] args) throws Exception {
    		Configuration conf = getConf();
    		Job job = Job.getInstance(conf, SecondarySortMapReduce.class.getSimpleName());
    		job.setJarByClass(SecondarySortMapReduce.class);
    		
    		FileInputFormat.addInputPath(job, new Path(args[0]));
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		
    		// map settings
    		job.setMapperClass(SecondarySortMapper.class);
    		job.setMapOutputKeyClass(CostBean.class);
    		job.setMapOutputValueClass(DoubleWritable.class);
    		
    		// partition settings
    		job.setPartitionerClass(CostBeanPatitioner.class);
    		
    		// sorting		
    		job.setSortComparatorClass(CostBeanComparator.class);
    		
    		// grouping
    		
    		job.setGroupingComparatorClass(GroupComparator.class);
    		
    		// reduce settings
    		job.setReducerClass(SecondarySortReducer.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputKeyClass(DoubleWritable.class);
    		
    		boolean res = job.waitForCompletion(true);
    		
    		return res ? 0 : 1;
    	}
    	
    	/**
    	 * @param args
    	 * @throws Exception 
    	 */
    	public static void main(String[] args) throws Exception {
    		if (args.length < 2) {
    			throw new IllegalArgumentException("Usage: <inpath> <outpath>");
    		}
    		
    		ToolRunner.run(new Configuration(), new SecondarySortMapReduce(), args);
    	}
    }
    

    1.3 测试

    运行环境
    • 操作系统: Centos 6.4
    • Hadoop: Apache Hadoop-2.5.0

    拿上面的例子作为测试数据

    账户 金额
    hadoop@apache 200
    hive@apache 550
    yarn@apache 580
    hive@apache 159
    hadoop@apache 300
    hive@apache 258
    hadoop@apache 300
    yarn@apache 100
    hadoop@apache 150
    yarn@apache 560
    yarn@apache 260

  • 相关阅读:
    CSU1090 数字转换问题[BFS+素数筛选]
    HDOJ2083 简易版之最短距离
    HOJ11525 Matchsticks
    HDOJ1058 Humble Numbers[DP]
    Sort函数进行升序和降序排列[#include <algorithm>]
    HDOJ1018 求N!的位数[斯特林公式处理阶乘及阶乘位数的问题]
    HDOJ1597 find the nth digit[一元二次方程求解]
    HOJ10641 Equidivisions [BFS]
    HOJ10814 Wooden Sticks[线性DP求最少不递增子序列+结构体排序]
    HOJ12363 Robots on a grid [DP+BFS()]
  • 原文地址:https://www.cnblogs.com/ivanny/p/secondary_soft.html
Copyright © 2011-2022 走看看