zoukankan      html  css  js  c++  java
  • hadoop中的DistributedCache 2

    WordCount.javaHadoop的分布式缓存机制使得一个job的所有map或reduce可以访问同一份文件。在任务提交后,hadoop将由-files和-archive选项指定的文件复制到HDFS上(JobTracker的文件系统)。在任务运行前,TaskTracker从JobTracker文件系统复制文件到本地磁盘作为缓存,这样任务就可以访问这些文件。对于job来说,它并不关心文件是从哪儿来的。在使用DistributedCache时,对于本地化文件的访问,通常使用Symbolic Link来访问,这样更方便。通过 URI hdfs://namenode/test/input/file1#myfile 指定的文件在当前工作目录中被符号链接为myfile。这样job里面可直接通过myfile来访问文件,而不用关心该文件在本地的具体路径。

    示例如下:

    package org.myorg;

    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.net.URI;
    import java.util.StringTokenizer;

    import java.io.IOException;
    import java.util.*;

    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.util.*;

    public class WordCount
    {
        public static void UseDistributedCacheBySymbolicLink() throws Exception
        {
            FileReader reader = new FileReader("god.txt");
            BufferedReader br = new BufferedReader(reader);
            String s1 = null;
            while ((s1 = br.readLine()) != null)
            {
                System.out.println(s1);
            }
            br.close();
            reader.close();
        }
        

        public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
        {

            public void configure(JobConf job)
            {
                System.out.println("Now, use the distributed cache and syslink");
                try {
                    UseDistributedCacheBySymbolicLink();
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }

            }

            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();

            public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
            {
                String line = value.toString();
                StringTokenizer tokenizer = new StringTokenizer(line);
                while (tokenizer.hasMoreTokens())
                {
                    word.set(tokenizer.nextToken());
                    output.collect(word, one);
                }
            }
        }

        public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
        {
            public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
            {
                int sum = 0;
                while (values.hasNext())
                {
                    sum += values.next().get();
                }
                output.collect(key, new IntWritable(sum));
            }
        }

        public static void main(String[] args) throws Exception
        {
            JobConf conf = new JobConf(WordCount.class);
            conf.setJobName("wordcount");

            conf.setOutputKeyClass(Text.class);
            conf.setOutputValueClass(IntWritable.class);

            conf.setMapperClass(Map.class);
            conf.setCombinerClass(Reduce.class);
            conf.setReducerClass(Reduce.class);

            conf.setInputFormat(TextInputFormat.class);
            conf.setOutputFormat(TextOutputFormat.class);

            FileInputFormat.setInputPaths(conf, new Path(args[0]));
            FileOutputFormat.setOutputPath(conf, new Path(args[1]));

            DistributedCache.createSymlink(conf);
            String path = "/xuxm_dev_test_61_pic/in/WordCount.java";
            Path filePath = new Path(path);
            String uriWithLink = filePath.toUri().toString() + "#" + "god.txt";
            DistributedCache.addCacheFile(new URI(uriWithLink), conf);

            JobClient.runJob(conf);
        }

      程序运行的结果是在jobtracker中的task的log可以看到打印后的/xuxm_dev_test_61_pic/in/WordCount.java文件的内容。

      如果程序中要用到很多小文件,那么使用Symbolic Link将非常方便。

         请在执行前先将WordCount.java文件放到指定位置,否则就会找不到文件

  • 相关阅读:
    mysql备份还原
    java-mysql(3) 读写image
    java-mysql(2) Prepared statement
    java-mysql(1)
    jmeter报告分析工具
    浏览器下载img标签Base64图片
    Java定时器TimeTask
    js倒计时
    h5语音播放(移动端)
    Linux环境下在Tomcat上部署JavaWeb工程
  • 原文地址:https://www.cnblogs.com/xuxm2007/p/2094397.html
Copyright © 2011-2022 走看看