zoukankan      html  css  js  c++  java
  • hadoop开发MapReduce程序

    准备工作:

    1.设置HADOOP_HOME,指向hadoop安装目录

    2.在window下,需要把hadoop/bin那个目录替换下,在网上搜一个对应版本的

    3.如果还报org.apache.hadoop.io.nativeio.NativeIO$Windows.access0错,把其中的hadoop.dll复制到c:windowssystem32目录

    依赖的jar

    1.common
      hadoop-2.7.3sharehadoopcommonhadoop-common-2.7.3.jar
      hadoop-2.7.3sharehadoopcommonlib下的所有
    2.hdfs
      hadoop-2.7.3sharehadoophdfshadoop-hdfs-2.7.3.jar
      hadoop-2.7.3sharehadoophdfslib下的所有
    3.mapreduce
      hadoop-2.7.3sharehadoopmapreducehadoop-mapreduce-client-app-2.7.3.jar
      hadoop-2.7.3sharehadoopmapreducehadoop-mapreduce-client-common-2.7.3.jar
      hadoop-2.7.3sharehadoopmapreducehadoop-mapreduce-client-core-2.7.3.jar
      hadoop-2.7.3sharehadoopmapreducehadoop-mapreduce-client-hs-2.7.3.jar
      hadoop-2.7.3sharehadoopmapreducehadoop-mapreduce-client-hs-plugins-2.7.3.jar
      hadoop-2.7.3sharehadoopmapreducehadoop-mapreduce-client-jobclient-2.7.3.jar
      hadoop-2.7.3sharehadoopmapreducehadoop-mapreduce-client-jobclient-2.7.3-tests.jar
      hadoop-2.7.3sharehadoopmapreducehadoop-mapreduce-client-shuffle-2.7.3.jar
      hadoop-2.7.3sharehadoopmapreducelib下的所有
    4.yarn
      hadoop-2.7.3sharehadoopyarnlibhadoop-yarn-api-2.7.3.jar
      hadoop-2.7.3sharehadoopyarnlibhadoop-yarn-applications-distributedshell-2.7.3.jar
      hadoop-2.7.3sharehadoopyarnlibhadoop-yarn-applications-unmanaged-am-launcher-2.7.3.jar
      hadoop-2.7.3sharehadoopyarnlibhadoop-yarn-client-2.7.3.jar
      hadoop-2.7.3sharehadoopyarnlibhadoop-yarn-common-2.7.3.jar
      hadoop-2.7.3sharehadoopyarnlibhadoop-yarn-registry-2.7.3.jar
      hadoop-2.7.3sharehadoopyarnlibhadoop-yarn-server-applicationhistoryservice-2.7.3.jar
      hadoop-2.7.3sharehadoopyarnlibhadoop-yarn-server-common-2.7.3.jar
      hadoop-2.7.3sharehadoopyarnlibhadoop-yarn-server-nodemanager-2.7.3.jar
      hadoop-2.7.3sharehadoopyarnlibhadoop-yarn-server-resourcemanager-2.7.3.jar
      hadoop-2.7.3sharehadoopyarnlibhadoop-yarn-server-sharedcachemanager-2.7.3.jar
      hadoop-2.7.3sharehadoopyarnlibhadoop-yarn-server-tests-2.7.3.jar
      hadoop-2.7.3sharehadoopyarnlibhadoop-yarn-server-web-proxy-2.7.3.jar
      hadoop-2.7.3sharehadoopyarnlib下的所有

    可以通过maven管理:

    <?xml version="1.0" encoding="UTF-8"?>
        <project xmlns="http://maven.apache.org/POM/4.0.0"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
    
            <groupId>xiaol</groupId>
            <artifactId>xiaol-hadoop</artifactId>
            <version>1.0-SNAPSHOT</version>
            <description>MapReduce</description>
    
            <properties>
                <project.build.sourceencoding>UTF-8</project.build.sourceencoding>
                <hadoop.version>2.7.3</hadoop.version>
            </properties>
            <dependencies>
                <dependency>
                    <groupId>junit</groupId>
                    <artifactId>junit</artifactId>
                    <version>4.12</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-client</artifactId>
                    <version>${hadoop.version}</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-common</artifactId>
                    <version>${hadoop.version}</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-hdfs</artifactId>
                    <version>${hadoop.version}</version>
                </dependency>
            </dependencies>
        </project>

    配置Log4J,放到src/main/resources目录下

    log4j.rootCategory=INFO, stdout
     
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender   
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout   
    log4j.appender.stdout.layout.ConversionPattern=[QC] %p [%t] %C.%M(%L) | %m%n

    编写Mapper:

    package xiaol;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * 整个工作过程:input->split->map->shuffle->reduce->output
     * input:  每一行都是空格分割的单词
     *         hello java
     *         hello python
     * split:   默认按行读取input,每一行作为一个KV对,交给下一步
     *          K就是行首地址,V就是行内容
     *          K:1   V:hello java
     *          K:11  V:hello python
     *          当然这一步可以用户自己重写
     * map:     必须由用户实现的步骤,进行业务逻辑处理
     *          从split的结果中读取数据,统计单词,产生KEYOUT VALUEOUT交给shuffle
     *          这里交给shuffle的K是单词,V是单词出现的次数
     *          hello 1
     *          java 1
     * shuffle  map的结果是KV对的形式,会把相同的K移动到同一个Node上去进行reduce
     *          当传给reduce的时候会相同K的V组装成Iterable<VALUEOUT>类型
     *          hello 1,1
     *          当然这一步可以用户自己重写
     * reduce   必须由用户实现的步骤,进行业务逻辑处理,将shuffle过来的结果进行汇总
     *          从shuffle的结果中读取数据,统计单词,产生KEYOUT VALUEOUT交给output
     *          hello 2
     */
    /**
     * org.apache.hadoop.mapreduce.Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     *     KEYIN    split完成后交给map的key的类型
     *     VALUEIN  split完成后交给map的value的类型
     *     KEYOUT   map完成后交给shuffle的key的类型
     *     VALUEOUT map完成后交给shuffle的key的类型
     * org.apache.hadoop.io.LongWritable    hadoop自己的Long包装类
     * org.apache.hadoop.io.Text            hadoop自己的Text
     * org.apache.hadoop.io.IntWritable     hadoop自己的Int包装类
     */
    public class WordMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
        /**
         * 重写map方法
         * protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException
         *      KEYIN       split完成后交给map的key的类型,就是那一行的起始地址
         *      VALUEIN     split完成后交给map的value的类型,就是那一行的内容
         *      Context     整个MapReduce的执行环境
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String s = value.toString();
            String[] words = s.split(" ");  //由于每一行都是空格分割的单词,比如hello java这种的,要统计个数,就先拆分
            for(String word: words){
                /**
                 * 在执行环境中写入KEYOUT和VALUEOUT作为下一步(shuffle)的输入
                 *
                 * 这一步是要统计在当前处理这一行里每个单词出现的次数,这里直接给了个1
                 * 这里可能有的人会有疑问:如果在某一行里出现了两个相同的单词会怎么样?
                 * 这个是不影响的,比如出现了两个hello,结果就是给shuffle的时候会有两个hello 1
                 * 然后shuffle的时候会把这两个hello 1交给reduce去处理
                 */
                context.write(new Text(word), new IntWritable(1));
            }
        }
    }

    编写Reducer

    package xiaol;
    
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     */
    public class WordReducer extends Reducer<Text, IntWritable, Text, LongWritable> {
    
        /**
         * 重写reduce方法
         * protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException
         *      KEYIN                   shuffle完成后交给reduce的key的类型,其实就是map的KEYOUT
         *      Iterable<VALUEIN>       shuffle完成后交给reduce的value的类型的数组(shuffle那一步会把相同的K分发到同一个node上去进行reduce,所以这里是V数组),其实就是map的VALUEOUT数组
         *      Context                 整个MapReduce的执行环境
         */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
            long count = 0;
            for(IntWritable v : values) {
                count += v.get();
            }
            context.write(key, new LongWritable(count));
        }
    
    }

    编写启动类:

    本地运行(本地数据源,本地计算):

    package xiaol;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.util.Properties;
    
    /**
     *
     */
    public class Test {
        public static void main(String[] args) throws Exception {
            //本地运行直接new一个Configuration,远程运行需要配集群相关的配置
            Configuration conf = new Configuration();
    
            Job job = Job.getInstance(conf);
    
            //设定mapper和reducer的class
            job.setMapperClass(WordMapper.class);
            job.setReducerClass(WordReducer.class);
    
            //设定mapper和outputKey和outputValue的class
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //设定reducer和outputKey和outputValue的class
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
    
            FileInputFormat.setInputPaths(job, "d:/test/test.txt");
            FileOutputFormat.setOutputPath(job, new Path("d:/test/out/"));
    
            //等待结束,true代表打印中间日志
            job.waitForCompletion(true);
        }
    }

    拉取远程数据到本地运行

    package xiaol;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.util.Properties;
    
    /**
     *
     */
    public class Test {
        public static void main(String[] args) throws Exception {
            //本地运行直接new一个Configuration,远程运行需要配集群相关的配置
            Configuration conf = new Configuration();
    
            Job job = Job.getInstance(conf);
    
            //设定mapper和reducer的class
            job.setMapperClass(WordMapper.class);
            job.setReducerClass(WordReducer.class);
    
            //设定mapper和outputKey和outputValue的class
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //设定reducer和outputKey和outputValue的class
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
    
            FileInputFormat.setInputPaths(job, "hdfs://192.168.0.104:9000/input/input.txt");
            FileOutputFormat.setOutputPath(job, new Path("d:/test/out/"));
    
            //等待结束,true代表打印中间日志
            job.waitForCompletion(true);
        }
    }

    在远程运行:

    准备工作:把本地的工程打成一个jar包(程序里要用)

    程序里需要告诉hadoop通过这个jar去做计算,不用手动传到yarn框架里,只要告诉程序就好了

    我这个例子里,直接放在项目根目录下

    package xiaol;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.util.Properties;
    
    /**
     *
     */
    public class Test {
        public static void main(String[] args) throws Exception {
            Properties properties = System.getProperties();
            properties.setProperty("HADOOP_USER_NAME", "root");
    
            Configuration conf = new Configuration();
            //配置hdfs地址
            conf.set("fs.defaultFS", "hdfs://192.168.0.104:9000/");
            //配置运行的是那个jar
            conf.set("mapreduce.job.jar", "xiaolhadoop.jar");
            //配置计算框架
            conf.set("mapreduce.framework.name", "yarn");
            //配置yarn的ResourceManage地址
            conf.set("yarn.resourcemanager.hostname", "192.168.0.104");
            //告诉hadoop这是从window上提交的任务(好像这步也并没有做什么)
            conf.set("mapreduce.app-submission.cross-platform", "true");
    
    
            Job job = Job.getInstance(conf);
    
            //设定mapper和reducer的class
            job.setMapperClass(WordMapper.class);
            job.setReducerClass(WordReducer.class);
    
            //设定mapper和outputKey和outputValue的class
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //设定reducer和outputKey和outputValue的class
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
    
            FileInputFormat.setInputPaths(job, "/input/input.txt");
            FileOutputFormat.setOutputPath(job, new Path("/out/"));
    
            //等待结束,true代表打印中间日志
            job.waitForCompletion(true);
        }
    }
  • 相关阅读:
    script标签加载顺序(defer & async)
    nginx反向代理vue访问时浏览器加载失败,出现 ERR_CONTENT_LENGTH_MISMATCH 问题
    Git每次进入都需要输入用户名和密码的问题解决
    update select
    sql --- where concat
    GO -- 正则表达式
    浏览器中回车(Enter)和刷新的区别是什么?[转载]
    转: Linux --- Supervisor的作用与配置
    Golang 使用Map构建Set类型的实现方法
    linux -- 查看应用启动时间
  • 原文地址:https://www.cnblogs.com/413xiaol/p/10054394.html
Copyright © 2011-2022 走看看