zoukankan      html  css  js  c++  java
  • 一个完整的hadoop程序开发过程

    目的

    说明hadoop程序开发过程

    前提条件

    ubuntu或同类OS

    java1.6.0_45

    eclipse-indigo

    hadoop-0.20.2

    hadoop-0.20.2-eclipse-plugin.jar

    各项版本一定要匹配,否则出了问题都不知道是什么原因。

    配置

    配置Java

    详见:Ubuntu下搭建JAVA开发环境及卸载

    配置分布式Hadoop

    详见:hadoop 0.20.2伪分布式安装详解

    伪分布式与分布式有两点主要区别:

    1. 在namenode节点配置完成hadoop以后,需要用scp把hadoop复制到datanode节点,为了方便,最好全部机器的路径都是一样的,比如都在/opt/hadoop-0.20.2中。
    2. conf目录下的masters文件要把默认的localhost改成namenode节点的主机名或IP地址,Slaves文件中,要把localhost改成datanode节点的主机名或IP

     

    eclipse的hadoop插件配置

    hadoop-0.20.2-eclipse-plugin.jar是一个 eclipse中的hadoop插件。

    它的作用是实现了HDFS的可视化操作,如果没有它,就要在大量地在终端输入命令,每个命令都是以bin/hadoop dfs开头。

    如果你是新手,可能还觉得很新鲜,如果很熟悉命令的话,就会觉得很烦。新手总会变成老手,所以这个插件还是有必要的。

    下面简单说一下配置过程:

    eclipse和hadoop-eclipse-plugin这套插件的版本要求非常高,一定要高度匹配才能用。另一篇博文写了一部分对应关系:https://www.cnblogs.com/Sabre/p/10621064.html

    1.下载hadoop-0.20.2-eclipse-plugin.jar,自行搜索。官网不太容易找旧版本。

    2.把此jar放到eclipse插件目录下,一般是plugins目录

    重新启动eclipse,如果版本正确,此时在eclipse中的project exporer中应该可以看到DFS Locations项。如果没有出现,很可能是版本的问题。

     

    3.配置Hadoop所在目录。eclipse-->window菜单-->Preferences-->Hadoop Map/Reduce,右侧输入或选择你的Hadoop目录

    4.显示Map/Reduce Locations窗口。eclipse-->window菜单-->Open Perspective-->Other,选择蓝色的小象图标Map/Reduce,会在下面出黄色的小象窗口,Map/Reduce Locations

    5.配置Hadoop LocationMap/Reduce Locations中右键,New Hadoop Location,出现配置窗口,location name随便你写。下面的Map/Reduce Master框中的host,如果是分布式就用IP或主机名,不要用默认的localhost。port改成9000。DFS Master框中的Use M/R Master host默认打勾保持不变,下面的Port改成9001 。user name 一般默认中不中 ,    

     至此,eclipse的hadoop插件就配置完成了。

    编写程序

    以下的程序是从《hadoop实战》中脱胎出来的,之所以说脱胎,是因为原书中的代码缺少很多条件,不加以完善是无法运行的。这本书写得不好,感觉是为了评职称之类的事情,让学生给凑的,里面很多硬伤。之所以还在硬着头皮看下去,是因为多少还是讲了一些东西,同时也挑战一下自己,面对不那么完善的环境时,能否解决问题,而不是一味地寻找更好的教材,这是在豆瓣上写的一篇书评:https://book.douban.com/review/10071283/

     1.打开eclipse,新建java项目。右键项目,properties,Java Builder Path,Libraries,Add External JARS,找到hadoop的目录,把根目录下的几个jar包都添加进来。

    2.新建类,Score_process.java,复制粘贴以下代码:

    package pkg1;
    
    import java.net.URI;
    import java.util.Iterator;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem; 
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    public  class Score_process extends Configured implements Tool {
        
        //内部类Map
        public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
            
            //map方法
            public void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
                
                System.out.println("key值:" + key);
                String line = value.toString();//将输入的纯文本文件的数据转化为string
                
                //将输入的数据按行分割
                StringTokenizer tokenizerArticle = new StringTokenizer(line, "
    ");
                
                //分别对每一行进行处理
                while (tokenizerArticle.hasMoreTokens()) {
                    
                    //每行按空格划分
                    StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
                    String nameString = tokenizerLine.nextToken();
                    String scoreString = tokenizerLine.nextToken();
                    Text name = new Text(nameString); 
                    int scoreInt = Integer.parseInt(scoreString);
                    context.write(name, new IntWritable(scoreInt));//输出姓名和成绩
                }
            };
        }
        
        //内部类Reduce
        public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
            
            //reduce方法
            public void reduce(Text key, java.lang.Iterable<IntWritable> values, Context context) throws java.io.IOException ,InterruptedException {
                
                int sum=0;
                int count=0;
                Iterator<IntWritable> iterator = values.iterator();
                
                while (iterator.hasNext()) {
                    sum += iterator.next().get();
                    count++;
                }
                
                int average = (int)sum/count;
                context.write(key, new IntWritable(average));
            };
            
        }    
        
        public int run(String[] args) throws Exception {
            
            Configuration configuration = getConf();
            
            //configuration.set("mapred", "Score_Process.jar");
            
            //准备环境,删除已经存在的output2目录,保证输出目录不存在**开始************
            final String uri = "hdfs://192.168.1.8:9000/";
            FileSystem fs = FileSystem.get(URI.create(uri),configuration);
            final String path = "/user/grid/output2";
            boolean exists = fs.exists(new Path(path));
            if(exists){
                fs.delete(new Path(path),true);
            }
            //准备环境,删除已经存在的output2目录,保证输出目录不存在**结束************
            
            Job job= new Job(configuration);
            
            job.setJobName("Score_process");
            job.setJarByClass(Score_process.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            job.setMapperClass(Map.class);
            
            job.setCombinerClass(Reduce.class);
            job.setReducerClass(Reduce.class);
            
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
    //        System.out.println(new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            boolean success = job.waitForCompletion(true);    
            
            return success ? 0:1;
    
        }
        
        
        public static void main(String[] args) throws Exception {
            
            int ret = ToolRunner.run(new Score_process1(), args);
            
            System.exit(ret);
        }
    }

    以上的代码中,有不少是套路,固定的模板。

    Map是处理输入参数中给定的文本文件,处理完毕后,输出到HDFS,供reduce调用。 context.write(name, new IntWritable(scoreInt));这一句是关键。

    Reduce调用map方法的结果,reduce后,写到OS文件系统。context.write(key, new IntWritable(average));这一句是关键。

    整个run方法,需要改的只有setJobName和setJarByClass类的名字,其他的不用动。

    整个main方法,不用动。

    程序部分基本上就是这样。

     编译

    终端中输入

    javac -classpath /opt/hadoop-0.20.2/hadoop-0.20.2-core.jar -d ~/allTest/ScoreProcessFinal/class ~/workspace-indigo/test5/src/pkg1/Score_process.java

    如果没有报错,就说明编译成功。

    打包

    jar -cvf ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar -C ~/allTest/ScoreProcessFinal/class .

    可以用以下命令查看包里的文件:
    jar vtf ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar

    执行

    执行可以分为两种方式,一种在eclipse中,另一种在终端。

    eclipse中运行

    配置运行参数。run configurations,arguments,Program arguments:

    文本框中输入:hdfs://host-thinkpad:9000/user/grid/input2 hdfs://host-thinkpad:9000/user/grid/output2

    就是输入目录和输出目录,注意中间有个空格。

    终端中运行

    /opt/hadoop-0.20.2/bin/hadoop jar ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar pkg1.Score_process1 input2 output2

     

    这就是hadoop开发的全过程框架。

    其实在此期间发生了很多各种各样的问题,分别记录在各个博文中了。

  • 相关阅读:
    Leetcode Reverse Words in a String
    topcoder SRM 619 DIV2 GoodCompanyDivTwo
    topcoder SRM 618 DIV2 MovingRooksDiv2
    topcoder SRM 618 DIV2 WritingWords
    topcoder SRM 618 DIV2 LongWordsDiv2
    Zepto Code Rush 2014 A. Feed with Candy
    Zepto Code Rush 2014 B
    Codeforces Round #245 (Div. 2) B
    Codeforces Round #245 (Div. 2) A
    Codeforces Round #247 (Div. 2) B
  • 原文地址:https://www.cnblogs.com/Sabre/p/10624804.html
Copyright © 2011-2022 走看看