zoukankan      html  css  js  c++  java
  • ACE中TASK架构简介及简单应用

    一、基础功能介绍

    1、ACE_Message_Block*,Windows消息用MSG结构表示,ACE_Task中因为不能预计各种应用中消息的类型,所以ACE_Message_Block基本上可以理解为是对一个指针的封装,这个指针指向实际的一块内存或是一个对象等等。在创建ACE_Message_Block时,可以指定是由ACE_Message_Block来管理内存(构造函数中指定一个 size_t类型的大小),还是由我们自己管理内存(构造函数中指定一个指针)。而一个ACE_Message_Block类型的指针,就是一个消息,我们通过传递它来进行逻辑的业务处理。

    其包含如下两个成员变量:

    1 /// Misc flags (e.g., DONT_DELETE and USER_FLAGS).
    2   ACE_Message_Block::Message_Flags flags_;
    3 
    4   /// Pointer To beginning of message payload.
    5   char *base_;

    其中flags_表示数据删除标志,有效取值为DONT_DELETE。对flags_标志的作用描述如下:数据指针base_的值有两种来源,一种是由应用程序传入,在这种情况下,应该将flags_设置为DONT_DELETE,告诉框架当删除ACE_Data_Block对象时,不要删除该指针,应由应用程序自己处理;第二种是该指针由框架申请和释放,应用程序无须关注。通过这样的设计可以提高数据结构在使用上的灵活性。

    2、ACE_Task::putq,事实上,到底用SendMessage还是PostMessage与ACE_Task::putq来进行类比,我很为难,PostMessage发送一个消息后立刻返回,这与通常的ACE_Task::putq行为非常类似,因为ACE_Task是运行在另外一个线程上,ACE_Task::putq只是完成将消息插入到消息队列的工作,理论上它应该立刻返回,但实际上,ACE_Task的消息队列有容量大小限制,这个限制由我们自己限定,当当前消息队列满时,ACE_Task::putq将阻塞一直到可以插入,这时候就比较类似与SendMessage,

    3、ACE_Task::getq,

    1 ACE_Message_Block * msg;
    2 while(getq(msg) != -1)    // int putq (ACE_Message_Block *, ACE_Time_Value *timeout = 0);
    3 {
    4     // process msg here
    5 }

    4、消息处理函数,默认没有提供,svc

    二、要搭架一个基于ACE_Task的消息系统,通常要做如下的步骤:

    1、编写一个派生自ACE_Task的类,指定它的同步模式
    ACE_Task的消息队列可以由多个处理线程共享使用,所以需要提供同步模式,例如 ACE_MT_SYNCH和ACE_NULL_SYNCH分别表示基于多线程的同步和不使用同步,这个参数是ACE_Task的一个模板参数。

    其分别声明了不同的锁。

     1 /**
     2  * @class ACE_NULL_SYNCH
     3  *
     4  * @brief Implement a do nothing Synchronization wrapper that
     5  *        typedefs the @c ACE_Condition and @c ACE_Mutex to the
     6  *        @c Null* versions.
     7  */
     8 class ACE_Export ACE_NULL_SYNCH
     9 {
    10 public:
    11   typedef ACE_Null_Mutex MUTEX;
    12   typedef ACE_Null_Mutex NULL_MUTEX;
    13   typedef ACE_Null_Mutex PROCESS_MUTEX;
    14   typedef ACE_Null_Mutex RECURSIVE_MUTEX;
    15   typedef ACE_Null_Mutex RW_MUTEX;
    16   typedef ACE_Null_Condition CONDITION;
    17   typedef ACE_Null_Condition RECURSIVE_CONDITION;
    18   typedef ACE_Null_Semaphore SEMAPHORE;
    19   typedef ACE_Null_Mutex NULL_SEMAPHORE;
    20 };
    21 
    22 /**
    23  * @class ACE_MT_SYNCH
    24  *
    25  * @brief Implement a default thread safe synchronization wrapper that
    26  *        typedefs the @c ACE_Condition and @c ACE_Mutex to the
    27  * @c ACE_Condition and @c ACE_Mutex versions.
    28  *
    29  * @todo This should be a template, but SunC++ 4.0.1 complains about
    30  *       this.
    31  */
    32 class ACE_Export ACE_MT_SYNCH
    33 {
    34 public:
    35   typedef ACE_Thread_Mutex MUTEX;
    36   typedef ACE_Null_Mutex NULL_MUTEX;
    37   typedef ACE_Process_Mutex PROCESS_MUTEX;
    38   typedef ACE_Recursive_Thread_Mutex RECURSIVE_MUTEX;
    39   typedef ACE_RW_Thread_Mutex RW_MUTEX;
    40   typedef ACE_Condition_Thread_Mutex CONDITION;
    41   typedef ACE_Condition_Recursive_Thread_Mutex RECURSIVE_CONDITION;
    42   typedef ACE_Thread_Semaphore SEMAPHORE;
    43   typedef ACE_Null_Semaphore NULL_SEMAPHORE;
    44 };
    1 class My_Task : public ACE_Task<ACE_MT_SYNCH>
    2 {
    3 public:
    4     virtual int svc();
    5 }

    2、重载 ACE_Task的 svc 方法,编写消息循环相关的代码

    1 int My_Task::svc()
    2 {
    3     ACE_Message_Block * msg;
    4     while(getq(msg) != -1)    // int putq (ACE_Message_Block *, ACE_Time_Value *timeout = 0);
    5     {
    6         // process msg here
    7     }
    8 }

    svc 方法相当与处理线程的入口方法。

    3、假设 My_Task是一个基于ACE_Task的类,创建一个唯一的My_Task实例,这个可以通过typedef ACE_Singleton<MyTask, SYNCH_METHOD> MYTASK;然后总是使用MYTASK::instance方法来获取一个My_Task的指针来完成。

    4、在适当位置(一般是程序开始的时候),让My_Task开始工作

    MYTASK::intance()->activate(
    THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , // 线程创建的属性
    n_threads = 1, // 线程的数目,即有多少处理线程
    ...)

    5、在有消息发生的时候发送消息

    1 ACE_Message_Block * msg;
    2 // fill the msg
    3 ...
    4 MYTASK::intance()->putq(msg);

    三、简单示例

    生产者消费者示例

      1 /*************************************************************************
      2     > File Name: task.cpp
      3     > Author: 
      4     > Mail: 
      5     > Created Time: Tue 10 Oct 2017 02:49:52 PM CST
      6  ************************************************************************/
      7 
      8 #include <ace/Synch.h>
      9 #include <ace/Task.h>
     10 #include <ace/Message_Block.h>
     11 
     12 char test_message[] = "test_message";
     13 #define MAX_MESSAGES 10
     14 class Counting_Test_Producer : public ACE_Task<ACE_MT_SYNCH>
     15 {
     16     public:
     17         Counting_Test_Producer (ACE_Message_Queue<ACE_MT_SYNCH> *queue)
     18         :ACE_Task<ACE_MT_SYNCH>(0,queue) {}
     19         virtual int svc (void);
     20 };
     21 
     22 int Counting_Test_Producer::svc (void)
     23 {
     24     int produced = 0;
     25     char data[256] = {0};
     26     ACE_Message_Block * b = 0;
     27 
     28     while(1)
     29     {
     30         ACE_OS::sprintf(data, "%s--%d.
    ", test_message, produced);
     31 
     32         //创建消息块
     33         ACE_NEW_NORETURN (b, ACE_Message_Block (256));
     34         if (b == 0)
     35         {
     36             break;
     37         }
     38         
     39         //将data中的数据复制到消息块中
     40         b->copy(data, 256);
     41         if (produced >= MAX_MESSAGES)
     42         {
     43             //如果是最后一个数据,那么将数据属性设置为MB_STOP
     44             b->msg_type(ACE_Message_Block::MB_STOP);
     45 
     46             //将消息块放入队列中
     47             if (this->putq(b) == -1)
     48             {
     49                 b->release();
     50                 break;
     51             }
     52             produced ++;
     53             ACE_DEBUG((LM_DEBUG, ACE_TEXT("Producer put the data: %s.
    "), b->base()));
     54             break;
     55         }
     56         if (this->putq(b) == -1)
     57         {
     58             b->release();
     59             break;
     60         }
     61         produced ++;
     62 
     63         ACE_DEBUG((LM_DEBUG, ACE_TEXT("Producer put the data: %s.
    "), b->base()));
     64         ACE_OS::sleep(1);
     65     }
     66     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Producer done
    ")));
     67     return 0;
     68 }
     69 
     70 class Counting_Test_Consumer : public ACE_Task<ACE_MT_SYNCH>
     71 {
     72     public:
     73         Counting_Test_Consumer (ACE_Message_Queue<ACE_MT_SYNCH> *queue)
     74         :ACE_Task<ACE_MT_SYNCH> (0, queue){}
     75         virtual int svc(void);
     76 };
     77 
     78 int Counting_Test_Consumer::svc(void)
     79 {
     80     int consumer = 0;
     81     ACE_Message_Block *b = 0;
     82     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("in consumer svc.
    ")));
     83     ACE_OS::sleep(30);
     84     while(1)
     85     {
     86         //循环从队列中读取数据块,如果读取失败,那么退出线程
     87         if (this->getq(b) == -1)
     88         {
     89             break;
     90         }
     91         if (b->msg_type() == ACE_Message_Block::MB_STOP)
     92         {
     93             //如果消息属性是MB_STOP,那么表示其为最后一个数据
     94             ACE_DEBUG((LM_DEBUG, ACE_TEXT("Consumer get the data: %s.
    "), b->base()));
     95             ACE_DEBUG((LM_DEBUG, ACE_TEXT("Consumer get the stop msg.
    ")));
     96             b->release();
     97             consumer++;
     98             break;
     99         }
    100         ACE_DEBUG((LM_DEBUG, ACE_TEXT("Consumer get the data: %s.
    "), b->base()));
    101         b->release();
    102         consumer++;
    103         ACE_OS::sleep(5);
    104     }
    105 
    106     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Consumer done
    ")));
    107     return 0;
    108 }
    109 
    110 int ACE_MAIN(int argc, ACE_TCHAR *argv[])
    111 {
    112     //创建消息队列
    113     ACE_Message_Queue<ACE_MT_SYNCH> queue(2*1024*1024);
    114 
    115     // 创建生产者和消费者,它们使用同一个消息队列,只有这样才能实现线程间消息的传递
    116     Counting_Test_Producer producer(&queue);
    117     Counting_Test_Consumer consumer(&queue);
    118 
    119     //调用activate函数创建消费者线程
    120     if (consumer.activate(THR_NEW_LWP | THR_DETACHED | THR_INHERIT_SCHED, 1) == -1)
    121     {
    122         ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT("Consumers %p
    "), ACE_TEXT("activate")), -1);
    123     }
    124 
    125     //调用activate函数创建生产者线程
    126     if (producer.activate ( THR_NEW_LWP | THR_DETACHED | THR_INHERIT_SCHED, 1) == -1)
    127     {
    128         ACE_ERROR((LM_ERROR, ACE_TEXT("Producers %p
    "), ACE_TEXT("activate")));
    129         consumer.wait();
    130         return -1;
    131     }
    132     //调用wait函数等待线程结束
    133     ACE_Thread_Manager::instance()->wait();
    134     ACE_DEBUG((LM_DEBUG, ACE_TEXT("Ending test!
    ")));
    135     return 0;
    136 }

    问题1:

    In file included from /usr/include/ace/config-macros.h:24,
                     from /usr/include/ace/config-lite.h:24,
                     from /usr/include/ace/ACE_export.h:11,
                     from /usr/include/ace/Shared_Object.h:18,
                     from /usr/include/ace/Service_Object.h:17,
                     from /usr/include/ace/Task.h:17,
                     from task.cpp:9:
    /usr/include/ace/config.h:20:2: error: #error "_FILE_OFFSET_BITS != 64"

    编译时添加-D_FILE_OFFSET_BITS=64

  • 相关阅读:
    关于研发岗位怎样做新人培训
    打工人的出路
    箭头函数的副作用
    前端日志系统设计
    cocos creator3d开发插件
    linux+Docker+asp.netCore部署
    asp.net core API 3.1获取图片返回流文件
    asp.net Core API 获取ip地址
    asp.net core 3.1配置log4net
    Asp.net Core JsonWebToken记录
  • 原文地址:https://www.cnblogs.com/zl1991/p/7646767.html
Copyright © 2011-2022 走看看