zoukankan      html  css  js  c++  java
  • mapreduce导出MSSQL的数据到HDFS

      今天想通过一些数据,来测试一下我的《基于信息熵的无字典分词算法》这篇文章的正确性。就写了一下MapReduce程序从MSSQL SERVER2008数据库里取数据分析。程序发布到hadoop机器上运行报SQLEXCEPTION错误

     

      奇怪了,我的SQL语句中没有LIMIT,这LIMIT哪来的。我翻看了DBInputFormat类的源码,

     1 protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
     2 
     3       Configuration conf) throws IOException {
     4 
     5  
     6 
     7     @SuppressWarnings("unchecked")
     8 
     9     Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
    10 
    11     try {
    12 
    13       // use database product name to determine appropriate record reader.
    14 
    15       if (dbProductName.startsWith("ORACLE")) {
    16 
    17         // use Oracle-specific db reader.
    18 
    19         return new OracleDBRecordReader<T>(split, inputClass,
    20 
    21             conf, createConnection(), getDBConf(), conditions, fieldNames,
    22 
    23             tableName);
    24 
    25       } else if (dbProductName.startsWith("MYSQL")) {
    26 
    27         // use MySQL-specific db reader.
    28 
    29         return new MySQLDBRecordReader<T>(split, inputClass,
    30 
    31             conf, createConnection(), getDBConf(), conditions, fieldNames,
    32 
    33             tableName);
    34 
    35       } else {
    36 
    37         // Generic reader.
    38 
    39         return new DBRecordReader<T>(split, inputClass,
    40 
    41             conf, createConnection(), getDBConf(), conditions, fieldNames,
    42 
    43             tableName);
    44 
    45       }
    46 
    47     } catch (SQLException ex) {
    48 
    49       throw new IOException(ex.getMessage());
    50 
    51     }
    52 
    53   }
    View Code

    DBRecordReader的源码

     1 protected String getSelectQuery() {
     2 
     3     StringBuilder query = new StringBuilder();
     4 
     5  
     6 
     7     // Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits.
     8 
     9     if(dbConf.getInputQuery() == null) {
    10 
    11       query.append("SELECT ");
    12 
    13   
    14 
    15       for (int i = 0; i < fieldNames.length; i++) {
    16 
    17         query.append(fieldNames[i]);
    18 
    19         if (i != fieldNames.length -1) {
    20 
    21           query.append(", ");
    22 
    23         }
    24 
    25       }
    26 
    27  
    28 
    29       query.append(" FROM ").append(tableName);
    30 
    31       query.append(" AS ").append(tableName); //in hsqldb this is necessary
    32 
    33       if (conditions != null && conditions.length() > 0) {
    34 
    35         query.append(" WHERE (").append(conditions).append(")");
    36 
    37       }
    38 
    39  
    40 
    41       String orderBy = dbConf.getInputOrderBy();
    42 
    43       if (orderBy != null && orderBy.length() > 0) {
    44 
    45         query.append(" ORDER BY ").append(orderBy);
    46 
    47       }
    48 
    49     } else {
    50 
    51       //PREBUILT QUERY
    52 
    53       query.append(dbConf.getInputQuery());
    54 
    55     }
    56 
    57         
    58 
    59     try {
    60 
    61       query.append(" LIMIT ").append(split.getLength()); //问题所在
    62 
    63       query.append(" OFFSET ").append(split.getStart());
    64 
    65     } catch (IOException ex) {
    66 
    67       // Ignore, will not throw.
    68 
    69     }
    70 
    71  
    72 
    73     return query.toString();
    74 
    75   }
    View Code

    终于找到原因了。

      原来,hadoop只实现了MysqlDBRecordReader(MySQLDBRecordReader)和ORACLEDBRecordReader(OracleDBRecordReader)。

    原因找到了,我参考着OracleDBRecordReader实现了MSSQL SERVERDBRecordReader代码如下:

      MSSQLDBInputFormat的代码:

     1 /**
     2  * 
     3  */
     4 package org.apache.hadoop.mapreduce.lib.db;
     5 
     6 import java.io.IOException;
     7 import java.sql.SQLException;
     8 
     9 import org.apache.hadoop.conf.Configuration;
    10 import org.apache.hadoop.io.LongWritable;
    11 import org.apache.hadoop.mapreduce.Job;
    12 import org.apache.hadoop.mapreduce.RecordReader;
    13 
    14 /**
    15  * @author summer
    16  *  MICROSOFT SQL SERVER
    17  */
    18 public class MSSQLDBInputFormat<T extends DBWritable> extends DBInputFormat<T> {
    19 
    20     public static void setInput(Job job,
    21               Class<? extends DBWritable> inputClass,
    22               String inputQuery, String inputCountQuery,String rowId) {
    23             job.setInputFormatClass(MSSQLDBInputFormat.class);
    24             DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
    25             dbConf.setInputClass(inputClass);
    26             dbConf.setInputQuery(inputQuery);
    27             dbConf.setInputCountQuery(inputCountQuery);
    28             dbConf.setInputFieldNames(new String[]{rowId});
    29           }
    30     
    31     @Override
    32     protected RecordReader<LongWritable, T> createDBRecordReader(
    33             org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split,
    34             Configuration conf) throws IOException {
    35         
    36          @SuppressWarnings("unchecked")
    37             Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
    38             try {
    39              
    40                 return new MSSQLDBRecordReader<T>(split, inputClass,
    41                     conf, createConnection(), getDBConf(), conditions, fieldNames,
    42                     tableName);
    43             
    44             } catch (SQLException ex) {
    45               throw new IOException(ex.getMessage());
    46             }
    47         
    48         
    49     }
    50 
    51     
    52 }
    View Code

      MSSQLDBRecordReader的代码:

     1 /**
     2  * 
     3  */
     4 package org.apache.hadoop.mapreduce.lib.db;
     5 
     6 import java.io.IOException;
     7 import java.sql.Connection;
     8 import java.sql.SQLException;
     9 
    10 import org.apache.hadoop.conf.Configuration;
    11 
    12 
    13 /**
    14  * @author summer
    15  *
    16  */
    17 public class MSSQLDBRecordReader <T extends DBWritable> extends DBRecordReader<T>{
    18 
    19     public MSSQLDBRecordReader(DBInputFormat.DBInputSplit split, 
    20               Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
    21               String cond, String [] fields, String table) throws SQLException {
    22         super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
    23         
    24     }
    25 
    26     @Override
    27     protected String getSelectQuery() {
    28          StringBuilder query = new StringBuilder();
    29             DBConfiguration dbConf = getDBConf();
    30             String conditions = getConditions();
    31             String tableName = getTableName();
    32             String [] fieldNames = getFieldNames();
    33 
    34             // Oracle-specific codepath to use rownum instead of LIMIT/OFFSET.
    35             if(dbConf.getInputQuery() == null) {
    36               query.append("SELECT ");
    37           
    38               for (int i = 0; i < fieldNames.length; i++) {
    39                 query.append(fieldNames[i]);
    40                 if (i != fieldNames.length -1) {
    41                   query.append(", ");
    42                 }
    43               }
    44           
    45               query.append(" FROM ").append(tableName);
    46               if (conditions != null && conditions.length() > 0)
    47                 query.append(" WHERE ").append(conditions);
    48               String orderBy = dbConf.getInputOrderBy();
    49               if (orderBy != null && orderBy.length() > 0) {
    50                 query.append(" ORDER BY ").append(orderBy);
    51               }
    52             } else {
    53               //PREBUILT QUERY
    54               query.append(dbConf.getInputQuery());
    55             }
    56                 
    57             try {
    58               DBInputFormat.DBInputSplit split = getSplit();
    59               if (split.getLength() > 0){
    60                 String querystring = query.toString();
    61                 String id = fieldNames[0];
    62                 query = new StringBuilder();
    63                 query.append("SELECT TOP "+split.getLength()+"* FROM ( ");
    64                 query.append(querystring);
    65                 query.append(" ) a WHERE " + id +" NOT IN (SELECT TOP ").append(split.getEnd());
    66                 query.append(" "+id +" FROM (");
    67                 query.append(querystring);
    68                 query.append(" ) b");
    69                 query.append(" )");
    70                 System.out.println("----------------------MICROSOFT SQL SERVER QUERY STRING---------------------------");
    71                 System.out.println(query.toString());
    72                 System.out.println("----------------------MICROSOFT SQL SERVER QUERY STRING---------------------------");
    73               }
    74             } catch (IOException ex) {
    75               // ignore, will not throw.
    76             }              
    77 
    78             return query.toString();
    79     }
    80     
    81     
    82 
    83 }
    View Code

    mapreduce的代码

      1 /**
      2  * 
      3  */
      4 package com.nltk.sns.mapreduce;
      5 
      6 import java.io.IOException;
      7 import java.util.List;
      8 
      9 import org.apache.hadoop.conf.Configuration;
     10 import org.apache.hadoop.fs.FileSystem;
     11 import org.apache.hadoop.fs.Path;
     12 import org.apache.hadoop.io.LongWritable;
     13 import org.apache.hadoop.io.Text;
     14 import org.apache.hadoop.mapreduce.Job;
     15 import org.apache.hadoop.mapreduce.MRJobConfig;
     16 import org.apache.hadoop.mapreduce.Mapper;
     17 import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
     18 import org.apache.hadoop.mapreduce.lib.db.MSSQLDBInputFormat;
     19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     20 
     21 
     22 
     23 
     24 
     25 
     26 
     27 import com.nltk.utils.ETLUtils;
     28 
     29 /**
     30  * @author summer
     31  *
     32  */
     33 public class LawDataEtl {
     34 
     35     public static class CaseETLMapper extends 
     36         Mapper<LongWritable, LawCaseRecord, LongWritable, Text>{
     37 
     38         static final int step = 6;
     39         
     40         LongWritable key = new LongWritable(1);
     41         Text value = new Text();
     42         
     43         @Override
     44         protected void map(
     45                 LongWritable key,
     46                 LawCaseRecord lawCaseRecord,
     47                 Mapper<LongWritable, LawCaseRecord, LongWritable, Text>.Context context)
     48                 throws IOException, InterruptedException {
     49             
     50             System.out.println("-----------------------------"+lawCaseRecord+"------------------------------");
     51             
     52             key.set(lawCaseRecord.id);
     53             String source = ETLUtils.format(lawCaseRecord.source);
     54             List<LawCaseWord> words = ETLUtils.split(lawCaseRecord.id,source, step);
     55             for(LawCaseWord w:words){
     56                 value.set(w.toString());
     57                 context.write(key, value);
     58             }
     59         }
     60     }
     61     
     62                 
     63     static final String driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
     64     static final String dbUrl = "jdbc:sqlserver://192.168.0.1:1433;DatabaseName=XXX";
     65     static final String uid = "XXX";
     66     static final String pwd = "XXX";
     67     static final String inputQuery = "select id,source from tablename where id<1000";
     68     static final String inputCountQuery = "select count(1) from LawDB.dbo.case_source where id<1000";
     69     static final String jarClassPath = "/user/lib/sqljdbc4.jar";
     70     static final String outputPath = "hdfs://ubuntu:9000/user/test";
     71     static final String rowId = "id";
     72     
     73     public static Job configureJob(Configuration conf) throws Exception{
     74         
     75         String jobName = "etlcase";
     76         Job job =  Job.getInstance(conf, jobName);
     77 
     78         job.addFileToClassPath(new Path(jarClassPath));
     79         MSSQLDBInputFormat.setInput(job, LawCaseRecord.class, inputQuery, inputCountQuery,rowId);
     80         job.setJarByClass(LawDataEtl.class);
     81         
     82         FileOutputFormat.setOutputPath(job, new Path(outputPath));
     83         
     84         job.setMapOutputKeyClass(LongWritable.class);
     85         job.setMapOutputValueClass(Text.class);
     86         job.setOutputKeyClass(LongWritable.class);
     87         job.setOutputValueClass(Text.class);
     88         job.setMapperClass(CaseETLMapper.class);
     89         
     90         return job;
     91     }
     92     
     93     public static void main(String[] args) throws Exception{
     94         
     95         Configuration conf = new Configuration();
     96         FileSystem fs = FileSystem.get(conf);
     97         fs.delete(new Path(outputPath), true);
     98         
     99         DBConfiguration.configureDB(conf, driverClass, dbUrl, uid, pwd);
    100         conf.set(MRJobConfig.NUM_MAPS, String.valueOf(10));
    101         Job job = configureJob(conf);
    102         System.out.println("------------------------------------------------");
    103         System.out.println(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
    104         System.out.println(conf.get(DBConfiguration.URL_PROPERTY));
    105         System.out.println(conf.get(DBConfiguration.USERNAME_PROPERTY));
    106         System.out.println(conf.get(DBConfiguration.PASSWORD_PROPERTY));
    107         System.out.println("------------------------------------------------");
    108         System.exit(job.waitForCompletion(true) ? 0 : 1);  
    109         
    110     }
    111 }
    View Code

    辅助类的代码:

     1 /**
     2  * 
     3  */
     4 package com.nltk.sns;
     5 
     6 import java.util.ArrayList;
     7 import java.util.List;
     8 
     9 import org.apache.commons.lang.StringUtils;
    10 
    11 
    12 
    13 
    14 
    15 /**
    16  * @author summer
    17  *
    18  */
    19 public class ETLUtils {
    20 
    21     public final static String NULL_CHAR = "";
    22     public final static String PUNCTUATION_REGEX = "[(\pP)&&[^\|\{\}\#]]+";
    23     public final static String WHITESPACE_REGEX = "[\p{Space}]+";
    24     
    25     public static String format(String s){
    26         
    27         return s.replaceAll(PUNCTUATION_REGEX, NULL_CHAR).replaceAll(WHITESPACE_REGEX, NULL_CHAR); 
    28     }
    29     
    30     public static List<String> split(String s,int stepN){
    31         
    32         List<String> splits = new ArrayList<String>();
    33         if(StringUtils.isEmpty(s) || stepN<1)
    34             return splits;
    35         int len = s.length();
    36         if(len<=stepN)
    37             splits.add(s);
    38         else{
    39             for(int j=1;j<=stepN;j++)
    40                 for(int i=0;i<=len-j;i++){
    41                     String key = StringUtils.mid(s, i,j);
    42                     if(StringUtils.isEmpty(key))
    43                         continue;
    44                     splits.add(key);
    45                 }
    46         }
    47         return splits;
    48         
    49     }
    50     
    51     public static void main(String[] args){
    52         
    53         String s="谢婷婷等与姜波等";
    54         int stepN = 2;
    55         List<String> splits = split(s,stepN);
    56         System.out.println(splits);
    57     }
    58 }
    View Code

    运行成功了

     

    代码初略的实现,主要是为了满足我的需求,大家可以根据自己的需要进行修改。

      实际上DBRecordReader作者实现的并不好,我们来看DBRecordReader、MySQLDBRecordReader和OracleDBRecordReader源码,DBRecordReader和MySQLDBRecordReader耦合度太高。一般而言,就是对于没有具体实现的数据库DBRecordReader也应该做到运行不报异常,无非就是采用单一的SPLIT和单一的MAP

  • 相关阅读:
    常用cmd命令总结
    百度地图api的简单应用(二):轻量级路径规划
    百度地图api的简单应用(一):POI检索
    R语言-八皇后问题
    8086基本IO口的仿真
    输入输出与中断
    汇编语言例子
    变量标号
    变量声明
    串操作指令2
  • 原文地址:https://www.cnblogs.com/bigdatafly/p/5018076.html
Copyright © 2011-2022 走看看