zoukankan      html  css  js  c++  java
  • ACE

    框架描述

    服务器层次:

    clip_image002

    • I/O层:对应具体的文件描述符处理,对应ACE中的handle。
    • Dispatch层:事件分发,将I/O事件分发到对应绑定的处理队列等待业务处理,对应ACE中的Event_handle。
    • 业务层:处理具体业务,包含一组线程或进程,并发处理业务。对应ACE中的ACE_Task。

    三层结构与五层网络的网络层,传输层,应用层类似对应。

    Reactor模式:

    clip_image004

    • I/O处理:ACE_Reactor使用select复用完成,将注册进去的IOhandle进行事件监听。
    • 消息队列:ACE_Task中包含一个消息队列。I/O产生事件后执行绑定的Event函数将消息插入对应的消息队列。
    • 服务进程:ACE_Task内可以构造一个线程池,获取消息队列进行业务并发处理。

    下面是Reactor代码实现,包含I/O、dispatch、反应器、和线程池:

      1 /*-----------------------------------------------------------------
      2 *  filename:    Reactor.cpp
      3 *  author:        bing
      4 *  time:        2016-06-29 15:26
      5 *  function:    using ACE Reactor implement I/O multiplex server, 
      6 *               include service thread pool.
      7 *-----------------------------------------------------------------*/
      8 #include <ace/INET_Addr.h>
      9 #include <ace/SOCK_Acceptor.h>
     10 #include <ace/SOCK_Stream.h>
     11 #include <ace/Reactor.h> 
     12 #include <ace/Log_Msg.h>
     13 #include "ace/Task.h"
     14 #include "ace/OS.h" 
     15 #include <list>
     16 
     17 #define MAX_BUFF_SIZE     1024
     18 #define LISTEN_PORT     5010
     19 #define SERVER_IP        ACE_LOCALHOST
     20 #define THREAD_NUM        10
     21 
     22 struct MsgData
     23 {
     24     ACE_HANDLE* IOHandle;
     25     int DataFlag;
     26     char Data[MAX_BUFF_SIZE];
     27     MsgData()
     28     {
     29         IOHandle = NULL;
     30         DataFlag = -1;
     31         ACE_OS::memset(Data, 0, sizeof(Data));
     32     }
     33 };
     34 
     35 class TaskThread;
     36 
     37 class ServerStream : public ACE_Event_Handler
     38 {
     39 public:
     40     ServerStream(TaskThread* pMsgQueue);
     41     ~ServerStream();
     42     ACE_SOCK_Stream& GetStream(){return m_Svr_stream;}      //给accept提供接口绑定数据通道
     43     virtual int handle_input(ACE_HANDLE fd);        //I/O触发事件后调用
     44     void close();
     45     virtual ACE_HANDLE get_handle(void) const {return m_Svr_stream.get_handle();}   //不重载需要手动将handle传入ACE_Reactor
     46 private:
     47     ACE_SOCK_Stream m_Svr_stream;
     48     TaskThread* m_MsgQueue;
     49 };
     50 
     51 std::list<ServerStream*> g_StreamPool;  //stream pool
     52 
     53 class TaskThread: public ACE_Task<ACE_MT_SYNCH>
     54 {
     55 public:
     56     virtual int svc(void)
     57     {
     58         ACE_Message_Block *Msg;// = new ACE_Message_Block();
     59         while(1)
     60         {
     61             getq(Msg);        //空闲线程阻塞
     62             
     63             ACE_Data_Block *Data_Block = Msg->data_block();
     64             MsgData *pData = reinterpret_cast <MsgData*>(Data_Block->base());
     65             if (0 == pData->DataFlag)
     66             {
     67                 std::list<ServerStream*>::iterator it;
     68                 for (it = g_StreamPool.begin();it != g_StreamPool.end();++it)
     69                 {
     70                     if (get_handle() == (*it)->get_handle())
     71                     {
     72                         g_StreamPool.erase(it);
     73                         delete *it;
     74                         break;  
     75                     }
     76                 }
     77                 return 0;
     78             }
     79             char strBuffer[MAX_BUFF_SIZE];
     80             ACE_OS::memset(strBuffer, 0, sizeof(strBuffer));
     81             ACE_OS::memcpy(strBuffer, pData->Data, sizeof(strBuffer));
     82             /*
     83                 这里接口业务代码分发数据
     84             */
     85             ACE_DEBUG((LM_INFO,"[time:%d]recevie msg:%s
    ",(int)ACE_OS::time(),strBuffer));
     86             //ACE_SOCK_Stream Stream(*(pData->IOHandle));
     87             //Stream.send("server recive data!
    ",sizeof("server recive data!"));  //响应client数据
     88             //ACE_OS::sleep(1);        //模拟业务耗时
     89             Msg->release();         //release,inclue data_block
     90             //ACE_DEBUG((LM_INFO,"thread end queue count:%d
    ",msg_queue_->message_count()));
     91         }
     92         return 0;
     93     }
     94 };
     95 typedef ACE_Singleton<TaskThread, ACE_Thread_Mutex> TaskThreadPool;
     96 
     97 ServerStream::ServerStream(TaskThread* pMsgQueue)
     98 {
     99     m_MsgQueue = pMsgQueue;
    100 }
    101 
    102 ServerStream::~ServerStream()
    103 {
    104     close();
    105 }
    106 
    107 /*------------------------------------------------------
    108 *    IO上报流数据,使用select复用上报,这里单线程处理
    109 *   原来考虑直接把IO插队列给线程池处理,但是线程池和
    110 *   这里是异步操作,线程没有处理队列这条消息ACE底层会
    111 *   一直上报这个IO插消息队列,暂时在这里做单线程revc
    112 *   考虑epoll边沿触发,一次上报处理
    113 *------------------------------------------------------*/
    114 int ServerStream::handle_input(ACE_HANDLE fd)
    115 {
    116     MsgData Message;
    117     char strBuffer[MAX_BUFF_SIZE];
    118     Message.DataFlag = m_Svr_stream.recv(strBuffer,MAX_BUFF_SIZE); //获取数据回select响应避免反复通知
    119     if (-1 == Message.DataFlag)
    120     {
    121         ACE_DEBUG((LM_INFO, ACE_TEXT("recive data error!
    ")));
    122         return -1;
    123     }
    124     else if(0 == Message.DataFlag)
    125     {
    126         close();
    127         ACE_DEBUG((LM_INFO, ACE_TEXT("client closed!
    ")));
    128     }        
    129     ACE_Data_Block *Data_Block = new ACE_Data_Block; //线程做释放
    130     ACE_HANDLE Cli_IO = get_handle();
    131 
    132     Message.IOHandle = &Cli_IO;
    133     ACE_OS::memcpy(Message.Data,strBuffer,sizeof(strBuffer));//传的data可带length信息来适配消息大小
    134 
    135     char *p = reinterpret_cast <char*>(&Message);
    136     Data_Block->base(p,sizeof(Message));
    137     ACE_Message_Block* msg = new ACE_Message_Block(Data_Block);    
    138     m_MsgQueue->putq(msg);    //put
    139     //Data_Block->release();
    140     return 0;
    141 }
    142 
    143 void ServerStream::close()
    144 {
    145     m_Svr_stream.close();
    146     ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
    147 }
    148 
    149 class ServerAcceptor : public ACE_Event_Handler
    150 {
    151 public:
    152     ServerAcceptor(int port,char* ip);
    153     ~ServerAcceptor();
    154     bool open();
    155     virtual int handle_input(ACE_HANDLE fd);  //有client连接
    156     void close();
    157     virtual ACE_HANDLE get_handle(void) const {return m_Svr_aceept.get_handle();}
    158 private:
    159     ACE_INET_Addr m_Svr_addr;
    160     ACE_SOCK_Acceptor m_Svr_aceept;
    161 };
    162 
    163 ServerAcceptor::ServerAcceptor(int port,char* ip):m_Svr_addr(port,ip)
    164 {
    165     if (!open())    //open listen port
    166     {
    167         ACE_DEBUG((LM_INFO, ACE_TEXT("open failed!
    ")));
    168     }
    169     else
    170     {
    171         ACE_DEBUG((LM_INFO, ACE_TEXT("open success!
    ")));
    172         TaskThreadPool::instance()->activate(THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , THREAD_NUM);//创建10个线程处理业务
    173     }
    174 }
    175 
    176 ServerAcceptor::~ServerAcceptor()
    177 {
    178     close();
    179     std::list<ServerStream*>::iterator it;
    180     for (it = g_StreamPool.begin();it != g_StreamPool.end();++it)
    181     {
    182         if (NULL != (*it))
    183         {
    184             (*it)->close();
    185             delete (*it);
    186         }
    187     }
    188 }
    189 
    190 bool ServerAcceptor::open()
    191 {
    192     if (-1 == m_Svr_aceept.open(m_Svr_addr,1))
    193     {
    194         ACE_DEBUG((LM_ERROR,ACE_TEXT("failed to accept
    ")));
    195         m_Svr_aceept.close();
    196         return false;
    197     }
    198     return true;
    199 }
    200 
    201 int ServerAcceptor::handle_input(ACE_HANDLE fd )  
    202 {
    203     ServerStream *stream = new ServerStream(TaskThreadPool::instance());    //产生新通道
    204     if (NULL != stream)
    205     {
    206         g_StreamPool.push_back(stream);//暂时存储全局变量用于内存管理,优化可增加一个连接管理类管理连接通道
    207     }
    208     if (m_Svr_aceept.accept(stream->GetStream()) == -1)  //绑定通道
    209     {  
    210         printf("accept client fail
    ");  
    211         return -1;  
    212     }
    213     ACE_Reactor::instance()->register_handler(stream,ACE_Event_Handler::READ_MASK);  //通道注册到ACE_Reactor
    214     ACE_DEBUG((LM_INFO,"User connect success!,ClientPool num = %d
    ",g_StreamPool.size()));
    215     return 0;
    216 }  
    217     
    218 void ServerAcceptor::close()
    219 {
    220     ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::ACCEPT_MASK);
    221     m_Svr_aceept.close();
    222 }
    223 
    224 int ACE_TMAIN()
    225 {
    226     ServerAcceptor server(LISTEN_PORT,(char *)SERVER_IP);
    227     ACE_Reactor::instance()->register_handler(&server,ACE_Event_Handler::ACCEPT_MASK);    //listen port注册到ACE_Reactor
    228     
    229     ACE_Reactor::instance()->run_reactor_event_loop();  //进入消息循环,有I/O事件回调handle_input
    230     return 0;
    231 }
    View Code

    代码实现了最简单的完整并发服务器,有部分还值得思考和优化:

    1.dispatch进行类封装

    2.回话通道的数据流管理进行类封装

    3.dispatch消息结构优化

    4.dispatch处为单线程,直接传递I/O给线程获取数据流还是获取数据流完成后给线程,如何实现两个线程同步

    5.底层I/O复用使用epoll边沿优化

    6.业务buff处理优化,进行消息类型划分,进入不同业务处理

    由于实现完整服务器代码以最简单形式实现,上述优化在实际商用代码中还需要大量封装优化考虑。

  • 相关阅读:
    团队项目 第一次作业
    20165201 课下作业第十周(选做)
    20165201 实验三敏捷开发与XP实践
    20165201 2017-2018-2 《Java程序设计》第9周学习总结
    20165201 结对编程练习_四则运算(第二周)
    20165201 2017-2018-2 《Java程序设计》第8周学习总结
    20165201 实验二面向对象程序设计
    20165326 java实验五
    20165326 课程总结
    20165326 java实验四
  • 原文地址:https://www.cnblogs.com/binchen-china/p/5632127.html
Copyright © 2011-2022 走看看