zoukankan      html  css  js  c++  java
  • HDFS 的dao

    一、HDFS dao接口

    package cn.mk.dao;
    
    import java.io.FileNotFoundException;
    import java.io.IOException;
    
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FileStatus;
    
    public interface HDFSDao {
    
        public boolean mkDirs(String path) throws IOException;
        
        public boolean rmr(String path) throws IOException;
        
        public boolean rmdir(String path) throws IOException;
        
        public boolean rename(String src, String dst) throws IOException;
        
        public FileStatus[] ls(String path) throws FileNotFoundException, IOException;
        
        public boolean createFile(String file, String content) throws IOException;
         
        public void copyFile(String local, String remote) throws IOException;
        
        public void download(String remote, String local) throws IOException;
        
        public String cat(String remoteFile) throws IOException;
        
        public BlockLocation[] location(String path) throws IOException;
    
        boolean createNewFile(String file, String content) throws IOException;
    }

    二、HDFS dao实现类

    package cn.mk.dao.impl;
    
    import java.io.ByteArrayOutputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    import cn.mk.dao.HDFSDao;
    
    public class HDFSDaoImpl implements HDFSDao {
    
        private static final String HDFS_PATH = "hdfs://master:9000/";
        
        private final FileSystem fileSystem;
    
        public HDFSDaoImpl(Configuration conf) throws IOException {
    
            this(HDFS_PATH, conf);
        }
    
        public HDFSDaoImpl(String hdfs, Configuration conf) throws IOException {
    
            fileSystem = FileSystem.get(URI.create(hdfs), conf);
        }
    
        @Override
        public boolean mkDirs(String path) throws IOException {
            Path p = new Path(path);
            if (!fileSystem.exists(p)) {
                return fileSystem.mkdirs(p);
            }
            return false;
        }
    
        @Override
        public boolean rmr(String path) throws IOException {
    
            Path p = new Path(path);
            return fileSystem.delete(p, true);
    
        }
    
        @Override
        public boolean rmdir(String path) throws IOException {
            Path p = new Path(path);
            return fileSystem.delete(p, true);
        }
    
        @Override
        public boolean rename(String src, String dst) throws IOException {
            Path p1 = new Path(src);
            Path p2 = new Path(dst);
    
            return fileSystem.rename(p1, p2);
        }
    
        @Override
        public FileStatus[] ls(String path) throws FileNotFoundException, IOException {
            Path p = new Path(path);
            return fileSystem.listStatus(p);
    
        }
        
        @Override
        public boolean createFile(String file, String content) throws IOException {
            Path p = new Path(file);
            return fileSystem.createNewFile(p);
        }
    
        @Override
        public boolean createNewFile(String file, String content) throws IOException {
            Path p = new Path(file);
            boolean b= fileSystem.createNewFile(p);
            if(!b)
                return false;
            FSDataOutputStream os = null;
            try {
                byte[] bytes=content.getBytes();
           
              os = fileSystem.create(p);
              os.write(bytes, 0,bytes.length);
            }finally{
                if(os!=null)
                 os.close();
            }
            return true;
        }
    
        @Override
        public void copyFile(String local, String remote) throws IOException {
            Path p1 =new Path(local);
            Path p2 =new Path(remote);
            fileSystem.copyFromLocalFile(p1, p2);
    
        }
    
        @Override
        public void download(String remote, String local) throws IOException {
            Path p1 =new Path(local);
            Path p2 =new Path(remote);
            fileSystem.copyToLocalFile(p2, p1);
        }
    
        @Override
        public String cat(String remoteFile) throws IOException {
            Path p =new Path(remoteFile);
            FSDataInputStream in = null;
            String content=null;
            try {
                in=fileSystem.open(p);
                ByteArrayOutputStream bos =new ByteArrayOutputStream();
                IOUtils.copyBytes(in, bos, 4096,false);
            } catch (Exception e) {
                IOUtils.closeStream(in);
            }
            
            return content;
        }
    
        @Override
        public BlockLocation[] location(String path) throws IOException {
            Path p =new Path(path);
            FileStatus fStatus=fileSystem.getFileStatus(p);
            
            return fileSystem.getFileBlockLocations(fStatus, 0, fStatus.getLen());
        }
    
        @Override
        protected void finalize() throws Throwable {
            fileSystem.close();
        }
    
    
    }
  • 相关阅读:
    python每日一题:使用套接字创建分布式进程
    市盈率分析
    python每日一题:分布式进程之坑点
    python每日一题:比较单线程,多线程,协程的运行效率
    python每日一题:锁知识点
    python每日一题:查找一篇文档中的人名和城市
    python之装饰器@
    python每日一题:利用字典实现超市购物程序
    【Java基础】多线程
    【Java基础】异常处理
  • 原文地址:https://www.cnblogs.com/maokun/p/7360136.html
Copyright © 2011-2022 走看看