zoukankan      html  css  js  c++  java
  • MapReduce框架原理-OutputFormat工作原理

    OutputFormat概述

    OutputFormat主要是用来指定MR程序的最终的输出数据格式 。

    默认使用的是TextOutputFormat,默认是将数据一行写一条数据,并且把数据放到指定的输出目录下,以 part-r-xxxxx数字开头。并且默认情况下有几个ReduceTask就有几个结果文件产生

    自定义OutputFormat

    自定义OutputFormat的详细流程:

    1. 定义MyOutputFormat继承FileOutputFormat<T>,泛型传入的是Reducer的输出类型
    2. 重写里面的getRecordWriter()方法,这个方法需要返回一个RecordWriter对象。

      这个方法里面定义了最终文件输出到什么地方

    3. 创建一个RecordWriter对象,继承RecordWriter<T>,重写里面的两个方法:write()、close()。其中write()方法中需要定义想要将文件输出到什么地方去,在这个方法中定义输出数据地址和输出数据格式
    4. 在Driver中通过job.setOutputFormatClass()指定我们使用的是哪个OutputFormat实现类

    注意】如果设置了分区,并且指定了ReduceTask的数量,那么根据以前所学的有多少个ReduceTask就会生成多少个结果文件,是因为默认使用的是TextOutputFormat实现类,这个实现类就是几个ReduceTask就有几个结果文件。但是如果我们自定义了OutputFormat,那么结果文件只有我们指明的地址,没有其他。

    案例实操

    案例一:存储数据到MySQL中

    需求:将手机流量数据根据总流向升序输出到MySQL数据库中

    代码:

    1. FlowOutputInformat.java
      public class FlowOutputFormat extends FileOutputFormat<FlowBean, NullWritable> {
          @Override
          public RecordWriter<FlowBean, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
              return new MyRecordWriter();
          }
      }
    2. MyRecordWriter.java
      public class MyRecordWriter extends RecordWriter<FlowBean, NullWritable> {
          /**
           * 需要在这个方法中定义输出格式、输出数据地址
           * @param flowBean:Reduce阶段输出数据Key值
           * @param nullWritable:Reduce阶段输出value值
           */
          @SneakyThrows
          @Override
          public void write(FlowBean flowBean, NullWritable nullWritable) throws IOException, InterruptedException {
              Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/sx_bigdata?serverTimezone=UTC", "root", "root");
              PreparedStatement preparedStatement = connection.prepareStatement("insert into phone_flow values (?, ?, ?, ?)");
              preparedStatement.setString(1, flowBean.getPhone());
              preparedStatement.setInt(2, flowBean.getUpFlow());
              preparedStatement.setInt(3, flowBean.getDownFlow());
              preparedStatement.setInt(4, flowBean.getSumFlow());
              int i = preparedStatement.executeUpdate();
              if (i > 0) {
                  System.out.println("添加成功!");
              } else {
                  System.out.println("添加失败!");
              }
              connection.close();
              preparedStatement.close();
          }
      
          @Override
          public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
      
          }
    3. FlowDriver.java
      job.setOutputFormatClass(FlowOutputFormat.class);

    案例二:存储数据到HDFS本地指定文件夹中

    需求:将单词计数案例结果输出到本地,其中首字母为大写字母存储在/upper.txt目录下,首字母为小写字母存储在/lower.txt目录下

    代码:

    1. MyOutputFormat.java
      public class MyOutputFormat extends FileOutputFormat<Text, LongWritable> {
          @SneakyThrows
          @Override
          public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
              return new MyRecordWriter(taskAttemptContext);
          }
      }
    2. MyRecordWriter.java
      public class MyRecordWriter extends RecordWriter<Text, LongWritable> {
          FSDataOutputStream fsDataOutputStream1;
          FSDataOutputStream fsDataOutputStream2;
          public MyRecordWriter(TaskAttemptContext taskAttemptContext) throws Exception {
              Configuration configuration = taskAttemptContext.getConfiguration();
              FileSystem fs = FileSystem.get(new URI("hdfs://192.168.218.55:9000"), configuration, "root");
              Path out1 = new Path("/test/school/upper.txt");
              Path out2 = new Path("/test/school/lower.txt");
              if (fs.exists(out1)) {
                  fs.delete(out1, true);
              }
              if (fs.exists(out2)) {
                  fs.delete(out2, true);
              }
              fsDataOutputStream1 = fs.create(out1);
              fsDataOutputStream2 = fs.create(out2);
          }
      
          @Override
          public void write(Text text, LongWritable longWritable) throws IOException, InterruptedException {
              char firstWord = text.toString().charAt(0);
              String line = text + "	" + longWritable.get() + "
      ";
              if (Character.isUpperCase(firstWord)) {
                  fsDataOutputStream1.write(line.getBytes());
              } else {
                  fsDataOutputStream2.write(line.getBytes());
              }
          }
      
          @Override
          public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
              if (fsDataOutputStream1 != null) {
                  fsDataOutputStream1.close();
              }
              if (fsDataOutputStream2 != null) {
                  fsDataOutputStream2.close();
              }
          }
      }
    3. FlowDriver.java
      job.setOutputFormatClass(MyOutputFormat.class);
  • 相关阅读:
    策略模式精讲
    工厂模式精讲
    单例模式精讲
    原型模式精讲
    CoreJava学习第五课 --- 进入第二阶段:面向对象编程思想
    CoreJava学习第四课-数组
    CoreJava学习第三课
    CoreJava学习第一课
    Oracle练习题一
    JDBC第一课-简介及开发第一个JDBC程序
  • 原文地址:https://www.cnblogs.com/zyd-994264926326/p/15136571.html
Copyright © 2011-2022 走看看