zoukankan      html  css  js  c++  java
  • 在Maprecue中利用MultipleOutputs输出多个文件

    用户在使用Mapreduce时默认以part-*命名,

    MultipleOutputs能够将不同的键值对输出到用户自己定义的不同的文件里。

    实现过程是在调用output.write(key, new IntWritable(total), key.toString());

    方法时候第三个參数是  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 指定了输出文件的命名前缀。那么我们能够通过对不同的key使用不同的baseOutputPath来使不同key相应的value输出到不同的文件里,比方将同一天的数据输出到以该日期命名的文件里

    測试数据:ip-to-hosts.txt

    18.217.167.70	United States
    206.96.54.107	United States
    196.109.151.139	Mauritius
    174.52.58.113	United States
    142.111.216.8	Canada
    162.100.49.185	United States
    146.38.26.54	United States
    36.35.107.36	China
    95.214.95.13	Spain
    2.96.191.111	United Kingdom
    62.177.119.177	Czech Republic
    21.165.189.3	United States
    46.190.32.115	Greece
    113.173.113.29	Vietnam
    42.65.172.142	Taiwan
    197.91.198.199	South Africa
    68.165.71.27	United States
    110.119.165.104	China
    171.50.76.89	India
    171.207.52.113	Singapore
    40.174.30.170	United States
    191.170.95.175	United States
    17.81.129.101	United States
    91.212.157.202	France
    173.83.82.99	United States
    129.75.56.220	United States
    149.25.104.198	United States
    103.110.22.19	Indonesia
    204.188.117.122	United States
    138.23.10.72	United States
    172.50.15.32	United States
    85.88.38.58	Belgium
    49.15.14.6	India
    19.84.175.5	United States
    50.158.140.215	United States
    161.114.120.34	United States
    118.211.174.52	Australia
    220.98.113.71	Japan
    182.101.16.171	China
    25.45.75.194	United Kingdom
    168.16.162.99	United States
    155.60.219.154	Australia
    26.216.17.198	United States
    68.34.157.157	United States
    89.176.196.28	Czech Republic
    173.11.51.134	United States
    116.207.191.159	China
    164.210.124.152	United States
    168.17.158.38	United States
    174.24.173.11	United States
    143.64.173.176	United States
    160.164.158.125	Italy
    15.111.128.4	United States
    22.71.176.163	United States
    105.57.100.182	Morocco
    111.147.83.42	China
    137.157.65.89	Australia
    
    该文件里每行数据有两个字段 各自是ip地址和该ip地址相应的国家。以 分隔


    上代码

     public static class IPCountryReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
    
            private MultipleOutputs output;
    
            @Override
            protected void setup(Context context
            ) throws IOException, InterruptedException {
                output = new MultipleOutputs(context);
            }
    
    
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context
            ) throws IOException, InterruptedException {
                int total = 0;
                for(IntWritable value: values) {
                    total += value.get();
                }
               <span style="color:#FF0000;"> output.write(new Text("Output by MultipleOutputs"), NullWritable.get(), key.toString());
                output.write(key, new IntWritable(total), key.toString());</span>
    
            }
    
            @Override
            protected void cleanup(Context context
            ) throws IOException, InterruptedException {
                output.close();
            }
        }
    在reduce的setup方法中
     output = new MultipleOutputs(context);
    然后在reduce中通过该output将内容输出到不同的文件里
       private Configuration conf;
        public static final String NAME = "named_output";
    
    
        public static void main(String[] args) throws Exception {
            args =new String[] {"hdfs://caozw:9100/user/hadoop/hadooprealword","hdfs://caozw:9100/user/hadoop/hadooprealword/output"};
            ToolRunner.run(new Configuration(), new NamedCountryOutputJob(), args);
        }
    
        public int run(String[] args) throws Exception {
            if(args.length != 2) {
                System.err.println("Usage: named_output <input> <output>");
                System.exit(1);
            }
    
            Job job = new Job(conf, "IP count by country to named files");
            job.setInputFormatClass(TextInputFormat.class);
    
            job.setMapperClass(IPCountryMapper.class);
            job.setReducerClass(IPCountryReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setJarByClass(NamedCountryOutputJob.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            return job.waitForCompletion(true) ? 1 : 0;
    
        }
    
        public void setConf(Configuration conf) {
            this.conf = conf;
        }
    
        public Configuration getConf() {
            return conf;
        }
    
        public static class IPCountryMapper
                extends Mapper<LongWritable, Text, Text, IntWritable> {
    
            private static final int country_pos = 1;
            private static final Pattern pattern = Pattern.compile("\t");
    
            @Override
            protected void map(LongWritable key, Text value,
                               Context context) throws IOException, InterruptedException {
                String country = pattern.split(value.toString())[country_pos];
                context.write(new Text(country), new IntWritable(1));
            }
        }

    測试结果:


  • 相关阅读:
    SQL----Scalar 函数
    SQL----Group By and Having
    SQL--合计函数(Aggregate functions):avg,count,first,last,max,min,sum
    SQL 函数
    Auto-increment 自动增长
    sql--ALTER
    sql--Drop语句
    sql--index 索引
    sql--select into,create database,create table,Constraints
    sql--union
  • 原文地址:https://www.cnblogs.com/slgkaifa/p/6732909.html
Copyright © 2011-2022 走看看