zoukankan      html  css  js  c++  java
  • Thrift辅助类,用于简化Thrift编程

    CThriftServerHelper用于服务端,CThriftClientHelper用于客户端。

    IDL定义:
    service PackageManagerService
    {
    }

    服务端使用示例:
    CThriftServerHelper<CPackageManagerHandler, PackageManagerServiceProcessor> _thrift_server_helper;
    return _thrift_server_helper.serve(FLAGS_package_port, rpc_threads);

    客户端使用示例:
    CThriftClientHelper<PackageManagerServiceClient> thrift_client_helper(FLAGS_package_ip, FLAGS_package_port);

    thrift_client_helper.connect(); // 注意需要处理异常TTransportException/TApplicationException/TException

    #include <arpa/inet.h>
    #include <thrift/concurrency/PosixThreadFactory.h>
    #include <thrift/concurrency/ThreadManager.h>
    #include <thrift/protocol/TBinaryProtocol.h>
    #include <thrift/server/TNonblockingServer.h>
    #include <thrift/transport/TSocketPool.h>
    #include <thrift/transport/TTransportException.h>
    #include <thrift/async/TAsyncChannel.h>
    
    using namespace apache;
    
    // 用来判断thrift是否已经连接,包括两种情况:
    // 1.从未连接过,也就是还未打开过连接
    // 2.连接被对端关闭了
    inline bool thrift_not_connected(
            thrift::transport::TTransportException::TTransportExceptionType type)
    {
        return (thrift::transport::TTransportException::NOT_OPEN == type)
            || (thrift::transport::TTransportException::END_OF_FILE == type);
    }
    
    // 封装对thrift服务端的公共操作
    template <class ThriftHandler, class ServiceProcessor>
    class CThriftServerHelper
    {
    public:
        // 启动rpc服务,请注意该调用是同步阻塞的,所以需放最后调用
        bool serve(uint16_t port);
        bool serve(uint16_t port, uint8_t num_threads);
        bool serve(const std::string& ip, uint16_t port, uint8_t num_threads);
        void stop();
    
    private:
        boost::shared_ptr<ThriftHandler> _handler;
        boost::shared_ptr<thrift::TProcessor> _processor;
        boost::shared_ptr<thrift::protocol::TProtocolFactory> _protocol_factory;
        boost::shared_ptr<thrift::server::ThreadManager> _thread_manager;
        boost::shared_ptr<thrift::concurrency::PosixThreadFactory> _thread_factory;
        boost::shared_ptr<thrift::server::TServer> _server;
    };
    
    // 封装对thrift客户端的公共操作
    template <class ThriftClient>
    class CThriftClientHelper
    {
    public:
        CThriftClientHelper(const std::string& host, uint16_t port
                          , int timeout = RPC_TIMEOUT);
        ~CThriftClientHelper();
        void connect();
        void close();
    
        ThriftClient* operator ->() const
        {
            return _container_client.get();
        }
    
        ThriftClient* get()
        {
            return _container_client.get();
        }
    
    private:
        std::string _host;
        uint16_t _port;
        int _timeout;
        boost::shared_ptr<thrift::transport::TSocketPool> _sock_pool;
        boost::shared_ptr<thrift::transport::TTransport> _socket;
        boost::shared_ptr<thrift::transport::TFramedTransport> _transport;
        boost::shared_ptr<thrift::protocol::TProtocol> _protocol;
        boost::shared_ptr<ThriftClient> _container_client;
    };
    
    ///////////////////////////////////////////////////////////////////////////////
    
    template <class ThriftHandler, class ServiceProcessor>
    bool CThriftServerHelper<ThriftHandler, ServiceProcessor>::serve(uint16_t port)
    {
        return serve("0.0.0.0", port, 1);
    }
    
    template <class ThriftHandler, class ServiceProcessor>
    bool CThriftServerHelper<ThriftHandler, ServiceProcessor>::serve(uint16_t port, uint8_t num_threads)
    {
        return serve("0.0.0.0", port, num_threads);
    }
    
    template <class ThriftHandler, class ServiceProcessor>
    bool CThriftServerHelper<ThriftHandler, ServiceProcessor>::serve(const std::string& ip, uint16_t port, uint8_t num_threads)
    {
        try
        {
            _handler.reset(new ThriftHandler);
            _processor.reset(new ServiceProcessor(_handler));
            _protocol_factory.reset(new thrift::protocol::TBinaryProtocolFactory());
            _thread_manager = thrift::server::ThreadManager::newSimpleThreadManager(num_threads);
            _thread_factory.reset(new thrift::concurrency::PosixThreadFactory());
    
            _thread_manager->threadFactory(_thread_factory);
            _thread_manager->start();
    
            _server.reset(new thrift::server::TNonblockingServer(
                    _processor, _protocol_factory, port, _thread_manager));
    
            _server->serve();
        }
        catch (thrift::TException& tx)
        {
            LOG(ERROR) << "start thrift error: " << tx.what();
            return false;
        }
    
        LOG(INFO) << "container-thrift start";
        return true;
    }
    
    template <class ThriftHandler, class ServiceProcessor>
    void CThriftServerHelper<ThriftHandler, ServiceProcessor>::stop()
    {
        _server->stop();
    }
    
    ///////////////////////////////////////////////////////////////////////////////
    
    template <class ThriftClient>
    CThriftClientHelper<ThriftClient>::CThriftClientHelper(
            const std::string& host, uint16_t port, int timeout)
            : _host(host)
            , _port(port)
            , _timeout(timeout)
    {
        _sock_pool.reset(new thrift::transport::TSocketPool());
        _sock_pool->addServer(host, port);
        _sock_pool->setConnTimeout(timeout);
        _sock_pool->setRecvTimeout(timeout);
        _sock_pool->setSendTimeout(timeout);
        
        _socket = _sock_pool;
        _transport.reset(new thrift::transport::TFramedTransport(_socket));
        _protocol.reset(new thrift::protocol::TBinaryProtocol(_transport));
    
        _container_client.reset(new ThriftClient(_protocol));
    }
    
    template <class ThriftClient>
    CThriftClientHelper<ThriftClient>::~CThriftClientHelper()
    {
        close();
    }
    
    template <class ThriftClient>
    void CThriftClientHelper<ThriftClient>::connect()
    {
        if (!_transport->isOpen())
        {
            _transport->open();
        }
    }
    
    template <class ThriftClient>
    void CThriftClientHelper<ThriftClient>::close()
    {
        if (_transport->isOpen())
        {
            _transport->close();
        }
    }


  • 相关阅读:
    从一个故障说说Java的三个BlockingQueue
    kafka producer源码
    Mock方法介绍
    async4j 普通用法、整合spring用法
    Spring中属性文件properties的读取与使用
    异步框架asyn4j的原理
    如何从线程返回信息——轮询、回调、Callable
    jdk 1.8 Executors
    java Iterator Fail-fast机制
    java的几种对象(PO,VO,DAO,BO,POJO)解释
  • 原文地址:https://www.cnblogs.com/aquester/p/9891605.html
Copyright © 2011-2022 走看看