zoukankan      html  css  js  c++  java
  • CV学习日志:CV开发之环形缓冲器

             本文设计一种无锁环形Buffer,它的核心功能是让后端可以获取最新的传感器数据。  

             以ROS中结点概念来描述其功能就是,一个结点(发布结点)不断地获取传感器数据写到环形Buffer,另一个或多个结点去环形Buffer中取数据但要求每次取到的必须是最新传感器数据。

             于是可以按如下原则来实现这个环形Buffer:

             (1)当读操作比写操作快时读操作等待直到写操作完成后,读操作才能继续:详细而言,就是写操作正在写一个单元时,读操作也要读这个单元;此时的设计模式为,读操作只能等待,直到写操作完成对该单元的写且释放了对该单元的控制权后,读操作才能继续。代码设计上就是读返回失败,可休眠一定时间后再读,如此反复,直到读成功。

             (2)当读操作比写操作慢时,读操作将跳过中间的所有单元,直接定位到最新单元读:需要注意的是,若读操作直接使用单元内存(对大内存单元进行拷贝耗时)而不是将单元拷贝出来后才进行相关运算,则这些运算必须足够快,至少在写操作完成一个环形周期,又回到当前单元时要运算结束,否则将读不到正确的数据进行运算。可根据运算时间与写操作的频率设置一个足够长的环形Buffer,以保证数据的安全性。

             将以上所述功能及使用样例封装为CirArr、CirMat、CirMatDemo三个类。

    1.关于template<class Object, int nbuf0 = 9, int nchn0 = 9, int ndim0 = 6> class CirArr

             (1)Object是单元类型、nbuf0是Buffer长度、nchn0是Buffer能同时支持的读者数量、ndim0是单例模式的单例数。

             (2)int getState()返回Buffer当前状态,只有Inited和Deinited两个状态。

             (3)int getBuf()&getChn()&getDim():返回值意义同nbuf0&nchn0&ndim0。

             (4)bool getLatest(…):读取最新单元,返回是否读成功。

             (5)int64 lockWritten(…):移动到下一单元并锁定该单元,成功返回写序号,失败返回-1。

             (6)int64 unlockWritten(…):释放当前单元使之能被读,成功返回1,失败返回-1。

             (7)bool init():初始化Buffer,主要是激活状态和初始化数据单元,返回是否成功,子类通常需要重写/重载此函数。

             (8)bool deinit():释放Buffer,主要欠激活状态和释放数据单元,返回是否成功,子类通常需要重写/重载此函数。

             (9)使用要点:主模块负责init和deinit,写模块进行lockWritten、writeData、unlockWritten、读模块进行getLatest和readData

             (10)是否继承:核心功能已在本类实现,继承主要是重写/重载init和deinit函数,需要继承主要有两种情形:一是单元是动态类型(即需要专门的内存分配和释放),二是单元的初值有特定要求(即单元初值不能是零)。若单元为固定数据类型(即无需内存分配)且初值无特定要求(本类将所有单元置为零),则无需继承。

             (11)设计原理:基于系统中断原理,int64在同一系统中访问具有原子性,于是定义int64的readPos和writePos。对于读,只有当writePos大于readPose,读才能进行且直接跳到writePos的位置读,同时将当前writePose的值赋给readPos,这就保证了永远读到最新的。对于写,每进行一次写操作就递增writePos,且写结束后才递增,这就保证了写的过程中不会被读。

    2.关于template<class tp, int nbuf0 = 9, int nchn0 = 9, int ndim0 = 6> class CirMat

             (1)公共继承类CirArr<Mat_<tp>, nbuf0, nchn0, ndim0>,核心是将Object具体化为cv::Mat_<tp>

             (2)重写/重载init和deinit函数以实现对基于cv::Mat的Buffer的初始化和释放

    3.关于CirMatDemo

             (1)定义三个业务类ControlThread、ReadThread、WriteThread和一个环形缓冲类CirMatUint8=CirMat<uchar,7)。

             (2)ControlThread负责初始化CirMatUint8、启动读线程、启动写线程。

             (3)WriteThread反复写CirMatUint8,每次写入有编号的图像,编号位于图像中心且编号递增。

             (4)ReadThread反复读CirMatUint8,每次读到图像后,就显示出来。

             (5)测试代码中,写速度约是读速度的10倍,CirMatUint8默认长度是7,测试结果显示读出的图像不正常,修改为更大的缓冲长度,可得到正常效果的图像。

             以下是详细代码,依赖于C++14、OpenCV4.x和Spdlog,封装为三个类:

             (1)CirArr:State+getState/getBuf/getChn/getDim+init+deinit+getLatest+lockWritten+unlockWritten+GetMe

             (2)CirMat:inherit from CirArr and rewrite/overload init&deinit&GetMe

             (3)CirMatDemo:CirMatUint8(from CirMat<uchar, 7>)+WriteThread+ReadThread+ControlThread+TestMe

      1 #include <opencv2/opencv.hpp>
      2 #include <spdlog/spdlog.h>
      3 using namespace std;
      4 using namespace cv;
      5 #ifndef ns1970
      6 #define ms1970 chrono::time_point_cast<chrono::milliseconds>(chrono::system_clock::now()).time_since_epoch().count()
      7 #define us1970 chrono::time_point_cast<chrono::microseconds>(chrono::system_clock::now()).time_since_epoch().count()
      8 #define ns1970 chrono::time_point_cast<chrono::nanoseconds>(chrono::system_clock::now()).time_since_epoch().count()
      9 #endif
     10 
     11 template<class Object, int nbuf0 = 9, int nchn0 = 9, int ndim0 = 6> class CirArr
     12 {
     13 public:
     14     static CirArr& GetMe(int dim = 0) { static CirArr us[ndim0]; return us[dim]; }
     15     void help()
     16     {
     17         spdlog::info("1.Read faster than write: read waits until timeout");
     18         spdlog::info("2.Read slower than write: read skips intermediary buffers and goes directly to the latest buffer(bufCount should be enough large in case writePos returns to readPos)");
     19     }
     20 
     21 public:
     22     enum State
     23     {
     24         Deinited,
     25         Inited
     26     };
     27 
     28 protected:
     29     State state = Deinited;
     30     const int nbuf = nbuf0;
     31     const int nchn = nchn0;
     32     const int ndim = ndim0;
     33 
     34 protected:
     35     Object objects[nbuf0];
     36     int64 readPos[nchn0] = { 0 };
     37     int64 writePos = 0;
     38 
     39 public:
     40     State getState() { return state; }
     41     int getBuf() { return nbuf; }
     42     int getChn() { return nchn; }
     43     int getDim() { return ndim; }
     44 
     45 public:
     46     virtual bool init()
     47     {
     48         if (state == Inited) { spdlog::info("zero operation"); return true; }
     49         memset(objects, 0, sizeof(objects));
     50         memset(readPos, 0, sizeof(readPos));
     51         memset(&writePos, 0, sizeof(writePos));
     52         state = Inited;
     53         return true;
     54     }
     55     virtual bool deinit()
     56     {
     57         if (state == Deinited) { spdlog::info("zero operation"); return true; }
     58         memset(objects, 0, sizeof(objects));
     59         memset(readPos, 0, sizeof(readPos));
     60         memset(&writePos, 0, sizeof(writePos));
     61         state = Deinited;
     62         return true;
     63     }
     64 
     65 public:
     66     bool getLatest(Object** object, int chnId, int msTimeout = 1000, int msSleep = 2)
     67     {
     68         if (state != Inited) { spdlog::critical("wrong state"); return false; }
     69         for (int64 t0 = ms1970; ms1970 - t0 < msTimeout;)
     70         {
     71             int64 availablePos = writePos;
     72             if (availablePos > readPos[chnId])
     73             {
     74                 int64 relativePos = availablePos % nbuf;
     75                 *object = objects + relativePos;
     76                 readPos[chnId] = availablePos;
     77                 return true;
     78             }
     79             this_thread::sleep_for(chrono::milliseconds(msSleep));
     80         }
     81         return false;
     82     }
     83     int64 lockWritten(Object** object)
     84     {
     85         if (state != Inited) { spdlog::critical("wrong state"); return -1; }
     86         int64 absolutePos = writePos;
     87         int64 relativePos = ++absolutePos % nbuf;
     88         *object = objects + relativePos;
     89         return absolutePos;
     90     }
     91     int64 unlockWritten(int64 absolutePos)
     92     {
     93         if (state != Inited) { spdlog::critical("wrong state"); return -1; }
     94         return (writePos = absolutePos);
     95     }
     96 };
     97 
     98 
     99 template<class tp, int nbuf0 = 9, int nchn0 = 9, int ndim0 = 6> class CirMat : public CirArr<Mat_<tp>, nbuf0, nchn0, ndim0>
    100 {
    101 public://Subclass cannot access parent members without following code in GCC
    102     using CirArr<Mat_<tp>, nbuf0, nchn0, ndim0>::state;
    103     using CirArr<Mat_<tp>, nbuf0, nchn0, ndim0>::nbuf;
    104     using CirArr<Mat_<tp>, nbuf0, nchn0, ndim0>::nchn;
    105     using CirArr<Mat_<tp>, nbuf0, nchn0, ndim0>::ndim;
    106     using CirArr<Mat_<tp>, nbuf0, nchn0, ndim0>::objects;
    107     using CirArr<Mat_<tp>, nbuf0, nchn0, ndim0>::readPos;
    108     using CirArr<Mat_<tp>, nbuf0, nchn0, ndim0>::writePos;
    109     using CirArr<Mat_<tp>, nbuf0, nchn0, ndim0>::Deinited;
    110     using CirArr<Mat_<tp>, nbuf0, nchn0, ndim0>::Inited;
    111 
    112 public:
    113     static CirMat& GetMe(int dim = 0) { static CirMat us[ndim0]; return us[dim]; }
    114 
    115 public:
    116     bool init() { spdlog::critical("please call the other one"); return false; };
    117     bool init(int rows, int cols)
    118     {
    119         if (state == Inited) { spdlog::info("zero operation"); return true; }
    120         for (int k = 0; k < nbuf; ++k) { objects[k].create(rows, cols); objects[k] = 0; }
    121         memset(readPos, 0, sizeof(readPos));
    122         memset(&writePos, 0, sizeof(writePos));
    123         state = Inited;
    124         return true;
    125     }
    126     bool deinit()
    127     {
    128         if (state == Deinited) { spdlog::info("zero operation"); return true; }
    129         for (int k = 0; k < nbuf; ++k) objects[k].release();
    130         memset(readPos, 0, sizeof(readPos));
    131         memset(&writePos, 0, sizeof(writePos));
    132         state = Deinited;
    133         return true;
    134     }
    135 };
    136 
    137 
    138 class CirMatDemo
    139 {
    140 public:
    141     static void TestMe(int argc = 0, char** argv = 0)
    142     {
    143         //1.StartControl
    144         if (ControlThread::GetMe().start() != 0) spdlog::critical("ControlThread::GetMe().start failed");
    145         else spdlog::info("ControlThread::GetMe().start succeeded");
    146 
    147         //2.StopControl
    148         if (ControlThread::GetMe().stop() != 0) spdlog::critical("ControlThread::GetMe().stop failed");
    149         else spdlog::info("ControlThread::GetMe().stop succeeded");
    150     }
    151 
    152 public:
    153     using CirMatUint8 = CirMat<uchar, 7>; //write is about ten times faster than read; set nbuf as 4/6/8/10/12/14 for showing different effect
    154 
    155 public:
    156     class WriteThread
    157     {
    158     public:
    159         static WriteThread& GetMe() { static WriteThread me; return me; }
    160 
    161     public:
    162         enum State
    163         {
    164             Stopped,
    165             Started
    166         };
    167 
    168     protected:
    169         State state = Stopped;
    170 
    171     public:
    172         State getState() { return state; }
    173 
    174     public:
    175         bool start()
    176         {
    177             if (state == Started) { spdlog::info("zero operation"); return true; }
    178             if (CirMatUint8::GetMe().getState() != CirMatUint8::Inited) { spdlog::critical("CirMatUint8 not Inited"); return false; }
    179             state = Started;
    180             std::thread([this]()->void
    181                 {
    182                     int k = 0;
    183                     while (1)
    184                     {
    185                         //Exit
    186                         if (state == Stopped) break;
    187 
    188                         //GetPointer
    189                         Mat_<uchar>* mat;
    190                         int64 absolutePos = CirMatUint8::GetMe().lockWritten(&mat);
    191                         if (absolutePos < 0) { spdlog::critical("CirMatUint8::GetMe().lockWritten failed"); continue; }
    192 
    193                         //OperateData
    194                         *mat = 0;
    195                         char num[20]; sprintf(num, "WritePos: %d", ++k);
    196                         putText(*mat, num, Point(mat->cols / 5, mat->rows >> 1), FONT_HERSHEY_PLAIN, 2, Scalar(255, 255, 255), 2, LINE_8, false);
    197 
    198                         //ReleasePointer
    199                         if (CirMatUint8::GetMe().unlockWritten(absolutePos) < 0) spdlog::critical("CirMatUint8::GetMe().unlockWritten failed");
    200 
    201                         //ControlFramerate
    202                         this_thread::sleep_for(50ms);
    203                     }
    204                 }).detach();
    205 
    206                 return true;
    207         }
    208         bool stop()
    209         {
    210             if (state == Stopped) { spdlog::info("zero operation"); return true; }
    211             state = Stopped;
    212             return true;
    213         }
    214     };
    215 
    216 
    217     class ReadThread
    218     {
    219     public:
    220         static ReadThread& GetMe() { static ReadThread me; return me; }
    221 
    222     public:
    223         enum State
    224         {
    225             Stopped,
    226             Started
    227         };
    228 
    229     protected:
    230         State state = Stopped;
    231 
    232     public:
    233         State getState() { return state; }
    234 
    235     public:
    236         bool start()
    237         {
    238             if (state == Started) { spdlog::info("zero operation"); return true; }
    239             if (CirMatUint8::GetMe().getState() != CirMatUint8::Inited) { spdlog::critical("CirMatUint8 not Inited"); return false; }
    240             state = Started;
    241             std::thread([this]()->void
    242                 {
    243                     int k = 0;
    244                     while (1)
    245                     {
    246                         //Exit
    247                         if (state == Stopped) break;
    248 
    249                         //GetPointer
    250                         Mat_<uchar>* mat;
    251                         if (!CirMatUint8::GetMe().getLatest(&mat, 0, 100)) { spdlog::info("CirMatUint8::GetMe().getLatest failed"); continue; }
    252 
    253                         //OperateData
    254                         mat->rowRange(0, mat->rows / 3) = 255;
    255                         mat->rowRange(mat->rows * 2 / 3, mat->rows) = 255;
    256                         char num[20]; sprintf(num, "ReadPos: %d", ++k);
    257                         putText(*mat, num, Point(mat->cols * 0.6, mat->rows >> 1), FONT_HERSHEY_PLAIN, 2, Scalar(255, 255, 255), 2, LINE_8, false);
    258 
    259                         //ControlFramerate
    260                         this_thread::sleep_for(400ms);
    261                         cv::imshow("effect", *mat);
    262                         cv::waitKey(100);
    263                     }
    264                 }).detach();
    265 
    266                 return true;
    267         }
    268         bool stop()
    269         {
    270             if (state == Stopped) { spdlog::info("zero operation"); return true; }
    271             state = Stopped;
    272             return true;
    273         }
    274     };
    275 
    276 
    277     class ControlThread
    278     {
    279     public:
    280         static ControlThread& GetMe() { static ControlThread me; return me; }
    281 
    282     public:
    283         enum State
    284         {
    285             Stopped,
    286             BufferStarted,
    287             WriteStarted,
    288             ReadStarted,
    289             Started
    290         };
    291 
    292     protected:
    293         State state = Stopped;
    294 
    295     public:
    296         State getState() { return state; }
    297 
    298     public:
    299         bool start()
    300         {
    301             if (state > Stopped) { spdlog::info("zero operation"); return true; }
    302 
    303             //1.StartBuffer
    304             if (CirMatUint8::GetMe().init(720, 1280) == false)
    305             {
    306                 spdlog::critical("CirMatUint8::GetMe().init failed");
    307                 stop(); return false;
    308             }
    309             else state = State(state + 1);
    310 
    311             //2.StartWrite
    312             if (WriteThread::GetMe().start() == false)
    313             {
    314                 spdlog::critical("WriteThread::GetMe().start failed");
    315                 stop(); return false;
    316             }
    317             else state = State(state + 1);
    318 
    319             //3.StartRead
    320             if (ReadThread::GetMe().start() == false)
    321             {
    322                 spdlog::critical("ReadThread::GetMe().start failed");
    323                 stop(); return false;
    324             }
    325             else state = State(state + 1);
    326 
    327             //4.MainThread
    328             while (1)
    329             {
    330                 static int sec = 60;
    331                 if (sec == 0) break;
    332                 spdlog::info("This is the main thread and finish in {}s", --sec);
    333                 this_thread::sleep_for(1000ms);
    334             }
    335 
    336             return true;
    337         }
    338         bool stop()
    339         {
    340             bool ret = true;
    341 
    342             //StopRead
    343             if (state > ReadStarted)
    344             {
    345                 if (ReadThread::GetMe().stop() != 0)
    346                 {
    347                     spdlog::critical("ReadThread::GetMe().stop failed");
    348                     ret = false;
    349                 }
    350                 state = State(state - 1);
    351             }
    352 
    353             //StopRead
    354             if (state > WriteStarted)
    355             {
    356                 if (WriteThread::GetMe().stop() != 0)
    357                 {
    358                     spdlog::critical("WriteThread::GetMe().stop failed");
    359                     ret = false;
    360                 }
    361                 state = State(state - 1);
    362             }
    363 
    364             //StopBuffer
    365             if (state > BufferStarted)
    366             {
    367                 if (CirMatUint8::GetMe().deinit() != 0)
    368                 {
    369                     spdlog::critical("CirMatUint8::GetMe().deinit failed");
    370                     ret = false;
    371                 }
    372                 state = State(state - 1);
    373             }
    374 
    375             return ret;
    376         }
    377     };
    378 };
    379 
    380 int main(int argc = 0, char** argv = 0) { CirMatDemo::TestMe(argc, argv); return 0; }
    View Code

             还存在另外一种需求是,不需要获得最新的传感器数据,而要获取指定id(如时间戳)或最接近指定id的数据。

        假设环形Buffer的单元是键值对,这就相当于给键,去环形Buffer中查询该键对应的值或与该键最接近的键对应的值。

             大致实现如下,也依赖于C++14和Spdlog,封装在类CirArrEx:State+getState/getBuf/getChn/getDim+init+deinit+find+findEx+write

     1 template<class Object, int nbuf0 = 9, int nchn0 = 9, int ndim0 = 6> class CirArrEx
     2 {    
     3 public:
     4     static CirArrEx &GetMe(int dim = 0) { static CirArrEx us[ndim0]; return us[dim]; }
     5 
     6 public:
     7     enum State
     8     {
     9         Deinited,
    10         Inited
    11     };
    12 
    13 protected:
    14     State state = Deinited;
    15     const int nbuf = nbuf0;
    16     const int nchn = nchn0;
    17     const int ndim = ndim0;
    18 
    19 protected:
    20     Object objects[nbuf0];
    21     int64 readPos[nchn0] = { 0 };
    22     int64 writePos = 0;
    23 
    24 public:
    25     State getState() { return state; }
    26     int getBuf() { return nbuf; }
    27     int getChn() { return nchn; }
    28     int getDim() { return ndim; }
    29 
    30 public:
    31     virtual bool init()
    32     {
    33         if (state == Inited) { spdlog::info("zero operation"); return true; }
    34         memset(objects, 0, sizeof(objects));
    35         memset(readPos, 0, sizeof(readPos));
    36         memset(&writePos, 0, sizeof(writePos));
    37         state = Inited;
    38         return true;
    39     }
    40     virtual bool deinit()
    41     {
    42         if (state == Deinited) { spdlog::info("zero operation"); return true; }
    43         memset(objects, 0, sizeof(objects));
    44         memset(readPos, 0, sizeof(readPos));
    45         memset(&writePos, 0, sizeof(writePos));
    46         state = Deinited;
    47         return true;
    48     }
    49 
    50 public:
    51     bool find(int64 ts, Object *object)
    52     {
    53         if (state != Inited) { spdlog::critical("wrong state"); return false; }
    54         if (writePos < nbuf) false;
    55         int64 tsMinDiff = INT64_MAX;
    56         for (int64 k = (writePos + 1) % nbuf, kk = (writePos - 1) % nbuf; k != kk; k = k == nbuf ? 0 : ++k) //from oldest to latest
    57             if (objects[k].ts == ts)
    58             {
    59                 *object = objects[k];
    60                 return true;
    61             }
    62             else
    63             {
    64                 int64 tsDiff = std::abs(ts - objects[k].ts);
    65                 if (tsMinDiff > tsDiff)
    66                 {
    67                     *object = objects[k];
    68                     tsMinDiff = tsDiff;
    69                 }
    70             }
    71         return (tsMinDiff != INT64_MAX);
    72     }
    73     int findEx(int64 ts1, int64 ts2, Object *objects)
    74     {
    75         if (state != Inited) { spdlog::critical("wrong state"); return 0; }
    76         if (writePos < nbuf) true;
    77         int count = 0;
    78         for (int64 k = (writePos + 1) % nbuf, kk = (writePos - 1) % nbuf; k != kk; k = k == nbuf ? 0 : ++k) //from oldest to latest
    79             if (objects[k].ts >= ts1 && objects[k].ts <= ts2)
    80                 objects[count++] = objects[k];
    81         return count;
    82     }
    83     bool write(Object *object)
    84     {
    85         if (state != Inited) { spdlog::critical("wrong state"); return false; }
    86         objects[++writePos % nbuf] = *object;
    87         return true;
    88     }
    89 };
    View Code
  • 相关阅读:
    centos7如何将docker容器配置成开机自启动
    Linux磁盘和文件系统扩容彻底研究
    Linux 系统中用Systemd 管理系统服务
    让程序员从运维工作中解放出来
    为什么linux系统中init被systemd替换了
    网页是如何实现从剪贴板从读取图片并上传到server的
    局域网中win10作为服务器,其他机器无法连接怎么办
    docker attach 和 exec 用法区别
    怎么理解linux作业(job),与进程(process)的关系
    HashMap和ConcurrentHashMap 源码关键点解析
  • 原文地址:https://www.cnblogs.com/dzyBK/p/13829435.html
Copyright © 2011-2022 走看看