zoukankan      html  css  js  c++  java
  • MapReduce(四) 典型编程场景(二)

    一、MapJoin-DistributedCache 应用

         1、mapreduce join 介绍

    在各种实际业务场景中,按照某个关键字对两份数据进行连接是非常常见的。如果两份数据 都比较小,那么可以直接在内存中完成连接。如果是大数据量的呢? 显然,在内存中进行连 接会发生 OOM。 MapReduce 可以用来解决大数据量的链接
    MapReduce Join 操作主要分两类: MapJoin ReduceJoin

    先看 ReduceJoin:
    (1)map 阶段,两份数据 data1 data2 会被 map 分别读入,解析成以链接字段为 key 以查 询字段为 value 的 key-value 对,并标明数据来源是 data1 还是 data2
    (2)reduce 阶段, reducetask 会接收来自 data1 data2 的相同 key 的数据,在 reduce 端进 行乘积链接, 最直接的影响是很消耗内存,导致 OOM 

    再看 MapJoin:
    MapJoin 适用于有一份数据较小的连接情况。做法是直接把该小份数据直接全部加载到内存 当中,按链接关键字建立索引。 然后大份数据就作为 MapTask 的输入,对 map()方法的每次 输入都去内存当中直接去匹配连接。 然后把连接结果按 key 输出,这种方法要使用 hadoop
    中的 DistributedCache 把小份数据分布到各个计算节点,每个 maptask 执行任务的节点都需 要加载该数据到内存,并且按连接关键字建立索引 
    (map读的是大表数据,在读大表之前,把小表数据放到内存当中,用setup方法)

        2、需求

    现有两份数据 movies.dat ratings.dat 数据样式分别为:

    Movies.dat:
         1::Toy Story (1995)::Animation|Children's|Comedy
         2::Jumanji (1995)::Adventure|Children's|Fantasy
         3::Grumpier Old Men (1995)::Comedy|Romance
         字段含义: movieid, moviename, movietype

    Ratings.dat
        1::1193::5::978300760
        1::661::3::978302109
        1::914::3::978301968

    字段含义: userid, movieid, rate, timestamp

    现要求对两表进行连接,要求输出最终的结果有以上六个字段:
    movieid, userid, rate, moviename, movietype, timestamp

         3、实现

    第一步:封装 MovieRate,方便数据的排序和序列化
        

    package com.ghgj.mr.mymapjoin;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import org.apache.hadoop.io.WritableComparable;
    public class MovieRate implements WritableComparable<MovieRate>{
    private String movieid;
    private String userid;
    private int rate;
    private String movieName;
    private String movieType;
    private long ts;
    public String getMovieid() {
    return movieid;
    }
    public void setMovieid(String movieid) {
    this.movieid = movieid;
    }
    public String getUserid() {
    return userid;
    }
    public void setUserid(String userid) {
    this.userid = userid;
    }
    public int getRate() {
    return rate;
    }
    public void setRate(int rate) {
    this.rate = rate;
    }
    public String getMovieName() {
    return movieName;
    }
    public void setMovieName(String movieName) {
    this.movieName = movieName;
    }
    public String getMovieType() {
    return movieType;
    }
    public void setMovieType(String movieType) {
    this.movieType = movieType;
    }
    public long getTs() {
    return ts;
    }
    public void setTs(long ts) {
    this.ts = ts;
    }
    public MovieRate() {
    }
    public MovieRate(String movieid, String userid, int rate, String movieName,
    String movieType, long ts) {
    this.movieid = movieid;
    this.userid = userid;
    this.rate = rate;
    this.movieName = movieName;
    this.movieType = movieType;
    this.ts = ts;
    }
    @Override
    public String toString() {
    return movieid + "	" + userid + "	" + rate + "	" + movieName
    + "	" + movieType + "	" + ts;
    }
    @Override
    public void write(DataOutput out) throws IOException {
    out.writeUTF(movieid);
    out.writeUTF(userid);
    out.writeInt(rate);
    out.writeUTF(movieName);
    out.writeUTF(movieType);
    out.writeLong(ts);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
    this.movieid = in.readUTF();
    this.userid = in.readUTF();
    this.rate = in.readInt();
    this.movieName = in.readUTF();
    this.movieType = in.readUTF();
    this.ts = in.readLong();
    }
    @Override
    public int compareTo(MovieRate mr) {
    int it = mr.getMovieid().compareTo(this.movieid);
    if(it == 0){
    return mr.getUserid().compareTo(this.userid);
    }else{
    return it;
    }
    }
    }
    

    第二步:编写mapreduce程序

    package com.ghgj.mr.mymapjoin;
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Map;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class MovieRatingMapJoinMR {
    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
    System.setProperty("HADOOP_USER_NAME","hadoop");

    Job job = Job.getInstance(conf);
    // job.setJarByClass(MovieRatingMapJoinMR.class);
    job.setJar("/home/hadoop/mrmr.jar");
    job.setMapperClass(MovieRatingMapJoinMRMapper.class);
    job.setMapOutputKeyClass(MovieRate.class);
    job.setMapOutputValueClass(NullWritable.class);
    // job.setReducerClass(MovieRatingMapJoinMReducer.class);
    // job.setOutputKeyClass(MovieRate.class);
    // job.setOutputValueClass(NullWritable.class);
    job.setNumReduceTasks(0);
    String minInput = args[0];
    String maxInput = args[1];
    String output = args[2];
    FileInputFormat.setInputPaths(job, new Path(maxInput));
    Path outputPath = new Path(output);
    FileSystem fs = FileSystem.get(conf);
    if(fs.exists(outputPath)){
    fs.delete(outputPath, true);
    }
    FileOutputFormat.setOutputPath(job, outputPath);
    URI uri = new Path(minInput).toUri();
    job.addCacheFile(uri);
    boolean status = job.waitForCompletion(true);
    System.exit(status?0:1);
    }

      

    二、自定义 OutputFormat—数据分类输出

    实现:自定义 OutputFormat,改写其中的 RecordWriter,改写具体输出数据的方法 write()

    package com.ghgj.mr.score_outputformat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class MyScoreOutputFormat extends TextOutputFormat<Text, NullWritable>{
    
    	@Override
    	public RecordWriter<Text, NullWritable> getRecordWriter(
    			TaskAttemptContext job) throws IOException, InterruptedException {
    		Configuration configuration = job.getConfiguration();
    		
    		FileSystem fs = FileSystem.get(configuration);
    		Path p1 = new Path("/score1/outpu1");
    		Path p2 = new Path("/score2/outpu2");
    		
    		if(fs.exists(p1)){
    			fs.delete(p1, true);
    		}
    		if(fs.exists(p2)){
    			fs.delete(p2, true);
    		}
    		
    		FSDataOutputStream fsdout1 = fs.create(p1);
    		FSDataOutputStream fsdout2 = fs.create(p2);
    		return new MyRecordWriter(fsdout1, fsdout2);
    	}
    	
    	static class MyRecordWriter extends RecordWriter<Text, NullWritable>{
    
    		FSDataOutputStream dout1 = null;
    		FSDataOutputStream dout2 = null;
    		
    		public MyRecordWriter(FSDataOutputStream dout1, FSDataOutputStream dout2) {
    			super();
    			this.dout1 = dout1;
    			this.dout2 = dout2;
    		}
    
    		@Override
    		public void write(Text key, NullWritable value) throws IOException,
    				InterruptedException {
    			// TODO Auto-generated method stub
    			
    			String[] strs = key.toString().split("::");
    			if(strs[0].equals("1")){
    				dout1.writeBytes(strs[1]+"
    ");
    			}else{
    				dout2.writeBytes(strs[1]+"
    ");
    			}
    		}
    
    		@Override
    		public void close(TaskAttemptContext context) throws IOException,
    				InterruptedException {
    			IOUtils.closeStream(dout2);
    			IOUtils.closeStream(dout1); 
    		}
    	}
    }
    

      

    package com.ghgj.mr.score_outputformat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class ScoreOutputFormatMR extends Configured implements Tool{
    
    	// 这个run方法就相当于Driver
    	@Override
    	public int run(String[] args) throws Exception {
    		
    		Configuration conf = new Configuration();
    		conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
    		System.setProperty("HADOOP_USER_NAME", "hadoop");
    		Job job = Job.getInstance(conf);
    		
    		job.setMapperClass(ScoreOutputFormatMRMapper.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(NullWritable.class);
    		
    		job.setNumReduceTasks(0);
    		
    		// 这就是默认的输入输出组件
    		job.setInputFormatClass(TextInputFormat.class);
    		// 这是默认往外输出数据的组件
    //		job.setOutputFormatClass(TextOutputFormat.class);
    		job.setOutputFormatClass(MyScoreOutputFormat.class);
    		
    		FileInputFormat.setInputPaths(job, new Path("/scorefmt"));
    		Path output = new Path("/scorefmt/output");
    		FileSystem fs = FileSystem.get(conf);
    		if(fs.exists(output)){
    			fs.delete(output, true);
    		}
    		FileOutputFormat.setOutputPath(job, output);
    		
    		boolean status = job.waitForCompletion(true);
    		return status?0:1;
    	}
    
    	public static void main(String[] args) throws Exception {
    		
    		int run = new ToolRunner().run(new ScoreOutputFormatMR(), args);
    		System.exit(run);
    	}
    	
    	static class ScoreOutputFormatMRMapper extends Mapper<LongWritable,  Text, Text, NullWritable>{
    		@Override
    		protected void map(LongWritable key, Text value,
    				Mapper<LongWritable, Text, Text, NullWritable>.Context context)
    				throws IOException, InterruptedException {
    			
    			String[] split = value.toString().split("	");
    			if(split.length-2 >= 6){
    				context.write(new Text("1::"+value.toString()), NullWritable.get());
    			}else{
    				context.write(new Text("2::"+value.toString()), NullWritable.get());
    			}
    		}
    	}
    }
    

    三、自定义 InputFormat—小文件合并

          第一步:自定义InputFormat

    package com.ghgj.mr.format.input;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    public class WholeFileInputFormat extends FileInputFormat<NullWritable, Text> {
    	// 设置每个小文件不可分片,保证一个小文件生成一个key-value键值对
    	@Override
    	protected boolean isSplitable(JobContext context, Path file) {
    		return false;
    	}
    
    	@Override
    	public RecordReader<NullWritable, Text> createRecordReader(
    			InputSplit split, TaskAttemptContext context) throws IOException,
    			InterruptedException {
    		WholeFileRecordReader reader = new WholeFileRecordReader();
    		reader.initialize(split, context);
    		return reader;
    	}
    }
    

      第二步:编写自定义的 RecordReader

    package com.ghgj.mr.format.input;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
    	private FileSplit fileSplit;
    	private Configuration conf;
    	private Text value = new Text();
    	private boolean processed = false;
    
    	@Override
    	public void initialize(InputSplit split, TaskAttemptContext context)
    			throws IOException, InterruptedException {
    		this.fileSplit = (FileSplit) split;
    		this.conf = context.getConfiguration();
    	}
    
    	@Override
    	public boolean nextKeyValue() throws IOException, InterruptedException {
    		if (!processed) {
    			// 获取 输入逻辑切片的 字节数组
    			byte[] contents = new byte[(int) fileSplit.getLength()];
    			// 通过 filesplit获取该逻辑切片在文件系统的位置
    			Path file = fileSplit.getPath();
    			FileSystem fs = file.getFileSystem(conf);
    			FSDataInputStream in = null;
    			try {
    				// 文件系统对象fs打开一个file的输入流
    				in = fs.open(file);
    				/**
    				 *  in是输入流
    				 *  contents是存这个流读取的到数的数据的字节数组
    				 *  
    				 */
    				IOUtils.readFully(in, contents, 0, contents.length);
    				
    				value.set(contents, 0, contents.length);
    				
    			} finally {
    				IOUtils.closeStream(in);
    			}
    			processed = true;
    			return true;
    		}
    		return false;
    	}
    
    	@Override
    	public NullWritable getCurrentKey() throws IOException, InterruptedException {
    		return NullWritable.get();
    	}
    
    	@Override
    	public Text getCurrentValue() throws IOException, InterruptedException {
    		return value;
    	}
    
    	@Override
    	public float getProgress() throws IOException {
    		return processed ? 1.0f : 0.0f;
    	}
    
    	@Override
    	public void close() throws IOException {
    		// do nothing
    	}
    }
    

       第三步:编写mapreduce程序

    package com.ghgj.mr.format.input;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class SmallFilesConvertToBigMR extends Configured implements Tool {
    	
    	public static void main(String[] args) throws Exception {
    		int exitCode = ToolRunner.run(new SmallFilesConvertToBigMR(), args);
    		System.exit(exitCode);
    	}
    
    	static class SmallFilesConvertToBigMRMapper extends
    			Mapper<NullWritable, Text, Text, Text> {
    		
    		private Text filenameKey;
    		@Override
    		protected void setup(Context context) throws IOException,
    				InterruptedException {
    			InputSplit split = context.getInputSplit();
    			Path path = ((FileSplit) split).getPath();
    			filenameKey = new Text(path.toString());
    		}
    
    		@Override
    		protected void map(NullWritable key, Text value, Context context)
    				throws IOException, InterruptedException {
    			context.write(filenameKey, value);
    		}
    	}
    
    	static class SmallFilesConvertToBigMRReducer extends
    			Reducer<Text, Text, NullWritable, Text> {
    		@Override
    		protected void reduce(Text filename, Iterable<Text> bytes,
    				Context context) throws IOException, InterruptedException {
    			context.write(NullWritable.get(), bytes.iterator().next());
    		}
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
    		System.setProperty("HADOOP_USER_NAME", "hadoop");
    		Job job = Job.getInstance(conf, "combine small files to bigfile");
    		
    		job.setJarByClass(SmallFilesConvertToBigMR.class);
    		
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    		job.setMapperClass(SmallFilesConvertToBigMRMapper.class);
    
    		job.setOutputKeyClass(NullWritable.class);
    		job.setOutputValueClass(Text.class);
    		job.setReducerClass(SmallFilesConvertToBigMRReducer.class);
    
    		// TextInputFormat是默认的数据读取组件
    //		job.setInputFormatClass(TextInputFormat.class);
    		// 不是用默认的读取数据的Format,我使用自定义的 WholeFileInputFormat
    		job.setInputFormatClass(WholeFileInputFormat.class);
    		
    		
    		Path input = new Path("/smallfiles");
    		Path output = new Path("/bigfile");
    		FileInputFormat.setInputPaths(job, input);
    		FileSystem fs = FileSystem.get(conf);
    		if (fs.exists(output)) {
    			fs.delete(output, true);
    		}
    		FileOutputFormat.setOutputPath(job, output);
    
    		int status = job.waitForCompletion(true) ? 0 : 1;
    		return status;
    	}
    }
    

      


     

  • 相关阅读:
    搭建React+TypeScript项目
    vue 基础常用部分总结
    nodejs基于nodemailer插件实现发送邮箱
    httpserver的使用
    android4.0.4 系统默认值的修改
    使用TransferLearning实现环视图像的角点检测——Tensorflow+MobileNetv2_SSD
    Idea 2020.3 springboot 热更新
    Prism学习之SilverlightWindowRegionAdapter
    Silverlight异步Socket通信
    Lync 2010 标准版安装注意事项
  • 原文地址:https://www.cnblogs.com/liuwei6/p/6735918.html
Copyright © 2011-2022 走看看