zoukankan      html  css  js  c++  java
  • ACE_linux:Reactor与Proactor两种模式的区别

    一、概念:

    Reactor与Proactor两种模式的区别。这里我们只关注read操作,因为write操作也是差不多的。下面是Reactor的做法:

    1. 某个事件处理器宣称它对某个socket上的读事件很感兴趣;

    2. 事件分离者等着这个事件的发生;

    3. 当事件发生了,事件分离器被唤醒,这负责通知先前那个事件处理器;

    4. 事件处理器收到消息,于是去那个socket上读数据了. 如果需要,它再次宣称对这个socket上的读事件感兴趣,一直重复上面的步骤;

    下面再来看看真正意义的异步模式Proactor是如何做的:

    1. 事件处理器直接投递发一个读操作(当然,操作系统必须支持这个异步操作)。这个时候,事件处理器根本不关心读事件,它只管发这么个请求,它魂牵梦萦的是这个读操作的完成事件。这个事件处理器很拽,发个命令就不管具体的事情了,只等着别人(系统)帮他搞定的时候给他回个话。

    2. 事件分离器等着这个读事件的完成(比较下与Reactor的不同);

    3. 当事件分离器默默等待完成事情到来的同时,操作系统已经在一边开始干活了,它从目标读取数据,放入用户提供的缓存区中,最后通知事件分离器,这个事情我搞完了;

    4. 事件分离器通知之前的事件处理器: 你吩咐的事情搞定了;

    5. 事件处理器这时会发现想要读的数据已经乖乖地放在他提供的缓存区中,想怎么处理都行了。如果有需要,事件处理器还像之前一样发起另外一个读操作,和上面的几个步骤一样。

     

    二、代码示例:

    ACE_Proactor::run_event_loop();  循环启动
    ACE_Proactor::end_event_loop();  循环停止

    -----------------------------------

     Reactor:

    -----------------------------------

    -----------------------------------

    Proactor:

      1 #include "ace/Proactor.h"
      2 #include "ace/Asynch_Acceptor.h"
      3 
      4 class HA_Proactive_Service : public ACE_Service_Handler
      5 {
      6 public:
      7     HA_Proactive_Service()
      8     {
      9         ACE_OS::printf("Service_Handler constructed for accepter 
    ");
     10     }
     11     ~HA_Proactive_Service ()
     12     {
     13         if (this->handle () != ACE_INVALID_HANDLE)
     14         {
     15             ACE_OS::closesocket (this->handle ());
     16         }
     17     }
     18     
     19     virtual void open (ACE_HANDLE h, ACE_Message_Block&)
     20     {
     21         //在OPEN函数中完成读写操作
     22         this->handle (h);
     23         if (this->reader_.open (*this) != 0 ||
     24                 this->writer_.open (*this) != 0   )
     25         {
     26             ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p
    "),
     27                         ACE_TEXT ("HA_Proactive_Service open")));
     28             
     29             return;
     30         }
     31         ACE_OS::printf("ready!
    ");
     32         
     33         //异步发送
     34         send_to_remote();
     35         //异步读取
     36         reveive_from_remote();
     37         
     38         // mb is now controlled by Proactor framework.
     39         return;
     40     }
     41     
     42     //异步读完成后会调用此函数
     43     virtual void handle_read_stream
     44     (const ACE_Asynch_Read_Stream::Result &result)
     45     {
     46         ACE_Message_Block &mb = result.message_block ();
     47         if (!result.success () || result.bytes_transferred () == 0)
     48         {
     49             mb.release ();
     50             
     51             return;
     52         }
     53         //else
     54         //输出读取内容
     55         ACE_OS::printf("received:%s
    ",mb.rd_ptr());
     56         mb.release();
     57         //继续下一次异步读取
     58         reveive_from_remote();
     59         
     60         return;
     61     }
     62     //异步写完成后会调用此函数
     63     virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
     64     {
     65         result.message_block ().release();
     66         ACE_OS::sleep(1);
     67         //上次发送完毕之后再接着发送一次,这次发送完成之后又会调用
     68         //handle_write_stream,所以会一直发送
     69         send_to_remote();
     70         return;
     71     }
     72     //remote 
     73     void reveive_from_remote(void)
     74     {
     75         ACE_Message_Block *mb;
     76         ACE_NEW_NORETURN (mb, ACE_Message_Block (1024));
     77         if (this->reader_.read (*mb, mb->space ()) != 0)
     78         {
     79             ACE_OS::printf("Begin read fail
    ");
     80             
     81             return;
     82         }
     83     }
     84     //把当前时间发送到远端
     85     void send_to_remote(void)
     86     {
     87         std::string book = "S: ";
     88         time_t now = ACE_OS::gettimeofday().sec();
     89         book = book+ ctime(&now);
     90         ACE_Message_Block *mb = new ACE_Message_Block(100);
     91         //获取当前时间的字符串格式
     92         mb->copy(book.c_str() );
     93         //send message to accepter
     94         if (this->writer_.write(*mb,mb->length()) !=0)
     95         {
     96             ACE_OS::printf("Begin write fail
    ");
     97             
     98             return;
     99         }
    100         else
    101         {
    102             ACE_OS::printf("sended %s
    ",mb->rd_ptr());
    103         }
    104     }
    105     
    106     // Listing 3
    107 private:
    108     ACE_Asynch_Read_Stream reader_;
    109     ACE_Asynch_Write_Stream writer_;
    110 };
    111 
    112 
    113 int main(int argc, char *argv[]) 
    114 {
    115     int port=3000;
    116     ACE_Asynch_Acceptor<HA_Proactive_Service> acceptor;
    117     
    118     if (acceptor.open (ACE_INET_Addr (port)) == -1)
    119         return -1;
    120     
    121     while(true)
    122         ACE_Proactor::instance ()->handle_events ();
    123     
    124     return 0; 
    125 }
    Acceptor.cpp
      1 #include "ace/Proactor.h"
      2 #include "ace/Asynch_Connector.h"
      3 
      4 class HA_Proactive_Service : public ACE_Service_Handler
      5 {
      6 public:
      7     HA_Proactive_Service()
      8     {
      9         ACE_OS::printf("Service_Handler constructed for connector 
    ");
     10     }
     11     ~HA_Proactive_Service ()
     12     {
     13         if (this->handle () != ACE_INVALID_HANDLE)
     14         {
     15             ACE_OS::closesocket (this->handle ());
     16         }
     17     }
     18     
     19     virtual void open (ACE_HANDLE h, ACE_Message_Block&)
     20     {
     21         //在OPEN函数中完成读写操作
     22         this->handle (h);
     23         
     24         if (this->reader_.open (*this) != 0 ||
     25                 this->writer_.open (*this) != 0   )
     26         {
     27             ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p
    "),
     28                         ACE_TEXT ("HA_Proactive_Service open")));
     29             
     30             return;
     31         }
     32         
     33         ACE_OS::printf("connceted!
    ");
     34         //异步发送
     35         send_to_remote();
     36         //异步读取
     37         reveive_from_remote();
     38         
     39         
     40         return;
     41     }
     42     
     43     //异步读完成后会调用此函数
     44     virtual void handle_read_stream
     45     (const ACE_Asynch_Read_Stream::Result &result)
     46     {
     47         ACE_Message_Block &mb = result.message_block ();
     48         if (!result.success () || result.bytes_transferred () == 0)
     49         {
     50             mb.release ();
     51             
     52             return;
     53         }
     54         //else
     55         //输出读取内容
     56         ACE_OS::printf("received:%s
    ",mb.rd_ptr());
     57         mb.release();
     58         //继续下一次异步读取
     59         reveive_from_remote();
     60         
     61         return;
     62     }
     63     //异步写完成后会调用此函数
     64     virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
     65     {
     66         result.message_block ().release();
     67         ACE_OS::sleep(1);
     68         //上次发送完毕之后再接着发送一次,这次发送完成之后又会调用
     69         //handle_write_stream,所以会一直发送
     70         send_to_remote();
     71         return;
     72     }
     73     //remote 
     74     void reveive_from_remote(void)
     75     {
     76         ACE_Message_Block *mb;
     77         ACE_NEW_NORETURN (mb, ACE_Message_Block (1024));
     78         if (this->reader_.read (*mb, mb->space ()) != 0)
     79         {
     80             ACE_OS::printf("Begin read fail
    ");
     81             
     82             return;
     83         }
     84     }
     85     //把当前时间发送到远端
     86     void send_to_remote(void)
     87     {
     88         std::string book = "C: ";
     89         time_t now = ACE_OS::gettimeofday().sec();
     90         book = book+ ctime(&now);
     91         ACE_Message_Block *mb = new ACE_Message_Block(100);
     92         //获取当前时间的字符串格式
     93         mb->copy(book.c_str() );
     94         //send message to accepter
     95         if (this->writer_.write(*mb,mb->length()) !=0)
     96         {
     97             ACE_OS::printf("Begin write fail
    ");
     98             
     99             return;
    100         }
    101         else
    102         {
    103             ACE_OS::printf("sended %s
    ",mb->rd_ptr());
    104         }
    105     }
    106     
    107     // Listing 3
    108 private:
    109     ACE_Asynch_Read_Stream reader_;
    110     ACE_Asynch_Write_Stream writer_;
    111 };
    112 
    113 int main(int argc, char *argv[]) 
    114 {
    115     
    116     ACE_INET_Addr addr(3000,"127.0.0.1"); 
    117     
    118     ACE_Asynch_Connector<HA_Proactive_Service> connector;
    119     
    120     connector.open();
    121     if (connector.connect(addr) == -1)
    122         return -1;
    123     
    124     while(true)
    125         ACE_Proactor::instance ()->handle_events ();
    126     
    127     return 0; 
    128 }
    Connector.cpp

    ACE_Proactor::run_event_loop(); <==>
    while(true)
        ACE_Proactor::instance ()->handle_events ();

     //增加线程池

      1 #include "ace/Proactor.h"
      2 #include "ace/Asynch_Acceptor.h"
      3 #include "ace/Task_T.h"
      4 #include "ace/Thread_Semaphore.h"
      5 
      6 class Receive : public ACE_Service_Handler
      7 {
      8 public:
      9     Receive()
     10     {
     11         ACE_OS::printf("Service_Handler constructed for accepter 
    ");
     12     }
     13     ~Receive ()
     14     {
     15         if (this->handle () != ACE_INVALID_HANDLE)
     16         {
     17             ACE_OS::closesocket (this->handle ());
     18         }
     19     }
     20     
     21     virtual void open (ACE_HANDLE h, ACE_Message_Block&)
     22     {
     23         //在OPEN函数中完成读写操作
     24         this->handle (h);
     25         if (this->reader_.open (*this) != 0 ||
     26                 this->writer_.open (*this) != 0   )
     27         {
     28             ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p
    "),
     29                         ACE_TEXT ("Receive open")));
     30             
     31             return;
     32         }
     33         ACE_OS::printf("ready!
    ");
     34         
     35         //异步发送
     36         //send_to_remote();
     37         //异步读取
     38         reveive_from_remote();
     39         
     40         // mb is now controlled by Proactor framework.
     41         return;
     42     }
     43     
     44     //异步读完成后会调用此函数
     45     virtual void handle_read_stream
     46     (const ACE_Asynch_Read_Stream::Result &result)
     47     {
     48         ACE_Message_Block &mb = result.message_block ();
     49         if (!result.success () || result.bytes_transferred () == 0)
     50         {
     51             mb.release ();
     52             
     53             return;
     54         }
     55         
     56         //输出读取内容
     57         //ACE_OS::printf("received:%s
    ",mb.rd_ptr());
     58         ACE_DEBUG((LM_DEBUG,"(%t)received:%s
    ",mb.rd_ptr()));
     59         mb.release();
     60         
     61         //继续下一次异步读取
     62         reveive_from_remote();
     63         
     64         return;
     65     }
     66     //异步写完成后会调用此函数
     67     virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
     68     {
     69         result.message_block ().release();
     70         ACE_OS::sleep(1);
     71         //上次发送完毕之后再接着发送一次,这次发送完成之后又会调用
     72         //handle_write_stream,所以会一直发送
     73         send_to_remote();
     74         return;
     75     }
     76     //remote 
     77     void reveive_from_remote(void)
     78     {
     79         ACE_Message_Block *mb;
     80         ACE_NEW_NORETURN (mb, ACE_Message_Block (1024));
     81         if (this->reader_.read (*mb, mb->space ()) != 0)
     82         {
     83             ACE_OS::printf("Begin read fail
    ");
     84             
     85             return;
     86         }
     87     }
     88     //把当前时间发送到远端
     89     void send_to_remote(void)
     90     {
     91         std::string book = "S: ";
     92         time_t now = ACE_OS::gettimeofday().sec();
     93         book = book+ ctime(&now);
     94         ACE_Message_Block *mb = new ACE_Message_Block(100);
     95         //获取当前时间的字符串格式
     96         mb->copy(book.c_str() );
     97         //send message to accepter
     98         if (this->writer_.write(*mb,mb->length()) !=0)
     99         {
    100             ACE_OS::printf("Begin write fail
    ");
    101             
    102             return;
    103         }
    104         else
    105         {
    106             ACE_OS::printf("sended %s
    ",mb->rd_ptr());
    107         }
    108     }
    109     
    110     // Listing 3
    111 private:
    112     ACE_Asynch_Read_Stream reader_;
    113     ACE_Asynch_Write_Stream writer_;
    114 };
    115 
    116 class Accepte : public ACE_Asynch_Acceptor<Receive>
    117 {
    118 public:
    119     virtual Receive* make_handler(void)
    120     {
    121         return new Receive();
    122     }
    123     
    124 };
    125 
    126 class Proactor_Task : public ACE_Task<ACE_MT_SYNCH>
    127 {
    128 public:
    129     
    130     int star(int nMax)
    131     {
    132         create_proactor();
    133         this->activate (THR_NEW_LWP, nMax);
    134         for (;nMax>0;nMax--)
    135         {
    136             sem_.acquire();
    137         }
    138         return 0;
    139     }
    140     int stop()
    141     {
    142         ACE_Proactor::end_event_loop();
    143         this->wait();
    144         return 0;
    145     }
    146     
    147     virtual int svc (void)
    148     {
    149         ACE_DEBUG((LM_INFO,ACE_TEXT("svc method is invoked!
    ")));
    150         sem_.release(1);
    151         
    152         ACE_Proactor::run_event_loop();
    153         
    154         return 0;
    155     }
    156     
    157     
    158     int create_proactor()
    159     {
    160         ACE_Proactor::instance ();
    161         
    162         return 0;
    163     }
    164     
    165     int release_proactor()
    166     {
    167         ACE_Proactor::close_singleton ();
    168         return 0;
    169     }
    170     
    171     ACE_Thread_Semaphore sem_;
    172 };
    173 
    174 int ACE_TMAIN(int ,char*[])
    175 {
    176     Proactor_Task task;
    177     task.star(3);
    178     
    179     Accepte accepte;
    180     accepte.open(ACE_INET_Addr (2222), 0, 1,ACE_DEFAULT_BACKLOG,1,ACE_Proactor::instance());
    181     
    182     //主函数退出控制
    183     {
    184         int nExit=0;
    185         while (nExit==0)
    186             scanf("%d",&nExit);
    187     }
    188     
    189     return 0;
    190 }
    191 
    192 /*
    193 int main(int argc, char *argv[]) 
    194 {
    195     int port=3000;
    196     ACE_Asynch_Acceptor<Receive> acceptor;
    197     
    198     if (acceptor.open (ACE_INET_Addr (port)) == -1)
    199         return -1;
    200         
    201     ACE_Proactor::run_event_loop();
    202     
    203     return 0; 
    204 }
    205 */
    Proactor_Task.cpp

    -----------------------------------

    GOOD LUCK!

  • 相关阅读:
    如何优雅地删除 Linux 中的垃圾文件
    session:
    cookie:
    多对多表结构设计:
    接口测试:
    oracle基本笔记整理
    oracle基本笔记整理
    oracle基本笔记整理
    2016年寒假心得
    2016年寒假心得
  • 原文地址:https://www.cnblogs.com/book-gary/p/4272862.html
Copyright © 2011-2022 走看看