zoukankan      html  css  js  c++  java
  • MultipleOutputs新旧api

    package MRNB_V4;
    import java.io.IOException;
    import java.util.Iterator;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.TextInputFormat;
    import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class MultipleOutputs extends Configured implements Tool {
    
        public static class MapClass extends MapReduceBase implements
                Mapper<LongWritable, Text, NullWritable, Text> {
    
            @Override
            public void map(LongWritable key, Text value,
                    OutputCollector<NullWritable, Text> output, Reporter reporter)
                    throws IOException {
                output.collect(NullWritable.get(), value);
            }
    
        }
    
    //MultipleTextOutputFormat 继承自MultipleOutputFormat,实现输出文件的分类
    
        public static class PartitionByCountryMTOF extends
                MultipleTextOutputFormat<NullWritable, Text> { //key is NullWritable, value is Text
            protected String generateFileNameForKeyValue(NullWritable key,
                    Text value, String filename) {
                String[] arr = value.toString().split(",",-1);
                String country = arr[4].substring(1,3); //获取country的名称
                return country + "/"+filename;
            }
        }
    
    //此处不使用reducer
        /*public static class Reducer extends MapReduceBase
                implements
                org.apache.hadoop.mapred.Reducer<LongWritable, Text, NullWritable, Text> {
    
            @Override
            public void reduce(LongWritable key, Iterator<Text> values,
                    OutputCollector<NullWritable, Text> output, Reporter reporter)
                    throws IOException {
                // TODO Auto-generated method stub
    
            }
    
        }
    */
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();
            JobConf job = new JobConf(conf,MultipleOutputs.class);
            
            Path in = new Path(args[0]);
            Path out = new Path(args[1]);
            
            FileInputFormat.setInputPaths(job, in);
            FileOutputFormat.setOutputPath(job, out);
            
            job.setJobName("MultipleOutputs");
            job.setMapperClass(MapClass.class);
            job.setInputFormat(TextInputFormat.class);
            job.setOutputFormat(PartitionByCountryMTOF.class);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
            
            job.setNumReduceTasks(0);
            JobClient.runJob(job);
            return 0;
        }
        
        public static void main(String[] args) throws Exception{
            int res = ToolRunner.run(new Configuration(), new MultipleOutputs(), args);
            System.exit(res);
        }
    
    }

      

    package MRNB_V4;
    
    import java.io.IOException;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.Set;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class TestwithMultipleOutputs extends Configured implements Tool {
    
    	public static class MapClass extends
    			Mapper<LongWritable, Text, Text, IntWritable> {
    
    		private MultipleOutputs<Text, IntWritable> mos;
    
    		protected void setup(Context context) throws IOException,
    				InterruptedException {
    			mos = new MultipleOutputs<Text, IntWritable>(context);
    		}
    
    		public void map(LongWritable key, Text value, Context context)
    				throws IOException, InterruptedException {
    			String line = value.toString();
    			String[] tokens = line.split("-");
    
    			//mos.write("MOSInt", new Text(tokens[0]),new IntWritable(Integer.parseInt(tokens[1]))); // (第一种)
    			//mos.write("MOSText", new Text(tokens[0]), tokens[2]); // 第二种
    			mos.write("mlj", new Text(tokens[0]), line, tokens[0] + "/");// 第三种 同时也可写到指定的文件或文件夹中
    		}
    
    		protected void cleanup(Context context) throws IOException,
    				InterruptedException {
    			mos.close();
    		}
    	}
    
    	public int run(String[] args) throws Exception {
    
    		Configuration conf = getConf();
    
    		Job job = new Job(conf, "word count with MultipleOutputs");
    
    		job.setJarByClass(TestwithMultipleOutputs.class);
    
    		/*Path in = new Path(args[0]);
    		Path out = new Path(args[1]);*/
    		  final String Input_path="hdfs://mlj:9000/hive";
    		  final String Out_path="hdfs://mlj:9000/hive_out";
    
    		FileInputFormat.setInputPaths(job, Input_path);
    		FileOutputFormat.setOutputPath(job, new Path(Out_path));
    
    		job.setMapperClass(MapClass.class);
    		job.setNumReduceTasks(0);
    		MultipleOutputs.addNamedOutput(job, "MOSInt", TextOutputFormat.class,Text.class, IntWritable.class);
    		MultipleOutputs.addNamedOutput(job, "mlj", TextOutputFormat.class,Text.class, Text.class);
    
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    		return 0;
    	}
    
    	public static void main(String[] args) throws Exception {
    
    		int res = ToolRunner.run(new Configuration(),new TestwithMultipleOutputs(), args);
    		System.exit(res);
    	}
    }
    

      

  • 相关阅读:
    2016.08.13/2/index/_d_Lucene54_0.dvm: Too many open files
    /usr/lib64/python2.6/site-packages/pycurl.so: undefined symbol: CRYPTO_set_locking_callback
    rsyslog 读取单个文件测试
    注意:rsyslog 源码安装 会出现日志重复发的情况,需要rpm包安装
    客户端把rsyslog重启,就会发送全部日志 --待研究
    rsyslog 一重启就会开始同步之前所有通配的日志文件
    rsyslog 只读取变化的日志
    响应头location 页面跳转
    8.1 Optimization Overview
    golang 建临时文件目录以及删除
  • 原文地址:https://www.cnblogs.com/mlj5288/p/4556875.html
Copyright © 2011-2022 走看看