zoukankan      html  css  js  c++  java
  • ACE-Streams架构简介及应用

    一概述

    Streams框架是管道和过滤构架模式的一种实现,主要应用于处理数据流的系统。其实现以Task框架为基础。Task框架有两个特性非常适用于Streams框架:一是Task框架可用于创建独立线程的并发环境,这适合应用于ACE Streams框架中的主动过滤器;二是Task框架有统一的数据传输结果——消息队列,这适用于Streams框架中的管道。

    二ACE_Task类

    这里主要介绍与Streams框架相关的部分。

    成员变量

     1 Task_T.h
     2 
     3 class ACE_Task : public ACE_Task_Base
     4 
     5 {
     6 
     7 /// Queue of messages on the ACE_Task..
     8 
     9   ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> *msg_queue_;
    10 
    11  
    12 
    13   /// true if should delete Message_Queue, false otherwise.
    14 
    15   bool delete_msg_queue_;
    16 
    17  
    18 
    19   /// Back-pointer to the enclosing module.
    20 
    21   ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod_;
    22 
    23  
    24 
    25   /// Pointer to adjacent ACE_Task.
    26 
    27   ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *next_;
    28 
    29 }

    put函数

    在ACE_Task框架中,put函数没有实际作用,在默认情况下,该函数没有执行任何操作,仅仅返回0.但是在Streams框架中,put函数与put_next函数结合起来可以实现数据在过滤器间的传输。如果put函数将数据保存在消息队列中,通过独立的线程来处理这些消息,那么它将成为一个主动过滤器;反之,如果put函数直接对数据进行处理,然后交给下一个过滤器,那么它就是一个被动过滤器。

     1 Task.cpp
     2 
     3 /// Default ACE_Task put routine.
     4 
     5 int
     6 
     7 ACE_Task_Base::put (ACE_Message_Block *, ACE_Time_Value *)
     8 
     9 {
    10 
    11   ACE_TRACE ("ACE_Task_Base::put");
    12 
    13   return 0;
    14 
    15 }

    put_next函数

    如果在数据处理流水线有下一个过滤器,那么put_next函数用于将数据交给下一个过滤器处理。如下:

     1 Task_T.inl
     2 
     3 // Transfer msg to the next ACE_Task.
     4 
     5  
     6 
     7 template <ACE_SYNCH_DECL, class TIME_POLICY> ACE_INLINE int
     8 
     9 ACE_Task<ACE_SYNCH_USE, TIME_POLICY>::put_next (ACE_Message_Block *msg, ACE_Time_Value *tv)
    10 
    11 {
    12 
    13   ACE_TRACE ("ACE_Task<ACE_SYNCH_USE, TIME_POLICY>::put_next");
    14 
    15   return this->next_ == 0 ? -1 : this->next_->put (msg, tv);
    16 
    17 }

    next_指向的是有序过滤器的下一个过滤器。通过put_next函数,可以将数据交给下一个过滤器处理。

    Streams框架应用示例

    在这个示例中,我们将一个数据流的处理分为4步,在Streams框架中,将每个处理步骤称为一个Module:

    • Logrec_Reader:从文件中读取记录,然后交给下一个步骤。
    • Logrec_Timer:在记录尾部加上“format_data”
    • Logrec_Suffix:在记录尾部加上一个后缀——suffix
    • Logrec_Write:将记录显示在终端上

    Logrec_Reader类

    其是ACE_Task的子类,是一个主动对象类,有独立的控制线程,线程处理函数是svc。在Streams框架中,Logrec_Reader类是一个主动过滤器,代码如下:

     1 class Logrec_Reader : public ACE_Task<ACE_MT_SYNCH>
     2 {
     3 private:
     4     ifstream fin;  //标准输入流
     5 public:
     6     Logrec_Reader(ACE_TString logfile)
     7     {
     8         fin.open(logfile.c_str()); //ACE_TString.c_str() 转换为char
     9     }
    10     virtual int open (void *)
    11     {
    12         return activate();
    13     }
    14 
    15     virtual int svc()
    16     {
    17         ACE_Message_Block *mblk;
    18         int len = 0;
    19         const int LineSize = 256;
    20         char file_buf[LineSize];
    21 
    22         while(!fin.eof())
    23         {
    24             fin.getline(file_buf, LineSize);
    25             len = ACE_OS::strlen(file_buf);
    26             ACE_NEW_RETURN(mblk, ACE_Message_Block(len + 200), 0);
    27             if (file_buf[len - 1] == '
    ')
    28             {
    29                 len = len - 1;
    30             }
    31             mblk->copy(file_buf, len);
    32             // 通过put_next函数,将消息传递给下一个过滤器
    33             put_next(mblk);
    34         }
    35         //构造一个MB_STOP消息
    36         ACE_NEW_RETURN(mblk, ACE_Message_Block (0, ACE_Message_Block::MB_STOP), 0);
    37         put_next(mblk);
    38         fin.close();
    39         ACE_DEBUG((LM_DEBUG, "read svc return. 
    "));
    40         return 0;
    41     }
    42 };

    Logrec_Timer类

    也是ACE_Task的子类,但是它不是主动对象类,没有创建独立线程。其实现了put函数,这个函数被它的上一个过滤器(Logrec_Reader)调用,并且数据直接在这个函数中处理。

    这里for循环用于处理消息链表,在示例中并没有使用链表因此for循环只会执行一次。

     1 class Logrec_Timer : public ACE_Task<ACE_SYNCH>
     2 {
     3 private:
     4     void format_data(ACE_Message_Block *mblk)
     5     {
     6         char *msg = mblk->data_block()->base();
     7         strcat(msg, "format_data");
     8     }
     9 public:
    10     virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *)
    11     {
    12         for (ACE_Message_Block *temp = mblk;
    13             temp != 0; temp = temp->cont())
    14         {
    15             if (temp->msg_type() != ACE_Message_Block::MB_STOP)
    16             {
    17                 format_data(temp);
    18             }
    19         }
    20         return put_next(mblk);
    21     }
    22 };

    Logrec_Suffix类

    类似Logrec_Timer

     1 class Logrec_Suffix : public ACE_Task<ACE_SYNCH>
     2 {
     3 public:
     4     void suffix(ACE_Message_Block *mblk)
     5     {
     6         char *msg = mblk->data_block()->base();
     7         strcat(msg, "suffix
    ");
     8     }
     9     virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *)
    10     {
    11         for (ACE_Message_Block *temp = mblk;
    12             temp != 0; temp = temp->cont())
    13         {
    14             if (temp->msg_type() != ACE_Message_Block::MB_STOP)
    15             {
    16                 suffix(temp);
    17             }
    18         }
    19         return put_next(mblk);
    20     }
    21 };

    Logrec_Write类

    这里put函数由上一个过滤器(Logrec_Suffix)调用,其并没有对数据进行实际处理,只是将数据放入队列中,由线程独立处理。

     1 class Logrec_Write : public ACE_Task<ACE_SYNCH>
     2 {
     3 public:
     4     virtual int open(void*)
     5     {
     6         ACE_DEBUG((LM_DEBUG, "Logrec_Writer.
    "));
     7         return activate();
     8     }
     9 
    10     virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *to)
    11     {
    12         return putq(mblk, to);
    13     }
    14 
    15     virtual int svc()
    16     {
    17         int stop = 0;
    18         for (ACE_Message_Block *mb; !stop && getq(mb) != -1;)
    19         {
    20             if (mb->msg_type() == ACE_Message_Block::MB_STOP)
    21             {
    22                 stop = 1;
    23             }
    24             else{
    25                 ACE_DEBUG((LM_DEBUG, "%s", mb->base()));
    26             }
    27             put_next(mb);
    28         }
    29         return 0;
    30     }
    31 };

    Main

    这里讲ACE_Module放入Streams中,ACE_Module才是真正数据处理的Module。ACE_Streams类有两个数据成员:stream_head_和stream_tail_,它们指向ACE_Module链表的首尾。对于每个Streams,其默认带有首尾两个Module,而后可以通过push将数据处理的Module放入执行链表中。每个Module包含两个Task,分别为读Task和写Task。在示例中仅注册了写Task,这些Module和Task通过next指针构成一个有序的串。

    这里注意push有顺序要求,最后push即栈顶的为先执行的Module。

     1 int ACE_TMAIN(int argc, ACE_TCHAR *argv[])
     2 {
     3     if (argc != 2)
     4     {
     5         ACE_ERROR_RETURN((LM_ERROR, "usage: %s logfile
    ", argv[0]), 1);
     6     }
     7 
     8     ACE_TString logfile (argv[1]);
     9 
    10     ACE_Stream<ACE_SYNCH> stream;
    11 
    12 
    13     ACE_Module<ACE_MT_SYNCH> *module[4];
    14     module[0] = new ACE_Module<ACE_MT_SYNCH>("Reader", new Logrec_Reader(logfile), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
    15     module[1] = new ACE_Module<ACE_MT_SYNCH>("Formatter", new Logrec_Timer(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
    16     module[2] = new ACE_Module<ACE_MT_SYNCH>("Separator", new Logrec_Suffix(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
    17     module[3] = new ACE_Module<ACE_MT_SYNCH>("Writer", new Logrec_Write(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
    18 
    19     for ( int i = 3; i >= 0; --i )
    20     {
    21         if (stream.push(module[i]) == -1)
    22         {
    23             ACE_ERROR_RETURN((LM_ERROR, "push %s module into stream failed.
    ", module[i]->name()), 1);
    24         }
    25         ACE_DEBUG((LM_DEBUG, "push %s module into stream success. 
    ", module[i]->name()));
    26     }
    27     ACE_Thread_Manager::instance()->wait();
    28 }

    附完整代码及结果:https://github.com/ShiningZhang/ACE_Learning/tree/master/streams

      1 /*************************************************************************
      2     > File Name: logrec.cpp
      3     > Author: 
      4     > Mail: 
      5     > Created Time: Fri 13 Oct 2017 04:19:39 PM CST
      6  ************************************************************************/
      7  #include <fstream>
      8  #include <ace/Synch.h>
      9  #include <ace/Task.h>
     10  #include <ace/Message_Block.h>
     11  #include <ace/Stream.h>
     12  #include "ace/Thread_Manager.h"
     13  #include <ace/Time_Value.h>
     14  #include <ace/Module.h>
     15 
     16 using namespace std;
     17 
     18 class Logrec_Reader : public ACE_Task<ACE_MT_SYNCH>
     19 {
     20 private:
     21     ifstream fin;  //标准输入流
     22 public:
     23     Logrec_Reader(ACE_TString logfile)
     24     {
     25         fin.open(logfile.c_str()); //ACE_TString.c_str() 转换为char
     26     }
     27     virtual int open (void *)
     28     {
     29         return activate();
     30     }
     31 
     32     virtual int svc()
     33     {
     34         ACE_Message_Block *mblk;
     35         int len = 0;
     36         const int LineSize = 256;
     37         char file_buf[LineSize];
     38 
     39         while(!fin.eof())
     40         {
     41             fin.getline(file_buf, LineSize);
     42             len = ACE_OS::strlen(file_buf);
     43             ACE_NEW_RETURN(mblk, ACE_Message_Block(len + 200), 0);
     44             if (file_buf[len - 1] == '
    ')
     45             {
     46                 len = len - 1;
     47             }
     48             mblk->copy(file_buf, len);
     49             // 通过put_next函数,将消息传递给下一个过滤器
     50             put_next(mblk);
     51         }
     52         //构造一个MB_STOP消息
     53         ACE_NEW_RETURN(mblk, ACE_Message_Block (0, ACE_Message_Block::MB_STOP), 0);
     54         put_next(mblk);
     55         fin.close();
     56         ACE_DEBUG((LM_DEBUG, "read svc return. 
    "));
     57         return 0;
     58     }
     59 };
     60 
     61 class Logrec_Timer : public ACE_Task<ACE_SYNCH>
     62 {
     63 private:
     64     void format_data(ACE_Message_Block *mblk)
     65     {
     66         char *msg = mblk->data_block()->base();
     67         strcat(msg, "format_data");
     68     }
     69 public:
     70     virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *)
     71     {
     72         for (ACE_Message_Block *temp = mblk;
     73             temp != 0; temp = temp->cont())
     74         {
     75             if (temp->msg_type() != ACE_Message_Block::MB_STOP)
     76             {
     77                 format_data(temp);
     78             }
     79         }
     80         return put_next(mblk);
     81     }
     82 };
     83 
     84 class Logrec_Suffix : public ACE_Task<ACE_SYNCH>
     85 {
     86 public:
     87     void suffix(ACE_Message_Block *mblk)
     88     {
     89         char *msg = mblk->data_block()->base();
     90         strcat(msg, "suffix
    ");
     91     }
     92     virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *)
     93     {
     94         for (ACE_Message_Block *temp = mblk;
     95             temp != 0; temp = temp->cont())
     96         {
     97             if (temp->msg_type() != ACE_Message_Block::MB_STOP)
     98             {
     99                 suffix(temp);
    100             }
    101         }
    102         return put_next(mblk);
    103     }
    104 };
    105 
    106 class Logrec_Write : public ACE_Task<ACE_SYNCH>
    107 {
    108 public:
    109     virtual int open(void*)
    110     {
    111         ACE_DEBUG((LM_DEBUG, "Logrec_Writer.
    "));
    112         return activate();
    113     }
    114 
    115     virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *to)
    116     {
    117         return putq(mblk, to);
    118     }
    119 
    120     virtual int svc()
    121     {
    122         int stop = 0;
    123         for (ACE_Message_Block *mb; !stop && getq(mb) != -1;)
    124         {
    125             if (mb->msg_type() == ACE_Message_Block::MB_STOP)
    126             {
    127                 stop = 1;
    128             }
    129             else{
    130                 ACE_DEBUG((LM_DEBUG, "%s", mb->base()));
    131             }
    132             put_next(mb);
    133         }
    134         return 0;
    135     }
    136 };
    137 
    138 int ACE_TMAIN(int argc, ACE_TCHAR *argv[])
    139 {
    140     if (argc != 2)
    141     {
    142         ACE_ERROR_RETURN((LM_ERROR, "usage: %s logfile
    ", argv[0]), 1);
    143     }
    144 
    145     ACE_TString logfile (argv[1]);
    146 
    147     ACE_Stream<ACE_SYNCH> stream;
    148 
    149 
    150     ACE_Module<ACE_MT_SYNCH> *module[4];
    151     module[0] = new ACE_Module<ACE_MT_SYNCH>("Reader", new Logrec_Reader(logfile), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
    152     module[1] = new ACE_Module<ACE_MT_SYNCH>("Formatter", new Logrec_Timer(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
    153     module[2] = new ACE_Module<ACE_MT_SYNCH>("Separator", new Logrec_Suffix(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
    154     module[3] = new ACE_Module<ACE_MT_SYNCH>("Writer", new Logrec_Write(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
    155 
    156     for ( int i = 3; i >= 0; --i )
    157     {
    158         if (stream.push(module[i]) == -1)
    159         {
    160             ACE_ERROR_RETURN((LM_ERROR, "push %s module into stream failed.
    ", module[i]->name()), 1);
    161         }
    162         ACE_DEBUG((LM_DEBUG, "push %s module into stream success. 
    ", module[i]->name()));
    163     }
    164     ACE_Thread_Manager::instance()->wait();
    165 }

     

    test:

     

  • 相关阅读:
    MVC模式简单介绍
    Android AES加密算法及事实上现
    01背包问题
    C++继承经典样例
    [React] React Fundamentals: Using Refs to Access Components
    [React] React Fundamentals: Owner Ownee Relationship
    [React] React Fundamentals: State Basics
    [React] React Fundamentals: First Component
    [Javascript] Lodash: Refactoring Simple For Loops (_.find, _.findLast, _.filter)
    [Javascript] Create an Array concatAll method
  • 原文地址:https://www.cnblogs.com/zl1991/p/7662993.html
Copyright © 2011-2022 走看看