zoukankan      html  css  js  c++  java
  • MapReduce入门(二)合并小文件

    hadoop为什么要合并小文件?

            小文件是指文件size小于HDFS上block大小的文件。这样的文件会给hadoop的扩展性和性能带来严重问题。首先,在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,每个对象约占150byte,如果有1000 0000个小文件,每个文件占用一个block,则namenode大约需要2G空间。如果存储1亿个文件,则namenode需要20G空间(见参考资料[1][4][5])。这样namenode内存容量严重制约了集群的扩展。 其次,访问大量小文件速度远远小于访问几个大文件。HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个datanode跳到另一个datanode,严重影响性能。最后,处理大量小文件速度远远小于处理同等大小的大文件的速度。每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上。

    一、创建MergeSmallFileJob 类:用于实现合并小文件的任务(2M一下属于小文件) 

    package cn.itxiaobai;
    
    import com.google.common.io.Resources;
    import com.utils.CDUPUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocatedFileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.RemoteIterator;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    /**
     * 合并小文件的任务(2M一下属于小文件)
     */
    public class MergeSmallFileJob {
    
        public static class MergeSmallFileMapper extends Mapper<LongWritable,Text,Text,Text>{
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
               //将文件名作为key,内容作为value输出
               //1.获取文件名
                FileSplit inputSplit = (FileSplit) context.getInputSplit();
                String fileName = inputSplit.getPath().getName();
                //打印文件名以及与之对应的内容
                context.write(new Text(fileName),value);
            }
        }
    
        public static class MergeSmallFileReduce extends Reducer<Text,Text,Text,Text>{
            /**
             *
             * @param key:文件名
             * @param values:一个文件的所有内容
             * @param context
             * @throws IOException
             * @throws InterruptedException
             */
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                //将迭代器中的内容拼接
                Iterator<Text> iterator = values.iterator();
                //使用StringBuffer
                StringBuffer stringBuffer = new StringBuffer();
                while (iterator.hasNext()){
                    stringBuffer.append(iterator.next()).append(",");
                }
                //打印
                context.write(key,new Text(stringBuffer.toString()));
            }
        }
    
        public static class MyJob{
            public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                Configuration coreSiteConf = new Configuration();
                coreSiteConf.addResource(Resources.getResource("core-site-local.xml"));
                //设置一个任务
                Job job = Job.getInstance(coreSiteConf, "my small merge big file");
                //设置job的运行类
                job.setJarByClass(MyJob.class);
    
                //设置Map和Reduce处理类
                job.setMapperClass(MergeSmallFileMapper.class);
                job.setReducerClass(MergeSmallFileReduce.class);
    
                //map输出类型
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);
                //设置job/reduce输出类型
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
    
                FileSystem fileSystem = FileSystem.get(coreSiteConf);
                //listFiles:可以迭代便利文件
                RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"), true);
                while (listFiles.hasNext()) {
                    LocatedFileStatus fileStatus = listFiles.next();
                    Path filesPath = fileStatus.getPath();
                    if (!fileStatus.isDirectory()) {
                        //判断大小 及格式
                        if (fileStatus.getLen() < 2 * 1014 * 1024 && filesPath.getName().contains(".txt")) {
                            //文件输入路径
                            FileInputFormat.addInputPath(job,filesPath);
                        }
                    }
                }
    
                //删除存在目录
                CDUPUtils.deleteFileName("/mymergeout");
    
                FileOutputFormat.setOutputPath(job, new Path("/mymergeout"));
                //运行任务
                boolean flag = job.waitForCompletion(true);
                if (flag){
                    System.out.println("文件读取内容如下:");
                    CDUPUtils.readContent("/mymergeout/part-r-00000");
                }else {
                    System.out.println("文件加载失败....");
                }
    
            }
        }
    }
    二、里面用到自己写的工具类CDUPUtils :用于删除已存在目录以及阅读文件内容
    package com.utils;
    
    import com.google.common.io.Resources;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.LineNumberReader;
    import java.util.ArrayList;
    
    public class CDUPUtils {
        //删除已经存在在hdfs上面的文件文件
        public static void deleteFileName(String path) throws IOException {
            //将要删除的文件
            Path fileName = new Path(path);
            Configuration entries = new Configuration();
            //解析core-site-master2.xml文件
            entries.addResource(Resources.getResource("core-site-local.xml"));
            //coreSiteConf.set(,);--在这里可以添加配置文件
            FileSystem fileSystem = FileSystem.get(entries);
            if (fileSystem.exists(fileName)){
                System.out.println(fileName+"已经存在,正在删除它...");
                boolean flag = fileSystem.delete(fileName, true);
                if (flag){
                    System.out.println(fileName+"删除成功");
                }else {
                    System.out.println(fileName+"删除失败!");
                    return;
                }
            }
            //关闭资源
            fileSystem.close();
        }
    
        //读取文件内容
        public static void readContent(String path) throws IOException {
            //将要读取的文件路径
            Path fileName = new Path(path);
            ArrayList<String> returnValue = new ArrayList<String>();
            Configuration configuration = new Configuration();
            configuration.addResource(Resources.getResource("core-site-local.xml"));
            //获取客户端系统文件
            FileSystem fileSystem = FileSystem.get(configuration);
            //open打开文件--获取文件的输入流用于读取数据
            FSDataInputStream inputStream = fileSystem.open(fileName);
            InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
            //一行一行的读取数据
            LineNumberReader lineNumberReader = new LineNumberReader(inputStreamReader);
            //定义一个字符串变量用于接收每一行的数据
            String str = null;
            //判断何时没有数据
            while ((str=lineNumberReader.readLine())!=null){
                returnValue.add(str);
            }
            //打印数据到控制台
            System.out.println("MapReduce算法操作的文件内容如下:");
            for (String read :
                    returnValue) {
                System.out.println(read);
            }
            //关闭资源
            lineNumberReader.close();
            inputStream.close();
            inputStreamReader.close();
        }
    }
    
    

     配置文件:cort-site-local.xml--------注意里面的主机IP需要填写自己的

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
        <property>
            <name>fs.defaultFS</name>
            <value>hdfs://master2:9000</value>
        </property>
        <property>
            <name>fs.hdfs.impl</name>
            <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
        </property>
    </configuration>
    
    

    pom中添加的依赖 

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.zhiyou100</groupId>
        <artifactId>mrdemo</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <org.apache.hadoop.version>2.7.5</org.apache.hadoop.version>
        </properties>
    
        <!--分布式计算-->
        <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>${org.apache.hadoop.version}</version>
            </dependency>
                <dependency>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-mapreduce-client-common</artifactId>
                    <version>${org.apache.hadoop.version}</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
                    <version>${org.apache.hadoop.version}</version>
                </dependency>
    
            <!--分布式存储-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${org.apache.hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${org.apache.hadoop.version}</version>
            </dependency>
            </dependencies>
    </project>
    

    在本地直接运行(右击Run)测试

  • 相关阅读:
    sqlserver 分布式连接方式
    介绍27款经典的CSS框架
    一些初级问题小总结
    WCF>一个运行环境,一个服务逻辑人,一个客户
    学习wcf过程的总结
    .Net 程序员面试 C# 语言篇 (回答Scott Hanselman的问题)(转)
    wcf一
    互联网用户能力解放带来的革命
    编程实现>ASP.NET 3.5开发范例精讲精析>探讨FormView控件
    互联网未来的趋势
  • 原文地址:https://www.cnblogs.com/pigdata/p/10305600.html
Copyright © 2011-2022 走看看