zoukankan      html  css  js  c++  java
  • UDT源码剖析(二)之启动与结束

    UDT的启动例程

    启动例程顺序:void UDT::startup() -> void CUDT::startup() -> void CUDTUnited::startup() -> void* CUDTUnited::garbageCollect(void* p) -> void CUDTUnited::checkBrokenSockets() -> void CUDTUnited::removeSocket(const UDTSOCKET u) -> void CUDT::close() -> void CUDT::flush()

    • 对于一个完整的UDT SOCKET的清理六部曲:
      • 将CUDT*的状态设置为BROKEN
      • 调用CUDT中的close()
      • 更新UDT SOCKET的关闭时间
      • 将UDT SOCKET设置为Closed
      • 在Closed Array中添加当前UDT SOCKET,在GC线程中进行处理
      • 从全局的MAP中删除

    Order 0:void CUDTUnited::startup()

    • 功能A:真正干活的启动函数
        int CUDTUnited::startup()
        {
            CGuard gcinit(m_InitLock);    //拿住启动锁
    
            if (m_iInstanceCount++ > 0)    //不需要重复启动
                return 0;
    
            // Global initialization code
            #ifdef WINDOWS
                WORD wVersionRequested;
                WSADATA wsaData;
                wVersionRequested = MAKEWORD(2, 2);
    
                if (0 != WSAStartup(wVersionRequested, &wsaData))
                    throw CUDTException(1, 0,  WSAGetLastError());
            #endif
    
            //init CTimer::EventLock
    
            if (m_bGCStatus)    //设置GC线程的状态
                return true;
    
            m_bClosing = false;    //设置全局的状态
            #ifndef WINDOWS
                pthread_mutex_init(&m_GCStopLock, NULL);
                pthread_cond_init(&m_GCStopCond, NULL);
                pthread_create(&m_GCThread, NULL, garbageCollect, this);    //启动GC线程
            #else
                m_GCStopLock = CreateMutex(NULL, false, NULL);
                m_GCStopCond = CreateEvent(NULL, false, false, NULL);
                DWORD ThreadID;
                m_GCThread = CreateThread(NULL, 0, garbageCollect, this, 0, &ThreadID);
            #endif
    
            m_bGCStatus = true;    //设置GC线程状态
    
            return 0;
        }
    

    Order 1:void* CUDTUnited::garbageCollect(void* p)

    • 功能:GC线程(注:unet的实现中使用事件的模式清理资源)
        #ifndef WINDOWS
            void* CUDTUnited::garbageCollect(void* p)
        #else
            DWORD WINAPI CUDTUnited::garbageCollect(LPVOID p)
        #endif
        {
            CUDTUnited* self = (CUDTUnited*)p;    //获取CUDTUnited实例
    
            CGuard gcguard(self->m_GCStopLock);    //拿到垃圾清理的锁
    
            while (!self->m_bClosing)    //如果调整目前的CUDTUnited的状态为关闭状态,退出垃圾清理的无限循环
            {
                self->checkBrokenSockets();    //当UDT协议判断某一个UDT SOCKET的状态不正确时,会将其状态设置为BROKEN,并在这个函数中进行处理
    
                #ifdef WINDOWS
                    self->checkTLSValue();
                #endif
    
                #ifndef WINDOWS
                    timeval now;
                    timespec timeout;
                    gettimeofday(&now, 0);
                    timeout.tv_sec = now.tv_sec + 1;    //进入睡眠等待,然后等待下一次可以清理出现BROKEN状态的UDT SOCKET。具体的睡眠为时间为:1S
                    timeout.tv_nsec = now.tv_usec * 1000;
    
                    pthread_cond_timedwait(&self->m_GCStopCond, &self->m_GCStopLock, &timeout);    //然后调用当前的GC线程陷入睡眠等待被唤醒
                #else
                    WaitForSingleObject(self->m_GCStopCond, 1000);
               #endif
            }
    
            //剩下的步骤负责清理目前依旧残余的资源 
            // remove all sockets and multiplexers(复用器)
            CGuard::enterCS(self->m_ControlLock);
            for (map<UDTSOCKET, CUDTSocket*>::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++ i)    .//逐步遍历CUDTUnited中保留的UTD SOCKET
            {
                i->second->m_pUDT->m_bBroken = true;    //将CUDT*描述的状态设置为BROKEN,后续进行处理,然后调用CUDT中的close函数清理保存连接所消耗的资源
                i->second->m_pUDT->close();
                i->second->m_Status = CLOSED;    //设置状态为关闭
                i->second->m_TimeStamp = CTimer::getTime();    //调整最后一次操作UDT SOCKET的时间
                self->m_ClosedSockets[i->first] = i->second;    //将当前描述连接的CUDT*保存至CloseMap
    
                // 清理Lintener Queue
                map<UDTSOCKET, CUDTSocket*>::iterator ls = self->m_Sockets.find(i->second->m_ListenSocket);    //从当前的UDT SOCKET中寻找自己的Listener
                if (ls == self->m_Sockets.end())    //如果没有找到Listener
                {
                    ls = self->m_ClosedSockets.find(i->second->m_ListenSocket);    //在存活的UDT Map中没有找到的话,就在Close UDT Map中寻找
                    if (ls == self->m_ClosedSockets.end())    //如果没有找到,就不再处理存储在Listener两个队列中的资源
                        continue;
                }
    
                CGuard::enterCS(ls->second->m_AcceptLock);    //获取Listener中的锁
                ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);    //从连接完成但是取出UDT SOCKET的队列中清理
                ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);    //从已完成连接的队列清理
                CGuard::leaveCS(ls->second->m_AcceptLock);    
            }
            self->m_Sockets.clear();    //最后清理资源
    
            for (map<UDTSOCKET, CUDTSocket*>::iterator j = self->m_ClosedSockets.begin(); j != self->m_ClosedSockets.end(); ++ j)    //最后再次遍历Close UDT SOCKET队列
            {
                j->second->m_TimeStamp = 0;    //将状态设置为0
            }
            CGuard::leaveCS(self->m_ControlLock);
    
            while (true)
            {
                self->checkBrokenSockets();    //之前遍历依旧存活的队列时,只是将即将清理的UDT SOCKET状态设置为BROKEN,此时,对上述为BROKEN状态的UDT SOCKET进行清理
    
                CGuard::enterCS(self->m_ControlLock);
                bool empty = self->m_ClosedSockets.empty();    //判断是否为空
                CGuard::leaveCS(self->m_ControlLock);
    
                if (empty)    //如果为empty,就可以直接退出
                    break;
    
                CTimer::sleep();    //不行的话,再歇一会,再次进行处理
            }
    
            #ifndef WINDOWS
                return NULL;
            #else
                return 0;
            #endif
        }
    

    Order 2:void CUDTUnited::checkBrokenSockets()

    • 功能A:真正干活的函数,用于清理处于BROKEN状态的UDT SOCKET
        void CUDTUnited::checkBrokenSockets()
        {
            CGuard cg(m_ControlLock);    //获取GC锁
    
            // set of sockets To Be Closed and To Be Removed
            vector<UDTSOCKET> tbc;    //收集处于Closeed状态的UDT SOCKET
            vector<UDTSOCKET> tbr;    //收集处于Removed状态的UDT SOCKET
    
            for (map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i)    //从当前的UDT SOCKET MAP中进行检索
            {
                // 检查处于BROKEN状态的UDT SOCKET
                if (i->second->m_pUDT->m_bBroken)    //如果处于BROKEN状态
                {
                    if (i->second->m_Status == LISTENING)    //如果是LISTENING UDT SOCKET
                    {
                        if (CTimer::getTime() - i->second->m_TimeStamp < 3000000)    //如果有客户端连接,等待额外的3S,等待下次处理
                            continue;
                    }
                    else if ((i->second->m_pUDT->m_pRcvBuffer != NULL) && (i->second->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && (i->second->m_pUDT->m_iBrokenCounter -- > 0))
                    {    //如果缓冲区中依旧有数据,应该等待更长的时间
                        continue;
                    }
    
                    //关闭断开的连接并启动清除计时器
                    i->second->m_Status = CLOSED;    //将状态设置为CLOSED
                    i->second->m_TimeStamp = CTimer::getTime();    //设置UDT SOCKET的关闭时间
                    tbc.push_back(i->first);    //将这个UDT SOCKET添加进Closed Array,稍后处理
                    m_ClosedSockets[i->first] = i->second;    //将这个UDT SOCKET添加进CLOSED UDT SOCKET MAP
    
                    // 清理Listener Queue
                    map<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(i->second->m_ListenSocket);
                    if (ls == m_Sockets.end())
                    {
                        ls = m_ClosedSockets.find(i->second->m_ListenSocket);
                        if (ls == m_ClosedSockets.end())
                           continue;
                    }
    
                    CGuard::enterCS(ls->second->m_AcceptLock);
                    ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);
                    ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);
                    CGuard::leaveCS(ls->second->m_AcceptLock);
                }
            }
    
            for (map<UDTSOCKET, CUDTSocket*>::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++ j)    //清理CLOSED UDT SOCKET MAP中的实例
            {
                if (j->second->m_pUDT->m_ullLingerExpiration > 0)    //如果还没有到等待关闭时间
                {
                    //异步关闭:
                    if ((NULL == j->second->m_pUDT->m_pSndBuffer) || (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize()) || (j->second->m_pUDT->m_ullLingerExpiration <= CTimer::getTime()))
                    {    //如果发送缓冲区为空,接收缓冲区为空或者等待关闭时间小于0,调整状态为CLOSED直接关闭
                        j->second->m_pUDT->m_ullLingerExpiration = 0;
                        j->second->m_pUDT->m_bClosing = true;    //更新Closed状态,由下一次启动的GC线程回收
                        j->second->m_TimeStamp = CTimer::getTime();    //更新关闭的时间
                    }
                }
    
                //如果已经超时1S,或者这个UDT SOCKET已经从接收链表中移除,就将其添加到Remove Array中 
                if ((CTimer::getTime() - j->second->m_TimeStamp > 1000000) && ((NULL == j->second->m_pUDT->m_pRNode) || !j->second->m_pUDT->m_pRNode->m_bOnList))
                {
                    tbr.push_back(j->first);
                }
            }
    
            //在全局Map中删除处于Closed状态的实例
            for (vector<UDTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++ k)
                m_Sockets.erase(*k);
    
            // 删除处于超时状态的UDT SOCKET
                for (vector<UDTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++ l)
                    removeSocket(*l);
        }
    

    Order 3:void CUDTUnited::removeSocket(const UDTSOCKET u)

    • 功能A:清理处于Closed状态的UDT SOCKET
        void CUDTUnited::removeSocket(const UDTSOCKET u)
        {
            map<UDTSOCKET, CUDTSocket*>::iterator i = m_ClosedSockets.find(u);
    
            // 如果是一个失效的UDT SCOKET,直接返回
            if (i == m_ClosedSockets.end())    r
                return;
    
            // 由于是多个UDT实例共享一个资源复用器,销毁时要减少引用计数
            const int mid = i->second->m_iMuxID;
    
            if (NULL != i->second->m_pQueuedSockets)    //如果是一个Listener
            {
                CGuard::enterCS(i->second->m_AcceptLock);
    
                //如果是一个Listener,关闭连接队列中所有没有accept()的UDT SOCKET,并在稍后删除它
                for (set<UDTSOCKET>::iterator q = i->second->m_pQueuedSockets->begin(); q != i->second->m_pQueuedSockets->end(); ++ q)
                {    //删除六部曲
                    m_Sockets[*q]->m_pUDT->m_bBroken = true;    //将CUDT*的状态设置为BROKEN
                    m_Sockets[*q]->m_pUDT->close();    //调用CUDT中的close()
                    m_Sockets[*q]->m_TimeStamp = CTimer::getTime();    //更新UDT SOCKET的关闭时间
                    m_Sockets[*q]->m_Status = CLOSED;    //将UDT SOCKET设置为Closed
                    m_ClosedSockets[*q] = m_Sockets[*q];    //在Closed Array中添加当前UDT SOCKET,在GC线程中进行处理
                    m_Sockets.erase(*q);    //从全局的MAP中删除
                }
    
                CGuard::leaveCS(i->second->m_AcceptLock);
            }
    
            // 从保存连接的map中删除连接
            map<int64_t, set<UDTSOCKET> >::iterator j = m_PeerRec.find((i->second->m_PeerID << 30) + i->second->m_iISN);
            if (j != m_PeerRec.end())
            {
                j->second.erase(u);
                if (j->second.empty())
                    m_PeerRec.erase(j);
            }
    
            // 删除当前的UDT SOCKET
            i->second->m_pUDT->close();
            delete i->second;
            m_ClosedSockets.erase(i);    //在CLOSED Map中也一并删除
    
            map<int, CMultiplexer>::iterator m;
            m = m_mMultiplexer.find(mid);
            if (m == m_mMultiplexer.end())    //如果这个资源复用器不存在,直接返回
            {
                //something is wrong!!!
                return;
            }
    
            m->second.m_iRefCount --;    //否则的话,减少这个资源复用器的引用计数
            if (0 == m->second.m_iRefCount)    //如果目前没有UDT SOCKET使用这个资源复用器
            {
                m->second.m_pChannel->close();    //与UDP SOCKET关联的Channel直接关闭
                delete m->second.m_pSndQueue;    //清理资源复用器的资源
                delete m->second.m_pRcvQueue;
                delete m->second.m_pTimer;
                delete m->second.m_pChannel;
                m_mMultiplexer.erase(m);
            }
        }
    

    Order 4:void CUDT::close()

    • 功能A:调用这个函数清理CUDT(保存连接的Class)中的资源
        void CUDT::close()
        {
            if (!m_bOpened)    //如果这个没有处于Opened状态直接返回
                return;
    
            if (0 != m_Linger.l_onoff)    //如果还没有到连接关闭时间
            {
                flush();
            }
    
            // 从SendQueue的发送队列中移除这个对象(CUDT)
            if (m_bConnected)
                m_pSndQueue->m_pSndUList->remove(this);
    
            // 清理IO事件
            s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
            s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
            s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_ERR, true);
    
            // 从所有的Epoll中删除
            try
            {
                for (set<int>::iterator i = m_sPollID.begin(); i != m_sPollID.end(); ++ i)
                    s_UDTUnited.m_EPoll.remove_usock(*i, m_SocketID);
            }
            catch (...)
            {
            }
    
            if (!m_bOpened)
                return;
    
            //当前处于Closing
            m_bClosing = true;
    
            CGuard cg(m_ConnectionLock);
    
            // 如果接收者与发送者还在等待数据,则发送信号
            releaseSynch();    //唤醒所有的发送/接收线程
    
            //如果是Listener并Listening状态,关闭Listening状态并从Recv Queue中移除 
            if (m_bListening)    
            {
                m_bListening = false;
                m_pRcvQueue->removeListener(this);
            }
            else if (m_bConnecting)    //如果处于连接状态,直接移除
            {
                m_pRcvQueue->removeConnector(m_SocketID);
            }
    
            if (m_bConnected)    //如果处于连接完成状态
            {
                if (!m_bShutdown)    //并且对对方没有发送shutdown,那么由我们发送shutdown
                    sendCtrl(5);
    
                m_pCC->close();    //并调用CCC的close()清理拥塞控制资源
    
                // 存储当前连接的状态
                CInfoBlock ib;
                ib.m_iIPversion = m_iIPversion;
                CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
                ib.m_iRTT = m_iRTT;
                ib.m_iBandwidth = m_iBandwidth;
                m_pCache->update(&ib);
    
                m_bConnected = false;    //关闭连接
            }
    
            // waiting all send and recv calls to stop
            CGuard sendguard(m_SendLock);
            CGuard recvguard(m_RecvLock);
    
            // CLOSED.
            m_bOpened = false;
    }
    

    Order 5:void CUDT::flush()

    • 功能A:当UDT SOCKET的Linger不为0时,调用这个函数
        void CUDT::flush()
        {
            uint64_t entertime = CTimer::getTime();    //获取当前时间
    
            while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0) && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL))
            {
                // 先前调用的close()已经检查了延迟,并且已经过期
                if (m_ullLingerExpiration >= entertime)
                    break;
    
                if (!m_bSynSending)
                {
                    // 如果该SOCKET启用了异步关闭机制,稍后使用GC清理资源,立即返回
                    if (0 == m_ullLingerExpiration)
                        m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000ULL;
    
                    return;
                }
    
                #ifndef WINDOWS
                    timespec ts;    //在这块延迟1S
                    ts.tv_sec = 0;
                    ts.tv_nsec = 1000000;
                    nanosleep(&ts, NULL);
                #else
                    Sleep(1);
                #endif
            }
        }
    

    UDT的清理例程

    清理例程顺序:void UDT::cleanup() -> void CUDT::cleanup() -> int CUDTUnited::cleanup()

    Order 0:int CUDTUnited::cleanup()

    • 功能A:在全局CUDTUnited中的清理例程
        int CUDTUnited::cleanup()
        {
            CGuard gcinit(m_InitLock);
    
            if (--m_iInstanceCount > 0)    //判断引用计数,如果>0,暂时不关闭
                return 0;
    
            if (!m_bGCStatus)
                return 0;
           
            //调整CUDTUnited状态,终止GC线程 
                m_bClosing = true;
            #ifndef WINDOWS
                pthread_cond_signal(&m_GCStopCond);
                pthread_join(m_GCThread, NULL);    //清理GC线程的资源
                pthread_mutex_destroy(&m_GCStopLock);
                pthread_cond_destroy(&m_GCStopCond);
            #else
                SetEvent(m_GCStopCond);
                WaitForSingleObject(m_GCThread, INFINITE);
                CloseHandle(m_GCThread);
                CloseHandle(m_GCStopLock);
                CloseHandle(m_GCStopCond);
            #endif
    
            m_bGCStatus = false;    //调整CUDTUnited状态
    
            // Global destruction code
            #ifdef WINDOWS
                WSACleanup();
            #endif
    
             return 0;
        }
    
  • 相关阅读:
    SGU 271 Book Pile (双端队列)
    POJ 3110 Jenny's First Exam (贪心)
    HDU 4310 Hero (贪心)
    ZOJ 2132 The Most Frequent Number (贪心)
    POJ 3388 Japanese Puzzle (二分)
    UVaLive 4628 Jack's socks (贪心)
    POJ 2433 Landscaping (贪心)
    CodeForces 946D Timetable (DP)
    Android Studio教程从入门到精通
    Android Tips – 填坑手册
  • 原文地址:https://www.cnblogs.com/ukernel/p/8993078.html
Copyright © 2011-2022 走看看