SendQueue与RecvQueue在代码中与UDP SOCKET相关。在用户将想要发送的数据提交给Buffer之后,由Buffer将数据打包,根据拥塞控制提供的时间计算,在合适的时间提交给SendQueue进行发送。在接收到数据包之后,通过事件驱动的模式通知用户从RecvQueue中拿去数据包。
删除了交会连接模式的代码。
SendQueue与RecvQueue依赖于几个通用的数据结构,先列出来哈,可能会与前面的描述有重复,怕来回查找麻烦,索性全部列出来:
CMultiplexer:每个UDP端口对应一个此对象,资源的实际持有者
struct CMultiplexer
{
CSndQueue* m_pSndQueue; // The sending queue
CRcvQueue* m_pRcvQueue; // The receiving queue
CChannel* m_pChannel; // The UDP channel for sending and receiving
CTimer* m_pTimer; // The timer
int m_iPort; // The UDP port number of this multiplexer
int m_iIPversion; // IP version
int m_iMSS; // Maximum Segment Size
int m_iRefCount; //与此资源复用器相关联的UDT实例的数量
bool m_bReusable; //这个资源复用器是否可以被共享
int m_iID; // multiplexer ID
};
CUnitQueue
struct CUnit
{
CPacket m_Packet; // packet
int m_iFlag; // 0: free 1:occupid, 2: msg已经read,但是还没有被free, 3: msg被丢弃
};
class CUnitQueue
{
private:
struct CQEntry
{
CUnit* m_pUnit; // unit queue
char* m_pBuffer; // data buffer
int m_iSize; // size of each queue
CQEntry* m_pNext;
}
*m_pQEntry, // 指向起始的Entry队列
*m_pCurrQueue, // 指向当前的Entry队列
*m_pLastQueue; // 指向最后一个Entry队列
CUnit* m_pAvailUnit; //最近访问的Unit*
int m_iSize; // 总共的Packets数量
int m_iCount; // 已经使用的Packets数量
int m_iMSS; // unit buffer size
int m_iIPversion; // IP version
};
- 初始化:
int CUnitQueue::init(int size, int mss, int version)
CUnitQueue::CUnitQueue():
m_pQEntry(NULL),
m_pCurrQueue(NULL),
m_pLastQueue(NULL),
m_iSize(0),
m_iCount(0),
m_iMSS(),
m_iIPversion()
{
}
int CUnitQueue::init(int size, int mss, int version)
{
CQEntry* tempq = NULL;
CUnit* tempu = NULL;
char* tempb = NULL;
try
{
tempq = new CQEntry; //初始化需要管理的内存
tempu = new CUnit [size];
tempb = new char [size * mss];
}
catch (...)
{
delete tempq; //出现异常后退出
delete [] tempu;
delete [] tempb;
return -1;
}
for (int i = 0; i < size; ++ i)
{
tempu[i].m_iFlag = 0; //0表示当前CUint是空闲的
tempu[i].m_Packet.m_pcData = tempb + i * mss; //初始化packet中的数据字段,只有使用权限
}
tempq->m_pUnit = tempu; //初始化队列
tempq->m_pBuffer = tempb; //初始化队列
tempq->m_iSize = size; //初始化大小
m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq; //初始化整体的队列
m_pQEntry->m_pNext = m_pQEntry; //初始化下一个指向自己
m_pAvailUnit = m_pCurrQueue->m_pUnit; //指向首个可用的CUnit
m_iSize = size; //整体的packet个数,CUint大小为0
m_iMSS = mss;
m_iIPversion = version;
return 0;
}
- 销毁:
CUnitQueue::~CUnitQueue()
CUnitQueue::~CUnitQueue()
{
CQEntry* p = m_pQEntry; //有size条队列,获取首条队列
while (p != NULL) //释放队列等的信息
{
delete [] p->m_pUnit;
delete [] p->m_pBuffer; //这个Buffer是一个CQEntry中的首部,指向这个CQEntry中所使用Buffer的起始位置,在分配的时候是一整块分配的,删除的时候也是一整块的删除
CQEntry* q = p;
if (p == m_pLastQueue)
p = NULL;
else
p = p->m_pNext;
delete q;
}
}
- 增加队列长度:
int CUnitQueue::increase()
int CUnitQueue::increase()
{
int real_count = 0;
CQEntry* p = m_pQEntry; //获取首条队列的指针
while (p != NULL)
{
CUnit* u = p->m_pUnit; //获取首条队列的首个CUnit
for (CUnit* end = u + p->m_iSize; u != end; ++ u)
if (u->m_iFlag != 0) //如果队列中的Cunit状态不是free
++ real_count; //真实的,使用中的CUnit数量++
if (p == m_pLastQueue)
p = NULL;
else
p = p->m_pNext;
}
m_iCount = real_count; //更新使用状况
if (double(m_iCount) / m_iSize < 0.9) //已经使用的数量/总数量 < 0.9
return -1; //就是说还有剩余的空间,不进行扩张
CQEntry* tempq = NULL; //创建一条新的队列,链接到末尾
CUnit* tempu = NULL;
char* tempb = NULL;
// all queues have the same size
int size = m_pQEntry->m_iSize;
try
{
tempq = new CQEntry;
tempu = new CUnit [size];
tempb = new char [size * m_iMSS];
}
catch (...)
{
delete tempq;
delete [] tempu;
delete [] tempb;
return -1;
}
for (int i = 0; i < size; ++ i)
{
tempu[i].m_iFlag = 0;
tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
}
tempq->m_pUnit = tempu;
tempq->m_pBuffer = tempb;
tempq->m_iSize = size;
m_pLastQueue->m_pNext = tempq; //链接到末尾
m_pLastQueue = tempq;
m_pLastQueue->m_pNext = m_pQEntry; //这是一条环形链表
m_iSize += size; //增加总共的数量
return 0;
}
- 获取下一个可用的CUnit:
CUnit* CUnitQueue::getNextAvailUnit()
CUnit* CUnitQueue::getNextAvailUnit()
{
if (m_iCount * 10 > m_iSize * 9) //如果已经使用的比例超过0.9,就增加数量
increase();
if (m_iCount >= m_iSize) //如果已经使用的数量超过总数量,就返回
return NULL;
CQEntry* entrance = m_pCurrQueue; //获取当前队列
do
{
for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1; m_pAvailUnit != sentinel; ++ m_pAvailUnit)
if (m_pAvailUnit->m_iFlag == 0) //在当前的队列中找到第一个free的CUnit
return m_pAvailUnit;
if (m_pCurrQueue->m_pUnit->m_iFlag == 0) //如果当前队列的第一个CUnit为free
{
m_pAvailUnit = m_pCurrQueue->m_pUnit; //更新avail CUnit并返回
return m_pAvailUnit;
}
m_pCurrQueue = m_pCurrQueue->m_pNext; //availCUint一直存在于CurrentQuene
m_pAvailUnit = m_pCurrQueue->m_pUnit;
} while (m_pCurrQueue != entrance);
increase(); //完了立马判断是否需要增加数量
return NULL;
}
CSndUList
struct CSNode
{
CUDT* m_pUDT; // 指向CUDT*的指针
uint64_t m_llTimeStamp; // 堆化时排序的时间戳
int m_iHeapLoc; // 堆的层次,-1意味着暂时不存在与当前堆中
};
class CSndUList
{
private:
CSNode** m_pHeap; // 堆化数组
int m_iArrayLength; // 堆数组长度
int m_iLastEntry; // 最近一次发送的位置
udt_pthread_mutex_t m_ListLock;
udt_pthread_mutex_t* m_pWindowLock;
udt_pthread_cond_t* m_pWindowCond;
CTimer* m_pTimer;
};
- 初始化:
CSndUList::CSndUList()
CSndUList::CSndUList():
m_pHeap(NULL),
m_iArrayLength(4096), //将队列长度预设为4096
m_iLastEntry(-1),
m_ListLock(),
m_pWindowLock(NULL),
m_pWindowCond(NULL),
m_pTimer(NULL)
{
m_pHeap = new CSNode*[m_iArrayLength]; //创建length条指针队列
#ifndef WINDOWS
pthread_mutex_init(&m_ListLock, NULL);
#else
m_ListLock = CreateMutex(NULL, false, NULL);
#endif
}
- 销毁:
CSndUList::~CSndUList()
CSndUList::~CSndUList()
{
delete [] m_pHeap; //销毁指针队列
#ifndef WINDOWS
pthread_mutex_destroy(&m_ListLock);
#else
CloseHandle(m_ListLock);
#endif
}
- 向堆中添加CUDT实例:
void CSndUList::insert(int64_t ts, const CUDT* u)
void CSndUList::insert(int64_t ts, const CUDT* u)
{
CGuard listguard(m_ListLock); //获取锁的guard
// increase the heap array size if necessary
if (m_iLastEntry == m_iArrayLength - 1) //如果上一次使用的是最后一条,增加长度
{
CSNode** temp = NULL;
try //都是指针,调整起来,消耗也不是很大
{
temp = new CSNode*[m_iArrayLength * 2]; //长度*2,然后将以往的都copy
}
catch(...)
{
return;
}
memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength);
m_iArrayLength *= 2;
delete [] m_pHeap; //释放以前的操作
m_pHeap = temp;
}
insert_(ts, u); //真实的insert
}
- 向堆中添加CUDT实例:
void CSndUList::insert_(int64_t ts, const CUDT* u)
void CSndUList::insert_(int64_t ts, const CUDT* u) //真实的insert
{
CSNode* n = u->m_pSNode; //反向获取CSNode
// do not insert repeated node
//-1意味着,这个CSNode没有在堆中,可以插入.>=1意味着在堆中,直接返回
if (n->m_iHeapLoc >= 0)
return;
m_iLastEntry ++; //修改即将插入的位置
m_pHeap[m_iLastEntry] = n; //将这个指针指向的CSNode插入堆中
n->m_llTimeStamp = ts; //修改时间为ts
int q = m_iLastEntry;
int p = q;
while (p != 0) //堆化的过程,调整新插入的位置,根据ts的大小调整位置
{
p = (q - 1) >> 1;
if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp) //ts越大,越往上
{
CSNode* t = m_pHeap[p];
m_pHeap[p] = m_pHeap[q];
m_pHeap[q] = t;
t->m_iHeapLoc = q;
q = p;
}
else
break;
}
n->m_iHeapLoc = q; //这个变量还表示在当前堆中的层数吗? 反正大于0,就是存在于堆中了
// an earlier event has been inserted, wake up sending worker
if (n->m_iHeapLoc == 0) //如果当前的CUDT*在最顶层,唤醒发送线程
m_pTimer->interrupt();
if (0 == m_iLastEntry) //如果队列为空,唤醒发送队列??
{
#ifndef WINDOWS
pthread_mutex_lock(m_pWindowLock);
pthread_cond_signal(m_pWindowCond);
pthread_mutex_unlock(m_pWindowLock);
#else
SetEvent(*m_pWindowCond);
#endif
}
}
- 更新CUDT的发送时间戳:
void CSndUList::update(const CUDT* u, bool reschedule)
void CSndUList::update(const CUDT* u, bool reschedule) //更新CUDT*的发送时间戳
{
CGuard listguard(m_ListLock);
CSNode* n = u->m_pSNode; //反向获取指针
if (n->m_iHeapLoc >= 0) //>0说明存在于堆中,需要调整
{
if (!reschedule) //如果调整参数为false,直接退出
return;
if (n->m_iHeapLoc == 0) //如果在堆顶部,唤醒发送队列,那还重新调整吗?
{
n->m_llTimeStamp = 1;
m_pTimer->interrupt();
return;
}
remove_(u); //删除已经存在的CUDT*
}
insert_(1, u); //重新插入CUDT*
}
- 取得发送地址中下一个packet和addr,并将CUDT重新加入堆中:
int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
{
CGuard listguard(m_ListLock);
if (-1 == m_iLastEntry) //索引有问题,直接返回
return -1;
// no pop until the next schedulled time
uint64_t ts;
CTimer::rdtsc(ts); //获取当前时间
if (ts < m_pHeap[0]->m_llTimeStamp) //如果发送的时间小于当前的时间(在规定的时间内没有发送出去)
return -1;
CUDT* u = m_pHeap[0]->m_pUDT; //从堆顶部获取并删除
remove_(u); //只是将flag调整为-1,不进行直接的删除
if (!u->m_bConnected || u->m_bBroken) //和CUDT中的函数挂钩了,回头调整注释
return -1;
// pack a packet from the socket
if (u->packData(pkt, ts) <= 0)
return -1;
addr = u->m_pPeerAddr;
// insert a new entry, ts is the next processing time
if (ts > 0)
insert_(ts, u); //将当前时间设置为下一次的发送时间
return 1;
}
- 获取下一次发哦少年宫的事件:
uint64_t CSndUList::getNextProcTime()
uint64_t CSndUList::getNextProcTime()
{
CGuard listguard(m_ListLock);
if (-1 == m_iLastEntry)
return 0;
return m_pHeap[0]->m_llTimeStamp; //获取下一次的处理时间
}
- 将CUDT从堆中删除:
void CSndUList::remove_(const CUDT* u)
void CSndUList::remove_(const CUDT* u)
{
CSNode* n = u->m_pSNode; //反向获取CUDT*在堆中的表现形式
if (n->m_iHeapLoc >= 0) //如果存在于堆中,进行删除,堆的删除操作
{
// remove the node from heap
m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];
m_iLastEntry --;
m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc;
int q = n->m_iHeapLoc;
int p = q * 2 + 1;
while (p <= m_iLastEntry)
{
if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp))
p ++;
if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp)
{
CSNode* t = m_pHeap[p];
m_pHeap[p] = m_pHeap[q];
m_pHeap[p]->m_iHeapLoc = p;
m_pHeap[q] = t;
m_pHeap[q]->m_iHeapLoc = q;
q = p;
p = q * 2 + 1;
}
else
break;
}
n->m_iHeapLoc = -1; //此时,这个CSNode不存在于堆中,只调整指针位置,不进行实际的删除
}
// the only event has been deleted, wake up immediately
if (0 == m_iLastEntry) //如果队列已经empty,唤醒队列
m_pTimer->interrupt();
}
CSndQueue:Send Queue
class CSndQueue
{
private:
static void* worker(void* param); //发送线程
udt_pthread_t m_WorkerThread;
private:
CSndUList* m_pSndUList; // 堆化的Send List
CChannel* m_pChannel; // The UDP channel for data sending
CTimer* m_pTimer; // 定时器设施
udt_pthread_mutex_t m_WindowLock;
udt_pthread_cond_t m_WindowCond;
volatile bool m_bClosing; // 发送线程是否启动
udt_pthread_cond_t m_ExitCond;
};
- 初始化:
void CSndQueue::init(CChannel* c, CTimer* t)
CSndQueue::CSndQueue():
m_WorkerThread(),
m_pSndUList(NULL),
m_pChannel(NULL),
m_pTimer(NULL),
m_WindowLock(),
m_WindowCond(),
m_bClosing(false),
m_ExitCond()
{
#ifndef WINDOWS
pthread_cond_init(&m_WindowCond, NULL);
pthread_mutex_init(&m_WindowLock, NULL);
#else
m_WindowLock = CreateMutex(NULL, false, NULL);
m_WindowCond = CreateEvent(NULL, false, false, NULL);
m_ExitCond = CreateEvent(NULL, false, false, NULL);
#endif
}
void CSndQueue::init(CChannel* c, CTimer* t) //讲真,SendQueue和RecvQueue共用UDP SOCKET的Timer
{
m_pChannel = c;
m_pTimer = t;
m_pSndUList = new CSndUList;
m_pSndUList->m_pWindowLock = &m_WindowLock; //初始化SendList控制变量
m_pSndUList->m_pWindowCond = &m_WindowCond;
m_pSndUList->m_pTimer = m_pTimer;
#ifndef WINDOWS
if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this))
{ //启动工作线程
m_WorkerThread = 0;
throw CUDTException(3, 1);
}
#else
DWORD threadID;
m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID);
if (NULL == m_WorkerThread)
throw CUDTException(3, 1);
#endif
}
- 工作线程:
void* CSndQueue::worker(void* param) //工作线程
{
CSndQueue* self = (CSndQueue*)param; //获取处理的Queue
while (!self->m_bClosing) //控制发送线程是否继续发送
{
uint64_t ts = self->m_pSndUList->getNextProcTime(); //获得发送队列下一个将要发送的包的具体信息
if (ts > 0) //如果还没有到发送时间
{
// wait until next processing time of the first socket on the list
uint64_t currtime;
CTimer::rdtsc(currtime);
if (currtime < ts) //如果当前的时间小鱼发送时间,小睡一会
self->m_pTimer->sleepto(ts);
// it is time to send the next pkt
sockaddr* addr; //已经到了发送时间
CPacket pkt;
if (self->m_pSndUList->pop(addr, pkt) < 0) //从发送队列中获取包和发送地址
continue;
self->m_pChannel->sendto(addr, pkt); //调用Channel的UDP的封装发送
}
else
{
//如果没有包需要发送的时候,就在这块休眠
// wait here if there is no sockets with data to be sent
#ifndef WINDOWS
pthread_mutex_lock(&self->m_WindowLock);
if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);
pthread_mutex_unlock(&self->m_WindowLock);
#else
WaitForSingleObject(self->m_WindowCond, INFINITE);
#endif
}
}
#ifndef WINDOWS
return NULL;
#else
SetEvent(self->m_ExitCond);
return 0;
#endif
}
- 发送数据包:
int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
{
m_pChannel->sendto(addr, packet); //调用Channel发送
return packet.getLength();
}
CRcvUList
struct CRNode
{
CUDT* m_pUDT; // Pointer to CUDT*
uint64_t m_llTimeStamp; // Time Stamp
CRNode* m_pPrev; // previous link
CRNode* m_pNext; // next link
bool m_bOnList; // 当前节点是否在双向链表上
};
class CRcvUList //用于接收数据的双向链表
{
public:
CRNode* m_pUList; // the head node
private:
CRNode* m_pLast; // the last node
};
- 向双向链表中插入CUDT实例:
void CRcvUList::insert(const CUDT* u)
void CRcvUList::insert(const CUDT* u)
{
CRNode* n = u->m_pRNode; //反向获取在双向链表中的表现形式
CTimer::rdtsc(n->m_llTimeStamp); //获取当前时间,gettimeofday()
if (NULL == m_pUList) //如果双向链表为空
{
// empty list, insert as the single node
n->m_pPrev = n->m_pNext = NULL;
m_pLast = m_pUList = n;
return;
}
// always insert at the end for RcvUList
n->m_pPrev = m_pLast; //插入双向链表的末尾
n->m_pNext = NULL;
m_pLast->m_pNext = n;
m_pLast = n;
}
- 向双向链表中移除CUDT实例:
void CRcvUList::remove(const CUDT* u)
void CRcvUList::remove(const CUDT* u)
{
CRNode* n = u->m_pRNode; //方向获取在链表中的表现形式
if (!n->m_bOnList) //如果不在链表中,直接返回
return;
if (NULL == n->m_pPrev) //如果需要删除的结点是首部结点
{
// n is the first node
m_pUList = n->m_pNext;
if (NULL == m_pUList)
m_pLast = NULL;
else
m_pUList->m_pPrev = NULL;
}
else //反正不是真正的删除,只是在链表中摘除,然后调整是否在链表中的标志
{
n->m_pPrev->m_pNext = n->m_pNext;
if (NULL == n->m_pNext)
{
// n is the last node
m_pLast = n->m_pPrev;
}
else
n->m_pNext->m_pPrev = n->m_pPrev;
}
n->m_pNext = n->m_pPrev = NULL;
}
- 更新CUDT在双向链表中的位置:
void CRcvUList::update(const CUDT* u)
void CRcvUList::update(const CUDT* u)
{
CRNode* n = u->m_pRNode; //反向获取在链表中的表现形式
if (!n->m_bOnList)
return;
CTimer::rdtsc(n->m_llTimeStamp); //获取当前时间
// if n is the last node, do not need to change
if (NULL == n->m_pNext) //如果n是末尾的结点,不调整
return;
if (NULL == n->m_pPrev) //如果n是头部结点
{
m_pUList = n->m_pNext; //将下一个结点调整为头部结点
m_pUList->m_pPrev = NULL;
}
else
{
n->m_pPrev->m_pNext = n->m_pNext; //否则在链表中剔除这个结点
n->m_pNext->m_pPrev = n->m_pPrev;
}
n->m_pPrev = m_pLast; //然后把这个结点添加到末尾
n->m_pNext = NULL;
m_pLast->m_pNext = n;
m_pLast = n;
}
CHash
class CHash
{
private:
struct CBucket
{
int32_t m_iID; // Socket ID
CUDT* m_pUDT; // Socket instance
CBucket* m_pNext; // next bucket
} **m_pBucket; // list of buckets (the hash table)
int m_iHashSize; // size of hash table
};
- 初始化:
void CHash::init(int size)
CHash::CHash():
m_pBucket(NULL),
m_iHashSize(0)
{
}
void CHash::init(int size)
{
m_pBucket = new CBucket* [size]; //创建size个HASH ENTRY
for (int i = 0; i < size; ++ i)
m_pBucket[i] = NULL; //每个HASH ENTRY指向空
m_iHashSize = size; //调整HASH SIZE
}
- 销毁:
CHash::~CHash()
CHash::~CHash()
{
for (int i = 0; i < m_iHashSize; ++ i) //删除所有的Buckets
{
CBucket* b = m_pBucket[i];
while (NULL != b)
{
CBucket* n = b->m_pNext;
delete b;
b = n;
}
}
delete [] m_pBucket;
}
- 查找:
CUDT* CHash::lookup(int32_t id)
CUDT* CHash::lookup(int32_t id)
{
// simple hash function (% hash table size); suitable for socket descriptors
CBucket* b = m_pBucket[id % m_iHashSize];
while (NULL != b) //寻找CUDT*
{
if (id == b->m_iID)
return b->m_pUDT;
b = b->m_pNext;
}
return NULL;
}
- 插入:
void CHash::insert(int32_t id, CUDT* u)
void CHash::insert(int32_t id, CUDT* u) //新加入的Bucket加入到最接近数组的底部
{
CBucket* b = m_pBucket[id % m_iHashSize];
CBucket* n = new CBucket;
n->m_iID = id;
n->m_pUDT = u;
n->m_pNext = b;
m_pBucket[id % m_iHashSize] = n;
}
- 删除:
void CHash::remove(int32_t id)
void CHash::remove(int32_t id)
{
CBucket* b = m_pBucket[id % m_iHashSize]; //找到Bucket
CBucket* p = NULL;
while (NULL != b) //在桶中的链表中删除一个结点
{
if (id == b->m_iID)
{
if (NULL == p)
m_pBucket[id % m_iHashSize] = b->m_pNext;
else
p->m_pNext = b->m_pNext;
delete b;
return;
}
p = b;
b = b->m_pNext;
}
}
CRcvQueue
class CRcvQueue
{
private:
static void* worker(void* param); //接收线程
udt_pthread_t m_WorkerThread;
private:
CUnitQueue m_UnitQueue; // The received packet queue(就是那个类似于Hash的组织)
CRcvUList* m_pRcvUList; // 这个List中的UDT实例准备从Queue中读取数据
CHash* m_pHash; // HASH可以加速在List中寻找UDT实例
CChannel* m_pChannel; // UDP channel for receving packets
CTimer* m_pTimer; // 与发送队列共享Timer
int m_iPayloadSize; // packet中的有效载荷
volatile bool m_bClosing; // 接收线程是否启动
udt_pthread_cond_t m_ExitCond;
private:
udt_pthread_mutex_t m_LSLock;
CUDT* m_pListener; // pointer to the (unique, if any) listening UDT entity
CRendezvousQueue* m_pRendezvousQueue; // 汇合模式中的UDT SOCKET列表
std::vector<CUDT*> m_vNewEntry; // 新添加的条目
udt_pthread_mutex_t m_IDLock;
std::map<int32_t, std::queue<CPacket*> > m_mBuffer; // 用于集合连接请求的临时缓冲区
udt_pthread_mutex_t m_PassLock;
udt_pthread_cond_t m_PassCond;
};
- 初始化:
void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel* cc, CTimer* t)
CRcvQueue::CRcvQueue():
m_WorkerThread(),
m_UnitQueue(),
m_pRcvUList(NULL),
m_pHash(NULL),
m_pChannel(NULL),
m_pTimer(NULL),
m_iPayloadSize(),
m_bClosing(false),
m_ExitCond(),
m_LSLock(),
m_pListener(NULL),
m_pRendezvousQueue(NULL),
m_vNewEntry(),
m_IDLock(),
m_mBuffer(),
m_PassLock(),
m_PassCond()
{
#ifndef WINDOWS
pthread_mutex_init(&m_PassLock, NULL);
pthread_cond_init(&m_PassCond, NULL);
pthread_mutex_init(&m_LSLock, NULL);
pthread_mutex_init(&m_IDLock, NULL);
#else
m_PassLock = CreateMutex(NULL, false, NULL);
m_PassCond = CreateEvent(NULL, false, false, NULL);
m_LSLock = CreateMutex(NULL, false, NULL);
m_IDLock = CreateMutex(NULL, false, NULL);
m_ExitCond = CreateEvent(NULL, false, false, NULL);
#endif
}
void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel* cc, CTimer* t)
{
m_iPayloadSize = payload;
m_UnitQueue.init(qsize, payload, version);
m_pHash = new CHash;
m_pHash->init(hsize);
m_pChannel = cc;
m_pTimer = t;
m_pRcvUList = new CRcvUList;
m_pRendezvousQueue = new CRendezvousQueue;
#ifndef WINDOWS
if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this))
{ //启动接收线程
m_WorkerThread = 0;
throw CUDTException(3, 1);
}
#else
DWORD threadID;
m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &threadID);
if (NULL == m_WorkerThread)
throw CUDTException(3, 1);
#endif
}
- 销毁:
CRcvQueue::~CRcvQueue()
CRcvQueue::~CRcvQueue()
{
m_bClosing = true;
#ifndef WINDOWS
if (0 != m_WorkerThread) //终止接受线程
pthread_join(m_WorkerThread, NULL);
pthread_mutex_destroy(&m_PassLock);
pthread_cond_destroy(&m_PassCond);
pthread_mutex_destroy(&m_LSLock);
pthread_mutex_destroy(&m_IDLock);
#else
if (NULL != m_WorkerThread)
WaitForSingleObject(m_ExitCond, INFINITE);
CloseHandle(m_WorkerThread);
CloseHandle(m_PassLock);
CloseHandle(m_PassCond);
CloseHandle(m_LSLock);
CloseHandle(m_IDLock);
CloseHandle(m_ExitCond);
#endif
delete m_pRcvUList;
delete m_pHash;
delete m_pRendezvousQueue;
// 删除队列中所有没有处理的信息
for (map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++ i)
{
while (!i->second.empty())
{
CPacket* pkt = i->second.front();
delete [] pkt->m_pcData;
delete pkt;
i->second.pop();
}
}
}
- 发送工作线程:
void* CRcvQueue::worker(void* param)
{
CRcvQueue* self = (CRcvQueue*)param; //反向获取接收队列对象
//准备好ID,地址之类的东西
sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
CUDT* u = NULL;
int32_t id;
while (!self->m_bClosing)
{
#ifdef NO_BUSY_WAITING
self->m_pTimer->tick();
#endif
// check waiting list, if new socket, insert it to the list
while (self->ifNewEntry()) //如果有新的CUDT*
{
CUDT* ne = self->getNewEntry(); //将新的CUDT*加入到合适的队列中
if (NULL != ne)
{
self->m_pRcvUList->insert(ne);
self->m_pHash->insert(ne->m_SocketID, ne);
}
}
// 为了接收packet,获取一个可用的slot
CUnit* unit = self->m_UnitQueue.getNextAvailUnit();
if (NULL == unit) //如果获取失败
{
// 没有足够的空间,读取这个packet后直接drop掉
CPacket temp;
temp.m_pcData = new char[self->m_iPayloadSize];
temp.setLength(self->m_iPayloadSize);
self->m_pChannel->recvfrom(addr, temp);
delete [] temp.m_pcData;
goto TIMER_CHECK;
}
unit->m_Packet.setLength(self->m_iPayloadSize);//设置packet的有效载荷
//recv一个packet.如果返回-1,就相当于什么都没有收到
if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)
goto TIMER_CHECK;
id = unit->m_Packet.m_iID; //获取收到的packet的ID
// ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
if (0 == id)//Connect request
{
if (NULL != self->m_pListener)
self->m_pListener->listen(addr, unit->m_Packet);//通过这个CUDT*处理这个packet
else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
{ //同样是处理new connect
// asynchronous connect: call connect here
// otherwise wait for the UDT socket to retrieve this packet
if (!u->m_bSynRecving)
u->connect(unit->m_Packet);
else
self->storePkt(id, unit->m_Packet.clone());
}
}
else if (id > 0) //发送往一个socket的数据包
{
if (NULL != (u = self->m_pHash->lookup(id)))//找到UDT对应的CUDT*
{
if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion))
{//对比地址
if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
{//如果这个CUDT*的状态正常
if (0 == unit->m_Packet.getFlag())//如果这是一个数据包
u->processData(unit); //处理数据
else//如果这是一个控制包
u->processCtrl(unit->m_Packet);//处理控制信息
u->checkTimers();//检查定时器
self->m_pRcvUList->update(u);//将这个处理过的CUDT*插入List末尾
}
}
}
else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
{//如果是一个新的连接状态
if (!u->m_bSynRecving)
u->connect(unit->m_Packet);//建立稳定的连接状态
else
self->storePkt(id, unit->m_Packet.clone());//连接状态还没有稳定,先存储数据,稍后处理
}
}
//当drop packet时或者没有free CUnit时,跳转到这块
TIMER_CHECK:
// take care of the timing event for all UDT sockets
uint64_t currtime;
CTimer::rdtsc(currtime); //获取当前的时间
CRNode* ul = self->m_pRcvUList->m_pUList;//获取头部的CUDT*
uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();
while ((NULL != ul) && (ul->m_llTimeStamp < ctime))
{
CUDT* udt = ul->m_pUDT; //获取CUDT*
if (udt->m_bConnected && !udt->m_bBroken && !udt->m_bClosing)
{//如果这个CUDT*的状态正常,更新到Recv List的后面就行了
udt->checkTimers();
self->m_pRcvUList->update(udt);
}
else
{//如果这个CUDT*的状态出现差错,直接删除
// the socket must be removed from Hash table first, then RcvUList
self->m_pHash->remove(udt->m_SocketID);
self->m_pRcvUList->remove(udt);
udt->m_pRNode->m_bOnList = false;
}
ul = self->m_pRcvUList->m_pUList;
}
//还没有进入正常的连接状态的CUDT*,发送探测包
// Check connection requests status for all sockets in the RendezvousQueue.
self->m_pRendezvousQueue->updateConnStatus();
}
//收尾工作
if (AF_INET == self->m_UnitQueue.m_iIPversion)
delete (sockaddr_in*)addr;
else
delete (sockaddr_in6*)addr;
#ifndef WINDOWS
return NULL;
#else
SetEvent(self->m_ExitCond);
return 0;
#endif
}
- 从Queue中获取一个Packet:
int CRcvQueue::recvfrom(int32_t id, CPacket& packet)
int CRcvQueue::recvfrom(int32_t id, CPacket& packet)
{
CGuard bufferlock(m_PassLock);
map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
if (i == m_mBuffer.end()) //如果描述这个UDT的Packet Queue为空
{
#ifndef WINDOWS
uint64_t now = CTimer::getTime();
timespec timeout;
timeout.tv_sec = now / 1000000 + 1;
timeout.tv_nsec = (now % 1000000) * 1000;
pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout);//就睡一会
#else
ReleaseMutex(m_PassLock);
WaitForSingleObject(m_PassCond, 1000);
WaitForSingleObject(m_PassLock, INFINITE);
#endif
i = m_mBuffer.find(id); //被唤醒后还没有packet可读的话,设置packet,并返回-1
if (i == m_mBuffer.end())
{
packet.setLength(-1);
return -1;
}
}
// retrieve the earliest packet
CPacket* newpkt = i->second.front(); //获取一个包
if (packet.getLength() < newpkt->getLength())//如果两个包的数据量有差错,返回-1
{
packet.setLength(-1);
return -1;
}
// copy packet content
// 将首部的packet拷贝出来
memcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize);
memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength());
packet.setLength(newpkt->getLength());
delete [] newpkt->m_pcData;
delete newpkt;
// remove this message from queue,
// if no more messages left for this socket, release its data structure
i->second.pop();
if (i->second.empty()) //如果队列为空的话,删除这个队列。对应上文,如果在m_buffer中没有找到Queue,就意味着没有Packet可读
m_mBuffer.erase(i);
return packet.getLength();
}
- 设置与取消Listener:
int CRcvQueue::setListener(CUDT* u)
,void CRcvQueue::removeListener(const CUDT* u)
int CRcvQueue::setListener(CUDT* u)
{
CGuard lslock(m_LSLock);
if (NULL != m_pListener)
return -1;
m_pListener = u;
return 0;
}
void CRcvQueue::removeListener(const CUDT* u)
{
CGuard lslock(m_LSLock);
if (u == m_pListener)
m_pListener = NULL;
}
- 从队列中删除UDT SOCKET以及为其维护的信息:
void CRcvQueue::removeConnector(const UDTSOCKET& id)
void CRcvQueue::removeConnector(const UDTSOCKET& id) //删除和控制包有关的所有信息,这个队列中维护着和连接相关的信息
{
m_pRendezvousQueue->remove(id);
CGuard bufferlock(m_PassLock);
map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
if (i != m_mBuffer.end())
{
while (!i->second.empty())
{
delete [] i->second.front()->m_pcData;
delete i->second.front();
i->second.pop();
}
m_mBuffer.erase(i);
}
}
- 处理连接尚未完全建立的情况:
void CRcvQueue::setNewEntry(CUDT* u)
,bool CRcvQueue::ifNewEntry()
,CUDT* CRcvQueue::getNewEntry()
void CRcvQueue::setNewEntry(CUDT* u) //加入新的CUDT*
{
CGuard listguard(m_IDLock);
m_vNewEntry.push_back(u);
}
bool CRcvQueue::ifNewEntry() //判断是否有新的entry
{
return !(m_vNewEntry.empty());
}
CUDT* CRcvQueue::getNewEntry() //获取新的Entry
{
CGuard listguard(m_IDLock);
if (m_vNewEntry.empty())
return NULL;
CUDT* u = (CUDT*)*(m_vNewEntry.begin());
m_vNewEntry.erase(m_vNewEntry.begin());
return u;
}