zoukankan      html  css  js  c++  java
  • 基于Httpfs访问HDFS的C++实现

           Httpfs是hadoop2.x中hdfs项目的内置应用,基于tomcat和jesery,对外提供完备HDFS操作的RESTful接口,无需安装客户端,可方便实现数据交互,如从windows访问存储在hdfs上的文件。本文通过Httpfs说明文档,实现了一个基于libcurl和jsoncpp的httpfs客户端程序(C++)。

          1.准备工作

      1.1 编译jsoncpp

         jsoncpp下载地址:https://codeload.github.com/open-source-parsers/jsoncpp/zip/master

          使用VS2010打开jsoncpp解压文件夹/makefiles/msvc2010/jsoncpp.sln,选择lib_json,设置项目的属性。具体设置为:1)常规里设置配置类型为.lib,使用多字节字符集C/C++->代码生成中的代码生成选择 /MD(release) /MDd(debug)。编译环境必须与我们开发的工程一致!!!


        1.2编译libcurl

         libcurl下载地址:https://curl.haxx.se/download/curl-7.47.1.tar.gz

          打开curl解压目录projectsWindowsVC10curl-all.sln ,选择lib_debug和lib_release编译。vs2010引用静态链接失败解决:

          1)给工程添加依赖的库:项目->属性->链接器->输入->附加依赖项,把libcurl.lib ws2_32.lib winmm.lib wldap32.lib添加进去(注意,debug配置用libcurld.lib).

          2)、加入预编译选项:项目->属性->c/c++ ->预处理器->预处理器,把;BUILDING_LIBCURL;HTTP_ONLY复制进去(注意不要丢了;)

         解决方案来自网络“vc2010使用libcurl静态库 遇到连接失败的解决方案”

        1.3设置头文件引用

         在工程路径下创建一个include目录,将libcurl和jsoncpp中的include文件夹下的文件复制到该include文件夹下,设置为vc++目录引用路径。

      

    2.代码实现

     HttpfsClient.H

    #pragma once
    #include <string>
    #include <vector>
    using namespace std;
    
    typedef struct FileStatus {
        __int64 accessTime;
        __int64 blocksize;
        string group;
        __int64 length;
        __int64 modificationTime;
        string owner;
        string pathSuffix;
        string permission;
        int replication;
        string type;
    }FileStatus;
    
    class CHttpFSClient
    {
    private:
        string m_hostaddr;    //http://<HOST>:<PORT>/webhdfs/v1/
        string m_username;  //i.e. hadoop
        long m_timeout;
        long m_conntimeout;
    public:
        enum HTTP_TYPE{GET=0,PUT,POST,DEL};
    public:
        CHttpFSClient(string& hostaddr,string& username);
        ~CHttpFSClient(void);
        bool create(string& local_file,string& rem_file,bool overwrite = false);
        bool append(string& local_file,string& rem_file);
        bool mkdirs(string& path);
        bool rename(string& src,string& dst);
        bool del(string& path, bool recursive=false);
        bool read(string& rem_file,string& local_file, long offset=0, long length=0);
        bool ls(string& rem_path,vector<FileStatus>& results);
    protected:
        static size_t fileread_callback(void *ptr, size_t size, size_t nmemb, void *stream);
        static size_t filewrite_data(const char *ptr, size_t size, size_t nmemb, void *stream);
        static size_t memwrite_data(const char *contents, size_t size, size_t nmemb, string *stream);   
        static size_t header_callback(const char  *ptr, size_t size, size_t nmemb, std::string *stream);
    
    
        void showFileStatus(vector<FileStatus>& results);
    };

    HttpfsClient.cpp

    // HttpfsClient.cpp : 定义控制台应用程序的入口点。
    //
    
    #include "stdafx.h"
    #include "HttpfsClient.h"
    #include <assert.h>
    #include <stdio.h>
    #include <fcntl.h>
    #include <sys/stat.h>
    #include <curl/curl.h>
    #include <json/json.h>
    #include <iostream>
    #include <fstream>
    using namespace std;
    
    CHttpFSClient::CHttpFSClient(string& hostaddr,string& username)
    {
        m_hostaddr = hostaddr;
        m_username = username;
        m_timeout = 5184000;
        m_conntimeout = 120;
        /* In windows, this will init the winsock stuff */
        curl_global_init(CURL_GLOBAL_ALL);
    }
    
    
    CHttpFSClient::~CHttpFSClient(void)
    {
        curl_global_cleanup();
    }
    
    /*
        Create and Write to a File
        @param local_file string
        @param rem_file string
        @param overwirte: ture,false
        @return true/false
        
        Step 1: Submit a HTTP PUT request without automatically following redirects and without sending the file data.
        curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE                        [&overwrite=<true|false>][&blocksize=<LONG>][&replication=<SHORT>]
                            [&permission=<OCTAL>][&buffersize=<INT>]"
        The request is redirected to a datanode where the file data is to be written:
    
        HTTP/1.1 307 TEMPORARY_REDIRECT
        Location: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE...
        Content-Length: 0
        Step 2: Submit another HTTP PUT request using the URL in the Location header with the file data to be written.
        curl -i -X PUT -T <LOCAL_FILE> "http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE..."
        The client receives a 201 Created response with zero content length and the WebHDFS URI of the file in the Location header:
    
        HTTP/1.1 201 Created
        Location: webhdfs://<HOST>:<PORT>/<PATH>
        Content-Length: 0
    */
    bool CHttpFSClient::create(string& local_file,string& rem_file,bool overwrite)
    {
        string url = m_hostaddr + rem_file + "?op=CREATE&user.name="+m_username;
        if(overwrite) url += "&overwrite=true";
    
        string szheader_buffer;
        char* redir_url;
        string strredir_url;
        long response_code=0;
        bool curlerr = false;
    
        CURL *curl;
        CURLcode res;
    
        // get a curl handle
        curl = curl_easy_init();
        if(curl) {
            curl_easy_setopt(curl, CURLOPT_PUT, 1L);
            curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
            curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
            curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_timeout);
            curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_conntimeout);
            curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 0L);
            curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0); //上传的字节数 
    
            res = curl_easy_perform(curl);
            // Check for errors
            if(res != CURLE_OK)
            {
                fprintf(stderr, "hdfs create first request failed: %s
    ",
                curl_easy_strerror(res));
                curlerr = true;
            }
            else
            {
                res = curl_easy_getinfo(curl,CURLINFO_REDIRECT_URL,&redir_url);
                if(res != CURLE_OK)
                {
                    fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_REDIRECT_URL failed: %s
    ",
                        curl_easy_strerror(res));
                    curlerr = true;
                }
                strredir_url = redir_url;
            }        
            
        }
        // always cleanup!!!!
        curl_easy_cleanup(curl);
        if(curlerr)
            return false;
    
        //upload file to hdfs
        struct stat file_info;
        // get the file size of the local file
        stat(local_file.c_str(), &file_info);
        FILE * hd_src;
        hd_src = fopen(local_file.c_str(), "rb");
        if(GetLastError() != 0)
            return false;
    
        struct curl_slist *headers = NULL;
        headers = curl_slist_append(headers, "Content-Type:application/octet-stream");
        headers = curl_slist_append(headers, "Content-Type:application/octet-stream");
        curl = curl_easy_init();
        if(curl) {
            // we want to use our own read function
            curl_easy_setopt(curl, CURLOPT_READFUNCTION, CHttpFSClient::fileread_callback);
            // enable uploading
            curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
            // HTTP PUT please
            curl_easy_setopt(curl, CURLOPT_PUT, 1L);
            // specify target URL, and note that this URL should include a file    name, not only a directory
            curl_easy_setopt(curl, CURLOPT_URL, strredir_url.c_str());
            // specify content type
            curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
            // now specify which file to upload
            curl_easy_setopt(curl, CURLOPT_READDATA, hd_src);
            // provide the size of the upload, we specicially typecast the value to curl_off_t 
            // since we must be sure to use the correct data size
            curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE,
                (curl_off_t)file_info.st_size);
    
            // Now run off and do what you've been told!
            res = curl_easy_perform(curl);
            // Check for errors
            if(res != CURLE_OK)
            {
                fprintf(stderr, "upload file to hdfs failed: %s
    ",
                curl_easy_strerror(res));
                curlerr = true;
            }
        }
        fclose(hd_src); // close the local file 
    
        // always cleanup!!!!
        curl_slist_free_all(headers);
        curl_easy_cleanup(curl);
        if(curlerr)
            return false;
    
        return true;
    }
    
    /*
        Append to a File
        @param local_file string
        @param rem_file string
        @return true/false
        
        Step 1: Submit a HTTP POST request without automatically following redirects and without sending the file data.
        curl -i -X POST "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND[&buffersize=<INT>]"
        The request is redirected to a datanode where the file data is to be appended:
    
        HTTP/1.1 307 TEMPORARY_REDIRECT
        Location: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=APPEND...
        Content-Length: 0
        Step 2: Submit another HTTP POST request using the URL in the Location header with the file data to be appended.
        curl -i -X POST -T <LOCAL_FILE> "http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=APPEND..."
        The client receives a response with zero content length:
    
        HTTP/1.1 200 OK
        Content-Length: 0
    */
    bool CHttpFSClient::append(string& local_file,string& rem_file)
    {
        string url = m_hostaddr + rem_file + "?op=APPEND&user.name="+m_username;
    
        char* redir_url;
        string strredir_url;
        long response_code=0;
        bool curlerr = false;
        
        CURL *curl;
        CURLcode res;
    
        // get a curl handle
        curl = curl_easy_init();
        if(curl) {
            curl_easy_setopt(curl, CURLOPT_POST, 1L);
            curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
            curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_timeout);
            curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 0L);
            curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_conntimeout);
            curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, 0);
    
            res = curl_easy_perform(curl);
            // Check for errors
            if(res != CURLE_OK)
            {
                fprintf(stderr, "hdfs append first request failed: %s
    ",
                    curl_easy_strerror(res));
                curlerr = true;
            }
            else
            {
                res = curl_easy_getinfo(curl,CURLINFO_REDIRECT_URL,&redir_url);
                if(res != CURLE_OK)
                {
                    fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_REDIRECT_URL failed: %s
    ",
                        curl_easy_strerror(res));
                    curlerr = true;
                }
                strredir_url = redir_url;
            }        
    
        }
        // always cleanup!!!!
        curl_easy_cleanup(curl);
        if(curlerr)
            return false;
    
        // append file to hdfs
        struct curl_slist *headers = NULL;
        headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
        curl = curl_easy_init();
        if(curl) {
            curl_easy_setopt(curl, CURLOPT_POST, 1L);
            curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
            curl_easy_setopt(curl, CURLOPT_URL, strredir_url.c_str());    
            //curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); 
    
            /*//multipart/formdata请求
            struct curl_httppost *formpost = NULL;
            struct curl_httppost *lastptr  = NULL;
            curl_formadd(&formpost, &lastptr, CURLFORM_COPYNAME, "file", CURLFORM_FILE, local_file.c_str(), CURLFORM_CONTENTTYPE, "application/octet-stream", CURLFORM_END);
            curl_easy_setopt(curl, CURLOPT_HTTPPOST, formpost);*/
            
            //C++代码一次读取文本文件全部内容到string对象
            ifstream fin(local_file.c_str(),ios::in);
            istreambuf_iterator<char> beg(fin), end;
            string strdata(beg, end);
            fin.close();
            curl_easy_setopt(curl,CURLOPT_POSTFIELDS,strdata.c_str());
    
            res = curl_easy_perform(curl);
            //curl_formfree(formpost);
            // Check for errors
            if(res != CURLE_OK)
            {
                fprintf(stderr, "append file to hdfs failed: %s
    ",
                    curl_easy_strerror(res));
                curlerr = true;
            }
            else
            {
                res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code);
                if(res != CURLE_OK)
                {
                    fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s
    ",
                        curl_easy_strerror(res));
                    curlerr = true;
                }
            }
        }
    
        // always cleanup!!!!
        curl_slist_free_all(headers);
        curl_easy_cleanup(curl);
        if(curlerr)
            return false;
    
        if(response_code == 200)
            return true;
        else
            return false;
    }
    
    /*
        Make a Directory
        
        Submit a HTTP PUT request.
        curl -i -X PUT "http://<HOST>:<PORT>/<PATH>?op=MKDIRS[&permission=<OCTAL>]"
        The client receives a response with a boolean JSON object:
    
        HTTP/1.1 200 OK
        Content-Type: application/json
        Transfer-Encoding: chunked
    
        {"boolean": true}
    */
    bool CHttpFSClient::mkdirs(string& path)
    {
        string url = m_hostaddr + path + "?op=MKDIRS&user.name="+m_username;
    
        long response_code=0;
        long headerlen = 0;
        bool curlerr = false;
        string response_contents;
    
        CURL *curl;
        CURLcode res;
    
        // get a curl handle
        curl = curl_easy_init();
        if(curl) {
            // http put
            curl_easy_setopt(curl, CURLOPT_PUT, 1L);
            curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
            curl_easy_setopt(curl, CURLOPT_HEADER, 1L);
            curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_timeout);
            curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_conntimeout);
            curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::memwrite_data);
            curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response_contents);
            curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0);
    
            res = curl_easy_perform(curl);
            // Check for errors
            if(res != CURLE_OK)
            {
                fprintf(stderr, "hdfs mkdirs failed: %s
    ",
                    curl_easy_strerror(res));
                curlerr = true;
            }
            else
            {
                res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code);
                if(res != CURLE_OK)
                {
                    fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s
    ",
                        curl_easy_strerror(res));
                    curlerr = true;
                }
                res = curl_easy_getinfo(curl,CURLINFO_HEADER_SIZE,&headerlen);
                if(res != CURLE_OK)
                {
                    fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_HEADER_SIZE failed: %s
    ",
                        curl_easy_strerror(res));
                    curlerr = true;
                }
            }        
    
        }
        // always cleanup!!!!
        curl_easy_cleanup(curl);
        if(curlerr)
            return false;
    
        if(response_code == 200)
        {
            Json::Reader reader;
            Json::Value root;
            const char *content = response_contents.c_str();
            if(!reader.parse(content+headerlen,content+response_contents.length(),root,false))
                return false;
    
            return root["boolean"].asBool();
        }
        else
            return false;
    }
    
    /*
    Rename a File/Directory
    Submit a HTTP PUT request. 
    curl -i -X PUT "<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>"
    
    The client receives a response with a boolean JSON object:
    
    HTTP/1.1 200 OK
    Content-Type: application/json
    Transfer-Encoding: chunked
    
    {"boolean": true}
    */
    bool CHttpFSClient::rename(string& src,string& dst)
    {
        string url = m_hostaddr + src + "?op=RENAME&user.name="+m_username+"&destination="+dst;
    
        long response_code=0;
        long headerlen = 0;
        bool curlerr = false;
        string response_contents;
    
        CURL *curl;
        CURLcode res;
    
        // get a curl handle
        curl = curl_easy_init();
        if(curl) {
            // http put
            curl_easy_setopt(curl, CURLOPT_PUT, 1L);
            curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
            curl_easy_setopt(curl, CURLOPT_HEADER, 1L);
            curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_timeout);
            curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_conntimeout);
            curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::memwrite_data);
            curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response_contents);
            curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0);
    
            res = curl_easy_perform(curl);
            // Check for errors
            if(res != CURLE_OK)
            {
                fprintf(stderr, "hdfs rename failed: %s
    ",
                    curl_easy_strerror(res));
                curlerr = true;
            }
            else
            {
                res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code);
                if(res != CURLE_OK)
                {
                    fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s
    ",
                        curl_easy_strerror(res));
                    curlerr = true;
                }
                res = curl_easy_getinfo(curl,CURLINFO_HEADER_SIZE,&headerlen);
                if(res != CURLE_OK)
                {
                    fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_HEADER_SIZE failed: %s
    ",
                        curl_easy_strerror(res));
                    curlerr = true;
                }
            }        
    
        }
        // always cleanup!!!!
        curl_easy_cleanup(curl);
        if(curlerr)
            return false;
    
        if(response_code == 200)
        {
            Json::Reader reader;
            Json::Value root;
            const char *content = response_contents.c_str();
            if(!reader.parse(content+headerlen,content+response_contents.length(),root,false))
                return false;
    
            return root["boolean"].asBool();
        }
        else
            return false;
    }
    
    /*
        Delete a File/Directory
        @param file string, the file or directory to be deleted
        @return ture/false
        
        Submit a HTTP DELETE request
        curl -i -X DELETE "http://<host>:<port>/webhdfs/v1/<path>?op=DELETE
                                      [&recursive=<true|false>]"
        The client receives a response with a boolean JSON object:
    
        HTTP/1.1 200 OK
        Content-Type: application/json
        Transfer-Encoding: chunked
    
        {"boolean": true}
    */    
    bool CHttpFSClient::del(string& path, bool recursive)
    {
        string url = m_hostaddr + path + "?op=DELETE&user.name="+m_username;
        if(recursive) url+="&recursive=true";
    
        string response_contents;
        char redir_url[100];
        long response_code=0;
        long headerlen = 0;
        bool curlerr = false;
    
        CURL *curl;
        CURLcode res;
    
        // get a curl handle
        curl = curl_easy_init();
        if(curl) {
            // Set the DELETE command
            curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
            curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
            curl_easy_setopt(curl, CURLOPT_HEADER, 1L);
            curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_timeout);
            curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_conntimeout);
            curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::memwrite_data);
            curl_easy_setopt(curl,CURLOPT_WRITEDATA,&response_contents);
    
            res = curl_easy_perform(curl);
            // Check for errors
            if(res != CURLE_OK)
            {
                fprintf(stderr, "hdfs del failed: %s
    ",
                    curl_easy_strerror(res));
                curlerr = true;
            }
            else
            {
                res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code);
                if(res != CURLE_OK)
                {
                    fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s
    ",
                        curl_easy_strerror(res));
                    curlerr = true;
                }
                res = curl_easy_getinfo(curl,CURLINFO_HEADER_SIZE,&headerlen);
                if(res != CURLE_OK)
                {
                    fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_HEADER_SIZE failed: %s
    ",
                        curl_easy_strerror(res));
                    curlerr = true;
                }
            }        
    
        }
        // always cleanup!!!!
        curl_easy_cleanup(curl);
        if(curlerr)
            return false;
    
        if(response_code == 200)
        {
            Json::Reader reader;
            Json::Value root;
            const char *content = response_contents.c_str();
            if(!reader.parse(content+headerlen,content+response_contents.length(),root,false))
                return false;
    
            return root["boolean"].asBool();
        }
        else
            return false;
    }
    
    /*
        Open and Read a File of remote an write to local_file
        @param @remote_file
        @param @local_file
        
        Submit a HTTP GET request with automatically following redirects.
        curl -i -L "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN
                            [&offset=<LONG>][&length=<LONG>][&buffersize=<INT>]"
        The request is redirected to a datanode where the file data can be read:
    
        HTTP/1.1 307 TEMPORARY_REDIRECT
        Location: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=OPEN...
        Content-Length: 0
        The client follows the redirect to the datanode and receives the file data:
    
        HTTP/1.1 200 OK
        Content-Type: application/octet-stream
        Content-Length: 22
    
        Hello, webhdfs user!
    */
    bool CHttpFSClient::read(string& rem_file,string& local_file, long offset, long length)
    {
        char url[200];
        if(offset != 0 && length != 0)
            sprintf_s(url,200,"%s%s?op=OPEN&user.name=%s&offset=%ld&length=%ld",m_hostaddr.c_str(),rem_file.c_str(),m_username.c_str(),offset,length);
        else
            sprintf_s(url,200,"%s%s?op=OPEN&user.name=%s",m_hostaddr.c_str(),rem_file.c_str(),m_username.c_str());
    
        long response_code=0;
        bool curlerr = false;
    
        CURL *curl;
        CURLcode res;
    
        // get a curl handle
        curl = curl_easy_init();
        if(curl) {
            // HTTP GET please
            curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L);
            // specify target URL, and note that this URL should include a file    name, not only a directory
            curl_easy_setopt(curl, CURLOPT_URL, url);
            /* send all data to this function  */
            curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::filewrite_data);
    
            FILE * pagefile;
            pagefile = fopen(local_file.c_str(), "wb");
            if(GetLastError() != 0)
                return false;
    
            // write the page body to this file handle
            curl_easy_setopt(curl, CURLOPT_WRITEDATA, pagefile);
    
            // Now run off and do what you've been told!
            res = curl_easy_perform(curl);
            // Check for errors
            if(res != CURLE_OK)
            {
                fprintf(stderr, "get file from hdfs failed: %s
    ",
                    curl_easy_strerror(res));
                curlerr = true;
            }
    
            fclose(pagefile); // close the local file 
        }
    
        // always cleanup!!!!
        curl_easy_cleanup(curl);
        if(curlerr)
            return false;
    
        return true;
    }
    
    /*
        list a directory
        @param $dir string, the dir to list
        @return json object
        
        Submit a HTTP GET request.
        curl -i  "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS"
        The client receives a response with a FileStatuses JSON object:
    
        HTTP/1.1 200 OK
        Content-Type: application/json
        Content-Length: 427
    
        {
          "FileStatuses":
          {
            "FileStatus":
            [
              {
                "accessTime"      : 1320171722771,
                "blockSize"       : 33554432,
                "group"           : "supergroup",
                "length"          : 24930,
                "modificationTime": 1320171722771,
                "owner"           : "webuser",
                "pathSuffix"      : "a.patch",
                "permission"      : "644",
                "replication"     : 1,
                "type"            : "FILE"
              },
              {
                "accessTime"      : 0,
                "blockSize"       : 0,
                "group"           : "supergroup",
                "length"          : 0,
                "modificationTime": 1320895981256,
                "owner"           : "szetszwo",
                "pathSuffix"      : "bar",
                "permission"      : "711",
                "replication"     : 0,
                "type"            : "DIRECTORY"
              },
              ...
            ]
          }
        }
        */
    bool CHttpFSClient::ls(string& rem_path,vector<FileStatus>& results)
    {
        string url = m_hostaddr + rem_path + "?op=LISTSTATUS&user.name="+m_username;
        
        long response_code=0;
        long headerlen = 0;
        bool curlerr = false;
        string response_contents;
    
        CURL *curl;
        CURLcode res;
    
        // get a curl handle
        curl = curl_easy_init();
        if(curl) {
            curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L);
            curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
            curl_easy_setopt(curl, CURLOPT_HEADER, 1L);
            curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_timeout);
            curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_conntimeout);
            curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::memwrite_data);
            curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response_contents);
            res = curl_easy_perform(curl);
            // Check for errors
            if(res != CURLE_OK)
            {
                fprintf(stderr, "hdfs GETFILESTATUS failed: %s
    ",
                    curl_easy_strerror(res));
                curlerr = true;
            }
            else
            {
                res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code);
                if(res != CURLE_OK)
                {
                    fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s
    ",
                        curl_easy_strerror(res));
                    curlerr = true;
                }
                res = curl_easy_getinfo(curl,CURLINFO_HEADER_SIZE,&headerlen);
                if(res != CURLE_OK)
                {
                    fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_HEADER_SIZE failed: %s
    ",
                        curl_easy_strerror(res));
                    curlerr = true;
                }
            }        
    
        }
        // always cleanup!!!!
        curl_easy_cleanup(curl);
        if(curlerr)
            return false;
    
        if(response_code == 200)
        {
            Json::Reader reader;
            Json::Value root;
            const char *content = response_contents.c_str();
            if(!reader.parse(content+headerlen,content+response_contents.length(),root,false))
                return false;
    
            if(root.empty()) return false;
            Json::Value FileStatuses = root.get("FileStatuses",Json::nullValue);
            if(FileStatuses == Json::nullValue) return false;
            Json::Value FileStatusVec = FileStatuses.get("FileStatus",Json::nullValue);
            if(FileStatusVec == Json::nullValue) return false;
            results.clear();
            int size = FileStatusVec.size();
            for (int i=0; i<size; ++i)
            {
                FileStatus fst;
                fst.accessTime = FileStatusVec[i]["accessTime"].asInt64();
                fst.blocksize = FileStatusVec[i]["blockSize"].asInt64();
                fst.group = FileStatusVec[i]["group"].asString();
                fst.length = FileStatusVec[i]["length"].asInt64();
                fst.modificationTime = FileStatusVec[i]["modificationTime"].asInt64();            
                fst.owner = FileStatusVec[i]["owner"].asString();
                fst.pathSuffix = FileStatusVec[i]["pathSuffix"].asString();
                fst.permission = FileStatusVec[i]["permission"].asString();
                fst.replication = FileStatusVec[i]["replication"].asInt();
                fst.type = FileStatusVec[i]["type"].asString();
                
                results.push_back(fst);
            }
            showFileStatus(results);
    
            return true;
        }
        else
            return false;
    }
    
    void CHttpFSClient::showFileStatus(vector<FileStatus>& results)
    {
        //print result
        printf("path	owner	length	rep
    ");
        for (vector<FileStatus>::const_iterator itr = results.begin();itr != results.end(); itr++)
        {
            printf("%s	%s	%ld	%d
    ",itr->pathSuffix.c_str(),itr->owner.c_str(),itr->length,itr->replication);
        }
    
    }
    
    size_t CHttpFSClient::fileread_callback(void *ptr, size_t size, size_t nmemb, void *stream)
    {
      size_t retcode;
      curl_off_t nread;
    
      /* in real-world cases, this would probably get this data differently
         as this fread() stuff is exactly what the library already would do
         by default internally */
      retcode = fread(ptr, size, nmemb, (FILE *)stream);
    
      nread = (curl_off_t)retcode;
    
      fprintf(stderr, "*** We read %" CURL_FORMAT_CURL_OFF_T
              " bytes from file
    ", nread);
    
      return retcode;
    }
    
    size_t CHttpFSClient::filewrite_data(const char *ptr, size_t size, size_t nmemb, void *stream)
    {
        size_t written = fwrite(ptr, size, nmemb, (FILE *)stream);
        return written;
    }
    
    size_t CHttpFSClient::memwrite_data(const char *contents, size_t size, size_t nmemb, string *stream)
    {
        assert(stream != NULL);  
        size_t len  = size * nmemb;  
        stream->append(contents, len);  
        return len;
    }
    size_t CHttpFSClient::header_callback(const char  *ptr, size_t size, size_t nmemb, std::string *stream) 
    {    
        assert(stream != NULL);  
        size_t len  = size * nmemb;  
        stream->append(ptr, len);  
        return len;  
    } 
    
    int main(int argc, _TCHAR* argv[])
    {
        string hostaddr = "http://192.168.0.111:14000/webhdfs/v1";
        string username = "hadoop";
        CHttpFSClient httpfs(hostaddr,username);
        vector<FileStatus> results;
        string local_file = ".\test.docx";
        string rem_path = "/test.docx";
        //httpfs.create(local_file,rem_path);
        //httpfs.append(local_file,rem_path);
        httpfs.read(rem_path,local_file);
        //httpfs.ls(rem_path,results);
        //httpfs.del(rem_path);
        
        getchar();
        return 0;
    }

    3.工程代码下载

     https://files.cnblogs.com/files/hikeepgoing/HttpfsClient.rar

  • 相关阅读:
    OpenCv 人脸识别 基础
    C++ 0x std::async 的应用
    C++ 0x 使用condition_variable 与 Mutex 同步两个线程
    Unity C# 调用 C++ DLL 并在 DLL 中调用 C# 的回调函数
    C++ 11 中的 Lambda 表达式的使用
    DirectShow 制作在Unity3D中可以设置进度的视频播放插件
    Async/Await 如何通过同步的方式实现异步
    React Native 开源项目汇总
    ES6 Promise的理解与简单实现(链接)
    深刻理解BFC(链接)
  • 原文地址:https://www.cnblogs.com/hikeepgoing/p/5294276.html
Copyright © 2011-2022 走看看