zoukankan      html  css  js  c++  java
  • MapReduce的DBInputFormat使用

      1 package com.mengyao.hadoop.mapreduce;
      2 
      3 import java.io.DataInput;
      4 import java.io.DataOutput;
      5 import java.io.IOException;
      6 import java.sql.PreparedStatement;
      7 import java.sql.ResultSet;
      8 import java.sql.SQLException;
      9 
     10 import org.apache.hadoop.conf.Configuration;
     11 import org.apache.hadoop.conf.Configured;
     12 import org.apache.hadoop.fs.Path;
     13 import org.apache.hadoop.io.LongWritable;
     14 import org.apache.hadoop.io.Text;
     15 import org.apache.hadoop.io.Writable;
     16 import org.apache.hadoop.mapreduce.Job;
     17 import org.apache.hadoop.mapreduce.Mapper;
     18 import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
     19 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
     20 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
     21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     22 import org.apache.hadoop.util.Tool;
     23 import org.apache.hadoop.util.ToolRunner;
     24 
     25 /**
     26  * 使用DBInputFormat类读取数据库并将结果数据写到HDFS的/mapreduces/dboutput目录下,
     27  *         /mapreduces/dboutput/_SUCCESS:_SUCCESS空文件表示作业执行成功
     28  *         /mapreduces/dboutput/part-r-00000:文件表示作业的结果,内容如下:
     29  *             0    1    New Balance 999复古跑鞋    ML999GY    ML999GY    999.0
     30  *             1    2    New Balance 999复古跑鞋    ML999BB    ML999BB    999.0
     31  *             2    3    New Balance 996复古跑鞋    MRL996DG    MRL996DG    819.0
     32  *             3    4    New Balance 996复古跑鞋    MRL996EM    MRL996EM    819.0
     33  *             4    5    New Balance 996复古跑鞋    MRL996ES    MRL996ES    819.0
     34  * 这个作业没有Reducer类,在默认的MapReduce作业中,如果输出的key,value是默认的LongWritable, Text,则Reducer类可以省略,省略不写时则默认启动一个Reducer
     35  *
     36  * 一定要记住在使用MapReduce操作数据库时一定要添加JDBC驱动jar包到Hadoop的classpath中,否则会报无法加载JDBC Driver类异常,如下:
     37  *       1、我这里添加到/usr/local/installs/hadoop/share/hadoop/mapreduce/lib/mysql-connector-java-5.1.26-bin.jar这里了,务必要重启集群使classpath生效。
     38  *       2、将JDBC驱动jar包打包到这个MapReduce作业jar包中。
     39  *
     40  * @author mengyao
     41  *
     42  */
     43 public class DBInputFormatApp extends Configured implements Tool {
     44 
     45     /**
     46      * 这个JavaBean需要实现Hadoop的序列化接口Writable和与数据库交互时的序列化接口DBWritable
     47      * 官方API中解释如下:
     48      *         public class DBInputFormat<T extends DBWritable>
     49      *             extends InputFormat<LongWritable, T> implements Configurable
     50      *             即Mapper的Key是LongWritable类型,不可改变;Value是继承自DBWritable接口的自定义JavaBean
     51      *         
     52      * @author mengyao
     53      *
     54      */
     55     static class ProductWritable implements Writable, DBWritable {
     56 
     57         private long id;            // bigint(20) NOT NULL AUTO_INCREMENT,
     58         private String name;        // varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '商品名称',
     59         private String model;        // varchar(30) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '型号',
     60         private String color;        // varchar(10) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '颜色',
     61         private double price;        // decimal(10,0) DEFAULT NULL COMMENT '售价',
     62         
     63         @Override
     64         public void write(PreparedStatement ps) throws SQLException {
     65             ps.setLong(1, id);
     66             ps.setString(2, name);
     67             ps.setString(3, model);
     68             ps.setString(4, color);
     69             ps.setDouble(5, price);
     70         }
     71 
     72         @Override
     73         public void readFields(ResultSet rs) throws SQLException {
     74             this.id = rs.getLong(1);
     75             this.name = rs.getString(2);
     76             this.model = rs.getString(3);
     77             this.color = rs.getString(4);
     78             this.price = rs.getDouble(5);
     79         }
     80 
     81         @Override
     82         public void readFields(DataInput in) throws IOException {
     83             this.id = in.readLong();
     84             this.name = in.readUTF();
     85             this.model = in.readUTF();
     86             this.color = in.readUTF();
     87             this.price = in.readDouble();
     88         }
     89 
     90         @Override
     91         public void write(DataOutput output) throws IOException {
     92             output.writeLong(id);
     93             output.writeUTF(name);
     94             output.writeUTF(model);
     95             output.writeUTF(color);
     96             output.writeDouble(price);
     97         }
     98 
     99         @Override
    100         public String toString() {
    101             return id +"\t"+ name +"\t"+ model +"\t"+ color +"\t"+ price;
    102         }
    103         
    104     }
    105     
    106     static class DBInputFormatMapper extends Mapper<LongWritable, ProductWritable, LongWritable, Text> {
    107         
    108         private LongWritable outputKey;
    109         private Text outputValue;
    110         
    111         @Override
    112         protected void setup(
    113                 Mapper<LongWritable, ProductWritable, LongWritable, Text>.Context context)
    114                 throws IOException, InterruptedException {
    115             this.outputKey = new LongWritable();
    116             this.outputValue = new Text();
    117         }
    118         
    119         @Override
    120         protected void map(LongWritable key, ProductWritable value,
    121                 Mapper<LongWritable, ProductWritable, LongWritable, Text>.Context context)
    122                 throws IOException, InterruptedException {
    123             outputKey.set(key.get());
    124             outputValue.set(value.toString());
    125             context.write(outputKey, outputValue);
    126         }
    127     }
    128     
    129     @Override
    130     public int run(String[] args) throws Exception {
    131         Configuration conf = getConf();
    132         //在创建Configuration对象时紧跟着配置当前作业需要使用的JDBC配置
    133         DBConfiguration.configureDB(
    134                 conf,
    135                 "com.mysql.jdbc.Driver",
    136                 "jdbc:mysql://192.168.1.10:3306/shops",
    137                 "root",
    138                 "123456");
    139         
    140         Job job = Job.getInstance(conf, DBInputFormatApp.class.getSimpleName());
    141         job.setJarByClass(DBInputFormatApp.class);
    142         
    143         job.setInputFormatClass(DBInputFormat.class);
    144         FileOutputFormat.setOutputPath(job, new Path(args[0]));
    145         
    146         job.setMapperClass(DBInputFormatMapper.class);
    147         job.setMapOutputKeyClass(LongWritable.class);
    148         job.setMapOutputValueClass(Text.class);
    149         
    150         job.setOutputKeyClass(LongWritable.class);
    151         job.setOutputValueClass(Text.class);
    152         //配置当前作业要查询的SQL语句和接收查询结果的JavaBean
    153         DBInputFormat.setInput(
    154                 job,
    155                 ProductWritable.class,
    156                 "SELECT `id`,`name`,`model`,`color`,`price` FROM `product`",
    157                 "SELECT COUNT(1) FROM `product`");
    158         
    159         return job.waitForCompletion(true)?0:1;
    160     }
    161      
    162     public static int createJob(String[] args) {
    163         Configuration conf = new Configuration();
    164         conf.set("dfs.datanode.socket.write.timeout", "7200000");
    165         conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
    166         conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
    167         int status = 0;
    168         
    169         try {
    170             status = ToolRunner.run(conf, new DBInputFormatApp(), args);
    171         } catch (Exception e) {
    172             e.printStackTrace();
    173         }
    174         
    175         return status;
    176     }
    177     
    178     public static void main(String[] args) {
    179         args = new String[]{"/mapreduces/dboutput"};
    180         if (args.length!=1) {
    181             System.out.println("Usage: "+DBInputFormatApp.class.getName()+" Input paramters <OUTPUT_PATH>");
    182         } else {
    183             int status = createJob(args);
    184             System.exit(status);
    185         }
    186 
    187     }
    188 
    189 }
  • 相关阅读:
    angular js 自定义指令
    web api 解决跨域的问题
    angular 监听ngrepeat结束时间
    redis关闭和启动
    intellij idea快捷键
    mysql连接字符串
    crontab命令格式
    maven中scope属性的
    maven pom文件元素说明
    引入maven以外的jar包
  • 原文地址:https://www.cnblogs.com/mengyao/p/4865565.html
Copyright © 2011-2022 走看看