zoukankan      html  css  js  c++  java
  • hadoop中的DistributedCache 2

    WordCount.javaHadoop的分布式缓存机制使得一个job的所有map或reduce可以访问同一份文件。在任务提交后,hadoop将由-files和-archive选项指定的文件复制到HDFS上(JobTracker的文件系统)。在任务运行前,TaskTracker从JobTracker文件系统复制文件到本地磁盘作为缓存,这样任务就可以访问这些文件。对于job来说,它并不关心文件是从哪儿来的。在使用DistributedCache时,对于本地化文件的访问,通常使用Symbolic Link来访问,这样更方便。通过 URI hdfs://namenode/test/input/file1#myfile 指定的文件在当前工作目录中被符号链接为myfile。这样job里面可直接通过myfile来访问文件,而不用关心该文件在本地的具体路径。

    示例如下:

    package org.myorg;

    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.net.URI;
    import java.util.StringTokenizer;

    import java.io.IOException;
    import java.util.*;

    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.util.*;

    public class WordCount
    {
        public static void UseDistributedCacheBySymbolicLink() throws Exception
        {
            FileReader reader = new FileReader("god.txt");
            BufferedReader br = new BufferedReader(reader);
            String s1 = null;
            while ((s1 = br.readLine()) != null)
            {
                System.out.println(s1);
            }
            br.close();
            reader.close();
        }
        

        public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
        {

            public void configure(JobConf job)
            {
                System.out.println("Now, use the distributed cache and syslink");
                try {
                    UseDistributedCacheBySymbolicLink();
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }

            }

            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();

            public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
            {
                String line = value.toString();
                StringTokenizer tokenizer = new StringTokenizer(line);
                while (tokenizer.hasMoreTokens())
                {
                    word.set(tokenizer.nextToken());
                    output.collect(word, one);
                }
            }
        }

        public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
        {
            public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
            {
                int sum = 0;
                while (values.hasNext())
                {
                    sum += values.next().get();
                }
                output.collect(key, new IntWritable(sum));
            }
        }

        public static void main(String[] args) throws Exception
        {
            JobConf conf = new JobConf(WordCount.class);
            conf.setJobName("wordcount");

            conf.setOutputKeyClass(Text.class);
            conf.setOutputValueClass(IntWritable.class);

            conf.setMapperClass(Map.class);
            conf.setCombinerClass(Reduce.class);
            conf.setReducerClass(Reduce.class);

            conf.setInputFormat(TextInputFormat.class);
            conf.setOutputFormat(TextOutputFormat.class);

            FileInputFormat.setInputPaths(conf, new Path(args[0]));
            FileOutputFormat.setOutputPath(conf, new Path(args[1]));

            DistributedCache.createSymlink(conf);
            String path = "/xuxm_dev_test_61_pic/in/WordCount.java";
            Path filePath = new Path(path);
            String uriWithLink = filePath.toUri().toString() + "#" + "god.txt";
            DistributedCache.addCacheFile(new URI(uriWithLink), conf);

            JobClient.runJob(conf);
        }

      程序运行的结果是在jobtracker中的task的log可以看到打印后的/xuxm_dev_test_61_pic/in/WordCount.java文件的内容。

      如果程序中要用到很多小文件,那么使用Symbolic Link将非常方便。

         请在执行前先将WordCount.java文件放到指定位置,否则就会找不到文件

  • 相关阅读:
    发现对各类项目有用的不同JavaScript的Web UI
    PowerDesigner 15.1 安装步骤详细图解及破解
    数据库设计工具PowerDesigner基础普及
    Vistual Studio 2010(VS2010)安装 MVC3.0具体方法
    pb的网络资源【转】
    powerbuider11 C/S 转换为B/S
    转:将可执行文件注册成系统windows服务
    WCF绑定类型选择(转)
    (转)找增强方法总结
    ALV简单模板1
  • 原文地址:https://www.cnblogs.com/xuxm2007/p/2094397.html
Copyright © 2011-2022 走看看