zoukankan      html  css  js  c++  java
  • 一个完整的hadoop程序开发过程

    目的

    说明hadoop程序开发过程

    前提条件

    ubuntu或同类OS

    java1.6.0_45

    eclipse-indigo

    hadoop-0.20.2

    hadoop-0.20.2-eclipse-plugin.jar

    各项版本一定要匹配,否则出了问题都不知道是什么原因。

    配置

    配置Java

    详见:Ubuntu下搭建JAVA开发环境及卸载

    配置分布式Hadoop

    详见:hadoop 0.20.2伪分布式安装详解

    伪分布式与分布式有两点主要区别:

    1. 在namenode节点配置完成hadoop以后,需要用scp把hadoop复制到datanode节点,为了方便,最好全部机器的路径都是一样的,比如都在/opt/hadoop-0.20.2中。
    2. conf目录下的masters文件要把默认的localhost改成namenode节点的主机名或IP地址,Slaves文件中,要把localhost改成datanode节点的主机名或IP

     

    eclipse的hadoop插件配置

    hadoop-0.20.2-eclipse-plugin.jar是一个 eclipse中的hadoop插件。

    它的作用是实现了HDFS的可视化操作,如果没有它,就要在大量地在终端输入命令,每个命令都是以bin/hadoop dfs开头。

    如果你是新手,可能还觉得很新鲜,如果很熟悉命令的话,就会觉得很烦。新手总会变成老手,所以这个插件还是有必要的。

    下面简单说一下配置过程:

    eclipse和hadoop-eclipse-plugin这套插件的版本要求非常高,一定要高度匹配才能用。另一篇博文写了一部分对应关系:https://www.cnblogs.com/Sabre/p/10621064.html

    1.下载hadoop-0.20.2-eclipse-plugin.jar,自行搜索。官网不太容易找旧版本。

    2.把此jar放到eclipse插件目录下,一般是plugins目录

    重新启动eclipse,如果版本正确,此时在eclipse中的project exporer中应该可以看到DFS Locations项。如果没有出现,很可能是版本的问题。

     

    3.配置Hadoop所在目录。eclipse-->window菜单-->Preferences-->Hadoop Map/Reduce,右侧输入或选择你的Hadoop目录

    4.显示Map/Reduce Locations窗口。eclipse-->window菜单-->Open Perspective-->Other,选择蓝色的小象图标Map/Reduce,会在下面出黄色的小象窗口,Map/Reduce Locations

    5.配置Hadoop LocationMap/Reduce Locations中右键,New Hadoop Location,出现配置窗口,location name随便你写。下面的Map/Reduce Master框中的host,如果是分布式就用IP或主机名,不要用默认的localhost。port改成9000。DFS Master框中的Use M/R Master host默认打勾保持不变,下面的Port改成9001 。user name 一般默认中不中 ,    

     至此,eclipse的hadoop插件就配置完成了。

    编写程序

    以下的程序是从《hadoop实战》中脱胎出来的,之所以说脱胎,是因为原书中的代码缺少很多条件,不加以完善是无法运行的。这本书写得不好,感觉是为了评职称之类的事情,让学生给凑的,里面很多硬伤。之所以还在硬着头皮看下去,是因为多少还是讲了一些东西,同时也挑战一下自己,面对不那么完善的环境时,能否解决问题,而不是一味地寻找更好的教材,这是在豆瓣上写的一篇书评:https://book.douban.com/review/10071283/

     1.打开eclipse,新建java项目。右键项目,properties,Java Builder Path,Libraries,Add External JARS,找到hadoop的目录,把根目录下的几个jar包都添加进来。

    2.新建类,Score_process.java,复制粘贴以下代码:

    package pkg1;
    
    import java.net.URI;
    import java.util.Iterator;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem; 
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    public  class Score_process extends Configured implements Tool {
        
        //内部类Map
        public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
            
            //map方法
            public void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
                
                System.out.println("key值:" + key);
                String line = value.toString();//将输入的纯文本文件的数据转化为string
                
                //将输入的数据按行分割
                StringTokenizer tokenizerArticle = new StringTokenizer(line, "
    ");
                
                //分别对每一行进行处理
                while (tokenizerArticle.hasMoreTokens()) {
                    
                    //每行按空格划分
                    StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
                    String nameString = tokenizerLine.nextToken();
                    String scoreString = tokenizerLine.nextToken();
                    Text name = new Text(nameString); 
                    int scoreInt = Integer.parseInt(scoreString);
                    context.write(name, new IntWritable(scoreInt));//输出姓名和成绩
                }
            };
        }
        
        //内部类Reduce
        public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
            
            //reduce方法
            public void reduce(Text key, java.lang.Iterable<IntWritable> values, Context context) throws java.io.IOException ,InterruptedException {
                
                int sum=0;
                int count=0;
                Iterator<IntWritable> iterator = values.iterator();
                
                while (iterator.hasNext()) {
                    sum += iterator.next().get();
                    count++;
                }
                
                int average = (int)sum/count;
                context.write(key, new IntWritable(average));
            };
            
        }    
        
        public int run(String[] args) throws Exception {
            
            Configuration configuration = getConf();
            
            //configuration.set("mapred", "Score_Process.jar");
            
            //准备环境,删除已经存在的output2目录,保证输出目录不存在**开始************
            final String uri = "hdfs://192.168.1.8:9000/";
            FileSystem fs = FileSystem.get(URI.create(uri),configuration);
            final String path = "/user/grid/output2";
            boolean exists = fs.exists(new Path(path));
            if(exists){
                fs.delete(new Path(path),true);
            }
            //准备环境,删除已经存在的output2目录,保证输出目录不存在**结束************
            
            Job job= new Job(configuration);
            
            job.setJobName("Score_process");
            job.setJarByClass(Score_process.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            job.setMapperClass(Map.class);
            
            job.setCombinerClass(Reduce.class);
            job.setReducerClass(Reduce.class);
            
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
    //        System.out.println(new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            boolean success = job.waitForCompletion(true);    
            
            return success ? 0:1;
    
        }
        
        
        public static void main(String[] args) throws Exception {
            
            int ret = ToolRunner.run(new Score_process1(), args);
            
            System.exit(ret);
        }
    }

    以上的代码中,有不少是套路,固定的模板。

    Map是处理输入参数中给定的文本文件,处理完毕后,输出到HDFS,供reduce调用。 context.write(name, new IntWritable(scoreInt));这一句是关键。

    Reduce调用map方法的结果,reduce后,写到OS文件系统。context.write(key, new IntWritable(average));这一句是关键。

    整个run方法,需要改的只有setJobName和setJarByClass类的名字,其他的不用动。

    整个main方法,不用动。

    程序部分基本上就是这样。

     编译

    终端中输入

    javac -classpath /opt/hadoop-0.20.2/hadoop-0.20.2-core.jar -d ~/allTest/ScoreProcessFinal/class ~/workspace-indigo/test5/src/pkg1/Score_process.java

    如果没有报错,就说明编译成功。

    打包

    jar -cvf ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar -C ~/allTest/ScoreProcessFinal/class .

    可以用以下命令查看包里的文件:
    jar vtf ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar

    执行

    执行可以分为两种方式,一种在eclipse中,另一种在终端。

    eclipse中运行

    配置运行参数。run configurations,arguments,Program arguments:

    文本框中输入:hdfs://host-thinkpad:9000/user/grid/input2 hdfs://host-thinkpad:9000/user/grid/output2

    就是输入目录和输出目录,注意中间有个空格。

    终端中运行

    /opt/hadoop-0.20.2/bin/hadoop jar ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar pkg1.Score_process1 input2 output2

     

    这就是hadoop开发的全过程框架。

    其实在此期间发生了很多各种各样的问题,分别记录在各个博文中了。

  • 相关阅读:
    在线教程的游戏化-20分钟做了个demo
    (转)内江师院网络专业大二学子实习快报
    学云网线下国信安教育走进川师大
    Ruby on Rails框架开发学习
    Unity3D编程学习分享
    后高考时代规划专家推荐 学云网“学历技能就业”
    Tomcat调优及JMX监控
    XenServer pool 移除server 设置master
    计算程序总行数的Python代码
    SQL Server 无法生成 FRunCM 线程。请查看 SQL Server 错误日志和 Windows 事件日志
  • 原文地址:https://www.cnblogs.com/Sabre/p/10624804.html
Copyright © 2011-2022 走看看