zoukankan      html  css  js  c++  java
  • MapReduce-join连接

    join连接

    MapReduce能够执行大型数据集间的连接(join)操作。连接操作的具体实现技术取决于数据集的规模及分区方式
    连接操作如果由mapper执行,则称为“map端连接”;如果由reducer执行,则称为“reduce端连接”。

    Map端连接

    在两个大规模输入数据集之间的map端连接会在数据到达map函数之前就执行连接操作。为达到该目的,各map的输入数据必须先分区并且以特定方式排序。各个输入数据集被划分成相同数量的分区,并且均按相同的键(连接键)排序。同一键的所有记录均会放在同一分区之中。
    Map端连接操作可以连接多个作业的输出,只要这些作业的reducer数量相同、键相同并且输出文件是不可切分的(例如,小于一个HDFS块,或gzip压缩)。

    Reduce端连接

    由于reduce端连接并不要求输入数据集符合特定结构,因而reduce端连接比map端连接更为常用。但是,由于两个数据集均需经过MapReduce的shuffle过程,所以reduce端连接的效率往往要低一些。基本思路是mapper为各个记录标记源,并且使用连接件作为map输出键,使键相同的记录放在同一reducer中。
    需要使用以下技术

    1.多输入

    数据集的输入源往往有多中格式,因此可以使用MultipleInputs类来方便地解析和标注各个源。

    2.辅助排序

    reducer将从两个源中选出键相同的记录且并不介意这些记录是否已排好序。此外,为了更好的执行连接操作,先将某一个源的数据传输到reducer会非常重要。

    举个例子

    现有气象站文件及气象数据文件,需要将两个文件进行关联

    气象站文件内容如下

    00001,北京
    00002,天津
    00003,山东

    气象数据文件内容如下

    00001,20180101,15
    00001,20180102,16
    00002,20180101,25
    00002,20180102,26
    00003,20180101,35
    00003,20180102,36
    

     要求:输出气象站ID 气象站名称及气象数据

    代码如下

    1.JoinRecordWithStationName类
    package com.zhen.mapreduce.join;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    
    /**
     * @author FengZhen
     * @date 2018年9月16日
     * 
     */
    public class JoinRecordWithStationName extends Configured implements Tool{
    
    	/**
    	 * 在reduce端连接中,标记气象站记录的mapper
    	 * @author FengZhen
    	 *	00001,北京
    		00002,天津
    		00003,山东
    	 */
    	static class JoinStationMapper extends Mapper<LongWritable, Text, TextPair, Text>{
    		private NcdcStationMetadataParser parser = new NcdcStationMetadataParser();
    		@Override
    		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TextPair, Text>.Context context)
    				throws IOException, InterruptedException {
    			if (parser.parse(value.toString())) {
    				context.write(new TextPair(parser.getStationId(), "0"), new Text(parser.getStationName()));
    			}
    		}
    	}
    	
    	/**
    	 * 在reduce端连接中标记天气记录的mapper
    	 * @author FengZhen
    	 *	00001,20180101,15
    		00001,20180102,16
    		00002,20180101,25
    		00002,20180102,26
    		00003,20180101,35
    		00003,20180102,36
    	 */
    	static class JoinRecordMapper extends Mapper<LongWritable, Text, TextPair, Text> {
    		private NcdcRecordParser parser = new NcdcRecordParser();
    		@Override
    		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TextPair, Text>.Context context)
    				throws IOException, InterruptedException {
    			parser.parse(value.toString());
    			context.write(new TextPair(parser.getStationId(), "1"), value);
    		}
    	}
    	
    	/**
    	 * reducer知道自己会先接收气象站记录。因此从中抽取出值,并将其作为后续每条输出记录的一部分写到输出文件。
    	 * @author FengZhen
    	 *
    	 */
    	static class JoinReducer extends Reducer<TextPair, Text, Text, Text> {
    		@Override
    		protected void reduce(TextPair key, Iterable<Text> values, Reducer<TextPair, Text, Text, Text>.Context context)
    				throws IOException, InterruptedException {
    			Iterator<Text> iterator = values.iterator();
    			//取气象站名
    			Text stationName = new Text(iterator.next());
    			while (iterator.hasNext()) {
    				Text record = iterator.next();
    				Text outValue = new Text(stationName.toString() + "	" + record.toString());
    				context.write(key.getFirst(), outValue);
    			}
    		}
    	}
    	
    	static class KeyPartitioner extends Partitioner<TextPair, Text>{
    		@Override
    		public int getPartition(TextPair key, Text value, int numPartitions) {
    			return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
    		}
    	}
    	
    	public int run(String[] args) throws Exception {
    		Job job = Job.getInstance(getConf());
    		job.setJobName("JoinRecordWithStationName");
    		job.setJarByClass(JoinRecordWithStationName.class);
    
    		Path ncdcInputPath = new Path(args[0]);
    		Path stationInputPath = new Path(args[1]);
    		Path outputPath = new Path(args[2]);
    		
    		MultipleInputs.addInputPath(job, ncdcInputPath, TextInputFormat.class, JoinRecordMapper.class);
    		MultipleInputs.addInputPath(job, stationInputPath, TextInputFormat.class, JoinStationMapper.class);
    		FileOutputFormat.setOutputPath(job, outputPath);
    		
    		job.setPartitionerClass(KeyPartitioner.class);
    		job.setGroupingComparatorClass(TextPair.FirstComparator.class);
    		
    		job.setMapOutputKeyClass(TextPair.class);
    		
    		job.setReducerClass(JoinReducer.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    
    	public static void main(String[] args)  {
    		String[] params = new String[] {
    				"hdfs://fz/user/hdfs/MapReduce/data/join/JoinRecordWithStationName/input/record",
    				"hdfs://fz/user/hdfs/MapReduce/data/join/JoinRecordWithStationName/input/station",
    				"hdfs://fz/user/hdfs/MapReduce/data/join/JoinRecordWithStationName/output"};
    		int exitCode = 0;
    		try {
    			exitCode = ToolRunner.run(new JoinRecordWithStationName(), params);
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		System.exit(exitCode);
    	}
    	
    }
    

    2.NcdcRecordParser类

    package com.zhen.mapreduce.join;
    
    import java.io.Serializable;
    
    /**
     * @author FengZhen
     * @date 2018年9月9日
     * 解析天气数据
     */
    public class NcdcRecordParser implements Serializable{
    
    	private static final long serialVersionUID = 1L;
    
    	/**
    	 * 气象台ID
    	 */
    	private String stationId;
    	/**
    	 * 时间
    	 */
    	private long timeStamp;
    	/**
    	 * 气温
    	 */
    	private Integer temperature;
    	
    	/**
    	 * 解析
    	 * @param value
    	 */
    	public void parse(String value) {
    		String[] values = value.split(",");
    		if (values.length >= 3) {
    			stationId = values[0];
    			timeStamp = Long.parseLong(values[1]);
    			temperature = Integer.valueOf(values[2]);
    		}
    	}
    	
    	/**
    	 * 校验是否合格
    	 * @return
    	 */
    	public boolean isValidTemperature() {
    		return null != temperature;
    	}
    
    	public String getStationId() {
    		return stationId;
    	}
    
    	public void setStationId(String stationId) {
    		this.stationId = stationId;
    	}
    
    	public long getTimeStamp() {
    		return timeStamp;
    	}
    
    	public void setTimeStamp(long timeStamp) {
    		this.timeStamp = timeStamp;
    	}
    
    	public Integer getTemperature() {
    		return temperature;
    	}
    
    	public void setTemperature(Integer temperature) {
    		this.temperature = temperature;
    	}
    	
    }
    

     3.NcdcStationMetadataParser类

    package com.zhen.mapreduce.join;
    
    import java.io.Serializable;
    
    /**
     * @author FengZhen
     * @date 2018年9月9日
     * 解析气象台数据
     */
    public class NcdcStationMetadataParser implements Serializable{
    
    	private static final long serialVersionUID = 1L;
    
    	/**
    	 * 气象台ID
    	 */
    	private String stationId;
    	/**
    	 * 气象台名称
    	 */
    	private String stationName;
    	
    	/**
    	 * 解析
    	 * @param value
    	 */
    	public boolean parse(String value) {
    		String[] values = value.split(",");
    		if (values.length >= 2) {
    			stationId = values[0];
    			stationName = values[1];
    			return true;
    		}
    		return false;
    	}
    
    	public String getStationId() {
    		return stationId;
    	}
    
    	public void setStationId(String stationId) {
    		this.stationId = stationId;
    	}
    
    	public String getStationName() {
    		return stationName;
    	}
    
    	public void setStationName(String stationName) {
    		this.stationName = stationName;
    	}
    }
    

     4.TextPair类

    package com.zhen.mapreduce.join;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    /**
     * @author FengZhen
     * @date 2018年9月16日
     * 
     */
    public class TextPair implements WritableComparable<TextPair>{
    
    	private Text first;
    	private Text second;
    	public TextPair() {
    		set(new Text(), new Text());
    	}
    	public TextPair(String first, String second) {
    		set(new Text(first), new Text(second));
    	}
    	public TextPair(Text first, Text second) {
    		set(first, second);
    	}
    	public void set(Text first, Text second) {
    		this.first = first;
    		this.second = second;
    	}
    	
    	public void write(DataOutput out) throws IOException {
    		first.write(out);
    		second.write(out);
    	}
    
    	public void readFields(DataInput in) throws IOException {
    		first.readFields(in);
    		second.readFields(in);
    	}
    
    	@Override
    	public int hashCode() {
    		return first.hashCode() * 163 + second.hashCode();
    	}
    	
    	@Override
    	public boolean equals(Object obj) {
    		if (obj instanceof TextPair) {
    			TextPair textPair = (TextPair) obj;
    			return first.equals(textPair.first) && second.equals(textPair.second);
    		}
    		return false;
    	}
    	
    	public int compareTo(TextPair o) {
    		int cmp = first.compareTo(o.first);
    		if (cmp != 0) {
    			return cmp;
    		}
    		return second.compareTo(o.second);
    	}
    	
    	public Text getFirst() {
    		return first;
    	}
    	public void setFirst(Text first) {
    		this.first = first;
    	}
    	public Text getSecond() {
    		return second;
    	}
    	public void setSecond(Text second) {
    		this.second = second;
    	}
    	@Override
    	public String toString() {
    		return first + "	" + second;
    	}
    	
    	/**
    	 * 比较两个int值大小
    	 * 降序
    	 * @param a
    	 * @param b
    	 * @return
    	 */
    	public static int compare(Text a, Text b) {
    		return a.compareTo(b);
    	}
    	
    	static class FirstComparator extends WritableComparator{
    		protected FirstComparator() {
    			super(TextPair.class, true);
    		}
    		@Override
    		public int compare(WritableComparable a, WritableComparable b) {
    			TextPair ip1 = (TextPair) a;
    			TextPair ip2 = (TextPair) b;
    			return TextPair.compare(ip1.getFirst(), ip2.getFirst());
    		}
    	}
    	
    }
    

     打jar包,上传并执行

    scp /Users/FengZhen/Desktop/Hadoop/file/JoinRecordWithStationName.jar root@192.168.1.124:/usr/local/test/mr
    hadoop jar JoinRecordWithStationName.jar com.zhen.mapreduce.join.JoinRecordWithStationName
    

     结果如下

    00001	北京	00001,20180102,16
    00001	北京	00001,20180101,15
    00002	天津	00002,20180102,26
    00002	天津	00002,20180101,25
    00003	山东	00003,20180102,36
    00003	山东	00003,20180101,35
    

     

  • 相关阅读:
    bzoj 1210 [HNOI2004] 邮递员 插头dp
    与非 乱搞233
    USACO JAN14 奶牛冰壶运动 凸包+判定
    bzoj 2829 计算几何
    R
    bzoj 1592 dp
    [Usaco2007 Open]Fliptile 翻格子游戏 状压dp
    拯救莫莉斯 状压dp
    大暑假集训 第一阶段总结 233
    hdu 1693 Eat the Trees 插头dp
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/9690780.html
Copyright © 2011-2022 走看看