zoukankan      html  css  js  c++  java
  • Thrift辅助类,用于简化Thrift编程

    CThriftServerHelper用于服务端,CThriftClientHelper用于客户端。

    IDL定义:
    service PackageManagerService
    {
    }

    服务端使用示例:
    CThriftServerHelper<CPackageManagerHandler, PackageManagerServiceProcessor> _thrift_server_helper;
    return _thrift_server_helper.serve(FLAGS_package_port, rpc_threads);

    客户端使用示例:
    CThriftClientHelper<PackageManagerServiceClient> thrift_client_helper(FLAGS_package_ip, FLAGS_package_port);

    thrift_client_helper.connect(); // 注意需要处理异常TTransportException/TApplicationException/TException

    #include <arpa/inet.h>
    #include <thrift/concurrency/PosixThreadFactory.h>
    #include <thrift/concurrency/ThreadManager.h>
    #include <thrift/protocol/TBinaryProtocol.h>
    #include <thrift/server/TNonblockingServer.h>
    #include <thrift/transport/TSocketPool.h>
    #include <thrift/transport/TTransportException.h>
    #include <thrift/async/TAsyncChannel.h>
    
    using namespace apache;
    
    // 用来判断thrift是否已经连接,包括两种情况:
    // 1.从未连接过,也就是还未打开过连接
    // 2.连接被对端关闭了
    inline bool thrift_not_connected(
            thrift::transport::TTransportException::TTransportExceptionType type)
    {
        return (thrift::transport::TTransportException::NOT_OPEN == type)
            || (thrift::transport::TTransportException::END_OF_FILE == type);
    }
    
    // 封装对thrift服务端的公共操作
    template <class ThriftHandler, class ServiceProcessor>
    class CThriftServerHelper
    {
    public:
        // 启动rpc服务,请注意该调用是同步阻塞的,所以需放最后调用
        bool serve(uint16_t port);
        bool serve(uint16_t port, uint8_t num_threads);
        bool serve(const std::string& ip, uint16_t port, uint8_t num_threads);
        void stop();
    
    private:
        boost::shared_ptr<ThriftHandler> _handler;
        boost::shared_ptr<thrift::TProcessor> _processor;
        boost::shared_ptr<thrift::protocol::TProtocolFactory> _protocol_factory;
        boost::shared_ptr<thrift::server::ThreadManager> _thread_manager;
        boost::shared_ptr<thrift::concurrency::PosixThreadFactory> _thread_factory;
        boost::shared_ptr<thrift::server::TServer> _server;
    };
    
    // 封装对thrift客户端的公共操作
    template <class ThriftClient>
    class CThriftClientHelper
    {
    public:
        CThriftClientHelper(const std::string& host, uint16_t port
                          , int timeout = RPC_TIMEOUT);
        ~CThriftClientHelper();
        void connect();
        void close();
    
        ThriftClient* operator ->() const
        {
            return _container_client.get();
        }
    
        ThriftClient* get()
        {
            return _container_client.get();
        }
    
    private:
        std::string _host;
        uint16_t _port;
        int _timeout;
        boost::shared_ptr<thrift::transport::TSocketPool> _sock_pool;
        boost::shared_ptr<thrift::transport::TTransport> _socket;
        boost::shared_ptr<thrift::transport::TFramedTransport> _transport;
        boost::shared_ptr<thrift::protocol::TProtocol> _protocol;
        boost::shared_ptr<ThriftClient> _container_client;
    };
    
    ///////////////////////////////////////////////////////////////////////////////
    
    template <class ThriftHandler, class ServiceProcessor>
    bool CThriftServerHelper<ThriftHandler, ServiceProcessor>::serve(uint16_t port)
    {
        return serve("0.0.0.0", port, 1);
    }
    
    template <class ThriftHandler, class ServiceProcessor>
    bool CThriftServerHelper<ThriftHandler, ServiceProcessor>::serve(uint16_t port, uint8_t num_threads)
    {
        return serve("0.0.0.0", port, num_threads);
    }
    
    template <class ThriftHandler, class ServiceProcessor>
    bool CThriftServerHelper<ThriftHandler, ServiceProcessor>::serve(const std::string& ip, uint16_t port, uint8_t num_threads)
    {
        try
        {
            _handler.reset(new ThriftHandler);
            _processor.reset(new ServiceProcessor(_handler));
            _protocol_factory.reset(new thrift::protocol::TBinaryProtocolFactory());
            _thread_manager = thrift::server::ThreadManager::newSimpleThreadManager(num_threads);
            _thread_factory.reset(new thrift::concurrency::PosixThreadFactory());
    
            _thread_manager->threadFactory(_thread_factory);
            _thread_manager->start();
    
            _server.reset(new thrift::server::TNonblockingServer(
                    _processor, _protocol_factory, port, _thread_manager));
    
            _server->serve();
        }
        catch (thrift::TException& tx)
        {
            LOG(ERROR) << "start thrift error: " << tx.what();
            return false;
        }
    
        LOG(INFO) << "container-thrift start";
        return true;
    }
    
    template <class ThriftHandler, class ServiceProcessor>
    void CThriftServerHelper<ThriftHandler, ServiceProcessor>::stop()
    {
        _server->stop();
    }
    
    ///////////////////////////////////////////////////////////////////////////////
    
    template <class ThriftClient>
    CThriftClientHelper<ThriftClient>::CThriftClientHelper(
            const std::string& host, uint16_t port, int timeout)
            : _host(host)
            , _port(port)
            , _timeout(timeout)
    {
        _sock_pool.reset(new thrift::transport::TSocketPool());
        _sock_pool->addServer(host, port);
        _sock_pool->setConnTimeout(timeout);
        _sock_pool->setRecvTimeout(timeout);
        _sock_pool->setSendTimeout(timeout);
        
        _socket = _sock_pool;
        _transport.reset(new thrift::transport::TFramedTransport(_socket));
        _protocol.reset(new thrift::protocol::TBinaryProtocol(_transport));
    
        _container_client.reset(new ThriftClient(_protocol));
    }
    
    template <class ThriftClient>
    CThriftClientHelper<ThriftClient>::~CThriftClientHelper()
    {
        close();
    }
    
    template <class ThriftClient>
    void CThriftClientHelper<ThriftClient>::connect()
    {
        if (!_transport->isOpen())
        {
            _transport->open();
        }
    }
    
    template <class ThriftClient>
    void CThriftClientHelper<ThriftClient>::close()
    {
        if (_transport->isOpen())
        {
            _transport->close();
        }
    }


  • 相关阅读:
    173. Binary Search Tree Iterator
    199. Binary Tree Right Side View
    230. Kth Smallest Element in a BST
    236. Lowest Common Ancestor of a Binary Tree
    337. House Robber III
    449. Serialize and Deserialize BST
    508. Most Frequent Subtree Sum
    513. Find Bottom Left Tree Value
    129. Sum Root to Leaf Numbers
    652. Find Duplicate Subtrees
  • 原文地址:https://www.cnblogs.com/aquester/p/9891605.html
Copyright © 2011-2022 走看看