zoukankan      html  css  js  c++  java
  • 使用MapReduce将mysql数据导入HDFS

    package com.zhen.mysqlToHDFS;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapred.lib.db.DBWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * @author FengZhen 
     * 将mysql数据导入hdfs
     */
    public class DBInputFormatApp extends Configured implements Tool {
    
        /**
         * JavaBean
         * 需要实现Hadoop序列化接口Writable以及与数据库交互时的序列化接口DBWritable
         * 官方API中解释如下:
         * public class DBInputFormat<T extends DBWritable>
         *   extends InputFormat<LongWritable, T> implements Configurable
         * 即Mapper的Key是LongWritable类型,不可改变;Value是继承自DBWritable接口的自定义JavaBean
         */
        public static class BeanWritable implements Writable, DBWritable {
    
            private int id;
            private String name;
            private double height;
    
            public void readFields(ResultSet resultSet) throws SQLException {
                this.id = resultSet.getInt(1);
                this.name = resultSet.getString(2);
                this.height = resultSet.getDouble(3);
            }
    
            public void write(PreparedStatement preparedStatement) throws SQLException {
                preparedStatement.setInt(1, id);
                preparedStatement.setString(2, name);
                preparedStatement.setDouble(3, height);
            }
    
            public void readFields(DataInput dataInput) throws IOException {
                this.id = dataInput.readInt();
                this.name = dataInput.readUTF();
                this.height = dataInput.readDouble();
            }
    
            public void write(DataOutput dataOutput) throws IOException {
                dataOutput.writeInt(id);
                dataOutput.writeUTF(name);
                dataOutput.writeDouble(height);
            }
    
            @Override
            public String toString() {
                return id + "	" + name + "	" + height;
            }
    
        }
    
        /**
         * Map
         * 当Map的输出key为LongWritable,value为Text时,reduce可以省略不写,默认reduce也是输出LongWritable:Text
         * */
        public static class DBInputMapper extends Mapper<LongWritable, BeanWritable, LongWritable, Text> {
    
            private LongWritable outputKey;
            private Text outputValue;
    
            @Override
            protected void setup(Mapper<LongWritable, BeanWritable, LongWritable, Text>.Context context)
                    throws IOException, InterruptedException {
                this.outputKey = new LongWritable();
                this.outputValue = new Text();
            }
            
            @Override
            protected void map(LongWritable key, BeanWritable value,
                    Mapper<LongWritable, BeanWritable, LongWritable, Text>.Context context)
                    throws IOException, InterruptedException {
                outputKey.set(key.get());;
                outputValue.set(value.toString());
                context.write(outputKey, outputValue);
            }
    
        }
    
        public int run(String[] arg0) throws Exception {
            Configuration configuration = getConf();
            //配置当前作业需要使用的JDBC配置
            DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/hadoop",
                    "root", "123qwe");
            Job job = Job.getInstance(configuration, DBInputFormatApp.class.getSimpleName());
    
            job.setJarByClass(DBInputFormatApp.class);
            job.setMapperClass(DBInputMapper.class);
            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(Text.class);
    
            //配置作业的输入数据格式
            job.setInputFormatClass(DBInputFormat.class);
            //配置当前作业需要查询的sql语句及接收sql语句的bean
            DBInputFormat.setInput(
                    job, 
                    BeanWritable.class, 
                    "select * from people", 
                    "select count(1) from people");
            
            FileOutputFormat.setOutputPath(job, new Path(arg0[0]));
    
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static int createJob(String[] args) {
            Configuration conf = new Configuration();
            conf.set("dfs.datanode.socket.write.timeout", "7200000");
            conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
            conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
            int status = 0;
            try {
    
                status = ToolRunner.run(conf,new DBInputFormatApp(), args);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return status;
        }
    
        public static void main(String[] args) {
            args = new String[] { "/user/hadoop/mapreduce/mysqlToHdfs/people" };
            int status = createJob(args);
            System.exit(status);
        }
    }

    在mysql新建一张表 people

    CREATE TABLE `people` (
      `id` int(11) NOT NULL,
      `name` varchar(255) DEFAULT NULL,
      `height` double DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8

    写入几条测试数据。

    将mapreduce作业打成jar包,上传到Hadoop集群服务器,执行。

    hadoop jar /Users/FengZhen/Desktop/Hadoop/other/mapreduce_jar/MysqlToHDFS.jar com.zhen.mysqlToHDFS.DBInputFormatApp

    因为代码中已经指定了写入HDFS的路径,所以此处不需要传参,只需指定job所在类即可。

    在运行中如果提示mysql驱动找不到,如下

    Caused by: java.lang.ClassNotFoundException: com.jdbc.mysql.Driver
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:191)
        at org.apache.hadoop.mapreduce.lib.db.DBConfiguration.getConnection(DBConfiguration.java:148)
        at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.createConnection(DBInputFormat.java:198)
        ... 24 more

    解决办法:

    将mysql jdbc驱动放入 .../hadoop/share/hadoop/mapreduce/lib下,然后重启集群再次执行即可。

    使用MapReduce将HDFS数据导入MySql

  • 相关阅读:
    python模块之__future__模块
    SQL之分组排序取top n
    SQL之层次查询
    win server 2008添加磁盘-脱机转换为联机状态方法
    拉链表-增量更新方法一
    一道hive SQL面试题
    一道与时间差有关的SQL面试题
    (转)linux中nmcli命令的使用及网络配置
    (转)内核模块操作命令-lsmod+rmmod+modinfo+modprobe
    (转)Linux 系统设置 : dmesg 命令详解
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/8425016.html
Copyright © 2011-2022 走看看