zoukankan      html  css  js  c++  java
  • 五.hadoop 从mysql中读取数据写到hdfs

    目录:

    目录见文章1

    本文是基于windows下来操作,linux下,mysql-connector-java-5.1.46.jar包的放置有讲究。

    mr程序

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.util.Iterator;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reducer;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.lib.db.DBConfiguration;
    import org.apache.hadoop.mapred.lib.db.DBInputFormat;
    import org.apache.hadoop.mapred.lib.db.DBWritable;
    
    /**
     * @author DELL_pc
     *  @date 2017年6月27日
     */
    public class DbaMysql {
         public static class DBAccessMapper extends MapReduceBase    implements Mapper<LongWritable,StudentRecord , IntWritable, Text>
         {
            public void map(LongWritable key, StudentRecord value, OutputCollector<IntWritable, Text> output,
                    Reporter reporter) throws IOException {
                // TODO Auto-generated method stub
                output.collect(new IntWritable(value.id), new Text(value.toString()));
            }
         }
          public static class DBAccessReduce extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text>
          {
            public void reduce(IntWritable key, Iterator<Text> values, OutputCollector<IntWritable, Text> output,
                    Reporter reporter) throws IOException {
                  while (values.hasNext()) {
                      output.collect(key, values.next());
                    }
            }
          }
         public static void main(String[] args) {
             System.setProperty("hadoop.home.dir", "D:\hadoop-2.7.6");//这一行一定要
            Configuration configuration=new Configuration();
            JobConf jobConf=new JobConf(configuration);
    
            jobConf.setOutputKeyClass(IntWritable.class);
            jobConf.setOutputValueClass(Text.class);
            jobConf.setInputFormat(DBInputFormat.class);
    
    //        String[] fields={"id,name"};
    //        DBInputFormat.setInput(jobConf, StudentRecord.class, "bbb", "length(name)>2", "",fields );//bbb是表名,读取方式1
    
             DBInputFormat.setInput(jobConf, StudentRecord.class,"select id,name from bbb","select 3 from dual");//读取方式2
    
            DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/test","root","admin");
            jobConf.setMapperClass(DBAccessMapper.class);
            jobConf.setReducerClass(DBAccessReduce.class);
            FileOutputFormat.setOutputPath(jobConf,new Path("output_mysql"));
            try {
                JobClient.runJob(jobConf);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        public static class StudentRecord implements Writable, DBWritable {
            int id;
            String name;
            //构造方法
            public StudentRecord() { }
            //Writable接口是对数据流进行操作的,所以输入是DataInput类对象
            public void readFields(DataInput in) throws IOException {
                this.id = in.readInt(); //输入流中的读取下一个整数,并返回
                this.name = Text.readString(in);
            }
            public String toString() {
                return new String(this.id + " " + this.name);
            }
            //DBWritable负责对数据库进行操作,所以输出格式是PreparedStatement
            //PreparedStatement接口继承并扩展了Statement接口,用来执行动态的SQL语句,即包含参数的SQL语句
            public void write(PreparedStatement stmt) throws SQLException {
                stmt.setInt(1, this.id);
                stmt.setString(2, this.name);
            }
            //DBWritable负责对数据库进行操作,输入格式是ResultSet
            // ResultSet接口类似于一张数据表,用来暂时存放从数据库查询操作所获得的结果集
            public void readFields(ResultSet result) throws SQLException {
                this.id = result.getInt(1);
                this.name = result.getString(2);
            }
            //Writable接口是对数据流进行操作的,所以输出是DataOutput类对象
            public void write(DataOutput out) throws IOException {
                out.writeInt(this.id);
                Text.writeString(out, this.name);
            }
        }
    }
    View Code

    pom.xml:

    <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>2.7.3</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.43</version>
            </dependency>
        </dependencies>
    View Code

     

    ————————————————————————————————————————————

    附录:

    本程序在windows上成功后,把打好的jar包 mstest.jar(jar包内已包含mysql驱动类)丢linux的hadoop上跑,会报错,说

    18/07/20 00:34:07 WARN mapreduce.JobResourceUploader: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
    18/07/20 00:34:07 INFO mapreduce.JobSubmitter: Cleaning up the staging area file:/usr/software/hadoop/tmp/mapred/staging/root1889661768/.staging/job_local1889661768_0001
    Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
        at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.setConf(DBInputFormat.java:171)
        at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76)
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)

    既使手动丢个mysql-connector-java-5.1.46.jar到java_home下没用,丢hadoop_home下没用,

    最终丢的目录:

    [hadoop_home]/share/hadoop/yarn/

    然后,mysql就能被hdfs用到了

    参考:Sqoop安装与MapReduce读MySql写HDFS加载不了JDBC驱动问题

  • 相关阅读:
    [转][c#]C# 二维数组到底该如何定义?
    [c++]筛法求素数
    USB驱动问题
    使用Ajax.dll前台调用后台方法及错误示例
    asp.net中前台javascript与后台C#交互
    visual stdio2010 生成的缓存文件
    jQuery.ajax概述[转]
    一种正向最小匹配的中文分词算法
    2010 .NET面试题整理之基础篇[转]
    Winform设计不规则窗体
  • 原文地址:https://www.cnblogs.com/xiaoliu66007/p/9337281.html
Copyright © 2011-2022 走看看