zoukankan      html  css  js  c++  java
  • hadoop 实现多文件输出

    需求

    不同的key输出到不同的文件

    txt文件

    multiple.txt

    中国;22
    美国;4342
    中国;123
    日本;44
    日本;6
    美国;55
    美国;43765
    日本;786
    日本;55

    Java

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    import java.io.IOException;
    
    
    public class MultipleApp {
        /**
         * map任务
         *
         * **/
        public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{
            @Override
            protected void map(LongWritable key, Text value,Context context)
                    throws IOException, InterruptedException {
                String ss[]=value.toString().split(";");
                context.write(new Text(ss[0]), new Text(ss[1]));
            }
        }
    
    
        public static class PReduce extends Reducer<Text, Text, Text, Text>{
            /**
             * 设置多个文件输出
             * */
            private MultipleOutputs mos;
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                mos = new MultipleOutputs(context);//初始化mos
            }
            @Override
            protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2) throws IOException, InterruptedException {
    
                String key=arg0.toString();
                for(Text t:arg1){
                    if(key.equals("中国")){
                        mos.write("china", arg0,t);
                    } else if(key.equals("美国")){
                        mos.write("USA", arg0,t);
                    } else if(key.equals("日本")){
                        mos.write("japan", arg0,t);
                    }
                }
            }
    
            @Override
            protected void cleanup(
                    Context context)
                    throws IOException, InterruptedException {
                mos.close();//释放资源
            }
        }
    
    
        public static void main(String[] args) throws Exception{
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "file:///");
            Job job = Job.getInstance(conf);
    
            /**Job任务**/
            job.setJobName("multiple");
            job.setJarByClass(MultipleApp.class);
            job.setMapperClass(PMapper.class);
            job.setReducerClass(PReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            /**
             * 注意在初始化时需要设置输出文件的名
             * 另外名称,不支持中文名,仅支持英文字符
             *
             * **/
            MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);
            MultipleOutputs.addNamedOutput(job, "USA", TextOutputFormat.class, Text.class, Text.class);
            MultipleOutputs.addNamedOutput(job, "japan", TextOutputFormat.class, Text.class, Text.class);
    
            //添加输入路径
            FileInputFormat.addInputPath(job,new Path("C://multiple.txt"));
            //设置输出路径
            FileOutputFormat.setOutputPath(job,new Path("C://out"));
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    结果

    part-r-00000为框架自动生成的空文件,可忽略

  • 相关阅读:
    CentOS7安装mysql-8
    zabbix监控规划及实施
    集群技术
    自动化脚本-配置LVS(DR模式)
    Pacemaker+ISCSI实现Apache高可用-配置
    创建集群corosync
    我的第一个python程序——猜数字
    质量报告
    新需求测试与回归测试
    冒烟测试
  • 原文地址:https://www.cnblogs.com/Alcesttt/p/11402232.html
Copyright © 2011-2022 走看看