zoukankan      html  css  js  c++  java
  • 使用JobControl控制MapReduce任务

    代码结构

    BeanWritable:往数据库读写使用的bean

    ControlJobTest:JobControl任务控制

    DBInputFormatApp:将关系型数据库的数据导入HDFS,其中包含了Map、Reduce,内部静态类

    DBOutputFormatApp:将HDFS的结构化数据导入关系型数据库

    此处关系型数据库使用Mysql

    代码如下

    BeanWritable.java

    /**
     * 
     */
    package com.zhen.controlJobTest;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapred.lib.db.DBWritable;
    
    /**
     * JavaBean
     * 需要实现Hadoop序列化接口Writable以及与数据库交互时的序列化接口DBWritable
     * 官方API中解释如下:
     * public class DBInputFormat<T extends DBWritable>
     *   extends InputFormat<LongWritable, T> implements Configurable
     * 即Mapper的Key是LongWritable类型,不可改变;Value是继承自DBWritable接口的自定义JavaBean
     * 
     * @author FengZhen
     */
    public class BeanWritable implements Writable, DBWritable {
    
        private int id;
        private String name;
        private double height;
    
        public void readFields(ResultSet resultSet) throws SQLException {
            this.id = resultSet.getInt(1);
            this.name = resultSet.getString(2);
            this.height = resultSet.getDouble(3);
        }
    
        public void write(PreparedStatement preparedStatement) throws SQLException {
            preparedStatement.setInt(1, id);
            preparedStatement.setString(2, name);
            preparedStatement.setDouble(3, height);
        }
    
        public void readFields(DataInput dataInput) throws IOException {
            this.id = dataInput.readInt();
            this.name = dataInput.readUTF();
            this.height = dataInput.readDouble();
        }
    
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeInt(id);
            dataOutput.writeUTF(name);
            dataOutput.writeDouble(height);
        }
    
        public void set(int id,String name,double height){
            this.id = id;
            this.name = name;
            this.height = height;
        }
        
        @Override
        public String toString() {
            return id + "	" + name + "	" + height;
        }
    
    }

    DBInputFormatApp.java

    package com.zhen.controlJobTest;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapred.lib.db.DBWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * @author FengZhen 
     * 将mysql数据导入hdfs
     */
    public class DBInputFormatApp{
    
        /**
         * Map
         * 当Map的输出key为LongWritable,value为Text时,reduce可以省略不写,默认reduce也是输出LongWritable:Text
         * */
        public static class DBInputMapper extends Mapper<LongWritable, BeanWritable, LongWritable, Text> {
    
            private LongWritable outputKey;
            private Text outputValue;
    
            @Override
            protected void setup(Mapper<LongWritable, BeanWritable, LongWritable, Text>.Context context)
                    throws IOException, InterruptedException {
                this.outputKey = new LongWritable();
                this.outputValue = new Text();
            }
            
            @Override
            protected void map(LongWritable key, BeanWritable value,
                    Mapper<LongWritable, BeanWritable, LongWritable, Text>.Context context)
                    throws IOException, InterruptedException {
                outputKey.set(key.get());;
                outputValue.set(value.toString());
                context.write(outputKey, outputValue);
            }
    
        }
    }

    DBOutputFormatApp.java

    package com.zhen.controlJobTest;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapred.lib.db.DBWritable;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    /**
     * @author FengZhen
     * 将hdfs数据导入mysql
     * 使用DBOutputFormat将HDFS路径下的结构化数据写入mysql中,结构化数据如下,第一列为key,后边三列为数据
     * 0    1    Enzo    180.66
     * 1    2    Din    170.666
     * 
     */
    public class DBOutputFormatApp{
        
        public static class DBOutputMapper extends Mapper<LongWritable, Text, NullWritable, BeanWritable>{
            private NullWritable outputKey;
            private BeanWritable outputValue;
    
            @Override
            protected void setup(Mapper<LongWritable, Text, NullWritable, BeanWritable>.Context context)
                    throws IOException, InterruptedException {
                this.outputKey = NullWritable.get();
                this.outputValue = new BeanWritable();
            }
            @Override
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, NullWritable, BeanWritable>.Context context)
                    throws IOException, InterruptedException {
                //插入数据库成功的计数器
                final Counter successCounter = context.getCounter("exec", "successfully");
                //插入数据库失败的计数器
                final Counter faildCounter = context.getCounter("exec", "faild");
                //解析结构化数据
                String[] fields = value.toString().split("	");
                //DBOutputFormatApp这个MapReduce应用导出的数据包含long类型的key,所以忽略key从1开始
                if (fields.length > 3) {
                    int id = Integer.parseInt(fields[1]);
                    String name = fields[2];
                    double height = Double.parseDouble(fields[3]);
                    this.outputValue.set(id, name, height);
                    context.write(outputKey, outputValue);
                    //如果插入数据库成功则递增1,表示成功计数
                    successCounter.increment(1L);
                }else{
                    //如果插入数据库失败则递增1,表示失败计数
                    faildCounter.increment(1L);
                }
                
            }
        }
        
        /**
         * 输出的key必须是继承自DBWritable的类型,DBOutputFormat要求输出的key必须是DBWritable类型
         * */
        public static class DBOutputReducer extends Reducer<NullWritable, BeanWritable, BeanWritable, NullWritable>{
            @Override
            protected void reduce(NullWritable key, Iterable<BeanWritable> values,
                    Reducer<NullWritable, BeanWritable, BeanWritable, NullWritable>.Context context)
                    throws IOException, InterruptedException {
                for (BeanWritable beanWritable : values) {
                    context.write(beanWritable, key);
                }
            }
        }
        
        
    }

    ControlJobTest.java

    /**
     * 
     */
    package com.zhen.controlJobTest;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
    import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
    import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import com.zhen.controlJobTest.DBInputFormatApp.DBInputMapper;
    import com.zhen.controlJobTest.DBOutputFormatApp.DBOutputMapper;
    import com.zhen.controlJobTest.DBOutputFormatApp.DBOutputReducer;
    
    /**
     * @author FengZhen
     *
     */
    public class ControlJobTest {
    
        public static void main(String[] args) throws IOException {
            //第一个任务,mysql导入到HDFS
            Configuration configuration1 = new Configuration();
            //配置当前作业需要使用的JDBC配置
            DBConfiguration.configureDB(configuration1, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/hadoop",
                    "root", "123qwe");
            Job job1 = Job.getInstance(configuration1, DBInputFormatApp.class.getSimpleName());
    
            job1.setJarByClass(DBInputFormatApp.class);
            job1.setMapperClass(DBInputMapper.class);
            job1.setMapOutputKeyClass(LongWritable.class);
            job1.setMapOutputValueClass(Text.class);
    
            job1.setOutputKeyClass(LongWritable.class);
            job1.setOutputValueClass(Text.class);
    
            //配置作业的输入数据格式
            job1.setInputFormatClass(DBInputFormat.class);
            //配置当前作业需要查询的sql语句及接收sql语句的bean
            DBInputFormat.setInput(
                    job1, 
                    BeanWritable.class, 
                    "select * from people", 
                    "select count(1) from people");
            
            FileOutputFormat.setOutputPath(job1, new Path(args[0]));
            
            //第二个任务 HDFS导出到mysql
            
            Configuration configuration2 = new Configuration();
            //在创建Configuration的时候紧接着配置数据库连接信息
            DBConfiguration.configureDB(configuration2, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/hadoop", "root", "123qwe");
            Job job2 = Job.getInstance(configuration2, DBOutputFormatApp.class.getSimpleName());
            job2.setJarByClass(DBOutputFormatApp.class);
            job2.setMapperClass(DBOutputMapper.class);
            job2.setMapOutputKeyClass(NullWritable.class);
            job2.setMapOutputValueClass(BeanWritable.class);
            
            job2.setReducerClass(DBOutputReducer.class);
            job2.setOutputFormatClass(DBOutputFormat.class);
            job2.setOutputKeyClass(BeanWritable.class);
            job2.setOutputValueClass(NullWritable.class);
            
            job2.setInputFormatClass(TextInputFormat.class);
            FileInputFormat.setInputPaths(job2, args[0]);
            //配置当前作业输出到数据库表、字段信息
            DBOutputFormat.setOutput(job2, "people", new String[]{"id","name","height"});
            
            ControlledJob controlledJob1 = new ControlledJob(configuration1);
            controlledJob1.setJob(job1);
            
            ControlledJob controlledJob2 = new ControlledJob(configuration2);
            controlledJob2.setJob(job2);
    
            //如果两个任务有依赖关系,必须设置此选项
            controlledJob2.addDependingJob(controlledJob1);
            
            JobControl jobControl = new JobControl("groupName");
            jobControl.addJob(controlledJob1);
            jobControl.addJob(controlledJob2);
            jobControl.run();
            
            while(true){
                boolean allFinished = jobControl.allFinished();
                if (allFinished) {
                    System.exit(0);
                }
            }
            
        }
        
    }

    mysql表结构如下

    CREATE TABLE `people` (
      `id` int(11) NOT NULL,
      `name` varchar(255) DEFAULT NULL,
      `height` double DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8

    先插入测试数据

    然后将代码打包为jar,传到服务器,执行任务

    hadoop jar /Users/FengZhen/Desktop/Hadoop/other/mapreduce_jar/JobControlTest.jar com.zhen.controlJobTest.ControlJobTest 
    /user/hadoop/mapreduce/mysqlToHdfs/people

    此任务包含了两个子任务,一个是将mysql数据导入HDFS,一个是将HDFS的数据导出Mysql,也可以写个简单的mapreduce任务来测试。

    如果两个子任务有依赖关系,那么必须要设置

    controlledJob2.addDependingJob(controlledJob1);

    说明job2依赖于job1,当job1执行完之后才会去执行job2.

  • 相关阅读:
    sql测试
    sql时间和日期函数
    递归算法
    冒泡排序
    Ajax
    省市区下拉框三级联动
    Repeater用法
    WIndows form Linq多表联合
    C# 递归算羊
    C# 定义一个学生的结构体,输入学生信息,学号,姓名,身高,按身高排序输出
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/8436990.html
Copyright © 2011-2022 走看看