zoukankan      html  css  js  c++  java
  • (转)live555从RTSP服务器读取数据到使用接收到的数据流程分析

    本文在linux环境下编译live555工程,并用cgdb调试工具对live555工程中的testProgs目录下的openRTSP的执行过程进行了跟踪分析,直到将从socket端读取视频数据并保存为对应的视频和音频数据为止。

    进入testProgs目录,执行./openRTSP rtsp://xxxx/test.mp4

    对于RTSP协议的处理部分,可设置断点在setupStreams函数中,并跟踪即可进行分析。

    这里主要分析进入如下的while(1)循环中的代码

     
    1. void BasicTaskScheduler0::doEventLoop(char* watchVariable)   
    2. {  
    3.   // Repeatedly loop, handling readble sockets and timed events:  
    4.   while (1)   
    5.   {  
    6.     if (watchVariable != NULL && *watchVariable != 0) break;  
    7.     SingleStep();  
    8.   }  
    9. }  

    从这里可知,live555在客户端处理数据实际上是单线程的程序,不断执行SingleStep()函数中的代码。通过查看该函数代码里,下面一句代码为重点

     
    1. (*handler->handlerProc)(handler->clientData, resultConditionSet);  


     

    其中该条代码出现了两次,通过调试跟踪它的执行轨迹,第一次出现调用的函数是为了处理和RTSP服务器的通信协议的商定,而第二次出现调用的函数才是处理真正的视频和音频数据。对于RTSP通信协议的分析我们暂且不讨论,而直接进入第二次调用该函数的部分。

    在我们的调试过程中在执行到上面的函数时就直接调用到livemedia目录下的如下函数

    1. void MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source, int /*mask*/)   
    2. {  
    3.   source->networkReadHandler1();  
    4. }  



    //下面这个函数实现的主要功能就是从socket端读取数据并存储数据

    1. void MultiFramedRTPSource::networkReadHandler1()   
    2. {  
    3.   BufferedPacket* bPacket = fPacketReadInProgress;  
    4.   if (bPacket == NULL)  
    5.   {  
    6.     // Normal case: Get a free BufferedPacket descriptor to hold the new network packet:  
    7.     //分配一块新的存储空间来存储从socket端读取的数据  
    8.     bPacket = fReorderingBuffer->getFreePacket(this);  
    9.   }  
    10.   
    11.   // Read the network packet, and perform sanity checks on the RTP header:  
    12.   Boolean readSuccess = False;  
    13.   do   
    14.   {  
    15.     Boolean packetReadWasIncomplete = fPacketReadInProgress != NULL;  
    16.     //fillInData()函数封装了从socket端获取数据的过程,到此函数执行完已经将数据保存到了bPacket对象中  
    17.     if (!bPacket->fillInData(fRTPInterface, packetReadWasIncomplete))   
    18.    {  
    19.       if (bPacket->bytesAvailable() == 0)   
    20.       {  
    21.       envir() << "MultiFramedRTPSource error: Hit limit when reading incoming packet over TCP. Increase "MAX_PACKET_SIZE" ";  
    22.       }  
    23.       break;  
    24.    }  
    25.     if (packetReadWasIncomplete)  
    26.     {  
    27.       // We need additional read(s) before we can process the incoming packet:  
    28.       fPacketReadInProgress = bPacket;  
    29.       return;  
    30.     } else   
    31.     {  
    32.       fPacketReadInProgress = NULL;  
    33.     }  
    34.       
    35.     //省略关于RTP包的处理  
    36.     ...  
    37.     ...  
    38.     ...  
    39.     //fReorderingBuffer为MultiFramedRTPSource类中的对象,该对象建立了一个存储Packet数据包对象的链表  
    40.     //下面的storePacket()函数即将上面获取的数据包存储在链表中  
    41.     if (!fReorderingBuffer->storePacket(bPacket)) break;   
    42.   
    43.     readSuccess = True;  
    44.   } while (0);  
    45.   if (!readSuccess) fReorderingBuffer->freePacket(bPacket);  
    46.   
    47.   doGetNextFrame1();  
    48.   // If we didn't get proper data this time, we'll get another chance  
    49. }  


     

    //下面的这个函数则实现从上面函数中介绍的存储数据包链表的对象(即fReorderingBuffer)中取出数据包并调用相应函数使用它

    //代码1.1

    1. void MultiFramedRTPSource::doGetNextFrame1()   
    2. {  
    3.   while (fNeedDelivery)   
    4.   {  
    5.     // If we already have packet data available, then deliver it now.  
    6.     Boolean packetLossPrecededThis;   
    7.     //从fReorderingBuffer对象中取出一个数据包  
    8.     BufferedPacket* nextPacket  
    9.       = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);  
    10.     if (nextPacket == NULL) break;  
    11.   
    12.     fNeedDelivery = False;  
    13.   
    14.     if (nextPacket->useCount() == 0)   
    15.     {  
    16.       // Before using the packet, check whether it has a special header  
    17.       // that needs to be processed:  
    18.       unsigned specialHeaderSize;  
    19.       if (!processSpecialHeader(nextPacket, specialHeaderSize))  
    20.       {  
    21.         // Something's wrong with the header; reject the packet:  
    22.         fReorderingBuffer->releaseUsedPacket(nextPacket);  
    23.         fNeedDelivery = True;  
    24.         break;  
    25.       }  
    26.       nextPacket->skip(specialHeaderSize);  
    27.     }  
    28.   
    29.     // Check whether we're part of a multi-packet frame, and whether  
    30.     // there was packet loss that would render this packet unusable:  
    31.     if (fCurrentPacketBeginsFrame)   
    32.     {  
    33.       if (packetLossPrecededThis || fPacketLossInFragmentedFrame)   
    34.       {  
    35.         // We didn't get all of the previous frame.  
    36.         // Forget any data that we used from it:  
    37.         fTo = fSavedTo; fMaxSize = fSavedMaxSize;  
    38.         fFrameSize = 0;  
    39.       }  
    40.       fPacketLossInFragmentedFrame = False;  
    41.     } else if (packetLossPrecededThis)   
    42.     {  
    43.       // We're in a multi-packet frame, with preceding packet loss  
    44.       fPacketLossInFragmentedFrame = True;  
    45.     }  
    46.     if (fPacketLossInFragmentedFrame)  
    47.     {  
    48.       // This packet is unusable; reject it:  
    49.       fReorderingBuffer->releaseUsedPacket(nextPacket);  
    50.       fNeedDelivery = True;  
    51.       break;  
    52.     }  
    53.   
    54.     // The packet is usable. Deliver all or part of it to our caller:  
    55.     unsigned frameSize;  
    56.     //将上面取出的数据包拷贝到fTo指针所指向的地址  
    57.     nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,  
    58.             fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,  
    59.             fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,  
    60.             fCurPacketMarkerBit);  
    61.     fFrameSize += frameSize;  
    62.   
    63.     if (!nextPacket->hasUsableData())   
    64.     {  
    65.       // We're completely done with this packet now  
    66.       fReorderingBuffer->releaseUsedPacket(nextPacket);  
    67.     }  
    68.   
    69.     if (fCurrentPacketCompletesFrame) //如果完整的取出了一帧数据,则可调用需要该帧数据的函数去处理它  
    70.      {  
    71.       // We have all the data that the client wants.  
    72.       if (fNumTruncatedBytes > 0)   
    73.       {  
    74.     envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("  
    75.         << fSavedMaxSize << ").  "  
    76.         << fNumTruncatedBytes << " bytes of trailing data will be dropped! ";  
    77.       }  
    78.       // Call our own 'after getting' function, so that the downstream object can consume the data:  
    79.       if (fReorderingBuffer->isEmpty())   
    80.       {  
    81.         // Common case optimization: There are no more queued incoming packets, so this code will not get  
    82.         // executed again without having first returned to the event loop.  Call our 'after getting' function  
    83.         // directly, because there's no risk of a long chain of recursion (and thus stack overflow):  
    84.     afterGetting(this);  //调用函数去处理取出的数据帧  
    85.        } else   
    86.       {  
    87.     // Special case: Call our 'after getting' function via the event loop.  
    88.     nextTask() = envir().taskScheduler().scheduleDelayedTask(0,  
    89.                                  (TaskFunc*)FramedSource::afterGetting, this);  
    90.       }  
    91.     }  
    92.     else       
    93.     {  
    94.       // This packet contained fragmented data, and does not complete  
    95.       // the data that the client wants.  Keep getting data:  
    96.       fTo += frameSize; fMaxSize -= frameSize;  
    97.       fNeedDelivery = True;  
    98.     }  
    99.   }  
    100. }  


    //下面这个函数即开始调用执行需要该帧数据的函数

     
    1. void FramedSource::afterGetting(FramedSource* source)   
    2. {  
    3.   source->fIsCurrentlyAwaitingData = False;  
    4.       // indicates that we can be read again  
    5.       // Note that this needs to be done here, in case the "fAfterFunc"  
    6.       // called below tries to read another frame (which it usually will)  
    7.   
    8.   if (source->fAfterGettingFunc != NULL)     
     
    1.   {  
    2.     (*(source->fAfterGettingFunc))(source->fAfterGettingClientData,  
    3.                    source->fFrameSize, source->fNumTruncatedBytes,  
    4.                    source->fPresentationTime,  
    5.                    source->fDurationInMicroseconds);  
    6.   }  
    7. }  


     

    上面的fAfterGettingFunc为我们自己注册的函数,如果运行的是testProgs中的openRTSP实例,则该函数指向下列代码中通过调用getNextFrame()注册的afterGettingFrame()函数

    1. Boolean FileSink::continuePlaying()  
    2. {  
    3.   if (fSource == NULL) return False;  
    4.   
    5.   fSource->getNextFrame(fBuffer, fBufferSize,  
    6.             afterGettingFrame, this,  
    7.             onSourceClosure, this);  
    8.   
    9.   return True;  
    10. }  


    如果运行的是testProgs中的testRTSPClient中的实例,则该函数指向这里注册的afterGettingFrame()函数

    1. Boolean DummySink::continuePlaying()  
    2. {  
    3.   if (fSource == NULL) return False; // sanity check (should not happen)  
    4.   
    5.   // Request the next frame of data from our input source.  "afterGettingFrame()" will get called later, when it arrives:  
    6.   fSource->getNextFrame(fReceiveBuffer, DUMMY_SINK_RECEIVE_BUFFER_SIZE,  
    7.                         afterGettingFrame, this,  
    8.                         onSourceClosure, this);  
    9.   return True;  
    10. }  


     

    从上面的代码中可以看到getNextFrame()函数的第一个参数为分别在各自类中定义的buffer,我们继续以openRTSP为运行程序来分析,fBuffer为FileSink类里定义的指针:unsigned char* fBuffer;

    这里我们先绕一个弯,看看getNextFrame()函数里做了什么

     
    1. void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,  
    2.                 afterGettingFunc* afterGettingFunc,  
    3.                 void* afterGettingClientData,  
    4.                 onCloseFunc* onCloseFunc,  
    5.                 void* onCloseClientData)   
    6. {  
    7.   // Make sure we're not already being read:  
    8.   if (fIsCurrentlyAwaitingData)     
    9.   {  
    10.     envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time! ";  
    11.     envir().internalError();  
    12.   }  
    13.   
    14.   fTo = to;  
    15.   fMaxSize = maxSize;  
    16.   fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame()  
    17.   fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()  
    18.   fAfterGettingFunc = afterGettingFunc;  
    19.   fAfterGettingClientData = afterGettingClientData;  
    20.   fOnCloseFunc = onCloseFunc;  
    21.   fOnCloseClientData = onCloseClientData;  
    22.   fIsCurrentlyAwaitingData = True;  
    23.   
    24.   doGetNextFrame();  
    25. }  


     

    从代码可以知道上面getNextFrame()中传入的第一个参数fBuffer指向了指针fTo,而我们在前面分析代码1.1中的void MultiFramedRTPSource::doGetNextFrame1()函数中有下面一段代码:

    1. //将上面取出的数据包拷贝到fTo指针所指向的地址  
    2.  nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,  
    3.    fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,  
    4.    fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,  
    5.    fCurPacketMarkerBit);  


    实际上现在应该明白了,从getNextFrame()函数中传入的第一个参数fBuffer最终存储的即是从数据包链表对象中取出的数据,并且在调用上面的use()函数后就可以使用了。
    而在void MultiFramedRTPSource::doGetNextFrame1()函数中代码显示的最终调用我们注册的void FileSink::afterGettingFrame()正好是在use()函数调用之后的afterGetting(this)中调用。我们再看看afterGettingFrame()做了什么处理:

    1. void FileSink::afterGettingFrame(void* clientData, unsigned frameSize,  
    2.                  unsigned numTruncatedBytes,  
    3.                  struct timeval presentationTime,  
    4.                  unsigned /*durationInMicroseconds*/)  
    5. {  
    6.   FileSink* sink = (FileSink*)clientData;  
    7.   sink->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime);  
    8. }  
    9.   
    10. void FileSink::afterGettingFrame(unsigned frameSize,  
    11.                  unsigned numTruncatedBytes,  
    12.                  struct timeval presentationTime)   
    13. {  
    14.   if (numTruncatedBytes > 0)     
    15.   {  
    16.     envir() << "FileSink::afterGettingFrame(): The input frame data was too large for our buffer size ("  
    17.         << fBufferSize << ").  "  
    18.             << numTruncatedBytes << " bytes of trailing data was dropped!  Correct this by increasing the "bufferSize" parameter in the "createNew()" call to at least "  
    19.             << fBufferSize + numTruncatedBytes << " ";  
    20.   }  
    21.   addData(fBuffer, frameSize, presentationTime);  
    22.   
    23.   if (fOutFid == NULL || fflush(fOutFid) == EOF)     
    24.   {  
    25.     // The output file has closed.  Handle this the same way as if the  
    26.     // input source had closed:  
    27.     onSourceClosure(this);  
    28.   
    29.     stopPlaying();  
    30.     return;  
    31.   }  
    32.   
    33.   if (fPerFrameFileNameBuffer != NULL)     
    34.   {  
    35.     if (fOutFid != NULL) { fclose(fOutFid); fOutFid = NULL; }  
    36.   }  
    37.   
    38.   // Then try getting the next frame:  
    39.   continuePlaying();  
    40. }  


    从上面代码可以看到调用了addData()函数将数据保存到文件中,然后继续continuePlaying()又去获取下一帧数据然后处理,直到遇到循环结束然后依次退出调用函数。最后看看addData()函数的实现即可知:

    1. void FileSink::addData(unsigned char const* data, unsigned dataSize,  
    2.                struct timeval presentationTime)   
    3. {  
    4.   if (fPerFrameFileNameBuffer != NULL)     
    5.   {  
    6.     // Special case: Open a new file on-the-fly for this frame  
    7.     sprintf(fPerFrameFileNameBuffer, "%s-%lu.%06lu", fPerFrameFileNamePrefix,  
    8.         presentationTime.tv_sec, presentationTime.tv_usec);  
    9.     fOutFid = OpenOutputFile(envir(), fPerFrameFileNameBuffer);  
    10.   }  
    11.   
    12.   // Write to our file:  
    13. #ifdef TEST_LOSS  
    14.   static unsigned const framesPerPacket = 10;  
    15.   static unsigned const frameCount = 0;  
    16.   static Boolean const packetIsLost;  
    17.   if ((frameCount++)%framesPerPacket == 0)     
    18.   {  
    19.     packetIsLost = (our_random()%10 == 0); // simulate 10% packet loss #####  
    20.   }  
    21.   
    22.   if (!packetIsLost)  
    23. #endif  
    24.   if (fOutFid != NULL && data != NULL)    
    25.   {  
    26.     fwrite(data, 1, dataSize, fOutFid);  
    27.   }  
    28. }  


    最后调用系统函数fwrite()实现写入文件功能。

    总结:从上面的分析可知,如果要取得从RTSP服务器端接收并保存的数据帧,我们只需要定义一个类并实现如下格式两个的函数,并声明一个指针地址buffer用于指向数据帧,再在continuePlaying()函数中调用getNextFrame(buffer,...)即可。

     typedef void (afterGettingFunc)(void* clientData, unsigned frameSize,  
    1.           unsigned numTruncatedBytes,  
    2.           struct timeval presentationTime,  
    3.           unsigned durationInMicroseconds);  
    4. typedef void (onCloseFunc)(void* clientData);  


    然后再在afterGettingFunc的函数中即可使用buffer。.

  • 相关阅读:
    ado.net的基本特性
    The relationship's type
    the relationship's cardinality
    复杂心情中。。。
    the relationship's existence.
    ORACLE中国的短视
    ADO.net连接数据库
    虚拟基类的初始化
    递归实现全排列
    Factory Methods
  • 原文地址:https://www.cnblogs.com/lihaiping/p/4244903.html
Copyright © 2011-2022 走看看