zoukankan      html  css  js  c++  java
  • hadoop的自定义数据类型和与关系型数据库交互

     最近有一个需求就是在建模的时候,有少部分数据是postgres的,只能读取postgres里面的数据到hadoop里面进行建模测试,而不能导出数据到hdfs上去。

     读取postgres里面的数据库有两种方法,一种就是用hadoop的DBInputFormat(DBInputFormat在hadoop2.4.1的jar里面有两个包,import  

     org.apache.hadoop.mapreduce.lib.db包和org.apache.hadoop.mapred包,前者是较新的),另外一种就是postgres的CopyManager类。

     先说一说用DBInputFormat这个方法吧。

     首先在数据库里面创建一个表,插入几条数据测试用

     由于表里面的数据要用来做为map的输入Value,所以要自定义数据类型。

     hadoop要自定义数据类型要实现Writable接口,如果是Key要自定义数据类型那么就要实现WritableComparable接口,还要实现里面的比较方法。实现WritableComparable接 口在比较时要反序列话,比较麻烦,那么可以用继承WritableComparator类来实现字节流的比较。

     在配置DBInputFormat的输入参数时,必须要有一个数据类型实现DBWritable,所有在这里为Value自定义数据类型要实现DBWritable和Writable两个接口。

    package com.qldhlbs.hadoop.demo0420;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.lib.db.DBWritable;
    
    public class PgDbWritable implements DBWritable, Writable{
    
        private Integer call_type_id;
        private String call_type;
        private String remark;
        
        public PgDbWritable() {
            
        }
        
        public PgDbWritable(Integer call_type_id, String call_type, String remark){
            
            set(call_type_id, call_type, remark);
        }
        
        public void set(Integer call_type_id, String call_type, String remark) {
            
            this.call_type_id = call_type_id;
            this.call_type = call_type;
            this.remark = remark;
        }
        
      //结果集读取 @Override
    public void readFields(ResultSet set) throws SQLException { this.call_type_id = set.getInt(1); this.call_type = set.getString(2); this.remark = set.getString(3); }   
      
      //设置参数 @Override
    public void write(PreparedStatement ps) throws SQLException { ps.setInt(1, this.call_type_id); ps.setString(2, this.call_type); ps.setString(3, this.remark); }
      //反序列化 @Override
    public void readFields(DataInput in) throws IOException { this.call_type_id = in.readInt(); this.call_type = in.readUTF(); this.remark = in.readUTF(); }
      //序列化 @Override
    public void write(DataOutput out) throws IOException { out.writeInt(this.call_type_id); out.writeUTF(this.call_type); out.writeUTF(this.remark); } public Integer getCall_type_id() { return call_type_id; } public String getCall_type() { return call_type; } public String getRemark() { return remark; } @Override public String toString() { return call_type_id + " " + call_type + " " + remark; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((call_type == null) ? 0 : call_type.hashCode()); result = prime * result + ((call_type_id == null) ? 0 : call_type_id.hashCode()); result = prime * result + ((remark == null) ? 0 : remark.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; PgDbWritable other = (PgDbWritable) obj; if (call_type == null) { if (other.call_type != null) return false; } else if (!call_type.equals(other.call_type)) return false; if (call_type_id == null) { if (other.call_type_id != null) return false; } else if (!call_type_id.equals(other.call_type_id)) return false; if (remark == null) { if (other.remark != null) return false; } else if (!remark.equals(other.remark)) return false; return true; } }

    首先在PgDbWritable 里面维护对应数据库表的3个字段,并覆写关键的四个方法。每个方法的作用在代码里面有介绍。重写toString,hashCode和equals方法。

    自定义数据类型后就是读取数据库的数据了。

    package com.qldhlbs.hadoop.demo0420;
    
    import java.io.IOException;
    import java.sql.SQLException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    
    public class MapreducePackageDbApp {
    
        
        static class DbReadMapper extends Mapper<LongWritable, PgDbWritable, LongWritable, PgDbWritable>{
            
            @Override
            protected void map(LongWritable key, PgDbWritable value,
                    Mapper<LongWritable, PgDbWritable, LongWritable, PgDbWritable>.Context context)
                            throws IOException, InterruptedException {
                context.write(key, value);
            }
        }
        
        static class DbReadReduce extends Reducer<LongWritable, PgDbWritable, LongWritable, PgDbWritable>{
            
            @Override
            protected void reduce(LongWritable key, Iterable<PgDbWritable> values,
                    Reducer<LongWritable, PgDbWritable, LongWritable, PgDbWritable>.Context context) throws IOException, InterruptedException {
                for (PgDbWritable value : values) {
                    context.write(key, value);
                }
            }
        }
    
        
        @SuppressWarnings("deprecation")
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, SQLException {
            
            Configuration conf = new Configuration();
    
            DBConfiguration.configureDB(conf, "org.postgresql.Driver", "jdbc:postgresql://192.168.0.203/test", "hb", "xxx");
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(MapreducePackageDbApp.class);
            job.setJobName(MapreducePackageDbApp.class.getSimpleName());
            
            DistributedCache.addFileToClassPath(new Path("hdfs://192.168.0.201:49000/user/qldhlbs/lib/postgresql-9.3-1101.jdbc3.jar"), conf);
        
            String[] fields = {"call_type_id", "call_type", "remark"};
    
            
            DBInputFormat<PgDbWritable> in = new DBInputFormat<PgDbWritable>();
            in.setConf(conf);
    
            //配置DBInputFormat的信息,job, 输入DBWritable, 表名, 查询条件, order by条件, 表的字段数组
            DBInputFormat.setInput(job, PgDbWritable.class, "dim_160_168_call_type", null, null, fields);
            
            job.setMapperClass(DbReadMapper.class);
            //可以不设置reducer,hadoop会自动配置最简的reducer,看源码可以知道是输出map的输出
            job.setReducerClass(DbReadReduce.class);
            
            job.setOutputKeyClass(LongWritable.class);
            //job.setOutputValueClass(Text.class);
            job.setOutputValueClass(PgDbWritable.class);
            
            job.setInputFormatClass(DBInputFormat.class);
        
            FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.0.201:49000/user/qldhlbs/db5"));
                
            boolean isSuccess = job.waitForCompletion(true);
            System.exit(isSuccess ? 0 : 1);
        }
    
    }

    这里只是一个demo,所以map函数就直接输出读取到的内容就行了,由于reduce函数不写,就是直接写出读取到的map函数数据,所有这里reduce函数也可以不写。

    在这里有几点是要注意的,首先这里面的包都是导入的mapreduce的而不是mapred包,混淆会报错;第二点是在hadoop的hdfs上上传一份postgres的驱动包,

    先在hdfs上创建一个目录:hadoop fs -mkdir /user/qldhlbs/lib,然后把文件上传上去:hadoop fs -copyFromLocal postgresql-9.3-1101.jdbc3.jar /user/qldhlbs/lib。

    在代码里面就是用DistributedCache.addFileToClassPath(new Path("hdfs://192.168.0.201:49000/user/qldhlbs/lib/postgresql-9.3-1101.jdbc3.jar"), conf)这个方法

    把jar加载到类路径上去;第三点就是配置DBConfiguration信息,参数依次是Configuration ,数据库驱动,数据库url,用户名,密码。在配置完DBConfiguration信息后,

    DBInputFormat<PgDbWritable> in = new DBInputFormat<PgDbWritable>();

    in.setConf(conf);

    setConf()这个方法不能忘记,一开始就是没调用这个方法把conf给DBInputFormat,一直报空指针异常,后来经过调试查看得知是connection没得到,但是DBConfiguration得到了connection。再进一步调试是DBInputFormat没得到DBConfiguration对象,所以根本就获取不到connection。查看hadoop-mapreduce-client-core-2.4.1源码才解决问题。

    public void setConf(Configuration conf)
      {
        this.dbConf = new DBConfiguration(conf);
        try
        {
          getConnection();
    
          DatabaseMetaData dbMeta = this.connection.getMetaData();
          this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
        }
        catch (Exception ex) {
          throw new RuntimeException(ex);
        }
    
        this.tableName = this.dbConf.getInputTableName();
        this.fieldNames = this.dbConf.getInputFieldNames();
        this.conditions = this.dbConf.getInputConditions();
      }
    
    
    public Connection getConnection() {
        try {
          if (null == this.connection)
          {
            this.connection = this.dbConf.getConnection();
            this.connection.setAutoCommit(false);
            this.connection.setTransactionIsolation(8);
          }
        }
        catch (Exception e) {
          throw new RuntimeException(e);
        }
        return this.connection;
      }

    这是反编译的部分源码,可以看到connection是可以从DBConfiguration对象拿的;第四点就是配置DBInputFormat的信息,参数是job, 输入DBWritable, 表名, 查询条件, order by条件, 表的字段字符串数组。

    所有的做完了接下来就可以跑hadoop了。

    这是在hdfs里面生成的文件,可以看到数据读取到hdfs上了。

    如果不用mapreduce包,用mapred包也是可以的,代码就不上了,差不多,只是不要掉用setConf()方法把conf绑定上去也行。

     

    这是第一种方法,第二种方法就是直接用org.postgresql.copy.CopyManager这个类

    public ByteArrayOutputStream copyToStream(String tableOrQuery,String delimiter){
            try {
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                CopyManager copyManager = new CopyManager(
                        (BaseConnection) getConnection());
                String copySql = "COPY " + tableOrQuery + " TO STDOUT";
                if (delimiter != null){
                    copySql = copySql + " WITH DELIMITER AS '"+delimiter+"'";
                }
                copyManager.copyOut(copySql,
                        out);
                return out;
            }catch(Exception e){
                e.printStackTrace();
            }
            return null;
        }
    
    
    ByteArrayOutputStream out = copyToStream(sql.toString(), ",");
    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
    
    public void uploadFile(String hdfsPath,InputStream in){
            try {
                FileSystem hdfs = FileSystem.get(conf);
                FSDataOutputStream out = hdfs.create(new Path(hdfsPath));
                org.apache.hadoop.io.IOUtils.copyBytes(in, out,4096,false);
                out.sync();
                out.close();
            } catch (Exception e) {
                // TODO: handle exception
            }
        }

    把流读取出来,用hadoop自带的IOUtils.copyBytes()方法写到hdfs上就可以了就可以了。

     

  • 相关阅读:
    科幻的意义---《超新星纪元》后记
    论文翻译第二弹--用python(或Markdown)对论文复制文本进行处理
    python note 001
    matlab读取txt文本
    VS中添加lib与dll
    Wake-Sleep(W-S)算法【转载】
    清华大学《C++语言程序设计进阶》线上课程笔记06---继承、派生、多态性
    清华大学《C++语言程序设计基础》线上课程笔记05---vector对象,对象的复制与移动,string类
    清华大学《C++语言程序设计基础》线上课程笔记04---指针
    Linux线程的信号量同步
  • 原文地址:https://www.cnblogs.com/qldhlbs/p/5417199.html
Copyright © 2011-2022 走看看