zoukankan      html  css  js  c++  java
  • Hadoop程序基础模板

    分布式编程相对复杂,而Hadoop本身蒙上大数据、云计算等各种面纱,让很多初学者望而却步。可事实上,Hadoop是一个很易用的分布式编程框架,经过良好封装屏蔽了很多分布式环境下的复杂问题,因此,对普通开发者来说很容易,容易到可以照葫芦画瓢。

    大多数Hadoop程序的编写可以简单的依赖于一个模板及其变种。当编写一个新的MapReduce程序时,我们通常采用一个现有的MapReduce程序,通过修改达到我们希望的功能就行了。对于写大部分的Hadoop程序来说几乎就是照葫芦画瓢。这个瓢到底是什么样子呢?还是和小讲一起看看吧。

    使用 Java 语言编写 MapReduce 非常方便,因为 Hadoop 的 API 提供了 Mapper 和 Reducer 抽象类,对开发人员来说,只需要继承这两个抽象类,然后实现抽象类里面的方法就可以了。

    有一份CSV格式专利引用数据,超过1600万行,某几行如下:
    "CITING(引用)","CITED(被引用)"
    3858241,956203
    3858241,1324234
    3858241,3398406
    3858242,1515701
    3858242,3319261
    3858242,3707004
    3858243,1324234
    2858244,1515701
    ...
    对每个专利,我们希望找到引用它的专利并合并,输出如下:
    1324234 3858243,3858241
    1515701 2858244,3858242
    3319261 3858242
    3398406 3858241
    3707004 3858242
    956203 3858241
    ...
    下边的程序就实现了一个这样的功能。很强大的功能,代码就这么少,没想到吧???

     

    下面是一个典型的Hadoop程序模板

     
    package com.dajiangtai.hadoop.junior;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * Hadoop程序基础模板
     */
    public class HadoopTpl extends Configured implements Tool {
    
        public static class   MapClass  extends Mapper< Text,Text,Text,Text> {
        
            public void map(Text key, Text value, Context context) throws IOException, InterruptedException {    
                    
                context.write(value, key);
                
            }
        }
        
        
        
        public static class   ReduceClass extends Reducer< Text, Text, Text, Text> {
            public void reduce(Text key, Iterable< Text> values, Context context)  throws IOException, InterruptedException {
                String csv = "";
                for(Text val:values) {
                    if(csv.length() > 0)
                        csv += ",";
                    csv += val.toString();
                }
                
                context.write(key, new Text(csv));
            }
        }
        
        @Override
        public int   run(String[] args) throws Exception {
            Configuration conf = getConf();    //读取配置文件
            conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
            
            Job job = new Job(conf, "HadoopTpl");//新建一个任务
            job.setJarByClass(HadoopTpl.class);//主类
    
            Path in = new Path(args[0]);
            Path out = new Path(args[1]);
    
            FileSystem hdfs = out.getFileSystem(conf);
            if (hdfs.isDirectory(out)) {
                hdfs.delete(out, true);
            }
            
            FileInputFormat.setInputPaths(job, in);//文件输入
            FileOutputFormat.setOutputPath(job, out);//文件输出
            
            job.setMapperClass(MapClass.class);//Mapper
            job.setReducerClass(ReduceClass.class);//Reducer
            
            job.setInputFormatClass(KeyValueTextInputFormat.class);//文件输入格式
            job.setOutputFormatClass(TextOutputFormat.class);//文件输出格式
            job.setOutputKeyClass(Text.class);//设置作业输出值 Key 的类 
            job.setOutputValueClass(Text.class);//设置作业输出值 Value 的类 
            
            System.exit(job.waitForCompletion(true)?0:1);//等待作业完成退出    
            return 0;
        }
    
        /**
         * @param args 输入文件、输出路径,可在Eclipse的Run Configurations中配Arguments如:
         * hdfs://single.hadoop.dajiangtai.com:9000/junior/patent.txt
         * hdfs://single.hadoop.dajiangtai.com:9000/junior/patent-out/
         */
        public static void   main(String[] args) {
            try {
                int res = ToolRunner.run(new Configuration(), new HadoopTpl(), args);
                System.exit(res);
            } catch (Exception e) {
                e.printStackTrace();
            }        
        }
    }

    可以想像,一份超过1600万的数据,实现这样一个功能,如果我们自己写算法处理,效率和资源耗费很难想像。可使用Hadoop处理起来就是这么简单。是不是很强大?加紧学习吧,少年!

     
  • 相关阅读:
    ASCII码表
    arm linux 下移植busybox 的tftp
    Makefile中的路径
    Wireshark图解教程(简介、抓包、过滤器)【转载】
    在装有windows跟ubuntu的机器上重新安装windows后修复ubuntu的grub
    在linux里建立一个快捷方式,连接到另一个目录
    ubuntu 迁移部分 / 目录下的存储空间到 /home目录
    /etc/ntp.conf
    ntp 配置 autokey 功能【摘录】
    mips-openwrt-linux-gcc test_usbsw.c -o usbsw 编译问题
  • 原文地址:https://www.cnblogs.com/bob-wzb/p/5144577.html
Copyright © 2011-2022 走看看