zoukankan      html  css  js  c++  java
  • 一步一步跟我学习hadoop(7)----hadoop连接mysql数据库运行数据读写数据库操作

        为了方便 MapReduce 直接訪问关系型数据库(Mysql,Oracle)。Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,依据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

        执行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,通常是因为程序找不到mysql驱动包。解决方法是让每一个tasktracker执行MapReduce程序时都能够找到该驱动包。

    加入包有两种方式:

    (1)在每一个节点下的${HADOOP_HOME}/lib下加入该包。重新启动集群,通常是比較原始的方法。

    (2)a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /hdfsPath/

           b)在mr程序提交job前,加入语句:DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql- connector-java-5.1.0-bin.jar”),conf);

    mysql数据库存储到hadoop hdfs

    mysql表创建和数据初始化

    DROP TABLE IF EXISTS `wu_testhadoop`;
    CREATE TABLE `wu_testhadoop` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `title` varchar(255) DEFAULT NULL,
      `content` varchar(255) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
    
    -- ----------------------------
    -- Records of wu_testhadoop
    -- ----------------------------
    INSERT INTO `wu_testhadoop` VALUES ('1', '123', '122312');
    INSERT INTO `wu_testhadoop` VALUES ('2', '123', '123456');
    

    定义hadoop数据訪问

    mysql表创建完成后,我们须要定义hadoop訪问mysql的规则。

    hadoop提供了org.apache.hadoop.io.Writable接口来实现简单的高效的可序列化的协议,该类基于DataInput和DataOutput来实现相关的功能。

    hadoop对数据库訪问也提供了org.apache.hadoop.mapred.lib.db.DBWritable接口,当中write方法用于对PreparedStatement对象设定值,readFields方法用于对从数据库读取出来的对象进行列的值绑定。

    以上两个接口的使用例如以下(内容是从源代码得来)

    writable

     public class MyWritable implements Writable {
           // Some data     
           private int counter;
           private long timestamp;
           
           public void write(DataOutput out) throws IOException {
             out.writeInt(counter);
             out.writeLong(timestamp);
           }
           
           public void readFields(DataInput in) throws IOException {
             counter = in.readInt();
             timestamp = in.readLong();
           }
           
           public static MyWritable read(DataInput in) throws IOException {
             MyWritable w = new MyWritable();
             w.readFields(in);
             return w;
           }
         }
     


    DBWritable

    public class MyWritable implements Writable, DBWritable {
       // Some data     
       private int counter;
       private long timestamp;
           
       //Writable#write() implementation
       public void write(DataOutput out) throws IOException {
         out.writeInt(counter);
         out.writeLong(timestamp);
       }
           
       //Writable#readFields() implementation
       public void readFields(DataInput in) throws IOException {
         counter = in.readInt();
         timestamp = in.readLong();
       }
           
       public void write(PreparedStatement statement) throws SQLException {
         statement.setInt(1, counter);
         statement.setLong(2, timestamp);
       }
           
       public void readFields(ResultSet resultSet) throws SQLException {
         counter = resultSet.getInt(1);
         timestamp = resultSet.getLong(2);
       } 
     }
    

    数据库相应的实现

    package com.wyg.hadoop.mysql.bean;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapred.lib.db.DBWritable;
    
    public class DBRecord implements Writable, DBWritable{
    	private int id;
    	private String title;
    	private String content;
    	public int getId() {
    		return id;
    	}
    
    	public void setId(int id) {
    		this.id = id;
    	}
    
    	public String getTitle() {
    		return title;
    	}
    
    	public void setTitle(String title) {
    		this.title = title;
    	}
    
    	public String getContent() {
    		return content;
    	}
    
    	public void setContent(String content) {
    		this.content = content;
    	}
    
    	@Override
    	public void readFields(ResultSet set) throws SQLException {
    		this.id = set.getInt("id");
    		this.title = set.getString("title");
    		this.content = set.getString("content");
    	}
    
    	@Override
    	public void write(PreparedStatement pst) throws SQLException {
    		pst.setInt(1, id);
    		pst.setString(2, title);
    		pst.setString(3, content);
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		this.id = in.readInt();
    		this.title = Text.readString(in);
    		this.content = Text.readString(in);
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeInt(this.id);
    		Text.writeString(out, this.title);
    		Text.writeString(out, this.content);
    	}
    
    	@Override
    	public String toString() {
    		 return this.id + " " + this.title + " " + this.content;  
    	}
    }
    


    实现Map/Reduce

    package com.wyg.hadoop.mysql.mapper;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reporter;
    
    import com.wyg.hadoop.mysql.bean.DBRecord;
    
    @SuppressWarnings("deprecation")
    public class DBRecordMapper extends MapReduceBase implements Mapper<LongWritable, DBRecord, LongWritable, Text>{
    
    	@Override
    	public void map(LongWritable key, DBRecord value,
    			OutputCollector<LongWritable, Text> collector, Reporter reporter)
    			throws IOException {
    		collector.collect(new LongWritable(value.getId()), new Text(value.toString()));  
    	}
    	
    }
    

    測试hadoop连接mysql并将数据存储到hdfs

    package com.wyg.hadoop.mysql.db;
    import java.io.IOException;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.lib.IdentityReducer;
    import org.apache.hadoop.mapred.lib.db.DBConfiguration;
    import org.apache.hadoop.mapred.lib.db.DBInputFormat;
    
    import com.wyg.hadoop.mysql.bean.DBRecord;
    import com.wyg.hadoop.mysql.mapper.DBRecordMapper;
    
    public class DBAccess {
          public static void main(String[] args) throws IOException {
                 JobConf conf = new JobConf(DBAccess.class);
                 conf.setOutputKeyClass(LongWritable.class);
                 conf.setOutputValueClass(Text.class);
                 conf.setInputFormat(DBInputFormat.class);
                 Path path = new Path("hdfs://192.168.44.129:9000/user/root/dbout");
                 FileOutputFormat.setOutputPath(conf, path);
                 DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://你的ip:3306/数据库名","username","password");
                 String [] fields = {"id", "title", "content"};
                 DBInputFormat.setInput(conf, DBRecord.class, "wu_testhadoop",
                            null, "id", fields);
                 conf.setMapperClass(DBRecordMapper.class);
                 conf.setReducerClass(IdentityReducer.class);
                 JobClient.runJob(conf);
          }
    }

    运行程序,结果例如以下:

    15/08/11 16:46:18 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
    15/08/11 16:46:18 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
    15/08/11 16:46:18 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
    15/08/11 16:46:19 INFO mapred.JobClient: Running job: job_local_0001
    15/08/11 16:46:19 INFO mapred.MapTask: numReduceTasks: 1
    15/08/11 16:46:19 INFO mapred.MapTask: io.sort.mb = 100
    15/08/11 16:46:19 INFO mapred.MapTask: data buffer = 79691776/99614720
    15/08/11 16:46:19 INFO mapred.MapTask: record buffer = 262144/327680
    15/08/11 16:46:19 INFO mapred.MapTask: Starting flush of map output
    15/08/11 16:46:19 INFO mapred.MapTask: Finished spill 0
    15/08/11 16:46:19 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
    15/08/11 16:46:19 INFO mapred.LocalJobRunner: 
    15/08/11 16:46:19 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
    15/08/11 16:46:19 INFO mapred.LocalJobRunner: 
    15/08/11 16:46:19 INFO mapred.Merger: Merging 1 sorted segments
    15/08/11 16:46:19 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 48 bytes
    15/08/11 16:46:19 INFO mapred.LocalJobRunner: 
    15/08/11 16:46:19 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
    15/08/11 16:46:19 INFO mapred.LocalJobRunner: 
    15/08/11 16:46:19 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
    15/08/11 16:46:19 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.44.129:9000/user/root/dbout
    15/08/11 16:46:19 INFO mapred.LocalJobRunner: reduce > reduce
    15/08/11 16:46:19 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
    15/08/11 16:46:20 INFO mapred.JobClient:  map 100% reduce 100%
    15/08/11 16:46:20 INFO mapred.JobClient: Job complete: job_local_0001
    15/08/11 16:46:20 INFO mapred.JobClient: Counters: 14
    15/08/11 16:46:20 INFO mapred.JobClient:   FileSystemCounters
    15/08/11 16:46:20 INFO mapred.JobClient:     FILE_BYTES_READ=34606
    15/08/11 16:46:20 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=69844
    15/08/11 16:46:20 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=30
    15/08/11 16:46:20 INFO mapred.JobClient:   Map-Reduce Framework
    15/08/11 16:46:20 INFO mapred.JobClient:     Reduce input groups=2
    15/08/11 16:46:20 INFO mapred.JobClient:     Combine output records=0
    15/08/11 16:46:20 INFO mapred.JobClient:     Map input records=2
    15/08/11 16:46:20 INFO mapred.JobClient:     Reduce shuffle bytes=0
    15/08/11 16:46:20 INFO mapred.JobClient:     Reduce output records=2
    15/08/11 16:46:20 INFO mapred.JobClient:     Spilled Records=4
    15/08/11 16:46:20 INFO mapred.JobClient:     Map output bytes=42
    15/08/11 16:46:20 INFO mapred.JobClient:     Map input bytes=2
    15/08/11 16:46:20 INFO mapred.JobClient:     Combine input records=0
    15/08/11 16:46:20 INFO mapred.JobClient:     Map output records=2
    15/08/11 16:46:20 INFO mapred.JobClient:     Reduce input records=2
    


    同一时候能够看到hdfs文件系统多了一个dbout的文件夹,里边的文件保存了数据库相应的数据,内容保存例如以下

    1	1 123 122312
    2	2 123 123456
    


    hdfs数据导入到mysql

        hdfs文件存储到mysql,也须要上边的DBRecord类作为辅助。由于数据库的操作都是通过DBInput和DBOutput来进行的;

        首先须要定义map和reduce的实现(map用以对hdfs的文档进行解析,reduce解析map的输出并输出)

    package com.wyg.hadoop.mysql.mapper;
    
    import java.io.IOException;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.util.Iterator;
    
    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reducer;
    import org.apache.hadoop.mapred.Reporter;
    import com.wyg.hadoop.mysql.bean.DBRecord;
    
    public class WriteDB {
        // Map处理过程
        public static class Map extends MapReduceBase implements
    
                Mapper<Object, Text, Text, DBRecord> {
            private final static DBRecord one = new DBRecord();
    
            private Text word = new Text();
    
            @Override
    
            public void map(Object key, Text value,
    
                OutputCollector<Text, DBRecord> output, Reporter reporter)
    
                    throws IOException {
    
                String line = value.toString();
                String[] infos = line.split(" ");
                String id = infos[0].split("	")[1];
                one.setId(new Integer(id));
                one.setTitle(infos[1]);
                one.setContent(infos[2]);
                word.set(id);
                output.collect(word, one);
            }
    
        }
    
        public static class Reduce extends MapReduceBase implements
    		    Reducer<Text, DBRecord, DBRecord, Text> {
    		@Override
    		public void reduce(Text key, Iterator<DBRecord> values,
    				OutputCollector<DBRecord, Text> collector, Reporter reporter)
    				throws IOException {
    			DBRecord record = values.next();
    		    collector.collect(record, new Text());
    		}
    	}
    }

    測试hdfs导入数据到数据库

    package com.wyg.hadoop.mysql.db;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.TextInputFormat;
    import org.apache.hadoop.mapred.lib.db.DBConfiguration;
    import org.apache.hadoop.mapred.lib.db.DBInputFormat;
    import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
    
    import com.wyg.hadoop.mysql.bean.DBRecord;
    import com.wyg.hadoop.mysql.mapper.WriteDB;
    
    public class DBInsert {
    	public static void main(String[] args) throws Exception {
    
    		 
    
            JobConf conf = new JobConf(WriteDB.class);
            // 设置输入输出类型
    
            conf.setInputFormat(TextInputFormat.class);
            conf.setOutputFormat(DBOutputFormat.class);
    
            // 不加这两句,通只是,可是网上给的样例没有这两句。
            //Text, DBRecord
            conf.setMapOutputKeyClass(Text.class);
            conf.setMapOutputValueClass(DBRecord.class);
            conf.setOutputKeyClass(Text.class);
            conf.setOutputValueClass(DBRecord.class);
            // 设置Map和Reduce类
            conf.setMapperClass(WriteDB.Map.class);
            conf.setReducerClass(WriteDB.Reduce.class);
            // 设置输如文件夹
            FileInputFormat.setInputPaths(conf, new Path("hdfs://192.168.44.129:9000/user/root/dbout"));
            // 建立数据库连接
            DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://数据库ip:3306/数据库名称","username","password");
            String[] fields = {"id","title","content" };
            DBOutputFormat.setOutput(conf, "wu_testhadoop", fields);
            JobClient.runJob(conf);
        }
    
    }
    

    測试结果例如以下

    15/08/11 18:10:15 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
    15/08/11 18:10:15 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
    15/08/11 18:10:15 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
    15/08/11 18:10:15 INFO mapred.FileInputFormat: Total input paths to process : 1
    15/08/11 18:10:15 INFO mapred.JobClient: Running job: job_local_0001
    15/08/11 18:10:15 INFO mapred.FileInputFormat: Total input paths to process : 1
    15/08/11 18:10:15 INFO mapred.MapTask: numReduceTasks: 1
    15/08/11 18:10:15 INFO mapred.MapTask: io.sort.mb = 100
    15/08/11 18:10:15 INFO mapred.MapTask: data buffer = 79691776/99614720
    15/08/11 18:10:15 INFO mapred.MapTask: record buffer = 262144/327680
    15/08/11 18:10:15 INFO mapred.MapTask: Starting flush of map output
    15/08/11 18:10:16 INFO mapred.MapTask: Finished spill 0
    15/08/11 18:10:16 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
    15/08/11 18:10:16 INFO mapred.LocalJobRunner: hdfs://192.168.44.129:9000/user/root/dbout/part-00000:0+30
    15/08/11 18:10:16 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
    15/08/11 18:10:16 INFO mapred.LocalJobRunner: 
    15/08/11 18:10:16 INFO mapred.Merger: Merging 1 sorted segments
    15/08/11 18:10:16 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 40 bytes
    15/08/11 18:10:16 INFO mapred.LocalJobRunner: 
    15/08/11 18:10:16 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
    15/08/11 18:10:16 INFO mapred.LocalJobRunner: reduce > reduce
    15/08/11 18:10:16 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
    15/08/11 18:10:16 INFO mapred.JobClient:  map 100% reduce 100%
    15/08/11 18:10:16 INFO mapred.JobClient: Job complete: job_local_0001
    15/08/11 18:10:16 INFO mapred.JobClient: Counters: 14
    15/08/11 18:10:16 INFO mapred.JobClient:   FileSystemCounters
    15/08/11 18:10:16 INFO mapred.JobClient:     FILE_BYTES_READ=34932
    15/08/11 18:10:16 INFO mapred.JobClient:     HDFS_BYTES_READ=60
    15/08/11 18:10:16 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=70694
    15/08/11 18:10:16 INFO mapred.JobClient:   Map-Reduce Framework
    15/08/11 18:10:16 INFO mapred.JobClient:     Reduce input groups=2
    15/08/11 18:10:16 INFO mapred.JobClient:     Combine output records=0
    15/08/11 18:10:16 INFO mapred.JobClient:     Map input records=2
    15/08/11 18:10:16 INFO mapred.JobClient:     Reduce shuffle bytes=0
    15/08/11 18:10:16 INFO mapred.JobClient:     Reduce output records=2
    15/08/11 18:10:16 INFO mapred.JobClient:     Spilled Records=4
    15/08/11 18:10:16 INFO mapred.JobClient:     Map output bytes=34
    15/08/11 18:10:16 INFO mapred.JobClient:     Map input bytes=30
    15/08/11 18:10:16 INFO mapred.JobClient:     Combine input records=0
    15/08/11 18:10:16 INFO mapred.JobClient:     Map output records=2
    15/08/11 18:10:16 INFO mapred.JobClient:     Reduce input records=2
    

    測试之前我对原有表进行了清空处理,能够看到运行后数据库里边加入了两条内容;

    下次在运行的时候会报错,属于正常情况,原因在于我们导入数据的时候对id进行赋值了,假设忽略id。是能够一直加入的;

    源代码下载地址

    源代码已上传,下载地址为download.csdn.net/detail/wuyinggui10000/8974585



  • 相关阅读:
    .NET的委托和匿名函数应用一例
    C#中,变量前的@符号
    ExtJs中多个form情况下指定某个form使能
    【Python web 开发】个人中心-用户收藏功能
    【Python web 开发】用户个人信息修改
    【Python web 开发】django rest framwork 动态设置serializers
    【Python web 开发】django rest framwork动态设置权限premission
    【fiddler】用fiddler 拦截请求修改response 回包测试
    【Python web 开发】联合唯一索引
    【Python web 开发】用户收藏功能
  • 原文地址:https://www.cnblogs.com/gccbuaa/p/6915983.html
Copyright © 2011-2022 走看看