zoukankan      html  css  js  c++  java
  • win7 上使用eclipse向hadoop集群提交job任务,将mysql数据库表记录写到hdfs

    本人小白,这两天在win7上使用eclipse向hadoop集群提交job任务,将mysql数据库表记录写到hdfs,遇到了各种坑。

    首先我在网上找到了类似的博文代码:http://www.cnblogs.com/ljhoracle/p/5272643.html   ,

    但是按照上面的做法,始终没能执行成功。下面是我在上面的代码上使之能执行的步骤。

    1:利用eclipse开发hadoop,所以根据网上,我下载了hadoop插件:hadoop-eclipse-plugin-2.6.4.jar 

    并将其放在eclipse安装目录的plugins下面,重新启动eclipse,并没有如网上的出现hadoop/MR ,

    百度得知是eclipse版本问题(Eclipse Jee Neon), 于是换成了eclipse-java-mars-2-win32-x86_64,

    官网太慢,附上下载地址:https://pan.baidu.com/share/link?shareid=382043580&uk=1376643476&fid=668239004584711

    ,解压,将插件放到plugins下面,启动,OK。 然后new Hadoop Location

    注意事项:user name与集群的用户名一致。hadoopmaster已经在hosts配置了。

    2、新建项目,将代码拷过来,但是执行,总是报空指针异常

    java.lang.NullPointerExceptionat org.apache.hadoop.mapred.LocalDistribu

    任务失败,并且log日志貌似没有记录任务的提交,于是百度搜索,得知跨平台需要增加某些配置,网上相关博文

    3、再执行,报class not found exception,百度得到的解决途径,将工程导出为jar包,再设置conf.setJar("")

    网上相关博文

    4、导出为jar之后,在执行,任务执行OK,但是hdfs上面的文件乱码,=>将eclipse的编码更改为UTF-8。下面是所有代码。

    package cn.com.anuo.dt;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.net.URI;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.lib.IdentityReducer;
    import org.apache.hadoop.mapred.lib.db.DBConfiguration;
    import org.apache.hadoop.mapred.lib.db.DBInputFormat;
    import org.apache.hadoop.mapred.lib.db.DBWritable;
    
    import cn.com.anuo.dt.DBAccess.DBRecord.DBRecordMapper;
    
    @SuppressWarnings("deprecation")
    public class DBAccess {
        
        public static class DBRecord implements Writable, DBWritable {
    
            private int id;
            private String name;
            private String email;
            
            public int getId() {
                return id;
            }
    
            public void setId(int id) {
                this.id = id;
            }
    
            public String getName() {
                return name;
            }
    
            public void setTitle(String name) {
                this.name = name;
            }
    
            public String getEmail() {
                return email;
            }
    
            public void setContent(String email) {
                this.email = email;
            }
    
            @Override
            public String toString() {
                 return this.id + " " + this.name + " " + this.email;  
            }
    
            public void readFields(ResultSet resultSet) throws SQLException {
                this.id = resultSet.getInt("id");
                this.name= resultSet.getString("name");
                this.email= resultSet.getString("email");
                
            }
    
            public void write(PreparedStatement statement) throws SQLException {
                statement.setInt(1, id);
                statement.setString(2, name);
                statement.setString(3, email);
                
            }
    
            public void readFields(DataInput in) throws IOException {
                this.id = in.readInt();;
                this.name= Text.readString(in);
                this.email= Text.readString(in);
                
            }
    
            public void write(DataOutput out) throws IOException {
                out.writeInt(this.id);
                Text.writeString(out, this.name);
                Text.writeString(out, this.email);
                
            }
            
            public static class DBRecordMapper extends MapReduceBase implements Mapper<LongWritable, cn.com.anuo.dt.DBAccess.DBRecord, LongWritable, Text> {
    
                public void map(LongWritable value, cn.com.anuo.dt.DBAccess.DBRecord value1, OutputCollector<LongWritable, Text> output, Reporter reporter)
                        throws IOException {
                    output.collect(new LongWritable(value1.getId()), new Text(value1.toString()));
                    
                }
    
            }
        }
    
        public static void main(String[] args) {
            JobConf conf = new JobConf();
            //设置hadoop集群
            conf.set("yarn.resourcemanager.hostname", "hadoopmaster");
            //设置用户
            conf.set("hadoop.job.user","hadoop");
            // 配置使用跨平台提交任务
            conf.setBoolean("mapreduce.app-submission.cross-platform", true);
            // 指定namenode
            conf.set("fs.defaultFS", "hdfs://hadoopmaster:9000/home/hadoop");
            // 指定使用yarn框架
            conf.set("mapreduce.framework.name", "yarn"); 
            // 指定ResourceManager
            conf.set("yarn.resourcemanager.address", "hadoopmaster:8032");
            // 指定资源分配器
            conf.set("yarn.resourcemanager.scheduler.address", "hadoopmaster:8030");
            //解决classNotFoundException错误
            conf.setJar("E:/hadoop/testJar/hadoop.jar");
            try {
                FileSystem fileSystem = FileSystem.get(
                        URI.create("hdfs://hadoopmaster:9000/"), conf);
                //将jar加到classPath
                DistributedCache
                .addFileToClassPath(
                        new Path(
                                "hdfs://hadoopmaster:9000/lib/mysql-connector-java-5.1.31.jar"),
                                conf, fileSystem);
                
                conf.setOutputKeyClass(LongWritable.class);
                conf.setOutputValueClass(Text.class);
                conf.setInputFormat(DBInputFormat.class);
                //输出目录
                Path path = new Path("hdfs://hadoopmaster:9000/home/hadoop/dl/employee");
                FileOutputFormat.setOutputPath(conf, path);
                
                //配置数据库信息
                DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
                        "**********","**********","************");
                //
                String [] fields = {"id", "name", "email"};
                DBInputFormat.setInput(conf, cn.com.anuo.dt.DBAccess.DBRecord.class, "employee",
                        null, "id", fields);
                //MapReducer
                conf.setMapperClass(DBRecordMapper.class);
                conf.setReducerClass(IdentityReducer.class);
                
                //执行
                JobClient.runJob(conf);
            } catch (IOException e) {
                System.out.println(e.getMessage());
                e.printStackTrace();
            }
        }
    }

    记录下来!

  • 相关阅读:
    dubbo里面的JavaBeanDescriptor是怎么进行序列化和反序列化的?
    为什么dubbo的调用重试不建议设置成超过1
    dubbo中registry、route、directory、cluster、loadbalance、route的关系以及一个引用操作和调用操作到底干了啥
    技术博客-1 DRF框架下的图片(文件)上传
    Scrum meeting 1
    beta设计和计划
    事后分析$alpha$
    项目展示$alpha$
    帮助文档
    发布声明α
  • 原文地址:https://www.cnblogs.com/loveyixiang/p/6734514.html
Copyright © 2011-2022 走看看