zoukankan      html  css  js  c++  java
  • MapReduce的DBInputFormat使用

      1 package com.mengyao.hadoop.mapreduce;
      2 
      3 import java.io.DataInput;
      4 import java.io.DataOutput;
      5 import java.io.IOException;
      6 import java.sql.PreparedStatement;
      7 import java.sql.ResultSet;
      8 import java.sql.SQLException;
      9 
     10 import org.apache.hadoop.conf.Configuration;
     11 import org.apache.hadoop.conf.Configured;
     12 import org.apache.hadoop.fs.Path;
     13 import org.apache.hadoop.io.LongWritable;
     14 import org.apache.hadoop.io.Text;
     15 import org.apache.hadoop.io.Writable;
     16 import org.apache.hadoop.mapreduce.Job;
     17 import org.apache.hadoop.mapreduce.Mapper;
     18 import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
     19 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
     20 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
     21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     22 import org.apache.hadoop.util.Tool;
     23 import org.apache.hadoop.util.ToolRunner;
     24 
     25 /**
     26  * 使用DBInputFormat类读取数据库并将结果数据写到HDFS的/mapreduces/dboutput目录下,
     27  *         /mapreduces/dboutput/_SUCCESS:_SUCCESS空文件表示作业执行成功
     28  *         /mapreduces/dboutput/part-r-00000:文件表示作业的结果,内容如下:
     29  *             0    1    New Balance 999复古跑鞋    ML999GY    ML999GY    999.0
     30  *             1    2    New Balance 999复古跑鞋    ML999BB    ML999BB    999.0
     31  *             2    3    New Balance 996复古跑鞋    MRL996DG    MRL996DG    819.0
     32  *             3    4    New Balance 996复古跑鞋    MRL996EM    MRL996EM    819.0
     33  *             4    5    New Balance 996复古跑鞋    MRL996ES    MRL996ES    819.0
     34  * 这个作业没有Reducer类,在默认的MapReduce作业中,如果输出的key,value是默认的LongWritable, Text,则Reducer类可以省略,省略不写时则默认启动一个Reducer
     35  *
     36  * 一定要记住在使用MapReduce操作数据库时一定要添加JDBC驱动jar包到Hadoop的classpath中,否则会报无法加载JDBC Driver类异常,如下:
     37  *       1、我这里添加到/usr/local/installs/hadoop/share/hadoop/mapreduce/lib/mysql-connector-java-5.1.26-bin.jar这里了,务必要重启集群使classpath生效。
     38  *       2、将JDBC驱动jar包打包到这个MapReduce作业jar包中。
     39  *
     40  * @author mengyao
     41  *
     42  */
     43 public class DBInputFormatApp extends Configured implements Tool {
     44 
     45     /**
     46      * 这个JavaBean需要实现Hadoop的序列化接口Writable和与数据库交互时的序列化接口DBWritable
     47      * 官方API中解释如下:
     48      *         public class DBInputFormat<T extends DBWritable>
     49      *             extends InputFormat<LongWritable, T> implements Configurable
     50      *             即Mapper的Key是LongWritable类型,不可改变;Value是继承自DBWritable接口的自定义JavaBean
     51      *         
     52      * @author mengyao
     53      *
     54      */
     55     static class ProductWritable implements Writable, DBWritable {
     56 
     57         private long id;            // bigint(20) NOT NULL AUTO_INCREMENT,
     58         private String name;        // varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '商品名称',
     59         private String model;        // varchar(30) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '型号',
     60         private String color;        // varchar(10) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '颜色',
     61         private double price;        // decimal(10,0) DEFAULT NULL COMMENT '售价',
     62         
     63         @Override
     64         public void write(PreparedStatement ps) throws SQLException {
     65             ps.setLong(1, id);
     66             ps.setString(2, name);
     67             ps.setString(3, model);
     68             ps.setString(4, color);
     69             ps.setDouble(5, price);
     70         }
     71 
     72         @Override
     73         public void readFields(ResultSet rs) throws SQLException {
     74             this.id = rs.getLong(1);
     75             this.name = rs.getString(2);
     76             this.model = rs.getString(3);
     77             this.color = rs.getString(4);
     78             this.price = rs.getDouble(5);
     79         }
     80 
     81         @Override
     82         public void readFields(DataInput in) throws IOException {
     83             this.id = in.readLong();
     84             this.name = in.readUTF();
     85             this.model = in.readUTF();
     86             this.color = in.readUTF();
     87             this.price = in.readDouble();
     88         }
     89 
     90         @Override
     91         public void write(DataOutput output) throws IOException {
     92             output.writeLong(id);
     93             output.writeUTF(name);
     94             output.writeUTF(model);
     95             output.writeUTF(color);
     96             output.writeDouble(price);
     97         }
     98 
     99         @Override
    100         public String toString() {
    101             return id +"\t"+ name +"\t"+ model +"\t"+ color +"\t"+ price;
    102         }
    103         
    104     }
    105     
    106     static class DBInputFormatMapper extends Mapper<LongWritable, ProductWritable, LongWritable, Text> {
    107         
    108         private LongWritable outputKey;
    109         private Text outputValue;
    110         
    111         @Override
    112         protected void setup(
    113                 Mapper<LongWritable, ProductWritable, LongWritable, Text>.Context context)
    114                 throws IOException, InterruptedException {
    115             this.outputKey = new LongWritable();
    116             this.outputValue = new Text();
    117         }
    118         
    119         @Override
    120         protected void map(LongWritable key, ProductWritable value,
    121                 Mapper<LongWritable, ProductWritable, LongWritable, Text>.Context context)
    122                 throws IOException, InterruptedException {
    123             outputKey.set(key.get());
    124             outputValue.set(value.toString());
    125             context.write(outputKey, outputValue);
    126         }
    127     }
    128     
    129     @Override
    130     public int run(String[] args) throws Exception {
    131         Configuration conf = getConf();
    132         //在创建Configuration对象时紧跟着配置当前作业需要使用的JDBC配置
    133         DBConfiguration.configureDB(
    134                 conf,
    135                 "com.mysql.jdbc.Driver",
    136                 "jdbc:mysql://192.168.1.10:3306/shops",
    137                 "root",
    138                 "123456");
    139         
    140         Job job = Job.getInstance(conf, DBInputFormatApp.class.getSimpleName());
    141         job.setJarByClass(DBInputFormatApp.class);
    142         
    143         job.setInputFormatClass(DBInputFormat.class);
    144         FileOutputFormat.setOutputPath(job, new Path(args[0]));
    145         
    146         job.setMapperClass(DBInputFormatMapper.class);
    147         job.setMapOutputKeyClass(LongWritable.class);
    148         job.setMapOutputValueClass(Text.class);
    149         
    150         job.setOutputKeyClass(LongWritable.class);
    151         job.setOutputValueClass(Text.class);
    152         //配置当前作业要查询的SQL语句和接收查询结果的JavaBean
    153         DBInputFormat.setInput(
    154                 job,
    155                 ProductWritable.class,
    156                 "SELECT `id`,`name`,`model`,`color`,`price` FROM `product`",
    157                 "SELECT COUNT(1) FROM `product`");
    158         
    159         return job.waitForCompletion(true)?0:1;
    160     }
    161      
    162     public static int createJob(String[] args) {
    163         Configuration conf = new Configuration();
    164         conf.set("dfs.datanode.socket.write.timeout", "7200000");
    165         conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
    166         conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
    167         int status = 0;
    168         
    169         try {
    170             status = ToolRunner.run(conf, new DBInputFormatApp(), args);
    171         } catch (Exception e) {
    172             e.printStackTrace();
    173         }
    174         
    175         return status;
    176     }
    177     
    178     public static void main(String[] args) {
    179         args = new String[]{"/mapreduces/dboutput"};
    180         if (args.length!=1) {
    181             System.out.println("Usage: "+DBInputFormatApp.class.getName()+" Input paramters <OUTPUT_PATH>");
    182         } else {
    183             int status = createJob(args);
    184             System.exit(status);
    185         }
    186 
    187     }
    188 
    189 }
  • 相关阅读:
    LeetCode 842. Split Array into Fibonacci Sequence
    LeetCode 1087. Brace Expansion
    LeetCode 1219. Path with Maximum Gold
    LeetCode 1079. Letter Tile Possibilities
    LeetCode 1049. Last Stone Weight II
    LeetCode 1046. Last Stone Weight
    LeetCode 1139. Largest 1-Bordered Square
    LeetCode 764. Largest Plus Sign
    LeetCode 1105. Filling Bookcase Shelves
    LeetCode 1027. Longest Arithmetic Sequence
  • 原文地址:https://www.cnblogs.com/mengyao/p/4865565.html
Copyright © 2011-2022 走看看