zoukankan      html  css  js  c++  java
  • UDT源码剖析(十一)之SendQueue And RecvQueue

    SendQueue与RecvQueue在代码中与UDP SOCKET相关。在用户将想要发送的数据提交给Buffer之后,由Buffer将数据打包,根据拥塞控制提供的时间计算,在合适的时间提交给SendQueue进行发送。在接收到数据包之后,通过事件驱动的模式通知用户从RecvQueue中拿去数据包。
    删除了交会连接模式的代码。
    SendQueue与RecvQueue依赖于几个通用的数据结构,先列出来哈,可能会与前面的描述有重复,怕来回查找麻烦,索性全部列出来:

    CMultiplexer:每个UDP端口对应一个此对象,资源的实际持有者

    struct CMultiplexer
    {
       CSndQueue* m_pSndQueue;	// The sending queue
       CRcvQueue* m_pRcvQueue;	// The receiving queue
       CChannel* m_pChannel;	// The UDP channel for sending and receiving
       CTimer* m_pTimer;		// The timer
    
       int m_iPort;			// The UDP port number of this multiplexer
       int m_iIPversion;		// IP version
       int m_iMSS;			// Maximum Segment Size
       int m_iRefCount;		//与此资源复用器相关联的UDT实例的数量
       bool m_bReusable;		//这个资源复用器是否可以被共享
    
       int m_iID;			// multiplexer ID
    };
    

    CUnitQueue

    struct CUnit
    {
       CPacket m_Packet;		// packet
       int m_iFlag;			        // 0: free 1:occupid, 2: msg已经read,但是还没有被free, 3: msg被丢弃
    };
    
    class CUnitQueue
    {
    private:
       struct CQEntry
       {
          CUnit* m_pUnit;		// unit queue
          char* m_pBuffer;		// data buffer
          int m_iSize;		// size of each queue
    
          CQEntry* m_pNext;
       }
       *m_pQEntry,			// 指向起始的Entry队列
       *m_pCurrQueue,		// 指向当前的Entry队列
       *m_pLastQueue;		// 指向最后一个Entry队列
    
       CUnit* m_pAvailUnit;         //最近访问的Unit* 
       int m_iSize;			// 总共的Packets数量
       int m_iCount;		// 已经使用的Packets数量
       int m_iMSS;			// unit buffer size
       int m_iIPversion;		// IP version
    };
    
    • 初始化:int CUnitQueue::init(int size, int mss, int version)
    CUnitQueue::CUnitQueue():
    m_pQEntry(NULL),
    m_pCurrQueue(NULL),
    m_pLastQueue(NULL),
    m_iSize(0),
    m_iCount(0),
    m_iMSS(),
    m_iIPversion()
    {
    }
    
    int CUnitQueue::init(int size, int mss, int version)
    {
       CQEntry* tempq = NULL;
       CUnit* tempu = NULL;
       char* tempb = NULL;
    
       try
       {
          tempq = new CQEntry;  //初始化需要管理的内存
          tempu = new CUnit [size];
          tempb = new char [size * mss];
       }
       catch (...)
       {
          delete tempq;     //出现异常后退出
          delete [] tempu;
          delete [] tempb;
    
          return -1;
       }
    
       for (int i = 0; i < size; ++ i)
       {
          tempu[i].m_iFlag = 0; //0表示当前CUint是空闲的
          tempu[i].m_Packet.m_pcData = tempb + i * mss; //初始化packet中的数据字段,只有使用权限
       }
       tempq->m_pUnit = tempu;  //初始化队列
       tempq->m_pBuffer = tempb;    //初始化队列
       tempq->m_iSize = size;   //初始化大小
    
       m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq; //初始化整体的队列
       m_pQEntry->m_pNext = m_pQEntry;  //初始化下一个指向自己
    
       m_pAvailUnit = m_pCurrQueue->m_pUnit;    //指向首个可用的CUnit
    
       m_iSize = size;  //整体的packet个数,CUint大小为0
       m_iMSS = mss;    
       m_iIPversion = version;
    
       return 0;
    }
    
    • 销毁:CUnitQueue::~CUnitQueue()
    CUnitQueue::~CUnitQueue()
    {
       CQEntry* p = m_pQEntry;  //有size条队列,获取首条队列
    
       while (p != NULL)    //释放队列等的信息
       {
          delete [] p->m_pUnit;
          delete [] p->m_pBuffer;    //这个Buffer是一个CQEntry中的首部,指向这个CQEntry中所使用Buffer的起始位置,在分配的时候是一整块分配的,删除的时候也是一整块的删除
    
          CQEntry* q = p;
          if (p == m_pLastQueue)
             p = NULL;
          else
             p = p->m_pNext;
          delete q;
       }
    }
    
    • 增加队列长度:int CUnitQueue::increase()
    int CUnitQueue::increase()
    {
       int real_count = 0;
       CQEntry* p = m_pQEntry;  //获取首条队列的指针
       while (p != NULL)
       {
          CUnit* u = p->m_pUnit;    //获取首条队列的首个CUnit
          for (CUnit* end = u + p->m_iSize; u != end; ++ u)
             if (u->m_iFlag != 0)   //如果队列中的Cunit状态不是free
                ++ real_count;  //真实的,使用中的CUnit数量++
    
          if (p == m_pLastQueue)
             p = NULL;
          else
             p = p->m_pNext;
       }
       m_iCount = real_count;   //更新使用状况
       if (double(m_iCount) / m_iSize < 0.9)    //已经使用的数量/总数量 < 0.9
          return -1; //就是说还有剩余的空间,不进行扩张
    
       CQEntry* tempq = NULL;   //创建一条新的队列,链接到末尾
       CUnit* tempu = NULL;
       char* tempb = NULL;
    
       // all queues have the same size
       int size = m_pQEntry->m_iSize;
    
       try
       {
          tempq = new CQEntry;
          tempu = new CUnit [size];
          tempb = new char [size * m_iMSS];
       }
       catch (...)
       {
          delete tempq;
          delete [] tempu;
          delete [] tempb;
    
          return -1;
       }
    
       for (int i = 0; i < size; ++ i)
       {
          tempu[i].m_iFlag = 0;
          tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
       }
       tempq->m_pUnit = tempu;
       tempq->m_pBuffer = tempb;
       tempq->m_iSize = size;
    
       m_pLastQueue->m_pNext = tempq;   //链接到末尾
       m_pLastQueue = tempq;
       m_pLastQueue->m_pNext = m_pQEntry;   //这是一条环形链表
    
       m_iSize += size; //增加总共的数量
    
       return 0;
    }
    
    • 获取下一个可用的CUnit:CUnit* CUnitQueue::getNextAvailUnit()
    CUnit* CUnitQueue::getNextAvailUnit()
    {
       if (m_iCount * 10 > m_iSize * 9) //如果已经使用的比例超过0.9,就增加数量
          increase();
    
       if (m_iCount >= m_iSize) //如果已经使用的数量超过总数量,就返回
          return NULL;
    
       CQEntry* entrance = m_pCurrQueue;    //获取当前队列
    
       do
       {
          for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1; m_pAvailUnit != sentinel; ++ m_pAvailUnit)
             if (m_pAvailUnit->m_iFlag == 0)    //在当前的队列中找到第一个free的CUnit
                return m_pAvailUnit;
    
          if (m_pCurrQueue->m_pUnit->m_iFlag == 0)  //如果当前队列的第一个CUnit为free
          {
             m_pAvailUnit = m_pCurrQueue->m_pUnit;  //更新avail CUnit并返回
             return m_pAvailUnit;
          }
    
          m_pCurrQueue = m_pCurrQueue->m_pNext; //availCUint一直存在于CurrentQuene
          m_pAvailUnit = m_pCurrQueue->m_pUnit;
       } while (m_pCurrQueue != entrance);
    
       increase();  //完了立马判断是否需要增加数量
    
       return NULL;
    }
    

    CSndUList

    struct CSNode
    {
       CUDT* m_pUDT;		        // 指向CUDT*的指针
       uint64_t m_llTimeStamp;     // 堆化时排序的时间戳
    
       int m_iHeapLoc;		        // 堆的层次,-1意味着暂时不存在与当前堆中
    };
    
    class CSndUList
    {
    private:
       CSNode** m_pHeap;			                // 堆化数组
       int m_iArrayLength;			                // 堆数组长度
       int m_iLastEntry;			                        // 最近一次发送的位置
       udt_pthread_mutex_t m_ListLock;               
       udt_pthread_mutex_t* m_pWindowLock;     
       udt_pthread_cond_t* m_pWindowCond;         
       CTimer* m_pTimer;
    };
    
    • 初始化:CSndUList::CSndUList()
    CSndUList::CSndUList():
    m_pHeap(NULL),
    m_iArrayLength(4096),    //将队列长度预设为4096
    m_iLastEntry(-1),
    m_ListLock(),
    m_pWindowLock(NULL),
    m_pWindowCond(NULL),
    m_pTimer(NULL)
    {
       m_pHeap = new CSNode*[m_iArrayLength];   //创建length条指针队列
    
       #ifndef WINDOWS
          pthread_mutex_init(&m_ListLock, NULL);
       #else
          m_ListLock = CreateMutex(NULL, false, NULL);
       #endif
    }
    
    • 销毁:CSndUList::~CSndUList()
    CSndUList::~CSndUList()
    {
       delete [] m_pHeap;   //销毁指针队列
    
       #ifndef WINDOWS
          pthread_mutex_destroy(&m_ListLock);
       #else
          CloseHandle(m_ListLock);
       #endif
    }
    
    • 向堆中添加CUDT实例:void CSndUList::insert(int64_t ts, const CUDT* u)
    void CSndUList::insert(int64_t ts, const CUDT* u)
    {
       CGuard listguard(m_ListLock);    //获取锁的guard
    
       // increase the heap array size if necessary
       if (m_iLastEntry == m_iArrayLength - 1)  //如果上一次使用的是最后一条,增加长度
       {
          CSNode** temp = NULL;
    
          try   //都是指针,调整起来,消耗也不是很大
          {         
             temp = new CSNode*[m_iArrayLength * 2];    //长度*2,然后将以往的都copy
          }
          catch(...)
          {
             return;
          }
    
          memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength);
          m_iArrayLength *= 2;
          delete [] m_pHeap;    //释放以前的操作
          m_pHeap = temp;
       }
    
       insert_(ts, u);  //真实的insert
    }
    
    • 向堆中添加CUDT实例:void CSndUList::insert_(int64_t ts, const CUDT* u)
    void CSndUList::insert_(int64_t ts, const CUDT* u)  //真实的insert
    {
       CSNode* n = u->m_pSNode; //反向获取CSNode
    
       // do not insert repeated node
       //-1意味着,这个CSNode没有在堆中,可以插入.>=1意味着在堆中,直接返回
       if (n->m_iHeapLoc >= 0)
          return;
    
       m_iLastEntry ++; //修改即将插入的位置
       m_pHeap[m_iLastEntry] = n;   //将这个指针指向的CSNode插入堆中
       n->m_llTimeStamp = ts;   //修改时间为ts
    
       int q = m_iLastEntry;
       int p = q;
       while (p != 0)   //堆化的过程,调整新插入的位置,根据ts的大小调整位置
       {
          p = (q - 1) >> 1;
          if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp) //ts越大,越往上
          {
             CSNode* t = m_pHeap[p];
             m_pHeap[p] = m_pHeap[q];
             m_pHeap[q] = t;
             t->m_iHeapLoc = q;
             q = p;
          }
          else
             break;
       }
    
       n->m_iHeapLoc = q;   //这个变量还表示在当前堆中的层数吗? 反正大于0,就是存在于堆中了
    
       // an earlier event has been inserted, wake up sending worker
       if (n->m_iHeapLoc == 0)  //如果当前的CUDT*在最顶层,唤醒发送线程
          m_pTimer->interrupt();
    
       if (0 == m_iLastEntry)   //如果队列为空,唤醒发送队列??
       {
          #ifndef WINDOWS
             pthread_mutex_lock(m_pWindowLock);
             pthread_cond_signal(m_pWindowCond);
             pthread_mutex_unlock(m_pWindowLock);
          #else
             SetEvent(*m_pWindowCond);
          #endif
       }
    }
    
    • 更新CUDT的发送时间戳:void CSndUList::update(const CUDT* u, bool reschedule)
    void CSndUList::update(const CUDT* u, bool reschedule)  //更新CUDT*的发送时间戳
    {
       CGuard listguard(m_ListLock);
    
       CSNode* n = u->m_pSNode; //反向获取指针
    
       if (n->m_iHeapLoc >= 0)  //>0说明存在于堆中,需要调整
       {
          if (!reschedule)  //如果调整参数为false,直接退出
             return;
    
          if (n->m_iHeapLoc == 0)   //如果在堆顶部,唤醒发送队列,那还重新调整吗?
          {
             n->m_llTimeStamp = 1;  
             m_pTimer->interrupt();
             return;
          }
    
          remove_(u);   //删除已经存在的CUDT*
       }
    
       insert_(1, u);   //重新插入CUDT*
    }
    
    • 取得发送地址中下一个packet和addr,并将CUDT重新加入堆中:int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
    int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
    {
       CGuard listguard(m_ListLock);
    
       if (-1 == m_iLastEntry)  //索引有问题,直接返回
          return -1;
    
       // no pop until the next schedulled time
       uint64_t ts;
       CTimer::rdtsc(ts);   //获取当前时间
       if (ts < m_pHeap[0]->m_llTimeStamp)  //如果发送的时间小于当前的时间(在规定的时间内没有发送出去)
          return -1;
    
       CUDT* u = m_pHeap[0]->m_pUDT;    //从堆顶部获取并删除
       remove_(u);  //只是将flag调整为-1,不进行直接的删除
    
       if (!u->m_bConnected || u->m_bBroken)    //和CUDT中的函数挂钩了,回头调整注释
          return -1;
    
       // pack a packet from the socket
       if (u->packData(pkt, ts) <= 0)
          return -1;
    
       addr = u->m_pPeerAddr;
    
       // insert a new entry, ts is the next processing time
       if (ts > 0)
          insert_(ts, u);   //将当前时间设置为下一次的发送时间
    
       return 1;
    }
    
    • 获取下一次发哦少年宫的事件:uint64_t CSndUList::getNextProcTime()
    uint64_t CSndUList::getNextProcTime()
    {
       CGuard listguard(m_ListLock);
    
       if (-1 == m_iLastEntry)
          return 0;
    
       return m_pHeap[0]->m_llTimeStamp;    //获取下一次的处理时间
    }
    
    • 将CUDT从堆中删除:void CSndUList::remove_(const CUDT* u)
    void CSndUList::remove_(const CUDT* u)
    {
       CSNode* n = u->m_pSNode;     //反向获取CUDT*在堆中的表现形式
    
       if (n->m_iHeapLoc >= 0)  //如果存在于堆中,进行删除,堆的删除操作
       {
          // remove the node from heap
          m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];
          m_iLastEntry --;
          m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc;
    
          int q = n->m_iHeapLoc;
          int p = q * 2 + 1;
          while (p <= m_iLastEntry)
          {
             if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp))
                p ++;
    
             if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp)
             {
                CSNode* t = m_pHeap[p];
                m_pHeap[p] = m_pHeap[q];
                m_pHeap[p]->m_iHeapLoc = p;
                m_pHeap[q] = t;
                m_pHeap[q]->m_iHeapLoc = q;
    
                q = p;
                p = q * 2 + 1;
             }
             else
                break;
          }
    
          n->m_iHeapLoc = -1;   //此时,这个CSNode不存在于堆中,只调整指针位置,不进行实际的删除
       }
    
       // the only event has been deleted, wake up immediately
       if (0 == m_iLastEntry)   //如果队列已经empty,唤醒队列
          m_pTimer->interrupt();
    }
    

    CSndQueue:Send Queue

    class CSndQueue
    {
    private:
       static void* worker(void* param);        //发送线程
       udt_pthread_t m_WorkerThread;
    
    private:
       CSndUList* m_pSndUList;		    // 堆化的Send List
       CChannel* m_pChannel;                 // The UDP channel for data sending
       CTimer* m_pTimer;			    // 定时器设施
    
       udt_pthread_mutex_t m_WindowLock;
       udt_pthread_cond_t m_WindowCond;
    
       volatile bool m_bClosing;		// 发送线程是否启动
       udt_pthread_cond_t m_ExitCond;
    };
    
    • 初始化:void CSndQueue::init(CChannel* c, CTimer* t)
    CSndQueue::CSndQueue():
    m_WorkerThread(),
    m_pSndUList(NULL),
    m_pChannel(NULL),
    m_pTimer(NULL),
    m_WindowLock(),
    m_WindowCond(),
    m_bClosing(false),
    m_ExitCond()
    {
       #ifndef WINDOWS
          pthread_cond_init(&m_WindowCond, NULL);
          pthread_mutex_init(&m_WindowLock, NULL);
       #else
          m_WindowLock = CreateMutex(NULL, false, NULL);
          m_WindowCond = CreateEvent(NULL, false, false, NULL);
          m_ExitCond = CreateEvent(NULL, false, false, NULL);
       #endif
    }
    
    void CSndQueue::init(CChannel* c, CTimer* t)    //讲真,SendQueue和RecvQueue共用UDP SOCKET的Timer
    {
       m_pChannel = c;
       m_pTimer = t;
       m_pSndUList = new CSndUList;
       m_pSndUList->m_pWindowLock = &m_WindowLock;  //初始化SendList控制变量
       m_pSndUList->m_pWindowCond = &m_WindowCond;
       m_pSndUList->m_pTimer = m_pTimer;
    
       #ifndef WINDOWS
          if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this))
          { //启动工作线程
             m_WorkerThread = 0;
             throw CUDTException(3, 1);
          }
       #else
          DWORD threadID;
          m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID);
          if (NULL == m_WorkerThread)
             throw CUDTException(3, 1);
       #endif
    }
    
    • 工作线程:
    void* CSndQueue::worker(void* param) //工作线程
    {
       CSndQueue* self = (CSndQueue*)param; //获取处理的Queue
    
       while (!self->m_bClosing)    //控制发送线程是否继续发送
       {
          uint64_t ts = self->m_pSndUList->getNextProcTime();   //获得发送队列下一个将要发送的包的具体信息
    
          if (ts > 0)   //如果还没有到发送时间
          {
             // wait until next processing time of the first socket on the list
             uint64_t currtime;
             CTimer::rdtsc(currtime);
             if (currtime < ts) //如果当前的时间小鱼发送时间,小睡一会
                self->m_pTimer->sleepto(ts);
    
             // it is time to send the next pkt
             sockaddr* addr;    //已经到了发送时间
             CPacket pkt;   
             if (self->m_pSndUList->pop(addr, pkt) < 0) //从发送队列中获取包和发送地址
                continue;
    
             self->m_pChannel->sendto(addr, pkt);   //调用Channel的UDP的封装发送
          }
          else
          {
              //如果没有包需要发送的时候,就在这块休眠
             // wait here if there is no sockets with data to be sent
             #ifndef WINDOWS
                pthread_mutex_lock(&self->m_WindowLock);
                if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
                   pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);
                pthread_mutex_unlock(&self->m_WindowLock);
             #else
                WaitForSingleObject(self->m_WindowCond, INFINITE);
             #endif
          }
       }
    
       #ifndef WINDOWS
          return NULL;
       #else
          SetEvent(self->m_ExitCond);
          return 0;
       #endif
    }
    
    • 发送数据包:int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
    int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
    {
       m_pChannel->sendto(addr, packet);    //调用Channel发送
       return packet.getLength();
    }
    

    CRcvUList

    struct CRNode
    {
       CUDT* m_pUDT;                // Pointer to CUDT*
       uint64_t m_llTimeStamp;      // Time Stamp
    
       CRNode* m_pPrev;             // previous link
       CRNode* m_pNext;             // next link
    
       bool m_bOnList;              // 当前节点是否在双向链表上
    };
    
    class CRcvUList    //用于接收数据的双向链表
    {
    public:
       CRNode* m_pUList;		// the head node
    private:
       CRNode* m_pLast;		// the last node
    };
    
    • 向双向链表中插入CUDT实例:void CRcvUList::insert(const CUDT* u)
    void CRcvUList::insert(const CUDT* u)
    {
       CRNode* n = u->m_pRNode; //反向获取在双向链表中的表现形式
       CTimer::rdtsc(n->m_llTimeStamp); //获取当前时间,gettimeofday()
    
       if (NULL == m_pUList)    //如果双向链表为空
       {
          // empty list, insert as the single node
          n->m_pPrev = n->m_pNext = NULL;
          m_pLast = m_pUList = n;
    
          return;
       }
    
       // always insert at the end for RcvUList
       n->m_pPrev = m_pLast;    //插入双向链表的末尾
       n->m_pNext = NULL;
       m_pLast->m_pNext = n;
       m_pLast = n;
    }
    
    • 向双向链表中移除CUDT实例:void CRcvUList::remove(const CUDT* u)
    void CRcvUList::remove(const CUDT* u)
    {
       CRNode* n = u->m_pRNode; //方向获取在链表中的表现形式
    
       if (!n->m_bOnList)   //如果不在链表中,直接返回
          return;
    
       if (NULL == n->m_pPrev)  //如果需要删除的结点是首部结点
       {
          // n is the first node
          m_pUList = n->m_pNext;
          if (NULL == m_pUList)
             m_pLast = NULL;
          else
             m_pUList->m_pPrev = NULL;
       }
       else     //反正不是真正的删除,只是在链表中摘除,然后调整是否在链表中的标志
       {
          n->m_pPrev->m_pNext = n->m_pNext;
          if (NULL == n->m_pNext)
          {
             // n is the last node
             m_pLast = n->m_pPrev;
          }
          else
             n->m_pNext->m_pPrev = n->m_pPrev;
       }
    
       n->m_pNext = n->m_pPrev = NULL;
    }
    
    • 更新CUDT在双向链表中的位置:void CRcvUList::update(const CUDT* u)
    void CRcvUList::update(const CUDT* u)
    {
       CRNode* n = u->m_pRNode; //反向获取在链表中的表现形式
    
       if (!n->m_bOnList)
          return;
    
       CTimer::rdtsc(n->m_llTimeStamp); //获取当前时间
    
       // if n is the last node, do not need to change
       if (NULL == n->m_pNext)  //如果n是末尾的结点,不调整
          return;
    
       if (NULL == n->m_pPrev)  //如果n是头部结点
       {
          m_pUList = n->m_pNext;    //将下一个结点调整为头部结点
          m_pUList->m_pPrev = NULL;
       }
       else
       {
          n->m_pPrev->m_pNext = n->m_pNext; //否则在链表中剔除这个结点
          n->m_pNext->m_pPrev = n->m_pPrev;
       }
    
       n->m_pPrev = m_pLast;    //然后把这个结点添加到末尾
       n->m_pNext = NULL;
       m_pLast->m_pNext = n;
       m_pLast = n;
    }
    

    CHash

    class CHash
    {
    private:
       struct CBucket
       {
          int32_t m_iID;		// Socket ID
          CUDT* m_pUDT;		// Socket instance
    
          CBucket* m_pNext;		// next bucket
       } **m_pBucket;		// list of buckets (the hash table)
    
       int m_iHashSize;		// size of hash table
    };
    
    • 初始化:void CHash::init(int size)
    CHash::CHash():
    m_pBucket(NULL),
    m_iHashSize(0)
    {
    }
    
    void CHash::init(int size)
    {
       m_pBucket = new CBucket* [size]; //创建size个HASH ENTRY
    
       for (int i = 0; i < size; ++ i)
          m_pBucket[i] = NULL;  //每个HASH ENTRY指向空
    
       m_iHashSize = size;  //调整HASH SIZE
    }
    
    • 销毁:CHash::~CHash()
    CHash::~CHash()
    {
       for (int i = 0; i < m_iHashSize; ++ i)   //删除所有的Buckets
       {
          CBucket* b = m_pBucket[i];
          while (NULL != b)
          {
             CBucket* n = b->m_pNext;
             delete b;
             b = n;
          }
       }
    
       delete [] m_pBucket;
    }
    
    • 查找:CUDT* CHash::lookup(int32_t id)
    CUDT* CHash::lookup(int32_t id)
    {
       // simple hash function (% hash table size); suitable for socket descriptors
       CBucket* b = m_pBucket[id % m_iHashSize];
    
       while (NULL != b)    //寻找CUDT*
       {
          if (id == b->m_iID)
             return b->m_pUDT;
          b = b->m_pNext;
       }
    
       return NULL;
    }
    
    • 插入:void CHash::insert(int32_t id, CUDT* u)
    void CHash::insert(int32_t id, CUDT* u) //新加入的Bucket加入到最接近数组的底部
    {
       CBucket* b = m_pBucket[id % m_iHashSize];
    
       CBucket* n = new CBucket;
       n->m_iID = id;
       n->m_pUDT = u;
       n->m_pNext = b;
    
       m_pBucket[id % m_iHashSize] = n;
    }
    
    • 删除:void CHash::remove(int32_t id)
    void CHash::remove(int32_t id)
    {
       CBucket* b = m_pBucket[id % m_iHashSize];    //找到Bucket
       CBucket* p = NULL;   
    
       while (NULL != b)    //在桶中的链表中删除一个结点
       {
          if (id == b->m_iID)
          {
             if (NULL == p)
                m_pBucket[id % m_iHashSize] = b->m_pNext;
             else
                p->m_pNext = b->m_pNext;
    
             delete b;
    
             return;
          }
    
          p = b;
          b = b->m_pNext;
       }
    }
    

    CRcvQueue

    class CRcvQueue
    {
    private:
       static void* worker(void* param);    //接收线程
       udt_pthread_t m_WorkerThread;
    
    private:
       CUnitQueue m_UnitQueue;		// The received packet queue(就是那个类似于Hash的组织)
    
       CRcvUList* m_pRcvUList;		// 这个List中的UDT实例准备从Queue中读取数据
       CHash* m_pHash;			// HASH可以加速在List中寻找UDT实例
       CChannel* m_pChannel;		// UDP channel for receving packets
       CTimer* m_pTimer;			// 与发送队列共享Timer
    
       int m_iPayloadSize;                      // packet中的有效载荷
    
       volatile bool m_bClosing;              // 接收线程是否启动
       udt_pthread_cond_t m_ExitCond;
    
    private:
       udt_pthread_mutex_t m_LSLock;
       CUDT* m_pListener;                                   // pointer to the (unique, if any) listening UDT entity
       CRendezvousQueue* m_pRendezvousQueue;                // 汇合模式中的UDT SOCKET列表
    
       std::vector<CUDT*> m_vNewEntry;                                  // 新添加的条目
       udt_pthread_mutex_t m_IDLock;
    
       std::map<int32_t, std::queue<CPacket*> > m_mBuffer;	// 用于集合连接请求的临时缓冲区
       udt_pthread_mutex_t m_PassLock;
       udt_pthread_cond_t m_PassCond;
    };
    
    • 初始化:void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel* cc, CTimer* t)
    CRcvQueue::CRcvQueue():
    m_WorkerThread(),
    m_UnitQueue(),
    m_pRcvUList(NULL),
    m_pHash(NULL),
    m_pChannel(NULL),
    m_pTimer(NULL),
    m_iPayloadSize(),
    m_bClosing(false),
    m_ExitCond(),
    m_LSLock(),
    m_pListener(NULL),
    m_pRendezvousQueue(NULL),
    m_vNewEntry(),
    m_IDLock(),
    m_mBuffer(),
    m_PassLock(),
    m_PassCond()
    {
       #ifndef WINDOWS
          pthread_mutex_init(&m_PassLock, NULL);
          pthread_cond_init(&m_PassCond, NULL);
          pthread_mutex_init(&m_LSLock, NULL);
          pthread_mutex_init(&m_IDLock, NULL);
       #else
          m_PassLock = CreateMutex(NULL, false, NULL);
          m_PassCond = CreateEvent(NULL, false, false, NULL);
          m_LSLock = CreateMutex(NULL, false, NULL);
          m_IDLock = CreateMutex(NULL, false, NULL);
          m_ExitCond = CreateEvent(NULL, false, false, NULL);
       #endif
    }
    
    void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel* cc, CTimer* t)
    {
       m_iPayloadSize = payload;
    
       m_UnitQueue.init(qsize, payload, version);   
    
       m_pHash = new CHash;
       m_pHash->init(hsize);
    
       m_pChannel = cc;
       m_pTimer = t;
    
       m_pRcvUList = new CRcvUList;
       m_pRendezvousQueue = new CRendezvousQueue;
    
       #ifndef WINDOWS
          if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this))
          { //启动接收线程
             m_WorkerThread = 0;
             throw CUDTException(3, 1);
          }
       #else
          DWORD threadID;
          m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &threadID);
          if (NULL == m_WorkerThread)
             throw CUDTException(3, 1);
       #endif
    }
    
    • 销毁:CRcvQueue::~CRcvQueue()
    CRcvQueue::~CRcvQueue()
    {
       m_bClosing = true;
    
       #ifndef WINDOWS
          if (0 != m_WorkerThread)  //终止接受线程
             pthread_join(m_WorkerThread, NULL);
          pthread_mutex_destroy(&m_PassLock);
          pthread_cond_destroy(&m_PassCond);
          pthread_mutex_destroy(&m_LSLock);
          pthread_mutex_destroy(&m_IDLock);
       #else
          if (NULL != m_WorkerThread)
             WaitForSingleObject(m_ExitCond, INFINITE);
          CloseHandle(m_WorkerThread);
          CloseHandle(m_PassLock);
          CloseHandle(m_PassCond);
          CloseHandle(m_LSLock);
          CloseHandle(m_IDLock);
          CloseHandle(m_ExitCond);
       #endif
    
       delete m_pRcvUList;
       delete m_pHash;
       delete m_pRendezvousQueue;
    
       // 删除队列中所有没有处理的信息
       for (map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++ i)
       {
          while (!i->second.empty())
          {
             CPacket* pkt = i->second.front();
             delete [] pkt->m_pcData;
             delete pkt;
             i->second.pop();
          }
       }
    }
    
    • 发送工作线程: void* CRcvQueue::worker(void* param)
    {
       CRcvQueue* self = (CRcvQueue*)param; //反向获取接收队列对象
    
       //准备好ID,地址之类的东西
       sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
       CUDT* u = NULL;
       int32_t id;
    
       while (!self->m_bClosing)
       {
          #ifdef NO_BUSY_WAITING
             self->m_pTimer->tick();
          #endif
    
          // check waiting list, if new socket, insert it to the list
          while (self->ifNewEntry())    //如果有新的CUDT*
          {
             CUDT* ne = self->getNewEntry();    //将新的CUDT*加入到合适的队列中
             if (NULL != ne)
             {
                self->m_pRcvUList->insert(ne);
                self->m_pHash->insert(ne->m_SocketID, ne);
             }
          }
    
          // 为了接收packet,获取一个可用的slot
          CUnit* unit = self->m_UnitQueue.getNextAvailUnit();
          if (NULL == unit) //如果获取失败
          {
             // 没有足够的空间,读取这个packet后直接drop掉
             CPacket temp;
             temp.m_pcData = new char[self->m_iPayloadSize];
             temp.setLength(self->m_iPayloadSize);
             self->m_pChannel->recvfrom(addr, temp);
             delete [] temp.m_pcData;
             goto TIMER_CHECK;
          }
    
          unit->m_Packet.setLength(self->m_iPayloadSize);//设置packet的有效载荷
    
          //recv一个packet.如果返回-1,就相当于什么都没有收到 
          if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)
             goto TIMER_CHECK;
    
          id = unit->m_Packet.m_iID;    //获取收到的packet的ID
    
          // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
          if (0 == id)//Connect request
          {
             if (NULL != self->m_pListener)
                self->m_pListener->listen(addr, unit->m_Packet);//通过这个CUDT*处理这个packet
             else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
             {  //同样是处理new connect
                // asynchronous connect: call connect here
                // otherwise wait for the UDT socket to retrieve this packet
                if (!u->m_bSynRecving)
                   u->connect(unit->m_Packet);
                else
                   self->storePkt(id, unit->m_Packet.clone());
             }
          }
          else if (id > 0)  //发送往一个socket的数据包
          {
             if (NULL != (u = self->m_pHash->lookup(id)))//找到UDT对应的CUDT*
             {
                if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion))
                {//对比地址
                   if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
                   {//如果这个CUDT*的状态正常
                      if (0 == unit->m_Packet.getFlag())//如果这是一个数据包
                         u->processData(unit);  //处理数据
                      else//如果这是一个控制包
                         u->processCtrl(unit->m_Packet);//处理控制信息
    
                      u->checkTimers();//检查定时器
                      self->m_pRcvUList->update(u);//将这个处理过的CUDT*插入List末尾
                   }
                }
             }
             else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
             {//如果是一个新的连接状态
                if (!u->m_bSynRecving)
                   u->connect(unit->m_Packet);//建立稳定的连接状态
                else
                   self->storePkt(id, unit->m_Packet.clone());//连接状态还没有稳定,先存储数据,稍后处理
             }
          }
    
          //当drop packet时或者没有free CUnit时,跳转到这块
    TIMER_CHECK:
          // take care of the timing event for all UDT sockets
    
          uint64_t currtime;
          CTimer::rdtsc(currtime);  //获取当前的时间
    
          CRNode* ul = self->m_pRcvUList->m_pUList;//获取头部的CUDT*
          uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();
          while ((NULL != ul) && (ul->m_llTimeStamp < ctime))
          {
             CUDT* udt = ul->m_pUDT;    //获取CUDT*
    
             if (udt->m_bConnected && !udt->m_bBroken && !udt->m_bClosing)
             {//如果这个CUDT*的状态正常,更新到Recv List的后面就行了
                udt->checkTimers(); 
                self->m_pRcvUList->update(udt);
             }
             else
             {//如果这个CUDT*的状态出现差错,直接删除
                // the socket must be removed from Hash table first, then RcvUList
                self->m_pHash->remove(udt->m_SocketID);
                self->m_pRcvUList->remove(udt);
                udt->m_pRNode->m_bOnList = false;
             }
    
             ul = self->m_pRcvUList->m_pUList;
          }
    
          //还没有进入正常的连接状态的CUDT*,发送探测包
          // Check connection requests status for all sockets in the RendezvousQueue.
          self->m_pRendezvousQueue->updateConnStatus();
       }
    
       //收尾工作
       if (AF_INET == self->m_UnitQueue.m_iIPversion)
          delete (sockaddr_in*)addr;
       else
          delete (sockaddr_in6*)addr;
    
       #ifndef WINDOWS
          return NULL;
       #else
          SetEvent(self->m_ExitCond);
          return 0;
       #endif
    }
    
    • 从Queue中获取一个Packet:int CRcvQueue::recvfrom(int32_t id, CPacket& packet)
    int CRcvQueue::recvfrom(int32_t id, CPacket& packet)
    {
       CGuard bufferlock(m_PassLock);
    
       map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
    
       if (i == m_mBuffer.end())    //如果描述这个UDT的Packet Queue为空
       {
          #ifndef WINDOWS
             uint64_t now = CTimer::getTime();
             timespec timeout;
    
             timeout.tv_sec = now / 1000000 + 1;
             timeout.tv_nsec = (now % 1000000) * 1000;
    
             pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout);//就睡一会
          #else
             ReleaseMutex(m_PassLock);
             WaitForSingleObject(m_PassCond, 1000);
             WaitForSingleObject(m_PassLock, INFINITE);
          #endif
    
          i = m_mBuffer.find(id);   //被唤醒后还没有packet可读的话,设置packet,并返回-1
          if (i == m_mBuffer.end())
          {
             packet.setLength(-1);
             return -1;
          }
       }
    
       // retrieve the earliest packet
       CPacket* newpkt = i->second.front(); //获取一个包
    
       if (packet.getLength() < newpkt->getLength())//如果两个包的数据量有差错,返回-1
       {
          packet.setLength(-1);
          return -1;
       }
    
       // copy packet content
       // 将首部的packet拷贝出来
       memcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize);
       memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength());
       packet.setLength(newpkt->getLength());
    
       delete [] newpkt->m_pcData;
       delete newpkt;
    
       // remove this message from queue,
       // if no more messages left for this socket, release its data structure
       i->second.pop();
       if (i->second.empty())   //如果队列为空的话,删除这个队列。对应上文,如果在m_buffer中没有找到Queue,就意味着没有Packet可读
          m_mBuffer.erase(i);
    
       return packet.getLength();
    }
    
    • 设置与取消Listener:int CRcvQueue::setListener(CUDT* u)void CRcvQueue::removeListener(const CUDT* u)
    int CRcvQueue::setListener(CUDT* u)
    {
       CGuard lslock(m_LSLock);
    
       if (NULL != m_pListener)
          return -1;
    
       m_pListener = u;
       return 0;
    }
    
    void CRcvQueue::removeListener(const CUDT* u)
    {
       CGuard lslock(m_LSLock);
    
       if (u == m_pListener)
          m_pListener = NULL;
    }
    
    • 从队列中删除UDT SOCKET以及为其维护的信息:void CRcvQueue::removeConnector(const UDTSOCKET& id)
    void CRcvQueue::removeConnector(const UDTSOCKET& id)    //删除和控制包有关的所有信息,这个队列中维护着和连接相关的信息
    {
       m_pRendezvousQueue->remove(id);
    
       CGuard bufferlock(m_PassLock);
    
       map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
       if (i != m_mBuffer.end())
       {
          while (!i->second.empty())
          {
             delete [] i->second.front()->m_pcData;
             delete i->second.front();
             i->second.pop();
          }
          m_mBuffer.erase(i);
       }
    }
    
    • 处理连接尚未完全建立的情况:void CRcvQueue::setNewEntry(CUDT* u) bool CRcvQueue::ifNewEntry() CUDT* CRcvQueue::getNewEntry()
    void CRcvQueue::setNewEntry(CUDT* u)        //加入新的CUDT*
    {
       CGuard listguard(m_IDLock);
       m_vNewEntry.push_back(u);
    }
    
    bool CRcvQueue::ifNewEntry()        //判断是否有新的entry
    {
       return !(m_vNewEntry.empty());
    }
    
    CUDT* CRcvQueue::getNewEntry()        //获取新的Entry
    {
       CGuard listguard(m_IDLock);
    
       if (m_vNewEntry.empty())
          return NULL;
    
       CUDT* u = (CUDT*)*(m_vNewEntry.begin());
       m_vNewEntry.erase(m_vNewEntry.begin());
    
       return u;
    }
    
  • 相关阅读:
    Jersey(1.19.1)
    Jersey(1.19.1)
    Jersey(1.19.1)
    Jersey(1.19.1)
    Jersey(1.19.1)
    Jersey(1.19.1)
    Jersey(1.19.1)
    Jersey(1.19.1)
    17. Letter Combinations of a Phone Number
    37.Sudoku Solver
  • 原文地址:https://www.cnblogs.com/ukernel/p/9191068.html
Copyright © 2011-2022 走看看