zoukankan      html  css  js  c++  java
  • (zt)ACE高效PROACTOR编程框架一ClientHandle

    (转自:http://blog.vckbase.com/bastet/archive/2005/08/14/10865.aspx)

    1、WIN32下面用proactor可以达到几乎RAW IOCP的效率,由于封装关系,应该是差那么一点。

    客户端处理类的常规写法:
    //处理客户端连接消息
    class ClientHandler : public ACE_Service_Handler
    {
    public:
     /**构造函数
      *
      *
      */
     ClientHandler(unsigned int client_recv_buf_size=SERVER_CLIENT_RECEIVE_BUF_SIZE)
      :_read_msg_block(client_recv_buf_size),_io_count(0)
     {
     }

     
     ~ClientHandler(){}

     /**
      *初始化,因为可能要用到ClientHandler内存池,而这个池又不一定要用NEW
      */
     void init();

     /**清理函数,因为可能要用到内存池
      *
      */
     void fini();

    //检查是否超时的函数

     void check_time_out(time_t cur_time);

    public:

     /**客户端连接服务器成功后调用
      *
      * \param handle 套接字句柄
      * \param &message_block 第一次读到的数据(未用)
      */

    //由Acceptor来调用!!!
     virtual void open (ACE_HANDLE handle,ACE_Message_Block &message_block);

     /**处理网络读操作结束消息
      *
      * \param &result 读操作结果
      */
     virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);


     /**处理网络写操作结束消息
      *
      * \param &result 写操作结果
      */
     virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);

    private:

    //**生成一个网络读请求
      *
      * \param void
      * \return 0-成功,-1失败
      */
     int  initiate_read_stream  (void);

     /**生成一个写请求
      *
      * \param mb 待发送的数据
      * \param nBytes 待发送数据大小
      * \return 0-成功,-1失败
      */
     int  initiate_write_stream (ACE_Message_Block & mb, size_t nBytes );
     
     /**
      *
      * \return 检查是否可以删除,用的是一个引用计数。每一个外出IO的时候+1,每一个IO成功后-1
      */
     int check_destroy();
     
     //异步读
     ACE_Asynch_Read_Stream _rs;

     //异步写
     ACE_Asynch_Write_Stream _ws;

     //接收缓冲区只要一个就够了,因为压根就没想过要多读,我直到现在也不是很清楚为什么要多读,多读的话要考虑很多问题
     ACE_Message_Block _read_msg_block;

     //套接字句柄,这个可以不要了,因为基类就有个HANDLER在里面的。
     //ACE_HANDLE _handle;

     //一个锁,客户端反正有东东要锁的,注意,要用ACE_Recursive_Thread_Mutex而不是用ACE_Thread_Mutex,这里面是可以重入的,而且在WIN32下是直接的EnterCriticalSection,可以达到很高的效率
     ACE_Recursive_Thread_Mutex _lock;
     
     //在外IO数量,其实就是引用计数啦,没啥的。为0的时候就把这个东东关掉啦。
     long _io_count;

    //检查超时用的,过段时间没东东就CLOSE他了。

     time_t _last_net_io;

    private:

    //本来想用另外一种模型的,只用1个或者2个外出读,后来想想,反正一般内存都是足够的,就不管了。

     //ACE_Message_Block _send_msg_blocks[2];

     //ACE_Message_Block &_sending_msg_block;

     //ACE_Message_Block &_idle_msg_block;

    private:
     
    public:
    //TODO:move to prriva and use friend class!!!

    //只是为了效率更高,不用STL的LIST是因为到现在我没有可用的Node_Allocator,所以效率上会有问题。
     ClientHandler *_next;

     ClientHandler *next(){return _next;}

     void next(ClientHandler *obj){_next=obj;}

    };


    //这是具体实现,有些地方比较乱,懒得管了,锁的有些地方不对。懒得改了,反正在出错或者有瓶颈的时候再做也不迟。

    void ClientHandler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
    {
     _last_net_io=ACE_OS::time(NULL);
     int byterecved=result.bytes_transferred ();
     if ( (result.success ()) && (byterecved != 0))
     {
      //ACE_DEBUG ((LM_DEBUG,  "Receiver completed:%d\n",byterecved));

    //处理完数据
      if(handle_received_data()==true)
      {
       //ACE_DEBUG ((LM_DEBUG,  "go on reading...\n"));

    //把东东推到头部,处理粘包
       _read_msg_block.crunch();
       initiate_read_stream();
      }
     }

    //这个地方不想用ACE_Atom_op,因为反正要有一个锁,而且一般都会用锁,不管了。假如不在意的话,应该直接用ACE_Atom_Op以达到最好的效率

     {
      ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);
      _io_count--;
     }
     check_destroy ();
    }

    void ClientHandler::init()
    {

    //初始化数据,并不在构造函数里做。
     _last_net_io=ACE_OS::time(NULL);
     _read_msg_block.rd_ptr(_read_msg_block.base());
     _read_msg_block.wr_ptr(_read_msg_block.base());
     this->handle(ACE_INVALID_HANDLE);
    }

    bool ClientHandler::handle_received_data()
    {

    ...........自己处理
     return true;
    }


    //==================================================================
    void ClientHandler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
    {
     //发送成功,RELEASE掉
     //这个不可能有多个RELEASE,直接XX掉
     //result.message_block ().release ();
     MsgBlockManager::get_instance().release_msg_block(&result.message_block());

     {
      ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);
      _io_count--;
     }
     check_destroy ();
    }

    //bool ClientHandler::destroy ()
    //{
    // FUNC_ENTER;
    // ClientManager::get_instance().release_client_handle(this);
    // FUNC_LEAVE;
    // return false ;
    //}


    int  ClientHandler::initiate_read_stream  (void)
    {
     ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);

    //考虑到粘包的呀
     if (_rs.read (_read_msg_block, _read_msg_block.space()) == -1)
     {
      ACE_ERROR_RETURN ((LM_ERROR,"%p\n","ACE_Asynch_Read_Stream::read"),-1);
     }
     _io_count++;
     return 0;
    }

    /**生成一个写请求
    *
    * \param mb 待发送的数据
    * \param nBytes 待发送数据大小
    * \return 0-成功,-1失败
    */
    int  ClientHandler::initiate_write_stream (ACE_Message_Block & mb, size_t nBytes )
    {
     ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);
     if (_ws.write (mb , nBytes ) == -1)
     {
      mb.release ();
      ACE_ERROR_RETURN((LM_ERROR,"%p\n","ACE_Asynch_Write_File::write"),-1);
     }
     _io_count++;
     return 0;
    }

    void ClientHandler::open (ACE_HANDLE handle,ACE_Message_Block &message_block)
    {
     //FUNC_ENTER;
     _last_net_io=ACE_OS::time(NULL);
     _io_count=0;
     if(_ws.open(*this,this->handle())==-1)
     {
      ACE_ERROR ((LM_ERROR,"%p\n","ACE_Asynch_Write_Stream::open"));
     }
     else if (_rs.open (*this, this->handle()) == -1)
     {
      ACE_ERROR ((LM_ERROR,"%p\n","ACE_Asynch_Read_Stream::open"));
     }
     else
     {
      initiate_read_stream ();
     }

     check_destroy();
     //FUNC_LEAVE;
    }

    void ClientHandler::fini()
    {
    }

    void ClientHandler::check_time_out(time_t cur_time)
    {
     //ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);
     //ACE_DEBUG((LM_DEBUG,"cur_time is %u,last io is %u\n",cur_time,_last_net_io));

     //检测是否已经为0了
     if(this->handle()==ACE_INVALID_HANDLE)
      return;
     if(cur_time-_last_net_io>CLIENT_TIME_OUT_SECONDS)
     {
      ACE_OS::shutdown(this->handle(),SD_BOTH);
      ACE_OS::closesocket(this->handle());
      this->handle(ACE_INVALID_HANDLE);
     }
    }

    int ClientHandler::check_destroy()
    {
     {
      ACE_Guard<ACE_Recursive_Thread_Mutex> locker (_lock);
      if (_io_count> 0)
       return 1;
     }
     ACE_OS::shutdown(this->handle(),SD_BOTH);
     ACE_OS::closesocket(this->handle());
     this->handle(ACE_INVALID_HANDLE);

    //这个地方给内存池吧。
     ClientManager::get_instance().release_client_handle(this);
     //delete this;
     return 0;
    }

  • 相关阅读:
    iperf简单说明
    计算后图像大小参数计算
    ipywidgets安装报错
    Cannot uninstall [pacakage]. It is a distutils installed project
    torch
    es-centos7安装注意细节
    jupyter 指定特定的环境
    未来方向
    深度学习过拟合处理
    归一化
  • 原文地址:https://www.cnblogs.com/gamesacer/p/977468.html
Copyright © 2011-2022 走看看