zoukankan      html  css  js  c++  java
  • Hadoop,MapReduce操作Mysql

    前以前帖子介绍,怎样读取文本数据源和多个数据源的合并:http://www.cnblogs.com/liqizhou/archive/2012/05/15/2501835.html

    这一个博客介绍一下MapReduce怎样读取关系数据库的数据,选择的关系数据库为MySql,因为它是开源的软件,所以大家用的比较多。以前上学的时候就没有用过开源的软件,直接用盗版,也相当与免费,且比开源好用,例如向oracle,windows7等等。现在工作了,由于公司考虑成本的问题,所以都用成开源的,ubuntu,mysql等,本人现在支持开源,特别像hadoop这样的东西,真的太好了,不但可以使用软件,也可以读到源代码。话不说多了。

    hadoop技术推出一首曾遭到关系数据库研究者的挑衅和批评,认为MapReduce不具有关系数据库中的结构化数据存储和处理能力。为此,hadoop社区和研究人员做了多的努力,在hadoop0.19版支持MapReduce访问关系数据库,如:mysql,MySQL、PostgreSQL、Oracle 等几个数据库系统。

    1. 从Mysql读出数据

    Hadoop访问关系数据库主要通过一下接口实现的:DBInputFormat类,包所在位置:org.apache.hadoop.mapred.lib.db 中。DBInputFormat 在 Hadoop 应用程序中通过数据库供应商提供的 JDBC接口来与数据库进行交互,并且可以使用标准的 SQL 来读取数据库中的记录。学习DBInputFormat首先必须知道二个条件。

    1. 在使用 DBInputFormat 之前,必须将要使用的 JDBC 驱动拷贝到分布式系统各个节点的$HADOOP_HOME/lib/目录下。

    2. MapReduce访问关系数据库时,大量频繁的从MapReduce程序中查询和读取数据,这大大的增加了数据库的访问负载,因此,DBInputFormat接口仅仅适合读取小数据量的数据,而不适合处理数据仓库。要处理数据仓库的方法有:利用数据库的Dump工具将大量待分析的数据输出为文本,并上传的Hdfs中进行处理,处理的方法可参考:http://www.cnblogs.com/liqizhou/archive/2012/05/15/2501835.html

     DBInputFormat 类中包含以下三个内置类

    1. protected class DBRecordReader implementsRecordReader<LongWritable, T>:用来从一张数据库表中读取一条条元组记录。
    2. 2.public static class NullDBWritable implements DBWritable,Writable:主要用来实现 DBWritable 接口。DBWritable接口要实现二个函数,第一是write,第二是readFileds,这二个函数都不难理解,一个是写,一个是读出所有字段。原型如下:
      public void write(PreparedStatement statement) throwsSQLException;
      public void readFields(ResultSet resultSet) throws SQLException;
    3. protected static class DBInputSplit implements InputSplit:主要用来描述输入元组集合的范围,包括 start 和 end 两个属性,start 用来表示第一条记录的索引号,end 表示最后一条记录的索引号.

    下面对怎样使用 DBInputFormat 读取数据库记录进行详细的介绍,具体步骤如下:

    1. DBConfiguration.configureDB (JobConf job, StringdriverClass, String dbUrl, String userName, String passwd)函数,配置JDBC 驱动,数据源,以及数据库访问的用户名和密码。MySQL 数据库的 JDBC 的驱动为“com.mysql.jdbc.Driver”,数据源为“jdbc:mysql://localhost/testDB”,其中testDB为访问的数据库。useName一般为“root”,passwd是你数据库的密码。

    2. DBInputFormat.setInput(JobConf job, Class<?extends DBWritable> inputClass, String tableName, String conditions,String orderBy, String... fieldNames),这个方法的参数很容易看懂,inputClass实现DBWritable接口。,string tableName表名, conditions表示查询的条件,orderby表示排序的条件,fieldNames是字段,这相当与把sql语句拆分的结果。当然也可以用sql语句进行重载。etInput(JobConf job, Class<?extends DBWritable> inputClass, String inputQuery, StringinputCountQuery)。

    3. 编写MapReduce函数,包括Mapper 类、Reducer 类、输入输出文件格式等,然后调用JobClient.runJob(conf)。

    上面讲了理论,下面举个例子:假设 MySQL 数据库中有数据库student,假设数据库中的字段有“id”,“name”,“gender","number"。

    第一步要实现DBwrite和write数据接口。代码如下:

    复制代码
            public class StudentRecord implements Writable, DBWritable{
                int id;
                String name;
                String gender;
                String number;
                @Override
            public void readFields(DataInput in) throws IOException {
                // TODO Auto-generated method stub
                this.id = in.readInt();
                this.gender = Text.readString(in);
                this.name = in.readString();
                this.number = in.readString();
            }
                @Override
            public void write(DataOutput out) throws IOException {
                // TODO Auto-generated method stub
                out.writeInt(this.id);
                Text.writeString(out,this.name);
                out.writeInt(this.gender);
                out.writeInt(this.number);
            }
                
                @Override
            public void readFields(ResultSet result) throws SQLException {
                // TODO Auto-generated method stub
                this.id = result.getInt(1);
                this.name = result.getString(2);
                this.gender = result.getString(3);
                this.number = result.getString(4);
            }
                
                @Override
            public void write(PreparedStatement stmt) throws SQLException{
                // TODO Auto-generated method stub
                stmt.setInt(1, this.id);
                stmt.setString(2, this.name);
                stmt.setString(3, this.gender);
                stmt.setString(4, this.number);
            }
                @Override
            public String toString() {
                // TODO Auto-generated method stub
                return new String(this.name + " " + this.gender + " " +this.number);
            }
    复制代码

    第二步,实现Map和Reduce类

    复制代码
        public class DBAccessMapper extends MapReduceBase implements
                Mapper<LongWritable, TeacherRecord, LongWritable, Text> {
            @Override
            public void map(LongWritable key, TeacherRecord value,
                    OutputCollector<LongWritable, Text> collector, Reporter reporter)
                    throws IOException {
                // TODO Auto-generated method stub
                new collector.collect(new LongWritable(value.id), new Text(value
                        .toString()));
            }
        }
    复制代码

    第三步:主函数的实现,函数

    复制代码
    public class DBAccessReader {
    
        public static void main(String[] args) throws IOException {
            JobConf conf = new JobConf(DBAccessReader.class);
            conf.setOutputKeyClass(LongWritable.class);
            conf.setOutputValueClass(Text.class);
            conf.setInputFormat(DBInputFormat.class);
            FileOutputFormat.setOutputPath(conf, new Path("dboutput"));
            DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
            "jdbc:mysql://localhost/school","root","123456");
            String [] fields = {"id", "name", "gender", "number"};
            DBInputFormat.setInput(conf, StudentRecord.class,"Student",null "id", fields);
            conf.setMapperClass(DBAccessMapper.class);
            conf.setReducerClass(IdentityReducer.class);
            JobClient.runJob(conf);
            }
    
    }
    复制代码

    2.写数据

     往往对于数据处理的结果的数据量一般不会太大,可能适合hadoop直接写入数据库中。hadoop提供了相应的数据库直接输出的计算发结果。

    1. DBOutFormat: 提供数据库写入接口。
    2. DBRecordWriter:提供向数据库中写入的数据记录的接口。
    3. DBConfiguration:提供数据库配置和创建链接的接口。

    DBOutFormat提供一个静态方法setOutput(job,String table,String ...filedNames);该方法的参数很容易看懂。假设要插入一个Student的数据,其代码为

    复制代码
        public static void main(String[] args) throws IOException {
            Configuration conf = new Configuration();
            JobConf conf = new JobConf();
            conf.setOutputFormat(DBOutputFormat.class);
            DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
                    "jdbc:mysql://localhost/school","root","123456");
            DBOutputFormat.setOutput(conf,"Student", 456, "liqizhou", "man", "20004154578");
            JobClient.runJob(conf); 
    复制代码

  • 相关阅读:
    Best Time to Buy and Sell Stock
    Remove Nth Node From End of List
    Unique Paths
    Swap Nodes in Pairs
    Convert Sorted Array to Binary Search Tree
    Populating Next Right Pointers in Each Node
    Maximum Subarray
    Climbing Stairs
    Unique Binary Search Trees
    Remove Duplicates from Sorted Array
  • 原文地址:https://www.cnblogs.com/bingyun84/p/4290374.html
Copyright © 2011-2022 走看看