zoukankan      html  css  js  c++  java
  • thrift实现HDFS文件操作

    thrift 文件如下


    namespace java com.pera.file.transform


    struct  File{
        1:string path ,
        2:string content,
    }

    service FileTransform {
        bool exists(1:string path),
        void mkdir(1:string path ),
        void store(1:File file),
        set<string> ls(1:string path),
        bool isDirectory(1:string path),
        string getParentFile(1:string path),
        void createNewFile(1:string path),
        void deleteFile(1:string path),
        void deleteOnExit(1:string path),
        bool isFile(1:string path),
        void rename(1:string srcPath ,2:string destPath),
        string getName(1:string path),
        i64 totalSpace(1:string path),
        i64 usedSpace(1:string path),
        string read(1:string path)

    }


    客户端:

    public class FileTransFormPair {
        
        private TTransport transport ;
        private Client  client  ;
        
        public FileTransFormPair(){}
        
        public FileTransFormPair( TTransport transport ,Client  client ){
            this.transport=transport ;
            this.client =client;
        }
        
        public TTransport getTransport() {
            return transport;
        }
        public void setTransport(TTransport transport) {
            this.transport = transport;
        }
        public Client getClient() {
            return client;
        }
        public void setClient(Client client) {
            this.client = client;
        }
    }

    public class FileTransFormClient implements Iface {

        public static  String SERVER_IP = "localhost";
        public static  int SERVER_PORT = 8090;
        public static  int TIMEOUT = 30000;
        
        public FileTransFormClient(String host,int port ,int timeout){
            SERVER_IP=host;
            SERVER_PORT=port;
            TIMEOUT=timeout;
        }
        
        private static   FileTransFormPair getClient() throws TTransportException{
            TTransport transport = new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT);
            TProtocol protocol = new TBinaryProtocol(transport);
            Client  client =new Client(protocol);
            transport.open();
            return new FileTransFormPair(transport, client);
        }
        
        @Override
        public boolean exists(String path) throws TException {
            boolean result =false;
            FileTransFormPair parameter =getClient();
            result =parameter.getClient().exists(path);
            parameter.getTransport().close();
            return result;
        }


        @Override
        public void mkdir(String path) throws TException {
            FileTransFormPair parameter =getClient();
            parameter.getClient().mkdir(path);
            parameter.getTransport().close();
            
        }


        @Override
        public void store(File file) throws TException {
            FileTransFormPair parameter =getClient();
            parameter.getClient().store(file);
            parameter.getTransport().close();
            
        }


        @Override
        public Set<String> ls(String path) throws TException {
            FileTransFormPair parameter =getClient();
            Set<String> result =parameter.getClient().ls(path);
            parameter.getTransport().close();
            return result;
        }


        @Override
        public boolean isDirectory(String path) throws TException {
            boolean result =false ;
            FileTransFormPair parameter =getClient();
            result =parameter.getClient().isDirectory(path);
            parameter.getTransport().close();
            return result;
        }


        @Override
        public String getParentFile(String path) throws TException {
            FileTransFormPair parameter =getClient();
            String result =parameter.getClient().getParentFile(path);
            parameter.getTransport().close();
            return result;
        }


        @Override
        public void createNewFile(String path) throws TException {
            FileTransFormPair parameter =getClient();
            parameter.getClient().createNewFile(path);
            parameter.getTransport().close();
        }


        @Override
        public void deleteFile(String path) throws TException {
            FileTransFormPair parameter =getClient();
            parameter.getClient().deleteFile(path);
            parameter.getTransport().close();
            
        }


        @Override
        public void deleteOnExit(String path) throws TException {
            FileTransFormPair parameter =getClient();
            parameter.getClient().deleteOnExit(path);
            parameter.getTransport().close();
            
        }


        @Override
        public boolean isFile(String path) throws TException {
            FileTransFormPair parameter =getClient();
            boolean result =parameter.getClient().isFile(path);
            parameter.getTransport().close();
            return result ;
        }


        @Override
        public void rename(String srcPath, String destPath) throws TException {
            FileTransFormPair parameter =getClient();
            parameter.getClient().rename(srcPath, destPath);
            parameter.getTransport().close();
            
        }


        @Override
        public String getName(String path) throws TException {
            FileTransFormPair parameter =getClient();
            String result =parameter.getClient().getName(path);
            parameter.getTransport().close();
            return result;
        }


        @Override
        public long totalSpace(String path) throws TException {
            FileTransFormPair parameter =getClient();
            long result =parameter.getClient().totalSpace(path);
            parameter.getTransport().close();
            return result;
        }


        @Override
        public long usedSpace(String path) throws TException {
            FileTransFormPair parameter =getClient();
            long result =parameter.getClient().usedSpace(path);
            parameter.getTransport().close();
            return result;
        }

        @Override
        public String read(String path) throws TException {
            FileTransFormPair parameter =getClient();
            String result =parameter.getClient().read(path);
            parameter.getTransport().close();
            return result;
        }
    }

    服务端:

    public class ThriftServer {

        public static  int SERVER_PORT ;

        private static TServer server =null;
        
        private  static Log  LOG =LogFactory.getLog(ThriftServer.class);
        
        public void init(){
            SERVER_PORT = 9090;
            LOG.info("init the thriftServer");
        }
        
        public void start() throws TTransportException {
            TProcessor tprocessor = new FileTransform.Processor<FileTransform.Iface>(
                    new FileStoreService());
            TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
            TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(
                    serverTransport);
            ttpsArgs.processor(tprocessor);
            ttpsArgs.protocolFactory(new TBinaryProtocol.Factory());
            server = new TThreadPoolServer(ttpsArgs);
            server.serve();
            LOG.info("start the thriftServer");
        }

        public void stop() {
            if(null!=server){
                server.stop();
                LOG.info("stop the thriftServer");
            }
        }

        public static void main(String[] args) {
            ThriftServer server =new ThriftServer();
            server.init();
            try {
                server.start();
            } catch (TTransportException e) {
                e.printStackTrace();
            }
        }

    }
    public class FileStoreService implements Iface{

        private Configuration getConfiguration(){
            Configuration  conf =new Configuration();
            Properties  pro = new Properties();
            try {
                pro.load(new FileInputStream(new File("conf/filetransform-site.properties")));
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
    //        conf.set("dfs.default.name", pro.getProperty("dfs.default.name"));
            
            return conf;
        }
        @Override
        public boolean exists(String path) throws TException {
            Configuration  conf =getConfiguration();
            FileSystem fs=null;
            try {
                fs = FileSystem.get(conf);
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }
            boolean exists =false ;
            try {
                exists  = fs.exists(new Path(path));
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }finally{
                if(null!=fs){
                    try {
                        fs.close();
                    } catch (IOException e) {
                        throw new TException(e.getMessage());
                    }
                }
            }
            return exists;
        }

        @Override
        public void mkdir(String path) throws TException {
            Configuration  conf =getConfiguration();
            FileSystem fs=null;
            try {
                fs = FileSystem.get(conf);
                fs.mkdirs(new Path(path));
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }finally{
                if(null!=fs){
                    try {
                        fs.close();
                    } catch (IOException e) {
                        throw new TException(e.getMessage());
                    }
                }
            }
        }

        @Override
        public void store(com.pera.file.transform.gen.File file) throws TException {
            String path =file.getPath();
            String content =file.getContent();
            Configuration conf  =getConfiguration();
            FileSystem  fs =null;
            FSDataOutputStream out=null;
            try {
                Path p =new Path(path);
                fs =FileSystem.get(conf);
                if(!fs.exists(p.getParent())){
                    throw new TException( path+" is not exists");
                }
                out =fs.create(p);
                if(null!=content){
                    byte   [] tmp =content.getBytes();
                    out.write(tmp);
                    out.flush();
                    out.close();
                }
            } catch (Exception e) {
                throw new TException(e.getMessage());
            }finally{
                try {
                    if(null!=out){
                        out.close();
                    }
                } catch (IOException e) {
                    throw new TException(e.getMessage());
                }
                try {
                    if(null!=fs){
                        fs.close();
                    }
                } catch (IOException e) {
                    throw new TException(e.getMessage());
                }
                
            }
        }

        @Override
        public Set<String> ls(String path) throws TException {
            Configuration  conf =getConfiguration();
            FileSystem fs =null;
            Set<String>  result =new HashSet<String>();
            try {
                fs=FileSystem.get(conf);
                FileStatus   [] fstatus=fs.listStatus(new Path(path));
                int length =fstatus==null?0:fstatus.length;
                for (int i = 0; i < length; i++) {
                    String url =fstatus[i].getPath().toString();
                    result.add(url);
                }
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }finally{
                if(null!=fs){
                    try {
                        fs.close();
                    } catch (IOException e) {
                        throw new TException(e.getMessage());
                    }
                }
            }
            return result;
        }

        
        @Override
        @SuppressWarnings("deprecation")
        public boolean isDirectory(String path) throws TException {
            Configuration conf =getConfiguration();
            FileSystem fs =null;
            boolean result =false ;
            try {
                fs =FileSystem.get(conf);
                result =fs.isDirectory(new Path(path));
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }finally{
                if(null!=fs){
                    try {
                        fs.close();
                    } catch (IOException e) {
                        throw new TException(e.getMessage());
                    }
                }
            }
            return result;
        }

        @Override
        public String getParentFile(String path) throws TException {
            Path parentPath =new Path(path).getParent();
            return parentPath.toString();
        }

        @Override
        public void createNewFile(String path) throws TException {
            Configuration conf =getConfiguration();
            FileSystem fs =null;
            try {
                fs =FileSystem.get(conf);
                fs.createNewFile(new Path(path));
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }finally{
                if(null!=fs){
                    try {
                        fs.close();
                    } catch (IOException e) {
                        throw new TException(e.getMessage());
                    }
                }
            }
        }

        @Override
        public void deleteFile(String path) throws TException {
            Configuration conf =getConfiguration();
            FileSystem fs =null;
            try {
                fs =FileSystem.get(conf);
                fs.deleteOnExit(new Path(path));
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }finally{
                if(null!=fs){
                    try {
                        fs.close();
                    } catch (IOException e) {
                        throw new TException(e.getMessage());
                    }
                }
            }
            
        }

        @Override
        public void deleteOnExit(String path) throws TException {
            Configuration conf =getConfiguration();
            FileSystem fs =null;
            try {
                fs =FileSystem.get(conf);
                fs.deleteOnExit(new Path(path));
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }finally{
                if(null!=fs){
                    try {
                        fs.close();
                    } catch (IOException e) {
                        throw new TException(e.getMessage());
                    }
                }
            }
            
        }

        @Override
        public boolean isFile(String path) throws TException {
            
            boolean isFile =false ;
            
            Configuration conf =getConfiguration();
            FileSystem fs =null;
            try {
                fs =FileSystem.get(conf);
                isFile =fs.isFile(new Path(path));
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }finally{
                if(null!=fs){
                    try {
                        fs.close();
                    } catch (IOException e) {
                        throw new TException(e.getMessage());
                    }
                }
            }
            return isFile;
        }

        @Override
        public void rename(String srcPath, String destPath) throws TException {
            Configuration conf =getConfiguration();
            FileSystem fs =null;
            try {
                fs =FileSystem.get(conf);
                fs.rename(new Path(srcPath), new Path(destPath));
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }finally{
                if(null!=fs){
                    try {
                        fs.close();
                    } catch (IOException e) {
                        throw new TException(e.getMessage());
                    }
                }
            }
            
        }

        @Override
        public String getName(String path) throws TException {
            String name  =new Path(path).getName();
            
            return name;
        }

        @Override
        public long totalSpace(String path) throws TException {
            Configuration conf =getConfiguration();
            FileSystem fs =null;
            long length =0 ;
            try {
                fs =FileSystem.get(conf);
                length =fs.getFileStatus(new Path(path)).getLen();
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }finally{
                if(null!=fs){
                    try {
                        fs.close();
                    } catch (IOException e) {
                        throw new TException(e.getMessage());
                    }
                }
            }
            return length;
        }

        @Override
        public long usedSpace(String path) throws TException {
            Configuration conf =getConfiguration();
            FileSystem fs =null;
            long used =0 ;
            try {
                fs =FileSystem.get(conf);
                used =fs.getUsed();
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }finally{
                if(null!=fs){
                    try {
                        fs.close();
                    } catch (IOException e) {
                        throw new TException(e.getMessage());
                    }
                }
            }
            return used;
        }
        
        @Override
        public String read(String path) throws TException {
            Configuration conf =getConfiguration();
            FileSystem fs =null;
            String result   ="" ;
            try {
                fs =FileSystem.get(conf);
                FSDataInputStream  in =fs.open(new Path(path));
                int length =in.available();
                byte  [] tmp = new byte  [length] ;
                in.readFully(tmp);
                result  =new String(tmp);
            } catch (IOException e) {
                throw new TException(e.getMessage());
            }finally{
                if(null!=fs){
                    try {
                        fs.close();
                    } catch (IOException e) {
                        throw new TException(e.getMessage());
                    }
                }
            }
            return result;
        }
    }


  • 相关阅读:
    带你破解时间管理的谜题
    学点产品思维(一起拿返现)
    利用MAT玩转JVM内存分析(一)
    JVM利器:Serviceability Agent介绍
    003-005:Java平台相关的面试题
    002-如何理解Java的平台独立性
    001-为什么Java能这么流行
    Redis保证事务一致性,以及常用的数据结构
    敏感词过滤服务的实现
    或许,挂掉的点总是出人意料(hw其实蛮有好感的公司)
  • 原文地址:https://www.cnblogs.com/cl1024cl/p/6205124.html
Copyright © 2011-2022 走看看