zoukankan      html  css  js  c++  java
  • MultipleOutputFormat和MultipleOutputs

    一,介绍

    1,旧API中有 org.apache.hadoop.mapred.lib.MultipleOutputFormat和org.apache.hadoop.mapred.lib.MultipleOutputs

    MultipleOutputFormat allowing to write the output data to different output files.

    MultipleOutputs creates multiple OutputCollectors. Each OutputCollector can have its own OutputFormat and types for the key/value pair. Your MapReduce program will decide what to output to each OutputCollector.

    2,新API中  org.apache.hadoop.mapreduce.lib.output.MultipleOutputs

    整合了上面旧API两个的功能,没有了MultipleOutputFormat。

      The MultipleOutputs class simplifies writing output data to multiple outputs

      Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own             OutputFormat, with its own key class and with its own value class.

      Case two: to write data to different files provided by user

    下面这段话来自Hadoop:The.Definitive.Guide(3rd,Early.Release)P251

      “In the old MapReduce API there are two classes for producing multiple outputs: MultipleOutputFormat and MultipleOutputs. In a nutshell, MultipleOutputs is more fully featured, but MultipleOutputFormat has more control over the output directory structure and file naming. MultipleOutputs in the new API combines the best features of the two multiple output classes in the old API.”

    二,应用

     1,输出到多个文件或多个文件夹:

      驱动中不需要额外改变,只需要在MapClass或Reduce类中加入如下代码

      private MultipleOutputs<Text,IntWritable> mos;
      public void setup(Context context) throws IOException,InterruptedException {
        mos = new MultipleOutputs(context);
      }
      public void cleanup(Context context) throws IOException,InterruptedException {
        mos.close();
      }
      然后就可以用mos.write(Key key,Value value,String baseOutputPath)代替context.write(key, value);
      在MapClass或Reduce中使用,输出时也会有默认的文件part-m-00*或part-r-00*,不过这些文件是无内容的,大小为0. 而且只有part-m-00*会传给Reduce

     2,以多种格式输出:

    public class TestwithMultipleOutputs extends Configured implements Tool {

      public static class MapClass extends Mapper<LongWritable,Text,Text,IntWritable> {

        private MultipleOutputs<Text,IntWritable> mos;

        protected void setup(Context context) throws IOException,InterruptedException {
          mos = new MultipleOutputs<Text,IntWritable>(context);
        }

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
          String line = value.toString();
          String[] tokens = line.split("-");

          mos.write("MOSInt",new Text(tokens[0]), new IntWritable(Integer.parseInt(tokens[1])));  //(第一处)
          mos.write("MOSText", new Text(tokens[0]),tokens[2]);     //(第二处)
          mos.write("MOSText", new Text(tokens[0]),line,tokens[0]+"/");  //(第三处)同时也可写到指定的文件或文件夹中
        }

        protected void cleanup(Context context) throws IOException,InterruptedException {
          mos.close();
        }

      }
      public int run(String[] args) throws Exception {

        Configuration conf = getConf();

        Job job = new Job(conf,"word count with MultipleOutputs");

        job.setJarByClass(TestwithMultipleOutputs.class);

        Path in = new Path(args[0]);
        Path out = new Path(args[1]);

        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setMapperClass(MapClass.class);
        job.setNumReduceTasks(0);  

        MultipleOutputs.addNamedOutput(job,"MOSInt",TextOutputFormat.class,Text.class,IntWritable.class);
        MultipleOutputs.addNamedOutput(job,"MOSText",TextOutputFormat.class,Text.class,Text.class);

        System.exit(job.waitForCompletion(true)?0:1);
        return 0;
      }

      public static void main(String[] args) throws Exception {

        int res = ToolRunner.run(new Configuration(), new TestwithMultipleOutputs(), args);
        System.exit(res);
      }

    }

    测试的数据:

    abc-1232-hdf
    abc-123-rtd
    ioj-234-grjth
    ntg-653-sdgfvd
    kju-876-btyun
    bhm-530-bhyt
    hfter-45642-bhgf
    bgrfg-8956-fmgh
    jnhdf-8734-adfbgf
    ntg-68763-nfhsdf
    ntg-98634-dehuy
    hfter-84567-drhuk

    结果截图:(结果输出到/test/testMOSout)

    遇到的一个问题:

      如果没有mos.close(), 程序运行中会出现异常:

      12/05/21 20:12:47 WARN hdfs.DFSClient: DataStreamer Exception:

      org.apache.hadoop.ipc.RemoteException:org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on
      /test/mosreduce/_temporary/_attempt_local_0001_r_000000_0/h-r-00000 File does not exist. [Lease. Holder: DFSClient_-352105532, pendingcreates: 5]

  • 相关阅读:
    CQOI2016 不同的最小割 (最小割树模板)(等价流树的Gusfield构造算法)
    CSP2019 D2T2 划分 (单调队列DP)
    Android弱网测试中关于网络检测的一些借鉴方法
    IOS的Crash情况在Crashlytics平台上统计解决方案的一点遗憾(截止到2015年6月14日)
    关于selenium2(webdriver)自动化测试过程中标签页面或者窗口切换的处理解决方案
    针对电信乌龙事件的深度测试: 广州电信错误将深圳地区189的号码在3G升级4G申请时从广州网厅发货,造成深圳用户收到4G卡后无法激活,深圳电信找不到订单
    spring mvc + freemarker优雅的实现邮件定时发送
    使用phantomjs实现highcharts等报表通过邮件发送(本文仅提供完整解决方案和实现思路,完全照搬不去整理代码无法马上得到效果)
    模拟http或https请求,实现ssl下的bugzilla登录、新增BUG,保持会话以及处理token
    xmlrpc实现bugzilla api调用(无会话保持功能,单一接口请求)
  • 原文地址:https://www.cnblogs.com/liangzh/p/2512264.html
Copyright © 2011-2022 走看看