zoukankan      html  css  js  c++  java
  • windows通过thrift访问hdfs

    thirift是一个支持跨种语言的远程调用框架,通过thrift远程调用框架,结合hadoop1.x中的thriftfs,编写了一个针对hadoop2.x的thriftfs,供外部程序调用。

     1.准备工作

    1.1 编译boost

    boost下载地址:http://120.52.72.39/jaist.dl.sourceforge.net/c3pr90ntcsf0/project/boost/boost/1.60.0/boost_1_60_0.tar.gz

    解压boost,windows运行bootstrap.bat,然后点击生成的b2.exe。

    若有多个vs编译环境,选择特定环境编译时,选择该vs的命令提示工具(开始-》vs-》工具-》命令行xxx),cd boost解压目录,./b2.exe.

    1.2 编译thrift

    thrift下载地址:http://mirrors.cnnic.cn/apache/thrift/0.9.3/thrift-0.9.3.tar.gz

    解压后,进入libcpp文件夹,选择thrift.sln(需要vs2010以上的工具打开),选择libthrift,设置boost头文件引用路径。

    在编译过程中,删除不需要的.h和.cpp文件,增加需要的文件(视使用环境而定)。

    1.3编译hadoopfs.thrift文件

    下载编译好的thrift程序:http://www.apache.org/dyn/closer.cgi?path=/thrift/0.9.3/thrift-0.9.3.exe

    修改hadoop1.x中的hadoopfs.thrift文件

    hadoopfs.thrift内容:

    #!/usr/local/bin/thrift -java
    #
    # Thrift Service exported by Hadoop File System
    # Dhruba Borthakur (dhruba@gmail.com)
    #
    
    /**
     * The available types in Thrift:
     *
     *  bool        Boolean, one byte
     *  byte        Signed byte
     *  i16         Signed 16-bit integer
     *  i32         Signed 32-bit integer
     *  i64         Signed 64-bit integer
     *  double      64-bit floating point value
     *  string      String
     *  binary      Blob (byte array)
     *  map<t1,t2>  Map from one type to another
     *  list<t1>    Ordered list of one type
     *  set<t1>     Set of unique elements of one type
     *
     */
    
    namespace java org.apache.hadoop.thriftfs.api
    namespace php hadoopfs
    
    struct ThriftHandle {
      1: i64 id
    }
    
    struct Pathname {
      1: string pathname
    }
    
    struct FileStatus {
      1: string path,
      2: i64 length,
      3: bool isdir,
      4: i16 block_replication,
      5: i64 blocksize,
      6: i64 modification_time,
      7: string permission,
      8: string owner,
      9: string group
    }
    
    struct BlockLocation {
      1: list<string> hosts,         /* hostnames of datanodes */
      2: list<string> names,         /* hostname:portNumber of datanodes */
      3: i64 offset,                 /* offset of the block in the file */
      4: i64 length                  /* length of data */
    }
    
    exception MalformedInputException {
      1: string message
    }
    
    exception ThriftIOException {
       1: string message
    }
    
    service ThriftHadoopFileSystem
    {
    
      // set inactivity timeout period. The period is specified in seconds.
      // if there are no RPC calls to the HadoopThrift server for this much
      // time, then the server kills itself.
      void setInactivityTimeoutPeriod(1:i64 periodInSeconds),
    
      // close session
      void shutdown(1:i32 status),
    
      // create a file and open it for writing
      ThriftHandle create(1:Pathname path) throws (1:ThriftIOException ouch),
    
      // create a file and open it for writing
      ThriftHandle createFile(1:Pathname path, 2:i16 mode, 
                              3:bool overwrite, 4:i32 bufferSize, 
                              5:i16 block_replication, 6:i64 blocksize) 
                              throws (1:ThriftIOException ouch),
    
      // returns a handle to an existing file  for reading
      ThriftHandle open(1:Pathname path) throws (1:ThriftIOException ouch),
    
      // returns a handle to an existing file for appending to it.
      ThriftHandle append(1:Pathname path) throws (1:ThriftIOException ouch),
    
      // write a string to the open handle for the file
      bool write(1:ThriftHandle handle, 2:binary data) throws (1:ThriftIOException ouch),
    
      // read some bytes from the open handle for the file
      binary read(1:ThriftHandle handle, 2:i64 offset, 3:i32 size) throws (1:ThriftIOException ouch),
    
      // close file
      bool close(1:ThriftHandle out) throws (1:ThriftIOException ouch),
    
      // delete file(s) or directory(s)
      bool rm(1:Pathname path, 2:bool recursive) throws (1:ThriftIOException ouch),
    
      // rename file(s) or directory(s)
      bool rename(1:Pathname path, 2:Pathname dest) throws (1:ThriftIOException ouch),
    
      // create directory
      bool mkdirs(1:Pathname path) throws (1:ThriftIOException ouch),
    
      // Does this pathname exist?
      bool exists(1:Pathname path) throws (1:ThriftIOException ouch),
    
      // Returns status about the path
      FileStatus stat(1:Pathname path) throws (1:ThriftIOException ouch),
    
      // If the path is a directory, then returns the list of pathnames in that directory
      list<FileStatus> listStatus(1:Pathname path) throws (1:ThriftIOException ouch),
    
      // Set permission for this file
      void chmod(1:Pathname path, 2:i16 mode) throws (1:ThriftIOException ouch),
    
      // set the owner and group of the file.
      void chown(1:Pathname path, 2:string owner, 3:string group) throws (1:ThriftIOException ouch),
    
      // set the replication factor for all blocks of the specified file
      void setReplication(1:Pathname path, 2:i16 replication) throws (1:ThriftIOException ouch),
    
      // get the locations of the blocks of this file
      list<BlockLocation> getFileBlockLocations(1:Pathname path, 2:i64 start, 3:i64 length) throws (1:ThriftIOException ouch),
    }

    编译 cpp,java文件

    cmd进入thrift.exe文件夹下,复制hadoopfs.thrift到相同目录,分别运行

    thrift -gen java hadoopfs.thrift

    thrift -gen cpp hadoopfs.thrift

    生成gen-cpp, gen-java文件夹,里面为生成的程序文件

    2.编写hdfs服务端程序(java)

    2.1 eclipse创建libthrift工程,复制thrift解压路径/lib/java/src下的代码到工程src目录下;复制1.3生成的gen-java中的代码到工程src目录下;

    解压hadoop2.x(下载地址:http://mirrors.cnnic.cn/apache/hadoop/common/hadoop-2.6.3/hadoop-2.6.3.tar.gz);

    eclipse添加lib引用的jar文件:进入hadoop2.x/share/hadoop/目录下,添加common, common/lib, hdfs, hdfs/lib文件夹下的所有jar到工程引用路径

    修改hadoop1.x中提供的HadoopThriftServer代码为:

    package org.apache.hadoop.thriftfs;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.nio.ByteBuffer;
    import java.util.HashMap;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Random;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.permission.FsPermission;
    // Include Generated code
    import org.apache.hadoop.thriftfs.api.Pathname;
    import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
    import org.apache.hadoop.thriftfs.api.ThriftHandle;
    import org.apache.hadoop.thriftfs.api.ThriftIOException;
    import org.apache.hadoop.util.Daemon;
    import org.apache.hadoop.util.StringUtils;
    import org.apache.thrift.TException;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.server.TThreadPoolServer;
    import org.apache.thrift.transport.TServerSocket;
    import org.apache.thrift.transport.TServerTransport;
    
    /**
     * ThriftHadoopFileSystem
     * A thrift wrapper around the Hadoop File System
     */
    public class HadoopThriftServer extends ThriftHadoopFileSystem {
    
      static int serverPort = 0;                    // default port
      TServer    server = null;
    
      public static class HadoopThriftHandler implements ThriftHadoopFileSystem.Iface
      {
    
        public static final Log LOG = LogFactory.getLog("org.apache.hadoop.thrift");
    
        // HDFS glue
        Configuration conf;
        FileSystem fs;
            
        // stucture that maps each Thrift object into an hadoop object
        private long nextId = new Random().nextLong();
        private HashMap<Long, Object> hadoopHash = new HashMap<Long, Object>();
        private Daemon inactivityThread = null;
    
        // Detect inactive session
        private static volatile long inactivityPeriod = 3600 * 1000; // 1 hr
        private static volatile long inactivityRecheckInterval = 60 * 1000;
        private static volatile boolean fsRunning = true;
        private static long now;
    
        // allow outsider to change the hadoopthrift path
        public void setOption(String key, String val) {
        }
    
        /**
         * Current system time.
         * @return current time in msec.
         */
        static long now() {
          return System.currentTimeMillis();
        }
    
        /**
        * getVersion
        *
        * @return current version of the interface.
        */
        public String getVersion() {
          return "0.1";
        }
    
        /**
         * shutdown
         *
         * cleanly closes everything and exit.
         */
        @Override
        public void shutdown(int status) {
          LOG.info("HadoopThriftServer shutting down.");
          try {
            fs.close();
          } catch (IOException e) {
            LOG.warn("Unable to close file system");
          }
          Runtime.getRuntime().exit(status);
        }
    
        /**
         * Periodically checks to see if there is inactivity
         */
        class InactivityMonitor implements Runnable {
          @Override
        public void run() {
            while (fsRunning) {
              try {
                if (now() > now + inactivityPeriod) {
                  LOG.warn("HadoopThriftServer Inactivity period of " +
                           inactivityPeriod + " expired... Stopping Server.");
                  shutdown(-1);
                }
              } catch (Exception e) {
                LOG.error(StringUtils.stringifyException(e));
              }
              try {
                Thread.sleep(inactivityRecheckInterval);
              } catch (InterruptedException ie) {
              }
            }
          }
        }
    
        /**
         * HadoopThriftServer
         *
         * Constructor for the HadoopThriftServer glue with Thrift Class.
         *
         * @param name - the name of this handler
         */
        public HadoopThriftHandler(String name) {
          conf = new Configuration();
          now = now();
          try {
            inactivityThread = new Daemon(new InactivityMonitor());
            fs = FileSystem.get(conf);
          } catch (IOException e) {
            LOG.warn("Unable to open hadoop file system...");
            Runtime.getRuntime().exit(-1);
          }
        }
    
        /**
          * printStackTrace
          *
          * Helper function to print an exception stack trace to the log and not stderr
          *
          * @param e the exception
          *
          */
        static private void printStackTrace(Exception e) {
          for(StackTraceElement s: e.getStackTrace()) {
            LOG.error(s);
          }
        }
    
        /**
         * Lookup a thrift object into a hadoop object
         */
        private synchronized Object lookup(long id) {
          return hadoopHash.get(new Long(id));
        }
    
        /**
         * Insert a thrift object into a hadoop object. Return its id.
         */
        private synchronized long insert(Object o) {
          nextId++;
          hadoopHash.put(nextId, o);
          return nextId;
        }
    
        /**
         * Delete a thrift object from the hadoop store.
         */
        private synchronized Object remove(long id) {
          return hadoopHash.remove(new Long(id));
        }
    
        /**
          * Implement the API exported by this thrift server
          */
    
        /** Set inactivity timeout period. The period is specified in seconds.
          * if there are no RPC calls to the HadoopThrift server for this much
          * time, then the server kills itself.
          */
        @Override
        public synchronized void setInactivityTimeoutPeriod(long periodInSeconds) {
          inactivityPeriod = periodInSeconds * 1000; // in milli seconds
          if (inactivityRecheckInterval > inactivityPeriod ) {
            inactivityRecheckInterval = inactivityPeriod;
          }
        }
    
    
        /**
          * Create a file and open it for writing
          */
        @Override
        public ThriftHandle create(Pathname path) throws ThriftIOException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("create: " + path);
            FSDataOutputStream out = fs.create(new Path(path.pathname));
            long id = insert(out);
            ThriftHandle obj = new ThriftHandle(id);
            HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
            return obj;
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        }
    
        /**
          * Create a file and open it for writing, delete file if it exists
          */
        @Override
        public ThriftHandle createFile(Pathname path, 
                                       short mode,
                                       boolean  overwrite,
                                       int bufferSize,
                                       short replication,
                                       long blockSize) throws ThriftIOException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("create: " + path +
                                         " permission: " + mode +
                                         " overwrite: " + overwrite +
                                         " bufferSize: " + bufferSize +
                                         " replication: " + replication +
                                         " blockSize: " + blockSize);
            FSDataOutputStream out = fs.create(new Path(path.pathname), 
                                               new FsPermission(mode),
                                               overwrite,
                                               bufferSize,
                                               replication,
                                               blockSize,
                                               null); // progress
            long id = insert(out);
            ThriftHandle obj = new ThriftHandle(id);
            HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
            return obj;
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        }
    
        /**
         * Opens an existing file and returns a handle to read it
         */
        @Override
        public ThriftHandle open(Pathname path) throws ThriftIOException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("open: " + path);
            FSDataInputStream out = fs.open(new Path(path.pathname));
            long id = insert(out);
            ThriftHandle obj = new ThriftHandle(id);
            HadoopThriftHandler.LOG.debug("opened: " + path + " id: " + id);
            return obj;
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        }
    
        /**
         * Opens an existing file to append to it.
         */
        @Override
        public ThriftHandle append(Pathname path) throws ThriftIOException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("append: " + path);
            FSDataOutputStream out = fs.append(new Path(path.pathname));
            long id = insert(out);
            ThriftHandle obj = new ThriftHandle(id);
            HadoopThriftHandler.LOG.debug("appended: " + path + " id: " + id);
            return obj;
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        }
    
        /**
         * write to a file
        */
        @Override
        public boolean write(ThriftHandle tout, ByteBuffer data)
                throws ThriftIOException, TException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("write: " + tout.id);
            FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
            // Retrieve all bytes in the buffer
            byte[] bytes = new byte[data.limit()];
            // transfer bytes from this buffer into the given destination array
            data.get(bytes);
            out.write(bytes, 0, bytes.length);
            data.clear();
            HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
            return true;
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        } 
    
        /**
         * read from a file
         */
        @Override
        public ByteBuffer read(ThriftHandle tout, long offset, int length)
                throws ThriftIOException, TException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("read: " + tout.id +
                                         " offset: " + offset +
                                         " length: " + length);
            FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
            if (in.getPos() != offset) {
              in.seek(offset);
            }
            byte[] tmp = new byte[length];
            int numbytes = in.read(offset, tmp, 0, length);
            HadoopThriftHandler.LOG.debug("read done: " + tout.id);
            return ByteBuffer.wrap(tmp,0,numbytes);
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        }
    
        /**
         * Delete a file/directory
         */
        @Override
        public boolean rm(Pathname path, boolean recursive) 
                              throws ThriftIOException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("rm: " + path +
                                         " recursive: " + recursive);
            boolean ret = fs.delete(new Path(path.pathname), recursive);
            HadoopThriftHandler.LOG.debug("rm: " + path);
            return ret;
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        }
    
        /**
         * Move a file/directory
         */
        @Override
        public boolean rename(Pathname path, Pathname dest) 
                              throws ThriftIOException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("rename: " + path +
                                         " destination: " + dest);
            boolean ret = fs.rename(new Path(path.pathname), 
                                    new Path(dest.pathname));
            HadoopThriftHandler.LOG.debug("rename: " + path);
            return ret;
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        }
    
        /**
         *  close file
         */
         @Override
        public boolean close(ThriftHandle tout) throws ThriftIOException {
           try {
             now = now();
             HadoopThriftHandler.LOG.debug("close: " + tout.id);
             Object obj = remove(tout.id);
             if (obj instanceof FSDataOutputStream) {
               FSDataOutputStream out = (FSDataOutputStream)obj;
               out.close();
             } else if (obj instanceof FSDataInputStream) {
               FSDataInputStream in = (FSDataInputStream)obj;
               in.close();
             } else {
               throw new ThriftIOException("Unknown thrift handle.");
             }
             HadoopThriftHandler.LOG.debug("closed: " + tout.id);
             return true;
           } catch (IOException e) {
             throw new ThriftIOException(e.getMessage());
           }
         }
    
         /**
          * Create a directory
          */
        @Override
        public boolean mkdirs(Pathname path) throws ThriftIOException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("mkdirs: " + path);
            boolean ret = fs.mkdirs(new Path(path.pathname));
            HadoopThriftHandler.LOG.debug("mkdirs: " + path);
            return ret;
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        }
    
        /**
         * Does this pathname exist?
         */
        @Override
        public boolean exists(Pathname path) throws ThriftIOException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("exists: " + path);
            boolean ret = fs.exists(new Path(path.pathname));
            HadoopThriftHandler.LOG.debug("exists done: " + path);
            return ret;
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        }
    
        /**
         * Returns status about the specified pathname
         */
        @Override
        public org.apache.hadoop.thriftfs.api.FileStatus stat(
                                Pathname path) throws ThriftIOException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("stat: " + path);
            org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus(
                                               new Path(path.pathname));
            HadoopThriftHandler.LOG.debug("stat done: " + path);
            return new org.apache.hadoop.thriftfs.api.FileStatus(
              stat.getPath().toString(),
              stat.getLen(),
              stat.isDir(),
              stat.getReplication(),
              stat.getBlockSize(),
              stat.getModificationTime(),
              stat.getPermission().toString(),
              stat.getOwner(),
              stat.getGroup());
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        }
    
        /**
         * If the specified pathname is a directory, then return the
         * list of pathnames in this directory
         */
        @Override
        public List<org.apache.hadoop.thriftfs.api.FileStatus> listStatus(
                                Pathname path) throws ThriftIOException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("listStatus: " + path);
    
            org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus(
                                               new Path(path.pathname));
            HadoopThriftHandler.LOG.debug("listStatus done: " + path);
            org.apache.hadoop.thriftfs.api.FileStatus tmp;
            List<org.apache.hadoop.thriftfs.api.FileStatus> value = 
              new LinkedList<org.apache.hadoop.thriftfs.api.FileStatus>();
    
            for (int i = 0; i < stat.length; i++) {
              tmp = new org.apache.hadoop.thriftfs.api.FileStatus(
                          stat[i].getPath().toString(),
                          stat[i].getLen(),
                          stat[i].isDir(),
                          stat[i].getReplication(),
                          stat[i].getBlockSize(),
                          stat[i].getModificationTime(),
                          stat[i].getPermission().toString(),
                          stat[i].getOwner(),
                          stat[i].getGroup());
              value.add(tmp);
            }
            return value;
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        }
    
        /**
         * Sets the permission of a pathname
         */
        @Override
        public void chmod(Pathname path, short mode) throws ThriftIOException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("chmod: " + path + 
                                         " mode " + mode);
            fs.setPermission(new Path(path.pathname), new FsPermission(mode));
            HadoopThriftHandler.LOG.debug("chmod done: " + path);
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        }
    
        /**
         * Sets the owner & group of a pathname
         */
        @Override
        public void chown(Pathname path, String owner, String group) 
                                                           throws ThriftIOException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("chown: " + path +
                                         " owner: " + owner +
                                         " group: " + group);
            fs.setOwner(new Path(path.pathname), owner, group);
            HadoopThriftHandler.LOG.debug("chown done: " + path);
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        }
    
        /**
         * Sets the replication factor of a file
         */
        @Override
        public void setReplication(Pathname path, short repl) throws ThriftIOException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("setrepl: " + path +
                                         " replication factor: " + repl);
            fs.setReplication(new Path(path.pathname), repl);
            HadoopThriftHandler.LOG.debug("setrepl done: " + path);
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
    
        }
    
        /**
         * Returns the block locations of this file
         */
        @Override
        public List<org.apache.hadoop.thriftfs.api.BlockLocation> 
                 getFileBlockLocations(Pathname path, long start, long length) 
                                             throws ThriftIOException {
          try {
            now = now();
            HadoopThriftHandler.LOG.debug("getFileBlockLocations: " + path);
    
            org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(
                                                     new Path(path.pathname));
    
            org.apache.hadoop.fs.BlockLocation[] stat = 
                fs.getFileBlockLocations(status, start, length);
            HadoopThriftHandler.LOG.debug("getFileBlockLocations done: " + path);
    
            org.apache.hadoop.thriftfs.api.BlockLocation tmp;
            List<org.apache.hadoop.thriftfs.api.BlockLocation> value = 
              new LinkedList<org.apache.hadoop.thriftfs.api.BlockLocation>();
    
            for (int i = 0; i < stat.length; i++) {
    
              // construct the list of hostnames from the array returned
              // by HDFS
              List<String> hosts = new LinkedList<String>();
              String[] hostsHdfs = stat[i].getHosts();
              for (int j = 0; j < hostsHdfs.length; j++) {
                hosts.add(hostsHdfs[j]);
              }
    
              // construct the list of host:port from the array returned
              // by HDFS
              List<String> names = new LinkedList<String>();
              String[] namesHdfs = stat[i].getNames();
              for (int j = 0; j < namesHdfs.length; j++) {
                names.add(namesHdfs[j]);
              }
              tmp = new org.apache.hadoop.thriftfs.api.BlockLocation(
                          hosts, names, stat[i].getOffset(), stat[i].getLength());
              value.add(tmp);
            }
            return value;
          } catch (IOException e) {
            throw new ThriftIOException(e.getMessage());
          }
        }
        
      }
    
      // Bind to port. If the specified port is 0, then bind to random port.
      private ServerSocket createServerSocket(int port) throws IOException {
        try {
          ServerSocket sock = new ServerSocket();
          // Prevent 2MSL delay problem on server restarts
          sock.setReuseAddress(true);
          // Bind to listening port
          if (port == 0) {
            sock.bind(null);
            serverPort = sock.getLocalPort();
          } else {
            sock.bind(new InetSocketAddress(port));
          }
          return sock;
        } catch (IOException ioe) {
          throw new IOException("Could not create ServerSocket on port " + port + "." +
                                ioe);
        }
      }
    
      /**
       * Constrcts a server object
       */
      public HadoopThriftServer(String [] args) {
    
        if (args.length > 0) {
          serverPort = new Integer(args[0]);
        }
        try {
          ServerSocket ssock = createServerSocket(serverPort);
          TServerTransport serverTransport = new TServerSocket(ssock);
          Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba");
          ThriftHadoopFileSystem.Processor processor = new ThriftHadoopFileSystem.Processor(handler);
          TThreadPoolServer.Args options = new TThreadPoolServer.Args(serverTransport);
          options.minWorkerThreads(10);
          options.processor(processor);
          server = new TThreadPoolServer(options);
          System.out.println("Starting the hadoop thrift server on port [" + serverPort + "]...");
          HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port [" +serverPort + "]...");
          System.out.flush();
    
        } catch (Exception x) {
          x.printStackTrace();
        }
      }
    
      public static void main(String [] args) {
        HadoopThriftServer me = new HadoopThriftServer(args);
        me.server.serve();
      }
    };

    红色部分为修改的内容;这里,我们使用thrift的binary类型传输文件!!!

    参考资料:使用Thrift传输二进制数据遇到的问题 

     3.windows c++客户端

    3.1创建工程,添加boost,thrift中的lib/cpp/src文件夹为头文件引用路径,将1.2生成的libthrift.lib添加到工程根目录(或添加到库目录引用路径)

    3.2复制1.3生成的gen-cpp文件夹下的代码到工程根目录,添加到工程

    3.3 编写thriftfsclient处理类:

    HdfsClient.h

    #pragma once
    #include "hadoopfs_types.h"
    #include "ThriftHadoopFileSystem.h"
    #include <boostshared_ptr.hpp>
    #include <thrift	ransportTSocket.h>
    #include <thrift	ransportTBufferTransports.h>
    #include <thriftprotocolTBinaryProtocol.h>
    
    using namespace ::apache::thrift;
    using namespace ::apache::thrift::protocol;
    using namespace ::apache::thrift::transport;
    using boost::shared_ptr;
    
    
    #define FILEOPEN_SUCCESS 0
    
    class HdfsClient
    {
    private:
        bool m_IsConn;
        shared_ptr<TTransport> m_Socket;
        shared_ptr<TBufferedTransport> m_Transport;
        shared_ptr<TBinaryProtocol> m_Protocol;
        shared_ptr<ThriftHadoopFileSystemClient> m_Client;
        ThriftHandle m_Handler;
    public:
        HdfsClient(void);
        ~HdfsClient(void);
    
        bool connect(const std::string server,int port);
        bool shutdown();
        bool put(const std::string& localfile,const std::string& rem_path);
        bool append(const std::string& localfile,const std::string& rem_path);
        bool get(const std::string& rem_path,const std::string& localfile);
        bool rm(const std::string& rem_path, const bool recursive=false);
        bool mv(const std::string& src_path,const std::string& dst_path);
        bool mkdirs(const std::string& rem_path);
        bool exists(const std::string& rem_path);
        void ls(std::vector<FileStatus> & result, const std::string& path);
        void chmod(const std::string& path, const int16_t mode);
        void chown(const std::string& path, const std::string& owner);
        void setReplication(const std::string& path, const int16_t replication);
        void getFileBlockLocations(std::vector<BlockLocation> & result, const std::string& path, const int64_t start, const int64_t length);
    };

    HdfsClient.cpp

    #include "StdAfx.h"
    #include "HdfsClient.h"
    #include <stdio.h>
    
    HdfsClient::HdfsClient(void)
    {
        m_IsConn = false;
    }
    
    HdfsClient::~HdfsClient(void)
    {
        if(m_IsConn)
            shutdown();
    }
    
    bool HdfsClient::connect(std::string server,int port)
    {
        m_Socket = shared_ptr<TTransport>(new TSocket(server,port));
        m_Transport = shared_ptr<TBufferedTransport>(new TBufferedTransport(m_Socket));
        m_Protocol = shared_ptr<TBinaryProtocol>(new TBinaryProtocol(m_Transport));
        m_Client = shared_ptr<ThriftHadoopFileSystemClient>(new ThriftHadoopFileSystemClient(m_Protocol));
    
        try
        {
            m_Transport->open();
            // tell the HadoopThrift server to die after 60 minutes of inactivity
            m_Client->setInactivityTimeoutPeriod(3600);
            m_IsConn = true;
        }
        catch (const ThriftIOException& ex)
        {
            printf("ERROR: %s",ex.message.c_str());
            return false;
        }
        return true;
    }
    
    
    bool HdfsClient::shutdown()
    {
        try
        {
            m_Transport->close();
            m_IsConn = false;
        }
        catch (const ThriftIOException& ex)
        {
            printf("ERROR: %s",ex.message.c_str());
            return false;
        }
        return true;
    }
    
    bool HdfsClient::put(const std::string& localfile,const std::string& rem_path)
    {
        Pathname ptname;
        ptname.pathname = rem_path;
        m_Client->create(m_Handler,ptname);//Create the specified file. Returns a handle to write data.
    
        if(m_Handler.id == 0)//error
            return false;
        else
        {
            FILE* fp = fopen(localfile.c_str(),"rb");
            if(GetLastError() != FILEOPEN_SUCCESS) return false;
    
            /*//A.read the whole file into content;
            //把文件的位置指针移到文件尾
            fseek(fp,0L,SEEK_END);
            //获取文件长度;
            long length=ftell(fp);
            fseek(fp,0,SEEK_SET);//将文件指针设置到开始位置
            char* buffer = new char[length];
            //memset(buffer,'',length);
            fread(buffer,sizeof(char),length,fp);
            //write data to hdfs
            std::string content;
            content.append(buffer,buffer+length);//string
            m_Client->write(m_Handler,content);*/
    
            //B.分块读取,然后上传; 可以使用内存映射文件方式将文件读入内存,然后发送到hdfs
            size_t bufferSize = 1 << 20;//1M
            size_t readSize = 0;
            char* buffer = new char[bufferSize];
            while(!feof(fp))
            {
                readSize = fread(buffer,sizeof(char),bufferSize,fp);
                //write data to hdfs
                std::string content;
                content.append(buffer,buffer+readSize);//string
                m_Client->write(m_Handler,content);
            }
            fclose(fp);
            delete[] buffer;
            return m_Client->close(m_Handler);
        }
    }
    
    bool HdfsClient::append(const std::string& localfile,const std::string& rem_path)
    {
        Pathname ptname;
        ptname.pathname = rem_path;
        m_Client->append(m_Handler,ptname);
    
        if(m_Handler.id == 0)//error
            return false;
        else
        {
            FILE* fp = fopen(localfile.c_str(),"rb");
            if(GetLastError() != FILEOPEN_SUCCESS) return false;
    
            /*//A.read the whole file into content;
            //把文件的位置指针移到文件尾
            fseek(fp,0L,SEEK_END);
            //获取文件长度;
            long length=ftell(fp);
            fseek(fp,0,SEEK_SET);//将文件指针设置到开始位置
            char* buffer = new char[length];
            //memset(buffer,'',length);
            fread(buffer,sizeof(char),length,fp);
            //write data to hdfs
            std::string content;
            content.append(buffer,buffer+length);//string
            m_Client->write(m_Handler,content);*/
    
            //B.分块读取,然后上传; 可以使用内存映射文件方式将文件读入内存,然后发送到hdfs
            size_t bufferSize = 1 << 20;//1M
            size_t readSize = 0;
            char* buffer = new char[bufferSize];
            while(!feof(fp))
            {
                readSize = fread(buffer,sizeof(char),bufferSize,fp);
                //write data to hdfs
                std::string content;
                content.append(buffer,buffer+readSize);//string
                m_Client->write(m_Handler,content);
            }
            fclose(fp);
            delete[] buffer;
            return m_Client->close(m_Handler);
        }
    }
    
    bool HdfsClient::get(const std::string& rem_path,const std::string& localfile)
    {
        Pathname ptname;
        ptname.__set_pathname(rem_path);
        m_Client->open(m_Handler,ptname);
    
        if(m_Handler.id == 0)//error
            return false;
        else
        {
            FileStatus rfstat;
            m_Client->stat(rfstat,ptname);
    
            int64_t offset = 0;
            int bufferSize = 1 << 20;//1M
            std::string content;
            int contentlen = 0;
            FILE* fp = fopen(localfile.c_str(),"wb+");
            DWORD err_code = GetLastError();
            if(err_code != FILEOPEN_SUCCESS) return false;
    
            while(offset < rfstat.length)
            {
                m_Client->read(content,m_Handler,offset,bufferSize);
                contentlen = content.length();
                if(contentlen > 0)
                {
                    fwrite(content.c_str(),sizeof(char),contentlen,fp);//todo: can use multi thread to read and write
                    offset += contentlen;
                }
                else
                    break;
            }
            fclose(fp);
            return m_Client->close(m_Handler);
        }
    }
    
    bool HdfsClient::rm(const std::string& rem_path, const bool recursive)
    {
        Pathname ptname;
        ptname.pathname = rem_path;
        return m_Client->rm(ptname,recursive);
    }
    
    bool HdfsClient::mv(const std::string& src_path,const std::string& dst_path)
    {
        Pathname src_ptname,dst_ptname;
        src_ptname.pathname = src_path;
        dst_ptname.pathname = dst_path;
        return m_Client->rename(src_ptname,dst_ptname);
    }
    
    bool HdfsClient::mkdirs(const std::string& rem_path)
    {
        Pathname ptname;
        ptname.pathname = rem_path;
        return m_Client->mkdirs(ptname);
    }
    
    bool HdfsClient::exists(const std::string& rem_path)
    {
        Pathname ptname;
        ptname.pathname = rem_path;
        return m_Client->exists(ptname);
    }
    
    void HdfsClient::ls(std::vector<FileStatus> & result, const std::string& path)
    {
        Pathname ptname;
        ptname.pathname = path;
        m_Client->listStatus(result,ptname);
    }
    
    void HdfsClient::chmod(const std::string& path, const int16_t mode)
    {
        Pathname ptname;
        ptname.pathname = path;
        m_Client->chmod(ptname,mode);
    }
    
    void HdfsClient::chown(const std::string& path, const std::string& owner)
    {
        Pathname ptname;
        ptname.pathname = path;
    
        FileStatus rfstat;
        m_Client->stat(rfstat,ptname);
        m_Client->chown(ptname,owner,rfstat.group);
    }
    
    void HdfsClient::setReplication(const std::string& path, const int16_t replication)
    {
        Pathname ptname;
        ptname.pathname = path;
        m_Client->setReplication(ptname,replication);
    }
    
    void HdfsClient::getFileBlockLocations(std::vector<BlockLocation> & result, const std::string& path, const int64_t start, const int64_t length)
    {
        Pathname ptname;
        ptname.pathname = path;
    
        m_Client->getFileBlockLocations(result,ptname,start,length);
    }
    
    int main()
    {
        std::string host = "192.168.0.111";
        int port = 50841;
        HdfsClient hdfs;
        std::string local_file = ".\hadoop1.1.2-thriftfs.rar";
        std::string local_file2 = ".\test.rar";
        std::string rem_file = "hdfs://master:9000/test.txt";
        std::string rem_dir = "hdfs://master:9000/";
        hdfs.connect(host,port);
        std::vector<FileStatus> result;
        hdfs.put(local_file,rem_file);
        //hdfs.append(local_file,rem_file);
        //hdfs.rm(rem_file);
        hdfs.ls(result,rem_dir);
        for (std::vector<FileStatus>::const_iterator itr = result.begin();
            itr != result.end(); itr++)
        {
            printf("%s	%d
    ",itr->path.c_str(),itr->length);
        }
        hdfs.get(rem_file,local_file2);
        getchar();
        return 0;
    }

     4.测试

    4.1安装配置hadoop2.x环境

    (具体步骤参考网络)

    4.2 编写开启服务端程序的脚本

    首先将服务端的java代码编译成功后打包成jar文件(libthrift.jar),放在libthrift文件夹下。

    然后复制hadoop安装目录/etc/hadoop/下的core-site.xml 和 hdfs-site.xml配置文件到脚本所在目录(访问hdfs时使用)(参考:http://blog.csdn.net/kkdelta/article/details/19908209)

    start_thrift_server.sh脚本

    #!/bin/sh
    
    CLASSPATH=
    HADOOP_DIR=/usr/hadoop-2.6.3
    
    # the hadoop common libraries
    for f in $HADOOP_DIR/share/hadoop/common/*.jar ; do
      CLASSPATH=$CLASSPATH:$f
    done
    
    # the apache libraries
    for f in $HADOOP_DIR/share/hadoop/common/lib/*.jar ; do
      CLASSPATH=$CLASSPATH:$f
    done
    
    # the hadoop hdfs libraries
    for f in $HADOOP_DIR/share/hadoop/hdfs/*.jar ; do
      CLASSPATH=$CLASSPATH:$f
    done
    
    # the apache libraries
    for f in $HADOOP_DIR/share/hadoop/hdfs/lib/*.jar ; do
      CLASSPATH=$CLASSPATH:$f
    done
    
    # the thrift libraries
    for f in ./libthrift/*.jar ; do
      CLASSPATH=$CLASSPATH:$f
    done
    
    java -Dcom.sun.management.jmxremote -cp $CLASSPATH org.apache.hadoop.thriftfs.HadoopThriftServer $*

    运行该脚本,记录程序数据的端口号,便于客户端使用。

    测试c++客户端,测试上传、下载等操作是否正常。

  • 相关阅读:
    python3 pyinstaller
    python3 random
    python3 turtle
    产生一个序列的所有排列组合
    蒙特卡洛算法
    lightoj 1014
    UVA11426
    nginx 配置本地https(免费证书)
    ElementUI
    Airbnb 代码规范
  • 原文地址:https://www.cnblogs.com/hikeepgoing/p/5295909.html
Copyright © 2011-2022 走看看