zoukankan      html  css  js  c++  java
  • thrift TNonblockingServer 使用

    下载 0.9.1 版本 (0.9.2需要 2.5的bison,而 RHEL6上自带bison是2.4)

     
    TNonblockingServer 时必须使用 TFramedTransport ,不能使用 TBufferedTransport,因为前者会先写入这个消息的字节数。这样非阻塞时可以预知消息的大小。
     
     
     
    1. 服务端获取请求方的IP
    网上流传最多的是扩展 
    bool TDispatchProcessor::process(boost::shared_ptr<protocol::TProtocol> in,
                           boost::shared_ptr<protocol::TProtocol> out,
                           void* connectionContext) 
    从 in 获得 transport 然后获得 TSocket 对象。但这个方法只适合阻塞式的 TSimpleServer 和 TThreadPoolServer 等,并不适合 TNonblockingServer。使用 TNonblockingServer 时,可通过扩展 void TServerEventHandler::processContext(void* serverContext, boost::shared_ptr<TTransport> transport) ,这里 transport 参数是个 TSocket 对象,而这个函数总会在调用 processor 处理请求之前被调用。
     
    static boost::thread_specific_ptr<std::string> thrift_client_ip; // thread specific
    class MyServerEventHandler : public TServerEventHandler
    {
        virtual void processContext(void* serverContext, boost::shared_ptr<TTransport> transport)
        {
            TSocket *sock = static_cast<TSocket *>(transport.get());
    
            if (sock)
            {
                //thrift_client_ip.reset(new string(sock->getPeerAddress())); // 0.9.2, 复用 TNonblockingServer::TConnection 导致 getPeerAddress() 返回脏数据, 见 https://issues.apache.org/jira/browse/THRIFT-3270
                sock->getCachedAddress(); // use this api instead
            }
        }
    };
    
    // create nonblocking server
    TNonblockingServer server(processor, protocolFactory, port, threadManager);
    boost::shared_ptr<MyServerEventHandler> eventHandler(new MyServerEventHandler());
    server.setServerEventHandler(eventHandler);
     TNonblockingServer 复用 TNonblockingServer::TConnection 对象,但调用 TSocket::setCachedAddress() 并没有清掉 peerAddress_ 和 peerHost_ ,导致脏数据。
    需要 TSocket::setCachedAddress() 函数中清除 这2个变量
    lib/cpp/src/thrift/transport/TSocket.cpp
    void TSocket::setCachedAddress(const sockaddr* addr, socklen_t len) {
      if (!path_.empty()) {
        return;
      }
    
      switch (addr->sa_family) {
      case AF_INET:
        if (len == sizeof(sockaddr_in)) {
          memcpy((void*)&cachedPeerAddr_.ipv4, (void*)addr, len);
        }
        break;
    
      case AF_INET6:
        if (len == sizeof(sockaddr_in6)) {
          memcpy((void*)&cachedPeerAddr_.ipv6, (void*)addr, len);
        }
        break;
      }
      peerAddress_.clear(); // ++
      peerHost_.clear(); // ++
    }
     
     
    2. 禁用监听 IPv6
    阻塞式服务需要修改 lib/cpp/src/thrift/transport/TServerSocket.cpp 的 
    TServerSocket::listen() 函数,修改 hints.ai_family = PF_INET;
    而 TNonblockingServer 需要修改 lib/cpp/src/thrift/transport/TNonblockingServer.cpp
    TNonblockingServer::createAndListenOnSocket() 函数,修改 hints.ai_family = PF_INET;
     
     
     
    3. 切换 libevent 为 libev
    由于项目中用到了 libev,而 thrift 的 TNonblockingServer 用到 libevent,而它们头文件有名字冲突。libev 做了个简单的 libevent 适配接口,但 thrift 还用到了 创建 socketpair 设置 socket close on exec 等函数,我简单的从 libevent 拷贝过来,即编译成功。 (sockepair 本来理论上可以用 libev 的 ev_async 替代的,但由于这里 thrift 直接把 TConnection 的指针通过 socket 传递,有个队列的功能在里面了,稍花时间就没做)
     
    因为前面 TNonblockingServer 复用 TConnection 和 TSocket 导致TSocket::getPeerAddress() 脏数据,需要修改 TSocket 源码,为了更好的发布,我把 TNonblockingServer 单独提到项目中,并直接禁用了复用 TConnection ,这样使用默认 thrift 库即可。
     
    修改后的 TNonblockingServer 文件
     
     
     
  • 相关阅读:
    hive中map与reduce的个数问题
    count(*) 与 count(1)的区别?
    星型模型与雪花模型有什么优缺点
    如何估计数据需要存储空间
    element-ui的table控件得到所有的表头label值
    vue-image-crop-upload通过params带参的时候key总是img
    vue通过window.addEventListener('scroll', XXXX)无法监听屏幕滚动事件
    在vue里面无法通过document.getElementById来修改ElementUI自带的标签
    Vue-Router升级导致的Uncaught (in promise)问题【转载别人的】
    node安装依赖以及使用api
  • 原文地址:https://www.cnblogs.com/JesseFang/p/4682251.html
Copyright © 2011-2022 走看看