zoukankan      html  css  js  c++  java
  • hadoop 文件合并

    来自:http://blog.csdn.net/dandingyy/article/details/7490046

    众所周知,Hadoop对处理单个大文件比处理多个小文件更有效率,另外单个文件也非常占用HDFS的存储空间。所以往往要将其合并起来。

    1,getmerge

    hadoop有一个命令行工具getmerge,用于将一组HDFS上的文件复制到本地计算机以前进行合并

    参考:http://hadoop.apache.org/common/docs/r0.19.2/cn/hdfs_shell.html

    使用方法:hadoop fs -getmerge <src> <localdst> [addnl]

    接受一个源目录和一个目标文件作为输入,并且将源目录中所有的文件连接成本地目标文件。addnl是可选的,用于指定在每个文件结尾添加一个换行符。

    多嘴几句:调用文件系统(FS)Shell命令应使用 bin/hadoop fs <args>的形式。 所有的的FS shell命令使用URI路径作为参数。URI格式是scheme://authority/path

    2.putmerge

    将本地小文件合并上传到HDFS文件系统中。

    一种方法可以现在本地写一个脚本,先将一个文件合并为一个大文件,然后将整个大文件上传,这种方法占用大量的本地磁盘空间;

    另一种方法如下,在复制的过程中上传。参考:《hadoop in action》

    1. import java.io.IOException;  
    2.   
    3. import org.apache.hadoop.conf.Configuration;  
    4. import org.apache.hadoop.fs.FSDataInputStream;  
    5. import org.apache.hadoop.fs.FSDataOutputStream;  
    6. import org.apache.hadoop.fs.FileStatus;  
    7. import org.apache.hadoop.fs.FileSystem;  
    8. import org.apache.hadoop.fs.Path;  
    9. import org.apache.hadoop.io.IOUtils;  
    10.   
    11. //参数1为本地目录,参数2为HDFS上的文件  
    12. public class PutMerge {  
    13.       
    14.     public static void putMergeFunc(String LocalDir, String fsFile) throws IOException  
    15.     {  
    16.         Configuration  conf = new Configuration();  
    17.         FileSystem fs = FileSystem.get(conf);       //fs是HDFS文件系统  
    18.         FileSystem local = FileSystem.getLocal(conf);   //本地文件系统  
    19.           
    20.         Path localDir = new Path(LocalDir);  
    21.         Path HDFSFile = new Path(fsFile);  
    22.           
    23.         FileStatus[] status =  local.listStatus(localDir);  //得到输入目录  
    24.         FSDataOutputStream out = fs.create(HDFSFile);       //在HDFS上创建输出文件  
    25.           
    26.         for(FileStatus st: status)  
    27.         {  
    28.             Path temp = st.getPath();  
    29.             FSDataInputStream in = local.open(temp);  
    30.             IOUtils.copyBytes(in, out, 4096, false);    //读取in流中的内容放入out  
    31.             in.close(); //完成后,关闭当前文件输入流  
    32.         }  
    33.         out.close();  
    34.     }  
    35.     public static void main(String [] args) throws IOException  
    36.     {  
    37.         String l = "/home/kqiao/hadoop/MyHadoopCodes/putmergeFiles";  
    38.         String f = "hdfs://ubuntu:9000/user/kqiao/test/PutMergeTest";  
    39.         putMergeFunc(l,f);  
    40.     }  
    41. }  

    3.将小文件打包成SequenceFile的MapReduce任务

    来自:《hadoop权威指南》

    实现将整个文件作为一条记录处理的InputFormat:

    1. public class WholeFileInputFormat  
    2.     extends FileInputFormat<NullWritable, BytesWritable> {  
    3.     
    4.   @Override  
    5.   protected boolean isSplitable(JobContext context, Path file) {  
    6.     return false;  
    7.   }  
    8.   
    9.   @Override  
    10.   public RecordReader<NullWritable, BytesWritable> createRecordReader(  
    11.       InputSplit split, TaskAttemptContext context) throws IOException,  
    12.       InterruptedException {  
    13.     WholeFileRecordReader reader = new WholeFileRecordReader();  
    14.     reader.initialize(split, context);  
    15.     return reader;  
    16.   }  
    17. }  

    实现上面类中使用的定制的RecordReader:

    1. /实现一个定制的RecordReader,这六个方法均为继承的RecordReader要求的虚函数。  
    2. //实现的RecordReader,为自定义的InputFormat服务  
    3. public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{  
    4.   
    5.     private FileSplit fileSplit;  
    6.     private Configuration conf;  
    7.     private BytesWritable value = new BytesWritable();  
    8.     private boolean processed = false;  
    9.     @Override  
    10.     public void close() throws IOException {  
    11.         // do nothing  
    12.     }  
    13.   
    14.     @Override  
    15.     public NullWritable getCurrentKey() throws IOException,  
    16.             InterruptedException {  
    17.         return NullWritable.get();  
    18.     }  
    19.   
    20.     @Override  
    21.     public BytesWritable getCurrentValue() throws IOException,  
    22.             InterruptedException {  
    23.         return value;  
    24.     }  
    25.   
    26.     @Override  
    27.     public float getProgress() throws IOException, InterruptedException {  
    28.         return processed? 1.0f : 0.0f;  
    29.     }  
    30.   
    31.     @Override  
    32.     public void initialize(InputSplit split, TaskAttemptContext context)  
    33.             throws IOException, InterruptedException {  
    34.         this.fileSplit = (FileSplit) split;  
    35.         this.conf = context.getConfiguration();  
    36.     }  
    37.   
    38.     //process表示记录是否已经被处理过  
    39.     @Override  
    40.     public boolean nextKeyValue() throws IOException, InterruptedException {  
    41.         if (!processed) {  
    42.             byte[] contents = new byte[(int) fileSplit.getLength()];  
    43.             Path file = fileSplit.getPath();  
    44.             FileSystem fs = file.getFileSystem(conf);  
    45.             FSDataInputStream in = null;  
    46.             try {  
    47.               in = fs.open(file);  
    48.                               //将file文件中 的内容放入contents数组中。使用了IOUtils实用类的readFully方法,将in流中得内容放入  
    49.               //contents字节数组中。  
    50.               IOUtils.readFully(in, contents, 0, contents.length);  
    51.               //BytesWritable是一个可用做key或value的字节序列,而ByteWritable是单个字节。  
    52.                                 //将value的内容设置为contents的值  
    53.               value.set(contents, 0, contents.length);  
    54.             } finally {  
    55.               IOUtils.closeStream(in);  
    56.             }  
    57.             processed = true;  
    58.             return true;  
    59.           }  
    60.           return false;  
    61.     }  
    62. }  

    将小文件打包成SequenceFile:

      1. public class SmallFilesToSequenceFileConverter extends Configured implements Tool{  
      2.   
      3.     //静态内部类,作为mapper  
      4.     static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>  
      5.     {  
      6.         private Text filenameKey;  
      7.           
      8.         //setup在task开始前调用,这里主要是初始化filenamekey  
      9.         @Override  
      10.         protected void setup(Context context)  
      11.         {  
      12.             InputSplit split = context.getInputSplit();  
      13.             Path path = ((FileSplit) split).getPath();  
      14.             filenameKey = new Text(path.toString());  
      15.         }  
      16.         @Override  
      17.         public void map(NullWritable key, BytesWritable value, Context context)  
      18.                 throws IOException, InterruptedException{  
      19.             context.write(filenameKey, value);  
      20.         }  
      21.     }  
      22.   
      23.     @Override  
      24.     public int run(String[] args) throws Exception {  
      25.         Configuration conf = new Configuration();  
      26.         Job job = new Job(conf);  
      27.         job.setJobName("SmallFilesToSequenceFileConverter");  
      28.           
      29.         FileInputFormat.addInputPath(job, new Path(args[0]));  
      30.         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
      31.           
      32.         //再次理解此处设置的输入输出格式。。。它表示的是一种对文件划分,索引的方法  
      33.         job.setInputFormatClass(WholeFileInputFormat.class);  
      34.         job.setOutputFormatClass(SequenceFileOutputFormat.class);  
      35.           
      36.         //此处的设置是最终输出的key/value,一定要注意!  
      37.         job.setOutputKeyClass(Text.class);  
      38.         job.setOutputValueClass(BytesWritable.class);  
      39.           
      40.         job.setMapperClass(SequenceFileMapper.class);  
      41.           
      42.         return job.waitForCompletion(true) ? 0 : 1;  
      43.     }  
      44.       
      45.     public static void main(String [] args) throws Exception  
      46.     {  
      47.         int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), args);  
      48.         System.exit(exitCode);  
      49.     }  
      50. }  
  • 相关阅读:
    【Azure 应用服务】在Azure App Service多实例的情况下,如何在应用中通过代码获取到实例名(Instance ID)呢?
    【Azure 应用服务】App Service For Windows 中如何设置代理实现前端静态文件和后端Java Spring Boot Jar包
    【Azure Developer】使用Azure Key Vault 的Key签名后,离线验证的一些参考资料
    【Azure Function】调试 VS Code Javascript Function本地不能运行,报错 Value cannot be null. (Parameter 'provider')问题
    【Azure 应用服务】App Service 使用Tomcat运行Java应用,如何设置前端网页缓存的相应参数呢(Xms512m Xmx1204m)?
    【Azure API 管理】APIM添加Logtoeventhub的策略后,一些相关APIM与Event Hub的问题
    【Azure API 管理】为调用APIM的请求启用Trace 调试APIM Policy的利器
    【Azure 事件中心】China Azure上是否有Kafka服务简答
    【Azure 应用服务】探索在Azure上设置禁止任何人访问App Service的默认域名(Default URL)
    【Azure 微服务】记一次错误的更新Service Fabric 证书而引发的集群崩溃而只能重建
  • 原文地址:https://www.cnblogs.com/sunxucool/p/3968628.html
Copyright © 2011-2022 走看看