zoukankan      html  css  js  c++  java
  • MapReduce的DBOutputFormat使用

    package com.mengyao.hadoop.mapreduce;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
    import org.apache.hadoop.mapreduce.lib.db.DBWritable;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * 使用DBOutputFormat类将HDFS上/mapreduces/dboutput目录下的结构化数据写入到数据库中,结构化数据文件如下(第一列为key),
     *             0    1    New Balance 999复古跑鞋    ML999GY        ML999GY        999.0
     *             1    2    New Balance 999复古跑鞋    ML999BB        ML999BB        999.0
     *             2    3    New Balance 996复古跑鞋    MRL996DG    MRL996DG    819.0
     *             3    4    New Balance 996复古跑鞋    MRL996EM    MRL996EM    819.0
     *             4    5    New Balance 996复古跑鞋    MRL996ES    MRL996ES    819.0
     * 
     * 一定要记住在使用MapReduce操作数据库时一定要添加JDBC驱动jar包到Hadoop的classpath中,否则会报无法加载JDBC Driver类异常,如下:
     *       1、我这里添加到/usr/local/installs/hadoop/share/hadoop/mapreduce/lib/mysql-connector-java-5.1.26-bin.jar这里了,务必要重启集群使classpath生效。
     *       2、将JDBC驱动jar包打包到这个MapReduce作业jar包中。
     * 如果写入到数据库的数据中出现乱码,请在JDBC连接url中设置编码,例如:jdbc:mysql://host:3306/dbName?useUnicode=true&characterEncoding=utf8
     * 
     * @author mengyao
     *
     */
    public class DBOutputFormatApp extends Configured implements Tool {
    
        /**
         * 这个JavaBean需要实现Hadoop的序列化接口Writable和与数据库交互时的序列化接口DBWritable
         * 官方源码定义如下:
         *         public class DBOutputFormat<K  extends DBWritable, V> extends OutputFormat<K,V> 
         *         也就是说DBOutputFormat要求输出的key必须是继承自DBWritable的类型
         * @author mengyao
         *
         */
        static class ProductWritable implements Writable, DBWritable {
            
            private long id;            // bigint(20) NOT NULL AUTO_INCREMENT,
            private String name;        // varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '商品名称',
            private String model;        // varchar(30) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '型号',
            private String color;        // varchar(10) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '颜色',
            private double price;        // decimal(10,0) DEFAULT NULL COMMENT '售价'
            
            public void set(long id, String name, String model,
                    String color, double price) {
                this.id = id;
                this.name = name;
                this.model = model;
                this.color = color;
                this.price = price;
            }
    
            @Override
            public void write(PreparedStatement ps) throws SQLException {
                ps.setLong(1, id);
                ps.setString(2, name);
                ps.setString(3, model);
                ps.setString(4, color);
                ps.setDouble(5, price);
            }
    
            @Override
            public void readFields(ResultSet rs) throws SQLException {
                this.id = rs.getLong(1);
                this.name = rs.getString(2);
                this.model = rs.getString(3);
                this.color = rs.getString(4);
                this.price = rs.getDouble(5);
            }
    
            @Override
            public void write(DataOutput out) throws IOException {
                out.writeLong(id);
                out.writeUTF(name);
                out.writeUTF(model);
                out.writeUTF(color);
                out.writeDouble(price);
            }
    
            @Override
            public void readFields(DataInput in) throws IOException {
                this.id = in.readLong();
                this.name = in.readUTF();
                this.model = in.readUTF();
                this.color = in.readUTF();
                this.price = in.readDouble();
                
            }
            
        }
        
        static class DBOutputFormatMapper extends Mapper<LongWritable, Text, NullWritable, ProductWritable> {
            
            private NullWritable outputKey;
            private ProductWritable outputValue;
            
            @Override
            protected void setup(
                    Mapper<LongWritable, Text, NullWritable, ProductWritable>.Context context)
                    throws IOException, InterruptedException {
                this.outputKey = NullWritable.get();
                this.outputValue = new ProductWritable();
            }
            
            @Override
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, NullWritable, ProductWritable>.Context context)
                    throws IOException, InterruptedException {
                //插入数据库成功的计数器
                final Counter successCounter = context.getCounter("exec", "successfully");
                //插入数据库失败的计数器
                final Counter faildCounter = context.getCounter("exec", "faild");
                //获取DFS上已有的机构化数据
                final String[] fields = value.toString().split("\t");
                //DBOutputFormatApp这个MapReduce应用导出的数据包含long类型的key,所以忽略key从1开始
                if (fields.length > 5) {
                    long id = Long.parseLong(fields[1]);
                    String name = fields[2];
                    String model = fields[3];
                    String color = fields[4];
                    double price = Double.parseDouble(fields[5]);
                    this.outputValue.set(id, name, model, color, price);
                    context.write(outputKey, outputValue);
                    //如果插入数据库成功则递增1,表示成功计数
                    successCounter.increment(1L);
                } else {
                    //如果插入数据库失败则递增1,表示失败计数
                    faildCounter.increment(1L);
                }
            }
        }
        
        /**
         * DBOutputFormatReducer
         *         输入的key即DBOutputFormatMapper输出的key
         *         输出的value即DBOutputFormatMapper输出的value
         *         输出的key必须是继承自DBWritable的类型(DBOutputFormat要求输出的key必须是继承自DBWritable的类型)
         *         输出的value无所谓,这里我使用的是NullWritable作为占位输出
         * @author mengyao
         *
         */
        static class DBOutputFormatReducer extends Reducer<NullWritable, ProductWritable, ProductWritable, NullWritable> {
            @Override
            protected void reduce(NullWritable key, Iterable<ProductWritable> value,
                    Reducer<NullWritable, ProductWritable, ProductWritable, NullWritable>.Context context)
                    throws IOException, InterruptedException {
                for (ProductWritable productWritable : value) {
                    context.write(productWritable, key);                            
                }
            }
        }
        
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();
            //在创建Configuration对象时紧跟着配置当前作业需要使用的JDBC配置
            DBConfiguration.configureDB(
                    conf, 
                    "com.mysql.jdbc.Driver", 
                    "jdbc:mysql://192.168.1.104:3306/test?useUnicode=true&characterEncoding=utf8", 
                    "root", 
                    "123456");
            Job job = Job.getInstance(conf, DBOutputFormatApp.class.getSimpleName());
            job.setJarByClass(DBOutputFormatApp.class);
            
            job.setInputFormatClass(TextInputFormat.class);
            FileInputFormat.setInputPaths(job, args[0]);
            
            job.setMapperClass(DBOutputFormatMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(ProductWritable.class);
            
            job.setPartitionerClass(HashPartitioner.class);
            job.setNumReduceTasks(1);
            
            job.setReducerClass(DBOutputFormatReducer.class);
            job.setOutputKeyClass(ProductWritable.class);
            job.setOutputValueClass(NullWritable.class);
            
            job.setOutputFormatClass(DBOutputFormat.class);
            //配置当前作业输出到数据库的表、字段信息
            DBOutputFormat.setOutput(job, "product", new String[]{"id", "name", "model", "color", "price"});
            
            return job.waitForCompletion(true)?0:1;
        }
        
        public static int createJob(String[] args) {
            Configuration conf = new Configuration();
            conf.set("dfs.datanode.socket.write.timeout", "7200000");
            conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
            conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
            int status = 0;
            
            try {
                status = ToolRunner.run(conf, new DBOutputFormatApp(), args);
            } catch (Exception e) {
                e.printStackTrace();
            }
            
            return status;
        }
        
        public static void main(String[] args) {
            args = new String[]{"/mapreduces/dboutput"};
            if (args.length!=1) {
                System.out.println("Usage: "+DBOutputFormatApp.class.getName()+" Input paramters <INPUT_PATH>");
            } else {
                int status = createJob(args);
                System.exit(status);
            }
        }
    
    }
  • 相关阅读:
    使用express+multer实现node中的图片上传
    利用H5构建地图和获取定位地点
    移动端开发基础 干货分享
    关于angularJS的一些用法
    你好 JSONP !!!!
    CentOS7使用Alien将RPM从DE转换为DEB和DEB转换为RPM包
    [Windows] visio2019破解激活
    python---九九乘法表代码
    HTTP 请求方式: GET和POST的比较
    win10WLAN没有有效的ip配置
  • 原文地址:https://www.cnblogs.com/mengyao/p/4865595.html
Copyright © 2011-2022 走看看