zoukankan      html  css  js  c++  java
  • 使用MapReduce将HDFS数据导入Mysql

    使用MapReduce将Mysql数据导入HDFS代码链接

    将HDFS数据导入Mysql,代码示例

    package com.zhen.mysqlToHDFS;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapred.lib.db.DBWritable;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    /**
     * @author FengZhen
     * 将hdfs数据导入mysql
     * 使用DBOutputFormat将HDFS路径下的结构化数据写入mysql中,结构化数据如下,第一列为key,后边三列为数据
     * 0    1    Enzo    180.66
     * 1    2    Din    170.666
     * 
     */
    public class DBOutputFormatApp extends Configured implements Tool{
    
        /**
         * JavaBean
         * 需要实现Hadoop序列化接口Writable以及与数据库交互时的序列化接口DBWritable
         * 官方API中解释如下:
         * public class DBInputFormat<T extends DBWritable>
         *   extends InputFormat<LongWritable, T> implements Configurable
         * 即Mapper的Key是LongWritable类型,不可改变;Value是继承自DBWritable接口的自定义JavaBean
         */
        public static class BeanWritable implements Writable, DBWritable {
    
            private int id;
            private String name;
            private double height;
    
            public void readFields(ResultSet resultSet) throws SQLException {
                this.id = resultSet.getInt(1);
                this.name = resultSet.getString(2);
                this.height = resultSet.getDouble(3);
            }
    
            public void write(PreparedStatement preparedStatement) throws SQLException {
                preparedStatement.setInt(1, id);
                preparedStatement.setString(2, name);
                preparedStatement.setDouble(3, height);
            }
    
            public void readFields(DataInput dataInput) throws IOException {
                this.id = dataInput.readInt();
                this.name = dataInput.readUTF();
                this.height = dataInput.readDouble();
            }
    
            public void write(DataOutput dataOutput) throws IOException {
                dataOutput.writeInt(id);
                dataOutput.writeUTF(name);
                dataOutput.writeDouble(height);
            }
    
            public void set(int id,String name,double height){
                this.id = id;
                this.name = name;
                this.height = height;
            }
            
            @Override
            public String toString() {
                return id + "	" + name + "	" + height;
            }
    
        }
        
        public static class DBOutputMapper extends Mapper<LongWritable, Text, NullWritable, BeanWritable>{
            private NullWritable outputKey;
            private BeanWritable outputValue;
    
            @Override
            protected void setup(Mapper<LongWritable, Text, NullWritable, BeanWritable>.Context context)
                    throws IOException, InterruptedException {
                this.outputKey = NullWritable.get();
                this.outputValue = new BeanWritable();
            }
            @Override
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, NullWritable, BeanWritable>.Context context)
                    throws IOException, InterruptedException {
                //插入数据库成功的计数器
                final Counter successCounter = context.getCounter("exec", "successfully");
                //插入数据库失败的计数器
                final Counter faildCounter = context.getCounter("exec", "faild");
                //解析结构化数据
                String[] fields = value.toString().split("	");
                //DBOutputFormatApp这个MapReduce应用导出的数据包含long类型的key,所以忽略key从1开始
                if (fields.length > 3) {
                    int id = Integer.parseInt(fields[1]);
                    String name = fields[2];
                    double height = Double.parseDouble(fields[3]);
                    this.outputValue.set(id, name, height);
                    context.write(outputKey, outputValue);
                    //如果插入数据库成功则递增1,表示成功计数
                    successCounter.increment(1L);
                }else{
                    //如果插入数据库失败则递增1,表示失败计数
                    faildCounter.increment(1L);
                }
                
            }
        }
        
        /**
         * 输出的key必须是继承自DBWritable的类型,DBOutputFormat要求输出的key必须是DBWritable类型
         * */
        public static class DBOutputReducer extends Reducer<NullWritable, BeanWritable, BeanWritable, NullWritable>{
            @Override
            protected void reduce(NullWritable key, Iterable<BeanWritable> values,
                    Reducer<NullWritable, BeanWritable, BeanWritable, NullWritable>.Context context)
                    throws IOException, InterruptedException {
                for (BeanWritable beanWritable : values) {
                    context.write(beanWritable, key);
                }
            }
        }
        
        
        public int run(String[] arg0) throws Exception {
            Configuration configuration = getConf();
            //在创建Configuration的时候紧接着配置数据库连接信息
            DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/hadoop", "root", "123qwe");
            Job job = Job.getInstance(configuration, DBOutputFormatApp.class.getSimpleName());
            job.setJarByClass(DBOutputFormatApp.class);
            job.setMapperClass(DBOutputMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(BeanWritable.class);
            
            job.setReducerClass(DBOutputReducer.class);
            job.setOutputFormatClass(DBOutputFormat.class);
            job.setOutputKeyClass(BeanWritable.class);
            job.setOutputValueClass(NullWritable.class);
            
            job.setInputFormatClass(TextInputFormat.class);
            FileInputFormat.setInputPaths(job, arg0[0]);
            //配置当前作业输出到数据库表、字段信息
            DBOutputFormat.setOutput(job, "people", new String[]{"id","name","height"});
            
            return job.waitForCompletion(true)?0:1;
        }
    
        public static int createJob(String[] args){
            Configuration conf = new Configuration();
            conf.set("dfs.datanode.socket.write.timeout", "7200000");
            conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
            conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
            int status = 0;
            try {
                status = ToolRunner.run(conf,new DBOutputFormatApp(), args);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return status;
        }
        
        public static void main(String[] args) {
            args = new String[]{"/user/hadoop/mapreduce/mysqlToHdfs/people"};
            int status = createJob(args);
            System.exit(status);
        }
        
    }

    打成jar包,放在服务器上,执行hadoop jar命令

    hadoop jar /Users/FengZhen/Desktop/Hadoop/other/mapreduce_jar/HDFSToMysql.jar com.zhen.mysqlToHDFS.DBOutputFormatApp

    任务结束后mysql表中即可发现数据已经有了。

  • 相关阅读:
    Android Studio使用百度地图(二)
    Android Studio使用百度地图(一)
    寒假每日日报30(体温登记app——进度3)
    EF CodeFirst数据注解特性详解
    EF CodeFirst配置领域类
    EF CodeFirst简介、默认约定、数据库初始化策略
    EF CodeFirst关于Mysql如何自动生成数据库表
    EF的预先加载--Eager Loading
    EF的延迟加载LazyLoad
    ADO.NET中sqlserver和mysql的变量名
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/8429992.html
Copyright © 2011-2022 走看看