zoukankan      html  css  js  c++  java
  • ZMQ源代码分析(一)-- 基础数据结构的实现

    yqueue 和 ypipe

    zmq号称是”史上最快的消息队列”,由此可见zmq中最重要的数据结构就是队列。

    zmq的队列主要由yqueue和ypipe实现。yqueue是队列的基本操作,以下首先分析yqueue的实现。

     //  Individual memory chunk to hold N elements.
            //  Individual memory chunk to hold N elements.
            struct chunk_t
            {
                 T values [N];
                 chunk_t *prev;
                 chunk_t *next;
            };
    
            //  Back position may point to invalid memory if the queue is empty,
            //  while begin & end positions are always valid. Begin position is
            //  accessed exclusively be queue reader (front/pop), while back and
            //  end positions are accessed exclusively by queue writer (back/push).
            chunk_t *begin_chunk;
            int begin_pos;
            chunk_t *back_chunk;
            int back_pos;
            chunk_t *end_chunk;
            int end_pos;
    
            //  People are likely to produce and consume at similar rates.  In
            //  this scenario holding onto the most recently freed chunk saves
            //  us from having to call malloc/free.
            atomic_ptr_t<chunk_t> spare_chunk;

    在yqueue中有一个重要的结构体chunk_t,他是yqueue高效的关键因素。

    内存的申请和释放很浪费效率。yqueue为了避免频繁的内存操作。每次不会申请一个元素大小的内存空间。而是申请一批,这一批元素就保存在chunk_t结构体中。yqueu用三个指针和三个游标来记录chunk以及在chunk内有效的数据的索引。以以下的push操作为例,当在队列的末尾加入一个元素时,会先推断当前尾端的chunk是否还有空暇的元素。即end_pos是否等于N-1。相等则说明须要申请新的chunk_t,否则直接移动end_pos就可以。另外因为许多队列中生产和消费的速率比較一致,所以yqueue用一个spare_chunk来保存刚刚释放的chunk。这样当须要申请新的chunk时就能够直接使用spare_chunk所记录的chunk了。除了push外,yqueue还提供了pop和unpush操作,实现原理和push相似。

            //  Adds an element to the back end of the queue.
            inline void push ()
            {
                back_chunk = end_chunk;
                back_pos = end_pos;
    
                if (++end_pos != N)
                    return;
    
                chunk_t *sc = spare_chunk.xchg (NULL);
                if (sc) {
                    end_chunk->next = sc;
                    sc->prev = end_chunk;
                } else {
                    end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t));
                    alloc_assert (end_chunk->next);
                    end_chunk->next->prev = end_chunk;
                }
                end_chunk = end_chunk->next;
                end_pos = 0;
            }

    接下来看ypipe。ypipe继承自ypipe_base_t,ypipe_base_t抽象出了ypipe和ypipe_conflate(后面分析)的基本操作:

        template <typename T> class ypipe_base_t
        {
        public:
            virtual ~ypipe_base_t () {}
            virtual void write (const T &value_, bool incomplete_) = 0;
            virtual bool unwrite (T *value_) = 0;
            virtual bool flush () = 0;
            virtual bool check_read () = 0;
            virtual bool read (T *value_) = 0;
            virtual bool probe (bool (*fn)(const T &)) = 0;
        };

    ypipe包括了了一个yqueue队列和四个很重要的指针,以下是ypipe的成员变量定义:

           //  Allocation-efficient queue to store pipe items.
            //  Front of the queue points to the first prefetched item, back of
            //  the pipe points to last un-flushed item. Front is used only by
            //  reader thread, while back is used only by writer thread.
            yqueue_t <T, N> queue;
    
            //  Points to the first un-flushed item. This variable is used
            //  exclusively by writer thread.
            T *w;
    
            //  Points to the first un-prefetched item. This variable is used
            //  exclusively by reader thread.
            T *r;
    
            //  Points to the first item to be flushed in the future.
            T *f;
    
            //  The single point of contention between writer and reader thread.
            //  Points past the last flushed item. If it is NULL,
            //  reader is asleep. This pointer should be always accessed using
            //  atomic operations.
            atomic_ptr_t <T> c;

    这四个指针很重要,以下来看一下他们各自的作用:

            //  Initialises the pipe.
            inline ypipe_t ()
            {
                //  Insert terminator element into the queue.
                queue.push ();
    
                //  Let all the pointers to point to the terminator.
                //  (unless pipe is dead, in which case c is set to NULL).
                r = w = f = &queue.back ();
                c.set (&queue.back ());
            }

    初始化时先想队列放入一个空对象作为结束符,全部指针都指向这个结束符。

           //  Write an item to the pipe.  Don't flush it yet. If incomplete is
            //  set to true the item is assumed to be continued by items
            //  subsequently written to the pipe. Incomplete items are never
            //  flushed down the stream.
            inline void write (const T &value_, bool incomplete_)
            {
                //  Place the value to the queue, add new terminator element.
                queue.back () = value_;
                queue.push ();
    
                //  Move the "flush up to here" poiter.
                if (!incomplete_)
                    f = &queue.back ();
            }
    
            //  Pop an incomplete item from the pipe. Returns true is such
            //  item exists, false otherwise.
            inline bool unwrite (T *value_)
            {
                if (f == &queue.back ())
                    return false;
                queue.unpush ();
                *value_ = queue.back ();
                return true;
            }

    f指针指向了当前未做flush操作的第一个元素,假设是写入了一条完整消息,那f指向的就是结束符。

            //  Flush all the completed items into the pipe. Returns false if
            //  the reader thread is sleeping. In that case, caller is obliged to
            //  wake the reader up before using the pipe again.
            inline bool flush ()
            {
                //  If there are no un-flushed items, do nothing.
                if (w == f)
                    return true;
    
                //  Try to set 'c' to 'f'.
                if (c.cas (w, f) != w) {
    
                    //  Compare-and-swap was unseccessful because 'c' is NULL.
                    //  This means that the reader is asleep. Therefore we don't
                    //  care about thread-safeness and update c in non-atomic
                    //  manner. We'll return false to let the caller know
                    //  that reader is sleeping.
                    c.set (f);
                    w = f;
                    return false;
                }
    
                //  Reader is alive. Nothing special to do now. Just move
                //  the 'first un-flushed item' pointer to 'f'.
                w = f;
                return true;
            }

    flush操作比較重要,除了要把w指向f外,还要推断当前pipe的read是否是sleep状态,推断的方式是用c和w作比較,c仅仅能有两个值,要么等于w,要么为空,当c为空时说明之前的check_read操作没有读到元素。check_read返回false同一时候将c置为空。

    check_read的返回值决定了上层的操作策略。flush的返回值也表明了之前check_read操作是否返回了false。

            //  Check whether item is available for reading.
            inline bool check_read ()
            {
                //  Was the value prefetched already?

    If so, return. if (&queue.front () != r && r) return true; // There's no prefetched value, so let us prefetch more values. // Prefetching is to simply retrieve the // pointer from c in atomic fashion. If there are no // items to prefetch, set c to NULL (using compare-and-swap). r = c.cas (&queue.front (), NULL); // If there are no elements prefetched, exit. // During pipe's lifetime r should never be NULL, however, // it can happen during pipe shutdown when items // are being deallocated. if (&queue.front () == r || !r) return false; // There was at least one value prefetched. return true; } // Reads an item from the pipe. Returns false if there is no value. // available. inline bool read (T *value_) { // Try to prefetch a value. if (!check_read ()) return false; // There was at least one value prefetched. // Return it to the caller. *value_ = queue.front (); queue.pop (); return true; }

    之前提到过check_read操作,它的返回值标记了队列中是否有数据,他使用r指针来标记当前能够读到的位置,假设r指针不在front位置处,说明有元素可读。否则就用c和front对照来推断当前是否有元素,假设没有将c置为空,表明读操作处于睡眠状态。

    yqueue中指针的使用相对复杂。他们除了指向详细位置外还标记了一些状态,使用很巧妙。

    dbuffer_t 和 ypipe_conflate_t

    ypipe_conflate_t是ypipe_base_t的还有一种实现,和ypipe相比它的效率更高。可是数据是不安全的。

    它的底层使用dbuffer_t实现的。

    ypipe_conflate_t是zmq4.x版本号中新加入的一个数据结构,使用一些对数据完整性要求不高的需求,实现相对简单。这里不做详细分析。

    pipe

    pipe是zmq中保存消息的一个双向管道,他维护两个ypipe_base_t队列。一个inpipe,一个outpipe。他主要用于socket_base之间(进程内通讯)或者socket_base和session_base之间传递消息。以下是pipe中比較重要的成员变量:

            //  Underlying pipes for both directions.
            upipe_t *inpipe;
            upipe_t *outpipe;
    
            //  Can the pipe be read from / written to?

    bool in_active; bool out_active; // High watermark for the outbound pipe. int hwm; // Low watermark for the inbound pipe. int lwm; // Number of messages read and written so far. uint64_t msgs_read; uint64_t msgs_written; // Last received peer's msgs_read. The actual number in the peer // can be higher at the moment. uint64_t peers_msgs_read; // The pipe object on the other side of the pipepair. pipe_t *peer; // Sink to send events to. i_pipe_events *sink; // States of the pipe endpoint: // active: common state before any termination begins, // delimiter_received: delimiter was read from pipe before // term command was received, // waiting_fo_delimiter: term command was already received // from the peer but there are still pending messages to read, // term_ack_sent: all pending messages were already read and // all we are waiting for is ack from the peer, // term_req_sent1: 'terminate' was explicitly called by the user, // term_req_sent2: user called 'terminate' and then we've got // term command from the peer as well. enum { active, delimiter_received, waiting_for_delimiter, term_ack_sent, term_req_sent1, term_req_sent2 } state; // If true, we receive all the pending inbound messages before // terminating. If false, we terminate immediately when the peer // asks us to. bool delay; // Identity of the writer. Used uniquely by the reader side. blob_t identity; // Pipe's credential. blob_t credential; const bool conflate;

    in_active和out_active标记管道中的队列是否是活跃状态,假设队列已满或者队列为空,这两个标记则设为false,上层依据管道的状态决定是否要进行休眠或者其它操作。比方session_base假设检測到false则会把engine中相应的fd设置为reset状态。hwm和lwm是两个阈值,hwm表示当前队列已满,lwm表示当msgs_read每达到lwm时要象对面的pipe发送一条激活消息。表明已经处理了一些数据,对面的能够继续向管道内写入数据。

    消息的发送机制会在接下来的章节中分析。i_pipe_events 是一个抽象类:

        struct i_pipe_events
        {
            virtual ~i_pipe_events () {}
            virtual void read_activated (zmq::pipe_t *pipe_) = 0;
            virtual void write_activated (zmq::pipe_t *pipe_) = 0;
            virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
            virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0;
        };

    sink是指向上层实现i_pipe_events的类的指针(session_base或者socket_base),当队列变为激活状态时。pipe须要通过sink通知上层能够从pipe中读取数据或者写入数据了。

  • 相关阅读:
    jq判断input 复选框有没有选
    jq根据id替换修改或添加css属性
    cookie和session的关系和区别
    tp5 统一返回json格式
    tp5 上传图片(自定义图片路径)
    tp5 生成随机数
    tp5 删除图片以及文件
    tp5 上传视频方法
    tp5 跨域问题
    js替换div里的内容
  • 原文地址:https://www.cnblogs.com/zhchoutai/p/8504775.html
Copyright © 2011-2022 走看看